Resolved the remaining known internal testing issues

Removed the 'immediatelyOnMain' extensions as they would break in some cases (eg. upstream errors with multiple 'receive(on:)' calls) resulting in logic running on unexpected threads
Updated the ReplaySubject to add subscribers in an Atomic just to be safe
Updated the code to remove the invalid open group when the user receives an error after joining
Fixed a bug with editing closed group members
Fixed broken unit tests
pull/751/head
Morgan Pretty 11 months ago
parent 6cf7cc42ab
commit 5e2e103ee1

@ -3,7 +3,6 @@
import UIKit
import YYImage
import MediaPlayer
import WebRTC
import SessionUIKit
import SessionMessagingKit
import SessionUtilitiesKit

@ -1,7 +1,6 @@
// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved.
import UIKit
import WebRTC
import SessionUIKit
public protocol VideoPreviewDelegate: AnyObject {

@ -1,7 +1,6 @@
// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved.
import UIKit
import WebRTC
import SessionUIKit
import SessionMessagingKit
import SignalUtilitiesKit

@ -465,7 +465,8 @@ final class EditClosedGroupVC: BaseVC, UITableViewDataSource, UITableViewDelegat
ModalActivityIndicatorViewController.present(fromViewController: navigationController) { _ in
Storage.shared
.writePublisher { db in
guard updatedMemberIds.contains(userPublicKey) else { return }
// If the user is no longer a member then leave the group
guard !updatedMemberIds.contains(userPublicKey) else { return }
try MessageSender.leave(
db,

@ -1552,6 +1552,18 @@ extension ConversationVC:
switch result {
case .finished: break
case .failure(let error):
// If there was a failure then the group will be in invalid state until
// the next launch so remove it (the user will be left on the previous
// screen so can re-trigger the join)
Storage.shared.writeAsync { db in
OpenGroupManager.shared.delete(
db,
openGroupId: OpenGroup.idFor(roomToken: room, server: server),
calledFromConfigHandling: false
)
}
// Show the user an error indicating they failed to properly join the group
let errorModal: ConfirmationModal = ConfirmationModal(
info: ConfirmationModal.Info(
title: "COMMUNITY_ERROR_GENERIC".localized(),

@ -4,7 +4,6 @@ import UIKit
import Combine
import UserNotifications
import GRDB
import WebRTC
import SessionUIKit
import SessionMessagingKit
import SessionUtilitiesKit

@ -193,11 +193,25 @@ final class JoinOpenGroupVC: BaseVC, UIPageViewControllerDataSource, UIPageViewC
receiveCompletion: { result in
switch result {
case .failure(let error):
self?.dismiss(animated: true, completion: nil) // Dismiss the loader
let title = "COMMUNITY_ERROR_GENERIC".localized()
let message = error.localizedDescription
// If there was a failure then the group will be in invalid state until
// the next launch so remove it (the user will be left on the previous
// screen so can re-trigger the join)
Storage.shared.writeAsync { db in
OpenGroupManager.shared.delete(
db,
openGroupId: OpenGroup.idFor(roomToken: roomToken, server: server),
calledFromConfigHandling: false
)
}
// Show the user an error indicating they failed to properly join the group
self?.isJoining = false
self?.showError(title: title, message: message)
self?.dismiss(animated: true) { // Dismiss the loader
self?.showError(
title: "COMMUNITY_ERROR_GENERIC".localized(),
message: error.localizedDescription
)
}
case .finished:
self?.presentingViewController?.dismiss(animated: true, completion: nil)

@ -144,7 +144,7 @@ final class OpenGroupSuggestionGrid: UIView, UICollectionViewDataSource, UIColle
OpenGroupManager.getDefaultRoomsIfNeeded()
.subscribe(on: DispatchQueue.global(qos: .default))
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.receive(on: DispatchQueue.main)
.sinkUntilComplete(
receiveCompletion: { [weak self] result in
switch result {
@ -340,7 +340,7 @@ extension OpenGroupSuggestionGrid {
.eraseToAnyPublisher()
)
.subscribe(on: DispatchQueue.global(qos: .userInitiated))
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.receive(on: DispatchQueue.main)
.sinkUntilComplete(
receiveValue: { [weak self] imageData, hasData in
guard hasData else {

@ -187,13 +187,7 @@ class SessionTableViewController<NavItemId: Equatable, Section: SessionTableSect
private func startObservingChanges() {
// Start observing for data changes
dataChangeCancellable = viewModel.observableTableData
.receive(
on: DispatchQueue.main,
// If we haven't done the initial load the trigger it immediately (blocking the main
// thread so we remain on the launch screen until it completes to be consistent with
// the old behaviour)
immediatelyIfMain: !hasLoadedInitialTableData
)
.receive(on: DispatchQueue.main)
.sink(
receiveCompletion: { [weak self] result in
switch result {
@ -334,7 +328,6 @@ class SessionTableViewController<NavItemId: Equatable, Section: SessionTableSect
.store(in: &disposables)
viewModel.leftNavItems
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.sink { [weak self] maybeItems in
self?.navigationItem.setLeftBarButtonItems(
maybeItems.map { items in
@ -356,7 +349,6 @@ class SessionTableViewController<NavItemId: Equatable, Section: SessionTableSect
.store(in: &disposables)
viewModel.rightNavItems
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.sink { [weak self] maybeItems in
self?.navigationItem.setRightBarButtonItems(
maybeItems.map { items in
@ -378,21 +370,18 @@ class SessionTableViewController<NavItemId: Equatable, Section: SessionTableSect
.store(in: &disposables)
viewModel.emptyStateTextPublisher
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.sink { [weak self] text in
self?.emptyStateLabel.text = text
}
.store(in: &disposables)
viewModel.footerView
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.sink { [weak self] footerView in
self?.tableView.tableFooterView = footerView
}
.store(in: &disposables)
viewModel.footerButtonInfo
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.sink { [weak self] buttonInfo in
if let buttonInfo: SessionButton.Info = buttonInfo {
self?.footerButton.setTitle(buttonInfo.title, for: .normal)

@ -55,8 +55,8 @@ public final class BackgroundPoller {
}
)
)
.subscribe(on: dependencies.subscribeQueue, immediatelyIfMain: true)
.receive(on: dependencies.receiveQueue, immediatelyIfMain: true)
.subscribe(on: dependencies.subscribeQueue)
.receive(on: dependencies.receiveQueue)
.collect()
.sinkUntilComplete(
receiveCompletion: { result in

@ -2,7 +2,6 @@
import Foundation
import GRDB
import WebRTC
import SessionUtilitiesKit
/// See https://developer.mozilla.org/en-US/docs/Web/API/RTCSessionDescription for more information.

@ -78,6 +78,7 @@ public final class OpenGroupManager {
result[server.lowercased()] = OpenGroupAPI.Poller(for: server.lowercased())
}
}
// Now that the pollers have been created actually start them
dependencies.cache.pollers.forEach { _, poller in poller.startIfNeeded(using: dependencies) }
}
@ -1012,8 +1013,8 @@ public final class OpenGroupManager {
)
}
.flatMap { OpenGroupAPI.send(data: $0, using: dependencies) }
.subscribe(on: dependencies.subscribeQueue, immediatelyIfMain: true)
.receive(on: dependencies.receiveQueue, immediatelyIfMain: true)
.subscribe(on: dependencies.subscribeQueue)
.receive(on: dependencies.receiveQueue)
.retry(8)
.map { info, response -> [DefaultRoomInfo]? in
dependencies.storage.write { db -> [DefaultRoomInfo] in

@ -81,8 +81,8 @@ extension OpenGroupAPI {
.map { _ in (lastPollStart, nextPollInterval) }
.eraseToAnyPublisher()
}
.subscribe(on: dependencies.subscribeQueue, immediatelyIfMain: true)
.receive(on: dependencies.receiveQueue, immediatelyIfMain: true)
.subscribe(on: dependencies.subscribeQueue)
.receive(on: dependencies.receiveQueue)
.sinkUntilComplete(
receiveValue: { [weak self] lastPollStart, nextPollInterval in
let currentTime: TimeInterval = Date().timeIntervalSince1970

@ -93,7 +93,6 @@ public class Poller {
let namespaces: [SnodeAPI.Namespace] = self.namespaces
getSnodeForPolling(for: publicKey)
.subscribe(on: dependencies.subscribeQueue, immediatelyIfMain: true)
.flatMap { snode -> AnyPublisher<[Message], Error> in
Poller.poll(
namespaces: namespaces,
@ -103,7 +102,8 @@ public class Poller {
using: dependencies
)
}
.receive(on: dependencies.receiveQueue, immediatelyIfMain: true)
.subscribe(on: dependencies.subscribeQueue)
.receive(on: dependencies.receiveQueue)
.sinkUntilComplete(
receiveCompletion: { [weak self] result in
switch result {
@ -134,7 +134,6 @@ public class Poller {
timer.invalidate()
self?.getSnodeForPolling(for: publicKey)
.subscribe(on: dependencies.subscribeQueue, immediatelyIfMain: true)
.flatMap { snode -> AnyPublisher<[Message], Error> in
Poller.poll(
namespaces: namespaces,
@ -144,7 +143,8 @@ public class Poller {
using: dependencies
)
}
.receive(on: dependencies.receiveQueue, immediatelyIfMain: true)
.subscribe(on: dependencies.subscribeQueue)
.receive(on: dependencies.receiveQueue)
.sinkUntilComplete(
receiveCompletion: { result in
switch result {

@ -367,7 +367,11 @@ class OpenGroupManagerSpec: QuickSpec {
mockOGMCache.when { $0.hasPerformedInitialPoll }.thenReturn([:])
mockOGMCache.when { $0.timeSinceLastPoll }.thenReturn([:])
mockOGMCache.when { $0.getTimeSinceLastOpen(using: dependencies) }.thenReturn(0)
mockOGMCache
.when { [dependencies = dependencies!] cache in
cache.getTimeSinceLastOpen(using: dependencies)
}
.thenReturn(0)
mockOGMCache.when { $0.isPolling }.thenReturn(false)
mockOGMCache.when { $0.pollers }.thenReturn([:])
@ -816,7 +820,7 @@ class OpenGroupManagerSpec: QuickSpec {
var didComplete: Bool = false // Prevent multi-threading test bugs
mockStorage
.writePublisherFlatMap { (db: Database) -> AnyPublisher<Void, Error> in
.writePublisher { (db: Database) -> Bool in
openGroupManager
.add(
db,
@ -827,6 +831,16 @@ class OpenGroupManagerSpec: QuickSpec {
dependencies: dependencies
)
}
.flatMap { successfullyAddedGroup in
openGroupManager.performInitialRequestsAfterAdd(
successfullyAddedGroup: successfullyAddedGroup,
roomToken: "testRoom",
server: "testServer",
publicKey: TestConstants.serverPublicKey,
calledFromConfigHandling: true, // Don't trigger SessionUtil logic
dependencies: dependencies
)
}
.handleEvents(receiveCompletion: { _ in didComplete = true })
.sinkAndStore(in: &disposables)
@ -847,7 +861,7 @@ class OpenGroupManagerSpec: QuickSpec {
var didComplete: Bool = false // Prevent multi-threading test bugs
mockStorage
.writePublisherFlatMap { (db: Database) -> AnyPublisher<Void, Error> in
.writePublisher { (db: Database) -> Bool in
openGroupManager
.add(
db,
@ -858,6 +872,16 @@ class OpenGroupManagerSpec: QuickSpec {
dependencies: dependencies
)
}
.flatMap { successfullyAddedGroup in
openGroupManager.performInitialRequestsAfterAdd(
successfullyAddedGroup: successfullyAddedGroup,
roomToken: "testRoom",
server: "testServer",
publicKey: TestConstants.serverPublicKey,
calledFromConfigHandling: true, // Don't trigger SessionUtil logic
dependencies: dependencies
)
}
.handleEvents(receiveCompletion: { _ in didComplete = true })
.sinkAndStore(in: &disposables)
@ -884,7 +908,7 @@ class OpenGroupManagerSpec: QuickSpec {
var didComplete: Bool = false // Prevent multi-threading test bugs
mockStorage
.writePublisherFlatMap { (db: Database) -> AnyPublisher<Void, Error> in
.writePublisher { (db: Database) -> Bool in
openGroupManager
.add(
db,
@ -897,6 +921,18 @@ class OpenGroupManagerSpec: QuickSpec {
dependencies: dependencies
)
}
.flatMap { successfullyAddedGroup in
openGroupManager.performInitialRequestsAfterAdd(
successfullyAddedGroup: successfullyAddedGroup,
roomToken: "testRoom",
server: "testServer",
publicKey: TestConstants.serverPublicKey
.replacingOccurrences(of: "c3", with: "00")
.replacingOccurrences(of: "b3", with: "00"),
calledFromConfigHandling: true, // Don't trigger SessionUtil logic
dependencies: dependencies
)
}
.handleEvents(receiveCompletion: { _ in didComplete = true })
.sinkAndStore(in: &disposables)
@ -940,7 +976,7 @@ class OpenGroupManagerSpec: QuickSpec {
var error: Error?
mockStorage
.writePublisherFlatMap { (db: Database) -> AnyPublisher<Void, Error> in
.writePublisher { (db: Database) -> Bool in
openGroupManager
.add(
db,
@ -951,6 +987,16 @@ class OpenGroupManagerSpec: QuickSpec {
dependencies: dependencies
)
}
.flatMap { successfullyAddedGroup in
openGroupManager.performInitialRequestsAfterAdd(
successfullyAddedGroup: successfullyAddedGroup,
roomToken: "testRoom",
server: "testServer",
publicKey: TestConstants.serverPublicKey,
calledFromConfigHandling: true, // Don't trigger SessionUtil logic
dependencies: dependencies
)
}
.mapError { result -> Error in error.setting(to: result) }
.sinkAndStore(in: &disposables)
@ -3334,15 +3380,16 @@ class OpenGroupManagerSpec: QuickSpec {
upload: false,
defaultUpload: nil
)
let publisher = Future<[OpenGroupAPI.Room], Error> { resolver in
resolver(Result.success([uniqueRoomInstance]))
let publisher = Future<[OpenGroupManager.DefaultRoomInfo], Error> { resolver in
resolver(Result.success([(uniqueRoomInstance, nil)]))
}
.shareReplay(1)
.eraseToAnyPublisher()
mockOGMCache.when { $0.defaultRoomsPublisher }.thenReturn(publisher)
let publisher2 = OpenGroupManager.getDefaultRoomsIfNeeded(using: dependencies)
expect(publisher2.firstValue()).to(equal(publisher.firstValue()))
expect(publisher2.firstValue()?.map { $0.room })
.to(equal(publisher.firstValue()?.map { $0.room }))
}
it("stores the open group information") {
@ -3376,13 +3423,13 @@ class OpenGroupManagerSpec: QuickSpec {
}
it("fetches rooms for the server") {
var response: [OpenGroupAPI.Room]?
var response: [OpenGroupManager.DefaultRoomInfo]?
OpenGroupManager.getDefaultRoomsIfNeeded(using: dependencies)
.handleEvents(receiveOutput: { (data: [OpenGroupAPI.Room]) in response = data })
.handleEvents(receiveOutput: { response = $0 })
.sinkAndStore(in: &disposables)
expect(response)
expect(response?.map { $0.room })
.toEventually(
equal(
[
@ -3598,17 +3645,14 @@ class OpenGroupManagerSpec: QuickSpec {
.thenReturn([OpenGroup.idFor(roomToken: "testRoom", server: "testServer"): publisher])
var result: Data?
mockStorage
.readPublisherFlatMap { (db: Database) -> AnyPublisher<Data, Error> in
OpenGroupManager
.roomImage(
db,
fileId: "1",
for: "testRoom",
on: "testServer",
using: dependencies
)
}
OpenGroupManager
.roomImage(
fileId: "1",
for: "testRoom",
on: "testServer",
existingData: nil,
using: dependencies
)
.handleEvents(receiveOutput: { result = $0 })
.sinkAndStore(in: &disposables)
@ -3617,17 +3661,14 @@ class OpenGroupManagerSpec: QuickSpec {
it("does not save the fetched image to storage") {
var didComplete: Bool = false
mockStorage
.readPublisherFlatMap { (db: Database) -> AnyPublisher<Data, Error> in
OpenGroupManager
.roomImage(
db,
fileId: "1",
for: "testRoom",
on: "testServer",
using: dependencies
)
}
OpenGroupManager
.roomImage(
fileId: "1",
for: "testRoom",
on: "testServer",
existingData: nil,
using: dependencies
)
.handleEvents(receiveCompletion: { _ in didComplete = true })
.sinkAndStore(in: &disposables)
@ -3648,17 +3689,14 @@ class OpenGroupManagerSpec: QuickSpec {
it("does not update the image update timestamp") {
var didComplete: Bool = false
mockStorage
.readPublisherFlatMap { (db: Database) -> AnyPublisher<Data, Error> in
OpenGroupManager
.roomImage(
db,
fileId: "1",
for: "testRoom",
on: "testServer",
using: dependencies
)
}
OpenGroupManager
.roomImage(
fileId: "1",
for: "testRoom",
on: "testServer",
existingData: nil,
using: dependencies
)
.handleEvents(receiveCompletion: { _ in didComplete = true })
.sinkAndStore(in: &disposables)
@ -3690,17 +3728,14 @@ class OpenGroupManagerSpec: QuickSpec {
}
dependencies = dependencies.with(onionApi: TestNeverReturningApi.self)
let publisher = mockStorage
.readPublisherFlatMap { (db: Database) -> AnyPublisher<Data, Error> in
OpenGroupManager
.roomImage(
db,
fileId: "1",
for: "testRoom",
on: "testServer",
using: dependencies
)
}
let publisher = OpenGroupManager
.roomImage(
fileId: "1",
for: "testRoom",
on: "testServer",
existingData: nil,
using: dependencies
)
publisher.sinkAndStore(in: &disposables)
expect(mockOGMCache)
@ -3716,17 +3751,14 @@ class OpenGroupManagerSpec: QuickSpec {
it("fetches a new image if there is no cached one") {
var result: Data?
mockStorage
.readPublisherFlatMap { (db: Database) -> AnyPublisher<Data, Error> in
OpenGroupManager
.roomImage(
db,
fileId: "1",
for: "testRoom",
on: OpenGroupAPI.defaultServer,
using: dependencies
)
}
OpenGroupManager
.roomImage(
fileId: "1",
for: "testRoom",
on: OpenGroupAPI.defaultServer,
existingData: nil,
using: dependencies
)
.handleEvents(receiveOutput: { (data: Data) in result = data })
.sinkAndStore(in: &disposables)
@ -3736,17 +3768,14 @@ class OpenGroupManagerSpec: QuickSpec {
it("saves the fetched image to storage") {
var didComplete: Bool = false
mockStorage
.readPublisherFlatMap { (db: Database) -> AnyPublisher<Data, Error> in
OpenGroupManager
.roomImage(
db,
fileId: "1",
for: "testRoom",
on: OpenGroupAPI.defaultServer,
using: dependencies
)
}
OpenGroupManager
.roomImage(
fileId: "1",
for: "testRoom",
on: OpenGroupAPI.defaultServer,
existingData: nil,
using: dependencies
)
.handleEvents(receiveCompletion: { _ in didComplete = true })
.sinkAndStore(in: &disposables)
@ -3768,17 +3797,14 @@ class OpenGroupManagerSpec: QuickSpec {
it("updates the image update timestamp") {
var didComplete: Bool = false
mockStorage
.readPublisherFlatMap { (db: Database) -> AnyPublisher<Data, Error> in
OpenGroupManager
.roomImage(
db,
fileId: "1",
for: "testRoom",
on: OpenGroupAPI.defaultServer,
using: dependencies
)
}
OpenGroupManager
.roomImage(
fileId: "1",
for: "testRoom",
on: OpenGroupAPI.defaultServer,
existingData: nil,
using: dependencies
)
.handleEvents(receiveCompletion: { _ in didComplete = true })
.sinkAndStore(in: &disposables)
@ -3816,17 +3842,14 @@ class OpenGroupManagerSpec: QuickSpec {
it("retrieves the cached image") {
var result: Data?
mockStorage
.readPublisherFlatMap { (db: Database) -> AnyPublisher<Data, Error> in
OpenGroupManager
.roomImage(
db,
fileId: "1",
for: "testRoom",
on: OpenGroupAPI.defaultServer,
using: dependencies
)
}
OpenGroupManager
.roomImage(
fileId: "1",
for: "testRoom",
on: OpenGroupAPI.defaultServer,
existingData: Data([2, 3, 4]),
using: dependencies
)
.handleEvents(receiveOutput: { (data: Data) in result = data })
.sinkAndStore(in: &disposables)
@ -3846,17 +3869,14 @@ class OpenGroupManagerSpec: QuickSpec {
var result: Data?
mockStorage
.readPublisherFlatMap { (db: Database) -> AnyPublisher<Data, Error> in
OpenGroupManager
.roomImage(
db,
fileId: "1",
for: "testRoom",
on: OpenGroupAPI.defaultServer,
using: dependencies
)
}
OpenGroupManager
.roomImage(
fileId: "1",
for: "testRoom",
on: OpenGroupAPI.defaultServer,
existingData: Data([2, 3, 4]),
using: dependencies
)
.handleEvents(receiveOutput: { (data: Data) in result = data })
.sinkAndStore(in: &disposables)

@ -10,7 +10,7 @@ import SessionUtilitiesKit
extension SMKDependencies {
public func with(
onionApi: OnionRequestAPIType.Type? = nil,
generalCache: Atomic<GeneralCacheType>? = nil,
generalCache: MutableGeneralCacheType? = nil,
storage: Storage? = nil,
scheduler: ValueObservationScheduler? = nil,
sodium: SodiumType? = nil,
@ -26,7 +26,7 @@ extension SMKDependencies {
) -> SMKDependencies {
return SMKDependencies(
onionApi: (onionApi ?? self._onionApi.wrappedValue),
generalCache: (generalCache ?? self._generalCache.wrappedValue),
generalCache: (generalCache ?? self._mutableGeneralCache.wrappedValue),
storage: (storage ?? self._storage.wrappedValue),
scheduler: (scheduler ?? self._scheduler.wrappedValue),
sodium: (sodium ?? self._sodium.wrappedValue),

@ -9,9 +9,9 @@ import SessionUtilitiesKit
extension OpenGroupManager.OGMDependencies {
public func with(
cache: Atomic<OGMCacheType>? = nil,
cache: OGMMutableCacheType? = nil,
onionApi: OnionRequestAPIType.Type? = nil,
generalCache: Atomic<GeneralCacheType>? = nil,
generalCache: MutableGeneralCacheType? = nil,
storage: Storage? = nil,
scheduler: ValueObservationScheduler? = nil,
sodium: SodiumType? = nil,
@ -28,7 +28,7 @@ extension OpenGroupManager.OGMDependencies {
return OpenGroupManager.OGMDependencies(
cache: (cache ?? self._mutableCache.wrappedValue),
onionApi: (onionApi ?? self._onionApi.wrappedValue),
generalCache: (generalCache ?? self._generalCache.wrappedValue),
generalCache: (generalCache ?? self._mutableGeneralCache.wrappedValue),
storage: (storage ?? self._storage.wrappedValue),
scheduler: (scheduler ?? self._scheduler.wrappedValue),
sodium: (sodium ?? self._sodium.wrappedValue),

@ -160,7 +160,7 @@ final class ThreadPickerVC: UIViewController, UITableViewDataSource, UITableView
ShareNavController.attachmentPrepPublisher?
.subscribe(on: DispatchQueue.global(qos: .userInitiated))
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.receive(on: DispatchQueue.main)
.sinkUntilComplete(
receiveValue: { [weak self] attachments in
guard let strongSelf = self else { return }

@ -49,7 +49,7 @@ class ThreadDisappearingMessagesSettingsViewModelSpec: QuickSpec {
)
cancellables.append(
viewModel.observableTableData
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.receive(on: ImmediateScheduler.shared)
.sink(
receiveCompletion: { _ in },
receiveValue: { viewModel.updateTableData($0.0) }
@ -132,7 +132,7 @@ class ThreadDisappearingMessagesSettingsViewModelSpec: QuickSpec {
)
cancellables.append(
viewModel.observableTableData
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.receive(on: ImmediateScheduler.shared)
.sink(
receiveCompletion: { _ in },
receiveValue: { viewModel.updateTableData($0.0) }
@ -178,7 +178,7 @@ class ThreadDisappearingMessagesSettingsViewModelSpec: QuickSpec {
cancellables.append(
viewModel.rightNavItems
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.receive(on: ImmediateScheduler.shared)
.sink(
receiveCompletion: { _ in },
receiveValue: { navItems in items = navItems }
@ -194,7 +194,7 @@ class ThreadDisappearingMessagesSettingsViewModelSpec: QuickSpec {
beforeEach {
cancellables.append(
viewModel.rightNavItems
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.receive(on: ImmediateScheduler.shared)
.sink(
receiveCompletion: { _ in },
receiveValue: { navItems in items = navItems }
@ -221,7 +221,7 @@ class ThreadDisappearingMessagesSettingsViewModelSpec: QuickSpec {
cancellables.append(
viewModel.dismissScreen
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.receive(on: ImmediateScheduler.shared)
.sink(
receiveCompletion: { _ in },
receiveValue: { _ in didDismissScreen = true }

@ -35,7 +35,7 @@ class ThreadSettingsViewModelSpec: QuickSpec {
)
mockGeneralCache = MockGeneralCache()
dependencies = Dependencies(
generalCache: Atomic(mockGeneralCache),
generalCache: mockGeneralCache,
storage: mockStorage,
scheduler: .immediate
)
@ -75,7 +75,7 @@ class ThreadSettingsViewModelSpec: QuickSpec {
)
disposables.append(
viewModel.observableTableData
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.receive(on: ImmediateScheduler.shared)
.sink(
receiveCompletion: { _ in },
receiveValue: { viewModel.updateTableData($0.0) }
@ -173,7 +173,7 @@ class ThreadSettingsViewModelSpec: QuickSpec {
)
disposables.append(
viewModel.observableTableData
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.receive(on: ImmediateScheduler.shared)
.sink(
receiveCompletion: { _ in },
receiveValue: { viewModel.updateTableData($0.0) }
@ -447,7 +447,7 @@ class ThreadSettingsViewModelSpec: QuickSpec {
)
disposables.append(
viewModel.observableTableData
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.receive(on: ImmediateScheduler.shared)
.sink(
receiveCompletion: { _ in },
receiveValue: { viewModel.updateTableData($0.0) }
@ -489,7 +489,7 @@ class ThreadSettingsViewModelSpec: QuickSpec {
)
disposables.append(
viewModel.observableTableData
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.receive(on: ImmediateScheduler.shared)
.sink(
receiveCompletion: { _ in },
receiveValue: { viewModel.updateTableData($0.0) }

@ -31,7 +31,7 @@ class NotificationContentViewModelSpec: QuickSpec {
)
viewModel = NotificationContentViewModel(storage: mockStorage, scheduling: .immediate)
dataChangeCancellable = viewModel.observableTableData
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.receive(on: ImmediateScheduler.shared)
.sink(
receiveCompletion: { _ in },
receiveValue: { viewModel.updateTableData($0.0) }
@ -99,7 +99,7 @@ class NotificationContentViewModelSpec: QuickSpec {
}
viewModel = NotificationContentViewModel(storage: mockStorage, scheduling: .immediate)
dataChangeCancellable = viewModel.observableTableData
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.receive(on: ImmediateScheduler.shared)
.sink(
receiveCompletion: { _ in },
receiveValue: { viewModel.updateTableData($0.0) }
@ -148,7 +148,7 @@ class NotificationContentViewModelSpec: QuickSpec {
var didDismissScreen: Bool = false
dismissCancellable = viewModel.dismissScreen
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.receive(on: ImmediateScheduler.shared)
.sink(
receiveCompletion: { _ in },
receiveValue: { _ in didDismissScreen = true }

@ -25,64 +25,6 @@ public extension Publisher {
)
}
/// The standard `.subscribe(on: DispatchQueue.main)` seems to ocassionally dispatch to the
/// next run loop before actually subscribing, this method checks if it's running on the main thread already and
/// if so just subscribes directly rather than routing via `.receive(on:)`
func subscribe<S>(
on scheduler: S,
immediatelyIfMain: Bool,
options: S.SchedulerOptions? = nil
) -> AnyPublisher<Output, Failure> where S: Scheduler {
guard immediatelyIfMain && ((scheduler as? DispatchQueue) == DispatchQueue.main) else {
return self.subscribe(on: scheduler, options: options)
.eraseToAnyPublisher()
}
return self
.flatMap { value -> AnyPublisher<Output, Failure> in
guard Thread.isMainThread else {
return Just(value)
.setFailureType(to: Failure.self)
.subscribe(on: scheduler, options: options)
.eraseToAnyPublisher()
}
return Just(value)
.setFailureType(to: Failure.self)
.eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}
/// The standard `.receive(on: DispatchQueue.main)` seems to ocassionally dispatch to the
/// next run loop before emitting data, this method checks if it's running on the main thread already and
/// if so just emits directly rather than routing via `.receive(on:)`
func receive<S>(
on scheduler: S,
immediatelyIfMain: Bool,
options: S.SchedulerOptions? = nil
) -> AnyPublisher<Output, Failure> where S: Scheduler {
guard immediatelyIfMain && ((scheduler as? DispatchQueue) == DispatchQueue.main) else {
return self.receive(on: scheduler, options: options)
.eraseToAnyPublisher()
}
return self
.flatMap { value -> AnyPublisher<Output, Failure> in
guard Thread.isMainThread else {
return Just(value)
.setFailureType(to: Failure.self)
.receive(on: scheduler, options: options)
.eraseToAnyPublisher()
}
return Just(value)
.setFailureType(to: Failure.self)
.eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}
func tryFlatMap<T, P>(
maxPublishers: Subscribers.Demand = .unlimited,
_ transform: @escaping (Self.Output) throws -> P

@ -9,8 +9,7 @@ public final class ReplaySubject<Output, Failure: Error>: Subject {
private var buffer: [Output] = [Output]()
private let bufferSize: Int
private let lock: NSRecursiveLock = NSRecursiveLock()
private var subscriptions = [ReplaySubjectSubscription<Output, Failure>]()
private var subscriptions: Atomic<[ReplaySubjectSubscription<Output, Failure>]> = Atomic([])
private var completion: Subscribers.Completion<Failure>?
// MARK: - Initialization
@ -27,7 +26,7 @@ public final class ReplaySubject<Output, Failure: Error>: Subject {
buffer.append(value)
buffer = buffer.suffix(bufferSize)
subscriptions.forEach { $0.receive(value) }
subscriptions.wrappedValue.forEach { $0.receive(value) }
}
/// Sends a completion signal to the subscriber
@ -35,7 +34,7 @@ public final class ReplaySubject<Output, Failure: Error>: Subject {
lock.lock(); defer { lock.unlock() }
self.completion = completion
subscriptions.forEach { subscription in subscription.receive(completion: completion) }
subscriptions.wrappedValue.forEach { $0.receive(completion: completion) }
}
/// Provides this Subject an opportunity to establish demand for any new upstream subscriptions
@ -61,11 +60,11 @@ public final class ReplaySubject<Output, Failure: Error>: Subject {
/// we can revert this change
///
/// https://forums.swift.org/t/combine-receive-on-runloop-main-loses-sent-value-how-can-i-make-it-work/28631/20
let subscription: ReplaySubjectSubscription = ReplaySubjectSubscription<Output, Failure>(downstream: AnySubscriber(subscriber)) { [buffer = buffer, completion = completion] subscription in
let subscription: ReplaySubjectSubscription = ReplaySubjectSubscription<Output, Failure>(downstream: AnySubscriber(subscriber)) { [weak self, buffer = buffer, completion = completion] subscription in
self?.subscriptions.mutate { $0.append(subscription) }
subscription.replay(buffer, completion: completion)
}
subscriber.receive(subscription: subscription)
subscriptions.append(subscription)
}
}

@ -7,8 +7,8 @@ import SessionUtilitiesKit
public extension Publisher {
func sinkAndStore<C>(in storage: inout C) where C: RangeReplaceableCollection, C.Element == AnyCancellable {
self
.subscribe(on: DispatchQueue.main, immediatelyIfMain: true)
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.subscribe(on: ImmediateScheduler.shared)
.receive(on: ImmediateScheduler.shared)
.sink(
receiveCompletion: { _ in },
receiveValue: { _ in }
@ -22,7 +22,7 @@ public extension AnyPublisher {
var value: Output?
_ = self
.receive(on: DispatchQueue.main, immediatelyIfMain: true)
.receive(on: ImmediateScheduler.shared)
.sink(
receiveCompletion: { _ in },
receiveValue: { result in value = result }

@ -5,7 +5,7 @@ import SessionUtilitiesKit
@testable import SessionMessagingKit
class MockGeneralCache: Mock<GeneralCacheType>, GeneralCacheType {
class MockGeneralCache: Mock<MutableGeneralCacheType>, MutableGeneralCacheType {
var encodedPublicKey: String? {
get { return accept() as? String }
set { accept(args: [newValue]) }

Loading…
Cancel
Save