From e761ff81fc596c980b244a8e2d90c2db40772571 Mon Sep 17 00:00:00 2001 From: SessionHero01 <180888785+SessionHero01@users.noreply.github.com> Date: Fri, 13 Dec 2024 10:34:35 +0800 Subject: [PATCH] Only send group messages after keys obtained (#848) --- .../messaging/jobs/MessageSendJob.kt | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt index 81b14099c2..9549d0c735 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt @@ -3,6 +3,11 @@ package org.session.libsession.messaging.jobs import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.withTimeout import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.jobs.Job.Companion.MAX_BUFFER_SIZE_BYTES @@ -12,6 +17,9 @@ import org.session.libsession.messaging.messages.visible.VisibleMessage import org.session.libsession.messaging.sending_receiving.MessageSender import org.session.libsession.messaging.utilities.Data import org.session.libsession.snode.utilities.await +import org.session.libsession.utilities.ConfigFactoryProtocol +import org.session.libsession.utilities.ConfigUpdateNotification +import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.HTTP import org.session.libsignal.utilities.Log @@ -77,6 +85,12 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { try { withTimeout(20_000L) { + // Shouldn't send message to group when the group has no keys available + if (destination is Destination.ClosedGroup) { + MessagingModuleConfiguration.shared.configFactory + .waitForGroupEncryptionKeys(AccountId(destination.publicKey)) + } + MessageSender.send(this@MessageSendJob.message, destination, isSync).await() } @@ -92,6 +106,17 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { } } + private suspend fun ConfigFactoryProtocol.waitForGroupEncryptionKeys(groupId: AccountId) { + (configUpdateNotifications + .filter { it is ConfigUpdateNotification.GroupConfigsUpdated && it.groupId == groupId } + as Flow<*> + ).onStart { emit(Unit) } + .filter { + withGroupConfigs(groupId) { configs -> configs.groupKeys.keys().isNotEmpty() } + } + .first() + } + private fun handleSuccess(dispatcherName: String) { delegate?.handleJobSucceeded(this, dispatcherName) }