Cleaned up the poller logic a bit

pull/612/head
Morgan Pretty 3 years ago
parent f8b2f73f7b
commit ecbded3819

@ -184,31 +184,21 @@ public final class ClosedGroupPoller {
guard isBackgroundPoll || poller?.isPolling.wrappedValue[groupPublicKey] == true else { return Promise.value(()) } guard isBackgroundPoll || poller?.isPolling.wrappedValue[groupPublicKey] == true else { return Promise.value(()) }
var promises: [Promise<Void>] = [] var promises: [Promise<Void>] = []
var messageCount: Int = 0 let allMessages: [SnodeReceivedMessage] = messageResults
let totalMessagesCount: Int = messageResults .reduce([]) { result, next in
.map { result -> Int in switch next {
switch result { case .fulfilled(let messages): return result.appending(contentsOf: messages)
case .fulfilled(let messages): return messages.count default: return result
default: return 0
} }
} }
.reduce(0, +) var messageCount: Int = 0
let totalMessagesCount: Int = allMessages.count
messageResults.forEach { result in Storage.shared.write { db in
guard case .fulfilled(let messages) = result else { return } let processedMessages: [ProcessedMessage] = allMessages
guard !messages.isEmpty else { return } .compactMap { message -> ProcessedMessage? in
var jobToRun: Job?
Storage.shared.write { db in
var jobDetailMessages: [MessageReceiveJob.Details.MessageInfo] = []
messages.forEach { message in
do { do {
let processedMessage: ProcessedMessage? = try Message.processRawReceivedMessage(db, rawMessage: message) return try Message.processRawReceivedMessage(db, rawMessage: message)
jobDetailMessages = jobDetailMessages
.appending(processedMessage?.messageInfo)
} }
catch { catch {
switch error { switch error {
@ -219,28 +209,30 @@ public final class ClosedGroupPoller {
MessageReceiverError.duplicateControlMessage, MessageReceiverError.duplicateControlMessage,
MessageReceiverError.selfSend: MessageReceiverError.selfSend:
break break
default: SNLog("Failed to deserialize envelope due to error: \(error).") default: SNLog("Failed to deserialize envelope due to error: \(error).")
} }
return nil
} }
} }
messageCount += jobDetailMessages.count
jobToRun = Job(
variant: .messageReceive,
behaviour: .runOnce,
threadId: groupPublicKey,
details: MessageReceiveJob.Details(
messages: jobDetailMessages,
isBackgroundPoll: isBackgroundPoll
)
)
// If we are force-polling then add to the JobRunner so they are persistent and will retry on
// the next app run if they fail but don't let them auto-start
JobRunner.add(db, job: jobToRun, canStartJob: !isBackgroundPoll)
}
messageCount = processedMessages.count
let jobToRun: Job? = Job(
variant: .messageReceive,
behaviour: .runOnce,
threadId: groupPublicKey,
details: MessageReceiveJob.Details(
messages: processedMessages.map { $0.messageInfo },
isBackgroundPoll: isBackgroundPoll
)
)
// If we are force-polling then add to the JobRunner so they are persistent and will retry on
// the next app run if they fail but don't let them auto-start
JobRunner.add(db, job: jobToRun, canStartJob: !isBackgroundPoll)
// We want to try to handle the receive jobs immediately in the background // We want to try to handle the receive jobs immediately in the background
if isBackgroundPoll { if isBackgroundPoll {
promises = promises.appending( promises = promises.appending(

@ -136,49 +136,44 @@ public final class Poller {
var messageCount: Int = 0 var messageCount: Int = 0
Storage.shared.write { db in Storage.shared.write { db in
var threadMessages: [String: [MessageReceiveJob.Details.MessageInfo]] = [:] messages
.compactMap { message -> ProcessedMessage? in
messages.forEach { message in do {
do { return try Message.processRawReceivedMessage(db, rawMessage: message)
let processedMessage: ProcessedMessage? = try Message.processRawReceivedMessage(db, rawMessage: message) }
let key: String = (processedMessage?.threadId ?? Message.nonThreadMessageId) catch {
switch error {
threadMessages[key] = (threadMessages[key] ?? []) // Ignore duplicate & selfSend message errors (and don't bother logging
.appending(processedMessage?.messageInfo) // them as there will be a lot since we each service node duplicates messages)
} case DatabaseError.SQLITE_CONSTRAINT_UNIQUE,
catch { MessageReceiverError.duplicateMessage,
switch error { MessageReceiverError.duplicateControlMessage,
// Ignore duplicate & selfSend message errors (and don't bother logging MessageReceiverError.selfSend:
// them as there will be a lot since we each service node duplicates messages) break
case DatabaseError.SQLITE_CONSTRAINT_UNIQUE,
MessageReceiverError.duplicateMessage, default: SNLog("Failed to deserialize envelope due to error: \(error).")
MessageReceiverError.duplicateControlMessage, }
MessageReceiverError.selfSend:
break
default: SNLog("Failed to deserialize envelope due to error: \(error).") return nil
} }
} }
} .grouped { threadId, _, _ in (threadId ?? Message.nonThreadMessageId) }
.forEach { threadId, threadMessages in
messageCount = threadMessages messageCount += threadMessages.count
.values
.reduce(into: 0) { prev, next in prev += next.count } JobRunner.add(
db,
threadMessages.forEach { threadId, threadMessages in job: Job(
JobRunner.add( variant: .messageReceive,
db, behaviour: .runOnce,
job: Job( threadId: threadId,
variant: .messageReceive, details: MessageReceiveJob.Details(
behaviour: .runOnce, messages: threadMessages.map { $0.messageInfo },
threadId: threadId, isBackgroundPoll: false
details: MessageReceiveJob.Details( )
messages: threadMessages,
isBackgroundPoll: false
) )
) )
) }
}
} }
SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") (duplicates: \(messages.count - messageCount))") SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") (duplicates: \(messages.count - messageCount))")

@ -688,9 +688,11 @@ private final class JobQueue {
} }
private func scheduleNextSoonestJob() { private func scheduleNextSoonestJob() {
let jobIdsAlreadyRunning: Set<Int64> = jobsCurrentlyRunning.wrappedValue
let nextJobTimestamp: TimeInterval? = Storage.shared.read { db in let nextJobTimestamp: TimeInterval? = Storage.shared.read { db in
try Job.filterPendingJobs(variants: jobVariants, excludeFutureJobs: false) try Job.filterPendingJobs(variants: jobVariants, excludeFutureJobs: false)
.select(.nextRunTimestamp) .select(.nextRunTimestamp)
.filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running
.asRequest(of: TimeInterval.self) .asRequest(of: TimeInterval.self)
.fetchOne(db) .fetchOne(db)
} }

Loading…
Cancel
Save