From 8439d5711523305f378ba41c0abb75fbe0cd9301 Mon Sep 17 00:00:00 2001 From: jubb Date: Mon, 10 May 2021 17:07:10 +1000 Subject: [PATCH 1/8] refactor: let the periodic work run more frequently and never fail from excessive retries preventing from re-running. remove resume pending jobs from ApplicationContext onCreate and handle in home activity's onCreate instead. prevent some illegal argument exceptions from Random.kt by returning null if empty --- .../securesms/ApplicationContext.java | 2 -- .../loki/api/BackgroundPollWorker.kt | 29 ++++--------------- .../securesms/loki/api/PublicChatManager.kt | 5 +++- .../libsession/snode/utilities/Random.kt | 1 + .../session/libsignal/service/loki/Random.kt | 1 + 5 files changed, 11 insertions(+), 27 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index cc61634f9a..64a9929630 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -327,7 +327,6 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc .setJobStorage(new FastJobStorage(DatabaseFactory.getJobDatabase(this))) .setDependencyInjector(this) .build()); - JobQueue.getShared().resumePendingJobs(); } private void initializeDependencyInjection() { @@ -455,7 +454,6 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc poller.setUserPublicKey(userPublicKey); return; } - LokiAPIDatabase apiDB = DatabaseFactory.getLokiAPIDatabase(this); poller = new Poller(); closedGroupPoller = new ClosedGroupPoller(); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt index 7b4f2c2aa6..c020b5333d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt @@ -17,7 +17,6 @@ import org.session.libsession.snode.SnodeAPI import org.session.libsession.utilities.TextSecurePreferences import org.session.libsignal.utilities.logging.Log import org.thoughtcrime.securesms.database.DatabaseFactory -import java.io.IOException import java.util.concurrent.TimeUnit class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Worker(context, params) { @@ -25,26 +24,10 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor companion object { const val TAG = "BackgroundPollWorker" - private const val RETRY_ATTEMPTS = 3 - - @JvmStatic - fun scheduleInstant(context: Context) { - val workRequest = OneTimeWorkRequestBuilder() - .setConstraints(Constraints.Builder() - .setRequiredNetworkType(NetworkType.CONNECTED) - .build() - ) - .build() - - WorkManager - .getInstance(context) - .enqueue(workRequest) - } - @JvmStatic fun schedulePeriodic(context: Context) { Log.v(TAG, "Scheduling periodic work.") - val workRequest = PeriodicWorkRequestBuilder(15, TimeUnit.MINUTES) + val workRequest = PeriodicWorkRequestBuilder(5, TimeUnit.MINUTES) .setConstraints(Constraints.Builder() .setRequiredNetworkType(NetworkType.CONNECTED) .build() @@ -55,7 +38,7 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor .getInstance(context) .enqueueUniquePeriodicWork( TAG, - ExistingPeriodicWorkPolicy.KEEP, + ExistingPeriodicWorkPolicy.REPLACE, workRequest ) } @@ -105,9 +88,8 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor return Result.success() } catch (exception: Exception) { - Log.v(TAG, "Background poll failed due to error: ${exception.message}.", exception) - - return if (runAttemptCount < RETRY_ATTEMPTS) Result.retry() else Result.failure() + Log.e(TAG, "Background poll failed due to error: ${exception.message}.", exception) + return Result.retry() } } @@ -116,8 +98,7 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor override fun onReceive(context: Context, intent: Intent) { if (intent.action == Intent.ACTION_BOOT_COMPLETED) { Log.v(TAG, "Boot broadcast caught.") - BackgroundPollWorker.scheduleInstant(context) - BackgroundPollWorker.schedulePeriodic(context) + schedulePeriodic(context) } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt index 04ed0e4c95..22878d46fd 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt @@ -31,7 +31,7 @@ class PublicChatManager(private val context: Context) { refreshChatsAndPollers() for ((threadID, _) in chats) { val poller = pollers[threadID] - areAllCaughtUp = if (poller != null) areAllCaughtUp && poller.isCaughtUp else true + areAllCaughtUp = if (poller != null) areAllCaughtUp && poller.isCaughtUp else areAllCaughtUp } return areAllCaughtUp } @@ -42,6 +42,9 @@ class PublicChatManager(private val context: Context) { val poller = pollers[threadID] ?: OpenGroupPoller(chat, executorService) poller.isCaughtUp = false } + for ((_,poller) in v2Pollers) { + poller.isCaughtUp = false + } } public fun startPollersIfNeeded() { diff --git a/libsession/src/main/java/org/session/libsession/snode/utilities/Random.kt b/libsession/src/main/java/org/session/libsession/snode/utilities/Random.kt index 2ec42cdf5b..72ceee9f3b 100644 --- a/libsession/src/main/java/org/session/libsession/snode/utilities/Random.kt +++ b/libsession/src/main/java/org/session/libsession/snode/utilities/Random.kt @@ -6,6 +6,7 @@ import java.security.SecureRandom * Uses `SecureRandom` to pick an element from this collection. */ fun Collection.getRandomElementOrNull(): T? { + if (isEmpty()) return null val index = SecureRandom().nextInt(size) // SecureRandom() should be cryptographically secure return elementAtOrNull(index) } diff --git a/libsignal/src/main/java/org/session/libsignal/service/loki/Random.kt b/libsignal/src/main/java/org/session/libsignal/service/loki/Random.kt index 68bc4380c5..b1c1cd2af7 100644 --- a/libsignal/src/main/java/org/session/libsignal/service/loki/Random.kt +++ b/libsignal/src/main/java/org/session/libsignal/service/loki/Random.kt @@ -6,6 +6,7 @@ import java.security.SecureRandom * Uses `SecureRandom` to pick an element from this collection. */ fun Collection.getRandomElementOrNull(): T? { + if (isEmpty()) return null val index = SecureRandom().nextInt(size) // SecureRandom() should be cryptographically secure return elementAtOrNull(index) } From 9f099771605cc528a333b88d72e778ca3e3c3589 Mon Sep 17 00:00:00 2001 From: jubb Date: Wed, 12 May 2021 10:43:17 +1000 Subject: [PATCH 2/8] refactor: remove registration required for job serialization and test logs, don't try to read class object if the message send class is not of expected type --- .../securesms/loki/api/SessionProtocolImpl.kt | 1 - .../libsession/messaging/jobs/AttachmentUploadJob.kt | 1 + .../java/org/session/libsession/messaging/jobs/Job.kt | 1 + .../org/session/libsession/messaging/jobs/JobQueue.kt | 1 + .../session/libsession/messaging/jobs/MessageSendJob.kt | 9 ++++++++- .../libsession/messaging/jobs/NotifyPNServerJob.kt | 1 + .../sending_receiving/ReceivedMessageHandler.kt | 1 - .../main/java/org/session/libsession/snode/SnodeAPI.kt | 2 +- 8 files changed, 13 insertions(+), 4 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/SessionProtocolImpl.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/SessionProtocolImpl.kt index 9be7b3e461..4916ef483d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/SessionProtocolImpl.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/SessionProtocolImpl.kt @@ -23,7 +23,6 @@ class SessionProtocolImpl(private val context: Context) : SessionProtocol { override fun decrypt(ciphertext: ByteArray, x25519KeyPair: ECKeyPair): Pair { val recipientX25519PrivateKey = x25519KeyPair.privateKey.serialize() val recipientX25519PublicKey = Hex.fromStringCondensed(x25519KeyPair.hexEncodedPublicKey.removing05PrefixIfNeeded()) - Log.d("Test", "recipientX25519PublicKey: $recipientX25519PublicKey") val signatureSize = Sign.BYTES val ed25519PublicKeySize = Sign.PUBLICKEYBYTES diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt index 1d8b1a7170..c6e73e12e1 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt @@ -135,6 +135,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess override fun create(data: Data): AttachmentUploadJob { val serializedMessage = data.getByteArray(KEY_MESSAGE) val kryo = Kryo() + kryo.isRegistrationRequired = false val input = Input(serializedMessage) val message: Message = kryo.readObject(input, Message::class.java) input.close() diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt index 4693fddf4a..aefe7cc907 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt @@ -11,6 +11,7 @@ interface Job { // Keys used for database storage private val KEY_ID = "id" private val KEY_FAILURE_COUNT = "failure_count" + internal const val MAX_BUFFER_SIZE = 1_000_000 // bytes } fun execute() diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index cffb2db7d6..ba21280700 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -50,6 +50,7 @@ class JobQueue : JobDelegate { private fun Job.canExecuteParallel(): Boolean { return this.javaClass in arrayOf( + MessageSendJob::class.java, AttachmentUploadJob::class.java, AttachmentDownloadJob::class.java ) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt index 83822c4fc7..7b64f6bb77 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt @@ -4,6 +4,7 @@ import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output import org.session.libsession.messaging.MessagingModuleConfiguration +import org.session.libsession.messaging.jobs.Job.Companion.MAX_BUFFER_SIZE import org.session.libsession.messaging.messages.Destination import org.session.libsession.messaging.messages.Message import org.session.libsession.messaging.messages.visible.VisibleMessage @@ -79,7 +80,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { override fun serialize(): Data { val kryo = Kryo() kryo.isRegistrationRequired = false - val output = Output(ByteArray(4096), 10_000_000) + val output = Output(ByteArray(4096), MAX_BUFFER_SIZE) kryo.writeClassAndObject(output, message) output.close() val serializedMessage = output.toBytes() @@ -102,7 +103,13 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { val serializedMessage = data.getByteArray(KEY_MESSAGE) val serializedDestination = data.getByteArray(KEY_DESTINATION) val kryo = Kryo() + kryo.isRegistrationRequired = false var input = Input(serializedMessage) + val messageClass = kryo.readClass(input) + if (messageClass == null || !Message::class.java.isAssignableFrom(messageClass.type)) { + // if the message class doesn't exist or it doesn't implement `Message` parent class + throw Exception("deserialized messageClass was ${messageClass.type}") + } val message = kryo.readClassAndObject(input) as Message input.close() input = Input(serializedDestination) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt index fb99f54f56..720dd091ac 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt @@ -80,6 +80,7 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job { override fun create(data: Data): NotifyPNServerJob { val serializedMessage = data.getByteArray(KEY_MESSAGE) val kryo = Kryo() + kryo.isRegistrationRequired = false val input = Input(serializedMessage) val message: SnodeMessage = kryo.readObject(input, SnodeMessage::class.java) input.close() diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt index 2a0b13ae3e..c63e86e56e 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt @@ -1,7 +1,6 @@ package org.session.libsession.messaging.sending_receiving import android.text.TextUtils -import okhttp3.HttpUrl import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.jobs.AttachmentDownloadJob import org.session.libsession.messaging.jobs.JobQueue diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt index a5094a63ee..cb0d5ee837 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -92,7 +92,7 @@ object SnodeAPI { "method" to "get_n_service_nodes", "params" to mapOf( "active_only" to true, - "limit" to 256, +// "limit" to 256, "fields" to mapOf( "public_ip" to true, "storage_port" to true, "pubkey_x25519" to true, "pubkey_ed25519" to true ) ) ) From 18818bf8da16b1ac87bbf491129266bea024fdeb Mon Sep 17 00:00:00 2001 From: jubb Date: Wed, 12 May 2021 11:24:08 +1000 Subject: [PATCH 3/8] refactor: re-add the node limit --- .../src/main/java/org/session/libsession/snode/SnodeAPI.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt index cb0d5ee837..a5094a63ee 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -92,7 +92,7 @@ object SnodeAPI { "method" to "get_n_service_nodes", "params" to mapOf( "active_only" to true, -// "limit" to 256, + "limit" to 256, "fields" to mapOf( "public_ip" to true, "storage_port" to true, "pubkey_x25519" to true, "pubkey_ed25519" to true ) ) ) From edc1454609180dd8690dca22a8cff5da34dfb8dc Mon Sep 17 00:00:00 2001 From: jubb Date: Wed, 12 May 2021 16:48:18 +1000 Subject: [PATCH 4/8] fix: unnamed open groups being processed by creating new threads after deletion job db not marking successful/unsuccessful properly handling send and receive better / in order --- .../securesms/database/Storage.kt | 2 +- .../loki/database/SessionJobDatabase.kt | 8 +-- .../libsession/messaging/jobs/JobQueue.kt | 59 ++++++++++++------- .../messaging/jobs/MessageSendJob.kt | 8 ++- .../ReceivedMessageHandler.kt | 7 ++- 5 files changed, 55 insertions(+), 29 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index 6a2ff6e8df..f93670ae88 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -581,7 +581,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, val database = DatabaseFactory.getThreadDatabase(context) if (!openGroupID.isNullOrEmpty()) { val recipient = Recipient.from(context, Address.fromSerialized(GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())), false) - return database.getOrCreateThreadIdFor(recipient) + return database.getThreadIdIfExistsFor(recipient) } else if (!groupPublicKey.isNullOrEmpty()) { val recipient = Recipient.from(context, Address.fromSerialized(GroupUtil.doubleEncodeGroupID(groupPublicKey)), false) return database.getOrCreateThreadIdFor(recipient) diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt index 3647a9937c..d8c072dd5f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt @@ -29,15 +29,15 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa contentValues.put(jobType, job.getFactoryKey()) contentValues.put(failureCount, job.failureCount) contentValues.put(serializedData, SessionJobHelper.dataSerializer.serialize(job.serialize())) - database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(jobID)) + database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(job.id!!)) } fun markJobAsSucceeded(jobID: String) { - databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf( jobID )) + databaseHelper.writableDatabase.delete(sessionJobTable, "${Companion.jobID} = ?", arrayOf( jobID )) } fun markJobAsFailed(jobID: String) { - databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf( jobID )) + databaseHelper.writableDatabase.delete(sessionJobTable, "${Companion.jobID} = ?", arrayOf( jobID )) } fun getAllPendingJobs(type: String): Map { @@ -75,7 +75,7 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa var cursor: android.database.Cursor? = null try { cursor = database.rawQuery("SELECT * FROM $sessionJobTable WHERE $jobID = ?", arrayOf( job.id )) - return cursor != null && cursor.moveToFirst() + return cursor == null || !cursor.moveToFirst() } catch (e: Exception) { // Do nothing } finally { diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index 246a766fce..fab49384fd 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -17,29 +17,50 @@ import kotlin.math.roundToLong class JobQueue : JobDelegate { private var hasResumedPendingJobs = false // Just for debugging private val jobTimestampMap = ConcurrentHashMap() - private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() - private val multiDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher() + private val rxDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + private val txDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + private val attachmentDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher() private val scope = GlobalScope + SupervisorJob() private val queue = Channel(UNLIMITED) val timer = Timer() + private fun CoroutineScope.processWithDispatcher(channel: Channel, dispatcher: CoroutineDispatcher) = launch(dispatcher) { + for (job in channel) { + if (!isActive) break + job.delegate = this@JobQueue + job.execute() + } + } + init { // Process jobs - scope.launch(dispatcher) { + scope.launch { + val rxQueue = Channel(capacity = 1024) + val txQueue = Channel(capacity = 1024) + val attachmentQueue = Channel(capacity = 1024) + + val receiveJob = processWithDispatcher(rxQueue, rxDispatcher) + val txJob = processWithDispatcher(txQueue, txDispatcher) + val attachmentJob = processWithDispatcher(attachmentQueue, attachmentDispatcher) + while (isActive) { - queue.receive().let { job -> - if (job.canExecuteParallel()) { - launch(multiDispatcher) { - job.delegate = this@JobQueue - job.execute() - } - } else { - job.delegate = this@JobQueue - job.execute() + for (job in queue) { + when (job) { + is NotifyPNServerJob, + is AttachmentUploadJob, + is MessageSendJob -> txQueue.send(job) + is AttachmentDownloadJob -> attachmentQueue.send(job) + else -> rxQueue.send(job) } } } + + // job has been cancelled + receiveJob.cancel() + txJob.cancel() + attachmentJob.cancel() + } } @@ -49,14 +70,6 @@ class JobQueue : JobDelegate { val shared: JobQueue by lazy { JobQueue() } } - private fun Job.canExecuteParallel(): Boolean { - return this.javaClass in arrayOf( - MessageSendJob::class.java, - AttachmentUploadJob::class.java, - AttachmentDownloadJob::class.java - ) - } - fun add(job: Job) { addWithoutExecuting(job) queue.offer(job) // offer always called on unlimited capacity @@ -112,8 +125,10 @@ class JobQueue : JobDelegate { override fun handleJobFailed(job: Job, error: Exception) { job.failureCount += 1 val storage = MessagingModuleConfiguration.shared.storage - if (storage.isJobCanceled(job)) { return Log.i("Loki", "${job::class.simpleName} canceled.")} - if (job.failureCount == job.maxFailureCount) { + if (storage.isJobCanceled(job)) { + return Log.i("Loki", "${job::class.simpleName} canceled.") + } + if (job.failureCount >= job.maxFailureCount) { handleJobFailedPermanently(job, error) } else { storage.persistJob(job) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt index 6c187b686a..1a0e4e57f4 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt @@ -12,6 +12,9 @@ import org.session.libsession.messaging.sending_receiving.MessageSender import org.session.libsignal.utilities.logging.Log class MessageSendJob(val message: Message, val destination: Destination) : Job { + + object AwaitingUploadException: Exception("Awaiting attachment upload") + override var delegate: JobDelegate? = null override var id: String? = null override var failureCount: Int = 0 @@ -46,7 +49,10 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { JobQueue.shared.add(job) } } - if (attachmentsToUpload.isNotEmpty()) return // Wait for all attachments to upload before continuing + if (attachmentsToUpload.isNotEmpty()) { + this.handleFailure(AwaitingUploadException) + return + } // Wait for all attachments to upload before continuing } MessageSender.send(this.message, this.destination).success { this.handleSuccess() diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt index 30f5d81743..41eb261b95 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt @@ -155,6 +155,11 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS val threadID = storage.getOrCreateThreadIdFor(message.syncTarget ?: message.sender!!, message.groupPublicKey, openGroupID) + if (threadID < 0) { + // thread doesn't exist, should only be reached in a case where we are processing open group messages for no longer existent thread + throw MessageReceiver.Error.NoThread + } + val openGroup = threadID.let { storage.getOpenGroup(it.toString()) } @@ -233,7 +238,7 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS } val openGroupServerID = message.openGroupServerMessageID if (openGroupServerID != null) { - storage.setOpenGroupServerMessageID(messageID, openGroupServerID, threadID, !(message.isMediaMessage() || attachments.isNotEmpty())) + storage.setOpenGroupServerMessageID(messageID, openGroupServerID, threadID, !message.isMediaMessage()) } // Cancel any typing indicators if needed cancelTypingIndicatorsIfNeeded(message.sender!!) From 26601dbcb208720d040dcc6c2b1c0edaee285a83 Mon Sep 17 00:00:00 2001 From: Niels Andriesse Date: Thu, 13 May 2021 09:24:13 +1000 Subject: [PATCH 5/8] Clean up background poll worker --- .../loki/api/BackgroundPollWorker.kt | 68 ++++++------------- 1 file changed, 20 insertions(+), 48 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt index a8d00d690c..86970cbcdc 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt @@ -8,7 +8,6 @@ import nl.komponents.kovenant.Promise import nl.komponents.kovenant.all import nl.komponents.kovenant.functional.map import org.session.libsession.messaging.jobs.MessageReceiveJob -import org.session.libsession.messaging.open_groups.OpenGroup import org.session.libsession.messaging.open_groups.OpenGroupV2 import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPoller @@ -17,7 +16,6 @@ import org.session.libsession.snode.SnodeAPI import org.session.libsession.utilities.TextSecurePreferences import org.session.libsignal.utilities.logging.Log import org.thoughtcrime.securesms.database.DatabaseFactory -import java.io.IOException import java.util.concurrent.TimeUnit class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Worker(context, params) { @@ -25,45 +23,23 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor companion object { const val TAG = "BackgroundPollWorker" - private const val RETRY_ATTEMPTS = 3 - - @JvmStatic - fun scheduleInstant(context: Context) { - val workRequest = OneTimeWorkRequestBuilder() - .setConstraints(Constraints.Builder() - .setRequiredNetworkType(NetworkType.CONNECTED) - .build() - ) - .build() - - WorkManager - .getInstance(context) - .enqueue(workRequest) - } - @JvmStatic fun schedulePeriodic(context: Context) { Log.v(TAG, "Scheduling periodic work.") - val workRequest = PeriodicWorkRequestBuilder(15, TimeUnit.MINUTES) - .setConstraints(Constraints.Builder() - .setRequiredNetworkType(NetworkType.CONNECTED) - .build() - ) - .build() - - WorkManager - .getInstance(context) - .enqueueUniquePeriodicWork( - TAG, - ExistingPeriodicWorkPolicy.KEEP, - workRequest - ) + val builder = PeriodicWorkRequestBuilder(5, TimeUnit.MINUTES) + builder.setConstraints(Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build()) + val workRequest = builder.build() + WorkManager.getInstance(context).enqueueUniquePeriodicWork( + TAG, + ExistingPeriodicWorkPolicy.REPLACE, + workRequest + ) } } override fun doWork(): Result { if (TextSecurePreferences.getLocalNumber(context) == null) { - Log.v(TAG, "Background poll is canceled due to the Session user is not set up yet.") + Log.v(TAG, "User not registered yet.") return Result.failure() } @@ -71,44 +47,41 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor Log.v(TAG, "Performing background poll.") val promises = mutableListOf>() - // Private chats + // DMs val userPublicKey = TextSecurePreferences.getLocalNumber(context)!! - val privateChatsPromise = SnodeAPI.getMessages(userPublicKey).map { envelopes -> + val dmsPromise = SnodeAPI.getMessages(userPublicKey).map { envelopes -> envelopes.map { envelope -> // FIXME: Using a job here seems like a bad idea... MessageReceiveJob(envelope.toByteArray(), false).executeAsync() } } - promises.addAll(privateChatsPromise.get()) + promises.addAll(dmsPromise.get()) // Closed groups promises.addAll(ClosedGroupPoller().pollOnce()) // Open Groups - val openGroups = DatabaseFactory.getLokiThreadDatabase(context).getAllPublicChats().map { (_,chat)-> - OpenGroup(chat.channel, chat.server, chat.displayName, chat.isDeletable) - } + val openGroups = DatabaseFactory.getLokiThreadDatabase(context).getAllPublicChats().values for (openGroup in openGroups) { val poller = OpenGroupPoller(openGroup) promises.add(poller.pollForNewMessages()) } - val openGroupsV2 = DatabaseFactory.getLokiThreadDatabase(context).getAllV2OpenGroups().values.groupBy(OpenGroupV2::server) + val v2OpenGroups = DatabaseFactory.getLokiThreadDatabase(context).getAllV2OpenGroups().values.groupBy(OpenGroupV2::server) - openGroupsV2.values.map { groups -> + v2OpenGroups.values.map { groups -> OpenGroupV2Poller(groups) }.forEach { poller -> - promises.add(poller.compactPoll(true).map{ /*Unit*/ }) + promises.add(poller.compactPoll(true).map { }) } - // Wait till all the promises get resolved + // Wait until all the promises are resolved all(promises).get() return Result.success() } catch (exception: Exception) { - Log.v(TAG, "Background poll failed due to error: ${exception.message}.", exception) - - return if (runAttemptCount < RETRY_ATTEMPTS) Result.retry() else Result.failure() + Log.e(TAG, "Background poll failed due to error: ${exception.message}.", exception) + return Result.retry() } } @@ -117,8 +90,7 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor override fun onReceive(context: Context, intent: Intent) { if (intent.action == Intent.ACTION_BOOT_COMPLETED) { Log.v(TAG, "Boot broadcast caught.") - BackgroundPollWorker.scheduleInstant(context) - BackgroundPollWorker.schedulePeriodic(context) + schedulePeriodic(context) } } } From 3cab81c329437072dc1308681d55d0e546c0bb26 Mon Sep 17 00:00:00 2001 From: Niels Andriesse Date: Thu, 13 May 2021 09:38:39 +1000 Subject: [PATCH 6/8] Fix message send job attachment upload handling --- .../securesms/database/Storage.kt | 2 +- .../loki/database/SessionJobDatabase.kt | 8 +++--- .../libsession/messaging/StorageProtocol.kt | 2 +- .../messaging/jobs/AttachmentUploadJob.kt | 2 +- .../libsession/messaging/jobs/JobQueue.kt | 26 ++++++++++++++----- .../messaging/jobs/MessageSendJob.kt | 4 +-- .../ReceivedMessageHandler.kt | 2 +- 7 files changed, 29 insertions(+), 17 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index f93670ae88..f1602a8e17 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -189,7 +189,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, DatabaseFactory.getSessionJobDatabase(context).markJobAsSucceeded(jobId) } - override fun markJobAsFailed(jobId: String) { + override fun markJobAsFailedPermanently(jobId: String) { DatabaseFactory.getSessionJobDatabase(context).markJobAsFailed(jobId) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt index d8c072dd5f..f684a778aa 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt @@ -25,18 +25,18 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa fun persistJob(job: Job) { val database = databaseHelper.writableDatabase val contentValues = ContentValues(4) - contentValues.put(jobID, job.id) + contentValues.put(jobID, job.id!!) contentValues.put(jobType, job.getFactoryKey()) contentValues.put(failureCount, job.failureCount) contentValues.put(serializedData, SessionJobHelper.dataSerializer.serialize(job.serialize())) - database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(job.id!!)) + database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf( job.id!! )) } fun markJobAsSucceeded(jobID: String) { databaseHelper.writableDatabase.delete(sessionJobTable, "${Companion.jobID} = ?", arrayOf( jobID )) } - fun markJobAsFailed(jobID: String) { + fun markJobAsFailedPermanently(jobID: String) { databaseHelper.writableDatabase.delete(sessionJobTable, "${Companion.jobID} = ?", arrayOf( jobID )) } @@ -74,7 +74,7 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa val database = databaseHelper.readableDatabase var cursor: android.database.Cursor? = null try { - cursor = database.rawQuery("SELECT * FROM $sessionJobTable WHERE $jobID = ?", arrayOf( job.id )) + cursor = database.rawQuery("SELECT * FROM $sessionJobTable WHERE $jobID = ?", arrayOf( job.id!! )) return cursor == null || !cursor.moveToFirst() } catch (e: Exception) { // Do nothing diff --git a/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt b/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt index da604264d1..d850e30664 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt @@ -45,7 +45,7 @@ interface StorageProtocol { // Jobs fun persistJob(job: Job) fun markJobAsSucceeded(jobId: String) - fun markJobAsFailed(jobId: String) + fun markJobAsFailedPermanently(jobId: String) fun getAllPendingJobs(type: String): Map fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? fun getMessageSendJob(messageSendJobID: String): MessageSendJob? diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt index cbdcd42fca..690caf512c 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt @@ -103,7 +103,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess val messageSendJob = storage.getMessageSendJob(messageSendJobID) MessageSender.handleFailedMessageSend(this.message, e) if (messageSendJob != null) { - storage.markJobAsFailed(messageSendJobID) + storage.markJobAsFailedPermanently(messageSendJobID) } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index fab49384fd..a2f47556bc 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -5,6 +5,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsignal.utilities.logging.Log +import java.lang.IllegalStateException import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors @@ -47,16 +48,15 @@ class JobQueue : JobDelegate { while (isActive) { for (job in queue) { when (job) { - is NotifyPNServerJob, - is AttachmentUploadJob, - is MessageSendJob -> txQueue.send(job) + is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob -> txQueue.send(job) is AttachmentDownloadJob -> attachmentQueue.send(job) - else -> rxQueue.send(job) + is MessageReceiveJob -> rxQueue.send(job) + else -> throw IllegalStateException("Unexpected job type.") } } } - // job has been cancelled + // The job has been cancelled receiveJob.cancel() txJob.cancel() attachmentJob.cancel() @@ -123,11 +123,23 @@ class JobQueue : JobDelegate { } override fun handleJobFailed(job: Job, error: Exception) { - job.failureCount += 1 + // Canceled val storage = MessagingModuleConfiguration.shared.storage if (storage.isJobCanceled(job)) { return Log.i("Loki", "${job::class.simpleName} canceled.") } + // Message send jobs waiting for the attachment to upload + if (job is MessageSendJob && error is MessageSendJob.AwaitingAttachmentUploadException) { + val retryInterval: Long = 1000 * 4 + Log.i("Loki", "Message send job waiting for attachment upload to finish.") + timer.schedule(delay = retryInterval) { + Log.i("Loki", "Retrying ${job::class.simpleName}.") + queue.offer(job) + } + return + } + // Regular job failure + job.failureCount += 1 if (job.failureCount >= job.maxFailureCount) { handleJobFailedPermanently(job, error) } else { @@ -148,7 +160,7 @@ class JobQueue : JobDelegate { private fun handleJobFailedPermanently(jobId: String) { val storage = MessagingModuleConfiguration.shared.storage - storage.markJobAsFailed(jobId) + storage.markJobAsFailedPermanently(jobId) } private fun getRetryInterval(job: Job): Long { diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt index 1a0e4e57f4..b93aa13dc2 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt @@ -13,7 +13,7 @@ import org.session.libsignal.utilities.logging.Log class MessageSendJob(val message: Message, val destination: Destination) : Job { - object AwaitingUploadException: Exception("Awaiting attachment upload") + object AwaitingAttachmentUploadException : Exception("Awaiting attachment upload.") override var delegate: JobDelegate? = null override var id: String? = null @@ -50,7 +50,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { } } if (attachmentsToUpload.isNotEmpty()) { - this.handleFailure(AwaitingUploadException) + this.handleFailure(AwaitingAttachmentUploadException) return } // Wait for all attachments to upload before continuing } diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt index 41eb261b95..be331891b0 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt @@ -153,7 +153,7 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS // Get or create thread val threadID = storage.getOrCreateThreadIdFor(message.syncTarget - ?: message.sender!!, message.groupPublicKey, openGroupID) + ?: message.sender!!, message.groupPublicKey, openGroupID) if (threadID < 0) { // thread doesn't exist, should only be reached in a case where we are processing open group messages for no longer existent thread From 43ba8299776ba25efca376a592a1ce904064b5bc Mon Sep 17 00:00:00 2001 From: Niels Andriesse Date: Thu, 13 May 2021 09:40:07 +1000 Subject: [PATCH 7/8] Fix build --- .../main/java/org/thoughtcrime/securesms/database/Storage.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index f1602a8e17..2c82d60aa7 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -190,7 +190,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, } override fun markJobAsFailedPermanently(jobId: String) { - DatabaseFactory.getSessionJobDatabase(context).markJobAsFailed(jobId) + DatabaseFactory.getSessionJobDatabase(context).markJobAsFailedPermanently(jobId) } override fun getAllPendingJobs(type: String): Map { From af84b1ef3a97ffb64779940d3ffc1ac864dd4c22 Mon Sep 17 00:00:00 2001 From: Niels Andriesse Date: Thu, 13 May 2021 09:45:29 +1000 Subject: [PATCH 8/8] Update build number --- app/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/build.gradle b/app/build.gradle index 74738b8801..dc0e327af9 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -158,7 +158,7 @@ dependencies { testImplementation 'org.robolectric:shadows-multidex:4.2' } -def canonicalVersionCode = 159 +def canonicalVersionCode = 161 def canonicalVersionName = "1.10.2" def postFixSize = 10