diff --git a/app/build.gradle b/app/build.gradle index 74738b8801..dc0e327af9 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -158,7 +158,7 @@ dependencies { testImplementation 'org.robolectric:shadows-multidex:4.2' } -def canonicalVersionCode = 159 +def canonicalVersionCode = 161 def canonicalVersionName = "1.10.2" def postFixSize = 10 diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index cc61634f9a..64a9929630 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -327,7 +327,6 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc .setJobStorage(new FastJobStorage(DatabaseFactory.getJobDatabase(this))) .setDependencyInjector(this) .build()); - JobQueue.getShared().resumePendingJobs(); } private void initializeDependencyInjection() { @@ -455,7 +454,6 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc poller.setUserPublicKey(userPublicKey); return; } - LokiAPIDatabase apiDB = DatabaseFactory.getLokiAPIDatabase(this); poller = new Poller(); closedGroupPoller = new ClosedGroupPoller(); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index f39e9d12d6..9575d53ad0 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -184,8 +184,8 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, DatabaseFactory.getSessionJobDatabase(context).markJobAsSucceeded(jobId) } - override fun markJobAsFailed(jobId: String) { - DatabaseFactory.getSessionJobDatabase(context).markJobAsFailed(jobId) + override fun markJobAsFailedPermanently(jobId: String) { + DatabaseFactory.getSessionJobDatabase(context).markJobAsFailedPermanently(jobId) } override fun getAllPendingJobs(type: String): Map { @@ -576,7 +576,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, val database = DatabaseFactory.getThreadDatabase(context) if (!openGroupID.isNullOrEmpty()) { val recipient = Recipient.from(context, Address.fromSerialized(GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())), false) - return database.getOrCreateThreadIdFor(recipient) + return database.getThreadIdIfExistsFor(recipient) } else if (!groupPublicKey.isNullOrEmpty()) { val recipient = Recipient.from(context, Address.fromSerialized(GroupUtil.doubleEncodeGroupID(groupPublicKey)), false) return database.getOrCreateThreadIdFor(recipient) diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt index a8d00d690c..86970cbcdc 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt @@ -8,7 +8,6 @@ import nl.komponents.kovenant.Promise import nl.komponents.kovenant.all import nl.komponents.kovenant.functional.map import org.session.libsession.messaging.jobs.MessageReceiveJob -import org.session.libsession.messaging.open_groups.OpenGroup import org.session.libsession.messaging.open_groups.OpenGroupV2 import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPoller @@ -17,7 +16,6 @@ import org.session.libsession.snode.SnodeAPI import org.session.libsession.utilities.TextSecurePreferences import org.session.libsignal.utilities.logging.Log import org.thoughtcrime.securesms.database.DatabaseFactory -import java.io.IOException import java.util.concurrent.TimeUnit class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Worker(context, params) { @@ -25,45 +23,23 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor companion object { const val TAG = "BackgroundPollWorker" - private const val RETRY_ATTEMPTS = 3 - - @JvmStatic - fun scheduleInstant(context: Context) { - val workRequest = OneTimeWorkRequestBuilder() - .setConstraints(Constraints.Builder() - .setRequiredNetworkType(NetworkType.CONNECTED) - .build() - ) - .build() - - WorkManager - .getInstance(context) - .enqueue(workRequest) - } - @JvmStatic fun schedulePeriodic(context: Context) { Log.v(TAG, "Scheduling periodic work.") - val workRequest = PeriodicWorkRequestBuilder(15, TimeUnit.MINUTES) - .setConstraints(Constraints.Builder() - .setRequiredNetworkType(NetworkType.CONNECTED) - .build() - ) - .build() - - WorkManager - .getInstance(context) - .enqueueUniquePeriodicWork( - TAG, - ExistingPeriodicWorkPolicy.KEEP, - workRequest - ) + val builder = PeriodicWorkRequestBuilder(5, TimeUnit.MINUTES) + builder.setConstraints(Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build()) + val workRequest = builder.build() + WorkManager.getInstance(context).enqueueUniquePeriodicWork( + TAG, + ExistingPeriodicWorkPolicy.REPLACE, + workRequest + ) } } override fun doWork(): Result { if (TextSecurePreferences.getLocalNumber(context) == null) { - Log.v(TAG, "Background poll is canceled due to the Session user is not set up yet.") + Log.v(TAG, "User not registered yet.") return Result.failure() } @@ -71,44 +47,41 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor Log.v(TAG, "Performing background poll.") val promises = mutableListOf>() - // Private chats + // DMs val userPublicKey = TextSecurePreferences.getLocalNumber(context)!! - val privateChatsPromise = SnodeAPI.getMessages(userPublicKey).map { envelopes -> + val dmsPromise = SnodeAPI.getMessages(userPublicKey).map { envelopes -> envelopes.map { envelope -> // FIXME: Using a job here seems like a bad idea... MessageReceiveJob(envelope.toByteArray(), false).executeAsync() } } - promises.addAll(privateChatsPromise.get()) + promises.addAll(dmsPromise.get()) // Closed groups promises.addAll(ClosedGroupPoller().pollOnce()) // Open Groups - val openGroups = DatabaseFactory.getLokiThreadDatabase(context).getAllPublicChats().map { (_,chat)-> - OpenGroup(chat.channel, chat.server, chat.displayName, chat.isDeletable) - } + val openGroups = DatabaseFactory.getLokiThreadDatabase(context).getAllPublicChats().values for (openGroup in openGroups) { val poller = OpenGroupPoller(openGroup) promises.add(poller.pollForNewMessages()) } - val openGroupsV2 = DatabaseFactory.getLokiThreadDatabase(context).getAllV2OpenGroups().values.groupBy(OpenGroupV2::server) + val v2OpenGroups = DatabaseFactory.getLokiThreadDatabase(context).getAllV2OpenGroups().values.groupBy(OpenGroupV2::server) - openGroupsV2.values.map { groups -> + v2OpenGroups.values.map { groups -> OpenGroupV2Poller(groups) }.forEach { poller -> - promises.add(poller.compactPoll(true).map{ /*Unit*/ }) + promises.add(poller.compactPoll(true).map { }) } - // Wait till all the promises get resolved + // Wait until all the promises are resolved all(promises).get() return Result.success() } catch (exception: Exception) { - Log.v(TAG, "Background poll failed due to error: ${exception.message}.", exception) - - return if (runAttemptCount < RETRY_ATTEMPTS) Result.retry() else Result.failure() + Log.e(TAG, "Background poll failed due to error: ${exception.message}.", exception) + return Result.retry() } } @@ -117,8 +90,7 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor override fun onReceive(context: Context, intent: Intent) { if (intent.action == Intent.ACTION_BOOT_COMPLETED) { Log.v(TAG, "Boot broadcast caught.") - BackgroundPollWorker.scheduleInstant(context) - BackgroundPollWorker.schedulePeriodic(context) + schedulePeriodic(context) } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt index 04ed0e4c95..22878d46fd 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt @@ -31,7 +31,7 @@ class PublicChatManager(private val context: Context) { refreshChatsAndPollers() for ((threadID, _) in chats) { val poller = pollers[threadID] - areAllCaughtUp = if (poller != null) areAllCaughtUp && poller.isCaughtUp else true + areAllCaughtUp = if (poller != null) areAllCaughtUp && poller.isCaughtUp else areAllCaughtUp } return areAllCaughtUp } @@ -42,6 +42,9 @@ class PublicChatManager(private val context: Context) { val poller = pollers[threadID] ?: OpenGroupPoller(chat, executorService) poller.isCaughtUp = false } + for ((_,poller) in v2Pollers) { + poller.isCaughtUp = false + } } public fun startPollersIfNeeded() { diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/SessionProtocolImpl.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/SessionProtocolImpl.kt index 9be7b3e461..4916ef483d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/SessionProtocolImpl.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/SessionProtocolImpl.kt @@ -23,7 +23,6 @@ class SessionProtocolImpl(private val context: Context) : SessionProtocol { override fun decrypt(ciphertext: ByteArray, x25519KeyPair: ECKeyPair): Pair { val recipientX25519PrivateKey = x25519KeyPair.privateKey.serialize() val recipientX25519PublicKey = Hex.fromStringCondensed(x25519KeyPair.hexEncodedPublicKey.removing05PrefixIfNeeded()) - Log.d("Test", "recipientX25519PublicKey: $recipientX25519PublicKey") val signatureSize = Sign.BYTES val ed25519PublicKeySize = Sign.PUBLICKEYBYTES diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt index 3647a9937c..f684a778aa 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt @@ -25,19 +25,19 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa fun persistJob(job: Job) { val database = databaseHelper.writableDatabase val contentValues = ContentValues(4) - contentValues.put(jobID, job.id) + contentValues.put(jobID, job.id!!) contentValues.put(jobType, job.getFactoryKey()) contentValues.put(failureCount, job.failureCount) contentValues.put(serializedData, SessionJobHelper.dataSerializer.serialize(job.serialize())) - database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(jobID)) + database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf( job.id!! )) } fun markJobAsSucceeded(jobID: String) { - databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf( jobID )) + databaseHelper.writableDatabase.delete(sessionJobTable, "${Companion.jobID} = ?", arrayOf( jobID )) } - fun markJobAsFailed(jobID: String) { - databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf( jobID )) + fun markJobAsFailedPermanently(jobID: String) { + databaseHelper.writableDatabase.delete(sessionJobTable, "${Companion.jobID} = ?", arrayOf( jobID )) } fun getAllPendingJobs(type: String): Map { @@ -74,8 +74,8 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa val database = databaseHelper.readableDatabase var cursor: android.database.Cursor? = null try { - cursor = database.rawQuery("SELECT * FROM $sessionJobTable WHERE $jobID = ?", arrayOf( job.id )) - return cursor != null && cursor.moveToFirst() + cursor = database.rawQuery("SELECT * FROM $sessionJobTable WHERE $jobID = ?", arrayOf( job.id!! )) + return cursor == null || !cursor.moveToFirst() } catch (e: Exception) { // Do nothing } finally { diff --git a/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt b/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt index 3897892555..d9520fc348 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt @@ -45,7 +45,7 @@ interface StorageProtocol { // Jobs fun persistJob(job: Job) fun markJobAsSucceeded(jobId: String) - fun markJobAsFailed(jobId: String) + fun markJobAsFailedPermanently(jobId: String) fun getAllPendingJobs(type: String): Map fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? fun getMessageSendJob(messageSendJobID: String): MessageSendJob? diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt index e0849d1daf..690caf512c 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt @@ -103,7 +103,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess val messageSendJob = storage.getMessageSendJob(messageSendJobID) MessageSender.handleFailedMessageSend(this.message, e) if (messageSendJob != null) { - storage.markJobAsFailed(messageSendJobID) + storage.markJobAsFailedPermanently(messageSendJobID) } } @@ -131,6 +131,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess override fun create(data: Data): AttachmentUploadJob { val serializedMessage = data.getByteArray(MESSAGE_KEY) val kryo = Kryo() + kryo.isRegistrationRequired = false val input = Input(serializedMessage) val message: Message = kryo.readObject(input, Message::class.java) input.close() diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt index 332eca002d..ca2c8c9629 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt @@ -12,6 +12,7 @@ interface Job { // Keys used for database storage private val ID_KEY = "id" private val FAILURE_COUNT_KEY = "failure_count" + internal const val MAX_BUFFER_SIZE = 1_000_000 // bytes } fun execute() diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index a89338cf3c..a2f47556bc 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -5,6 +5,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsignal.utilities.logging.Log +import java.lang.IllegalStateException import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors @@ -17,29 +18,49 @@ import kotlin.math.roundToLong class JobQueue : JobDelegate { private var hasResumedPendingJobs = false // Just for debugging private val jobTimestampMap = ConcurrentHashMap() - private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() - private val multiDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher() + private val rxDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + private val txDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + private val attachmentDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher() private val scope = GlobalScope + SupervisorJob() private val queue = Channel(UNLIMITED) val timer = Timer() + private fun CoroutineScope.processWithDispatcher(channel: Channel, dispatcher: CoroutineDispatcher) = launch(dispatcher) { + for (job in channel) { + if (!isActive) break + job.delegate = this@JobQueue + job.execute() + } + } + init { // Process jobs - scope.launch(dispatcher) { + scope.launch { + val rxQueue = Channel(capacity = 1024) + val txQueue = Channel(capacity = 1024) + val attachmentQueue = Channel(capacity = 1024) + + val receiveJob = processWithDispatcher(rxQueue, rxDispatcher) + val txJob = processWithDispatcher(txQueue, txDispatcher) + val attachmentJob = processWithDispatcher(attachmentQueue, attachmentDispatcher) + while (isActive) { - queue.receive().let { job -> - if (job.canExecuteParallel()) { - launch(multiDispatcher) { - job.delegate = this@JobQueue - job.execute() - } - } else { - job.delegate = this@JobQueue - job.execute() + for (job in queue) { + when (job) { + is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob -> txQueue.send(job) + is AttachmentDownloadJob -> attachmentQueue.send(job) + is MessageReceiveJob -> rxQueue.send(job) + else -> throw IllegalStateException("Unexpected job type.") } } } + + // The job has been cancelled + receiveJob.cancel() + txJob.cancel() + attachmentJob.cancel() + } } @@ -49,13 +70,6 @@ class JobQueue : JobDelegate { val shared: JobQueue by lazy { JobQueue() } } - private fun Job.canExecuteParallel(): Boolean { - return this.javaClass in arrayOf( - AttachmentUploadJob::class.java, - AttachmentDownloadJob::class.java - ) - } - fun add(job: Job) { addWithoutExecuting(job) queue.offer(job) // offer always called on unlimited capacity @@ -109,10 +123,24 @@ class JobQueue : JobDelegate { } override fun handleJobFailed(job: Job, error: Exception) { - job.failureCount += 1 + // Canceled val storage = MessagingModuleConfiguration.shared.storage - if (storage.isJobCanceled(job)) { return Log.i("Loki", "${job::class.simpleName} canceled.")} - if (job.failureCount == job.maxFailureCount) { + if (storage.isJobCanceled(job)) { + return Log.i("Loki", "${job::class.simpleName} canceled.") + } + // Message send jobs waiting for the attachment to upload + if (job is MessageSendJob && error is MessageSendJob.AwaitingAttachmentUploadException) { + val retryInterval: Long = 1000 * 4 + Log.i("Loki", "Message send job waiting for attachment upload to finish.") + timer.schedule(delay = retryInterval) { + Log.i("Loki", "Retrying ${job::class.simpleName}.") + queue.offer(job) + } + return + } + // Regular job failure + job.failureCount += 1 + if (job.failureCount >= job.maxFailureCount) { handleJobFailedPermanently(job, error) } else { storage.persistJob(job) @@ -132,7 +160,7 @@ class JobQueue : JobDelegate { private fun handleJobFailedPermanently(jobId: String) { val storage = MessagingModuleConfiguration.shared.storage - storage.markJobAsFailed(jobId) + storage.markJobAsFailedPermanently(jobId) } private fun getRetryInterval(job: Job): Long { diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt index 26620b3057..b93aa13dc2 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt @@ -4,6 +4,7 @@ import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output import org.session.libsession.messaging.MessagingModuleConfiguration +import org.session.libsession.messaging.jobs.Job.Companion.MAX_BUFFER_SIZE import org.session.libsession.messaging.messages.Destination import org.session.libsession.messaging.messages.Message import org.session.libsession.messaging.messages.visible.VisibleMessage @@ -11,6 +12,9 @@ import org.session.libsession.messaging.sending_receiving.MessageSender import org.session.libsignal.utilities.logging.Log class MessageSendJob(val message: Message, val destination: Destination) : Job { + + object AwaitingAttachmentUploadException : Exception("Awaiting attachment upload.") + override var delegate: JobDelegate? = null override var id: String? = null override var failureCount: Int = 0 @@ -45,7 +49,10 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { JobQueue.shared.add(job) } } - if (attachmentsToUpload.isNotEmpty()) return // Wait for all attachments to upload before continuing + if (attachmentsToUpload.isNotEmpty()) { + this.handleFailure(AwaitingAttachmentUploadException) + return + } // Wait for all attachments to upload before continuing } MessageSender.send(this.message, this.destination).success { this.handleSuccess() @@ -80,7 +87,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { override fun serialize(): Data { val kryo = Kryo() kryo.isRegistrationRequired = false - val output = Output(ByteArray(4096), 10_000_000) + val output = Output(ByteArray(4096), MAX_BUFFER_SIZE) // Message kryo.writeClassAndObject(output, message) output.close() diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt index d5a1674bad..8aa69a5859 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt @@ -78,6 +78,7 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job { override fun create(data: Data): NotifyPNServerJob { val serializedMessage = data.getByteArray(MESSAGE_KEY) val kryo = Kryo() + kryo.isRegistrationRequired = false val input = Input(serializedMessage) val message: SnodeMessage = kryo.readObject(input, SnodeMessage::class.java) input.close() diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt index e8e1ebde1e..71a590c7e4 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageHandler.kt @@ -1,7 +1,6 @@ package org.session.libsession.messaging.sending_receiving import android.text.TextUtils -import okhttp3.HttpUrl import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.jobs.AttachmentDownloadJob import org.session.libsession.messaging.jobs.JobQueue @@ -156,7 +155,12 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS // Get or create thread val threadID = storage.getOrCreateThreadIdFor(message.syncTarget - ?: message.sender!!, message.groupPublicKey, openGroupID) + ?: message.sender!!, message.groupPublicKey, openGroupID) + + if (threadID < 0) { + // thread doesn't exist, should only be reached in a case where we are processing open group messages for no longer existent thread + throw MessageReceiver.Error.NoThread + } val openGroup = threadID.let { storage.getOpenGroup(it.toString()) @@ -236,7 +240,7 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS } val openGroupServerID = message.openGroupServerMessageID if (openGroupServerID != null) { - storage.setOpenGroupServerMessageID(messageID, openGroupServerID, threadID, !(message.isMediaMessage() || attachments.isNotEmpty())) + storage.setOpenGroupServerMessageID(messageID, openGroupServerID, threadID, !message.isMediaMessage()) } // Cancel any typing indicators if needed cancelTypingIndicatorsIfNeeded(message.sender!!) diff --git a/libsession/src/main/java/org/session/libsession/snode/utilities/Random.kt b/libsession/src/main/java/org/session/libsession/snode/utilities/Random.kt index 2ec42cdf5b..72ceee9f3b 100644 --- a/libsession/src/main/java/org/session/libsession/snode/utilities/Random.kt +++ b/libsession/src/main/java/org/session/libsession/snode/utilities/Random.kt @@ -6,6 +6,7 @@ import java.security.SecureRandom * Uses `SecureRandom` to pick an element from this collection. */ fun Collection.getRandomElementOrNull(): T? { + if (isEmpty()) return null val index = SecureRandom().nextInt(size) // SecureRandom() should be cryptographically secure return elementAtOrNull(index) } diff --git a/libsignal/src/main/java/org/session/libsignal/service/loki/Random.kt b/libsignal/src/main/java/org/session/libsignal/service/loki/Random.kt index 68bc4380c5..b1c1cd2af7 100644 --- a/libsignal/src/main/java/org/session/libsignal/service/loki/Random.kt +++ b/libsignal/src/main/java/org/session/libsignal/service/loki/Random.kt @@ -6,6 +6,7 @@ import java.security.SecureRandom * Uses `SecureRandom` to pick an element from this collection. */ fun Collection.getRandomElementOrNull(): T? { + if (isEmpty()) return null val index = SecureRandom().nextInt(size) // SecureRandom() should be cryptographically secure return elementAtOrNull(index) }