Reset last hash on promotion (#885)

pull/1709/head
SessionHero01 3 months ago committed by GitHub
parent eb2f92b8a7
commit 681ac32198
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -757,6 +757,9 @@ class GroupManagerV2Impl @Inject constructor(
configs.groupMembers.set(member) configs.groupMembers.set(member)
} }
} }
// Remove lastHash so we can receive all the messages in the past
lokiAPIDatabase.clearLastMessageHashes(groupId.hexString)
} }
// Delete the promotion message remotely // Delete the promotion message remotely

@ -154,7 +154,14 @@ class Poller(
val namespace = forConfig.namespace val namespace = forConfig.namespace
val processed = if (!messages.isNullOrEmpty()) { val processed = if (!messages.isNullOrEmpty()) {
SnodeAPI.updateLastMessageHashValueIfPossible(snode, userPublicKey, messages, namespace) SnodeAPI.updateLastMessageHashValueIfPossible(snode, userPublicKey, messages, namespace)
SnodeAPI.removeDuplicates(userPublicKey, messages, namespace, true).mapNotNull { rawMessageAsJSON -> SnodeAPI.removeDuplicates(
publicKey = userPublicKey,
messages = messages,
messageHashGetter = { (it as? Map<*, *>)?.get("hash") as? String },
namespace = namespace,
updateStoredHashes = true
).mapNotNull { rawMessageAsJSON ->
rawMessageAsJSON as Map<*, *> // removeDuplicates should have ensured this is always a map
val hashValue = rawMessageAsJSON["hash"] as? String ?: return@mapNotNull null val hashValue = rawMessageAsJSON["hash"] as? String ?: return@mapNotNull null
val b64EncodedBody = rawMessageAsJSON["data"] as? String ?: return@mapNotNull null val b64EncodedBody = rawMessageAsJSON["data"] as? String ?: return@mapNotNull null
val timestamp = rawMessageAsJSON["t"] as? Long ?: SnodeAPI.nowWithOffset val timestamp = rawMessageAsJSON["t"] as? Long ?: SnodeAPI.nowWithOffset

@ -12,14 +12,11 @@ import com.goterl.lazysodium.utils.Key
import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.select import kotlinx.coroutines.selects.select
import com.goterl.lazysodium.utils.KeyPair
import kotlinx.coroutines.coroutineScope
import nl.komponents.kovenant.Promise import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.all import nl.komponents.kovenant.all
import nl.komponents.kovenant.functional.bind import nl.komponents.kovenant.functional.bind
@ -986,7 +983,13 @@ object SnodeAPI {
fun parseRawMessagesResponse(rawResponse: RawResponse, snode: Snode, publicKey: String, namespace: Int = 0, updateLatestHash: Boolean = true, updateStoredHashes: Boolean = true, decrypt: ((ByteArray) -> Pair<ByteArray, AccountId>?)? = null): List<Pair<SignalServiceProtos.Envelope, String?>> = fun parseRawMessagesResponse(rawResponse: RawResponse, snode: Snode, publicKey: String, namespace: Int = 0, updateLatestHash: Boolean = true, updateStoredHashes: Boolean = true, decrypt: ((ByteArray) -> Pair<ByteArray, AccountId>?)? = null): List<Pair<SignalServiceProtos.Envelope, String?>> =
(rawResponse["messages"] as? List<*>)?.let { messages -> (rawResponse["messages"] as? List<*>)?.let { messages ->
if (updateLatestHash) updateLastMessageHashValueIfPossible(snode, publicKey, messages, namespace) if (updateLatestHash) updateLastMessageHashValueIfPossible(snode, publicKey, messages, namespace)
parseEnvelopes(removeDuplicates(publicKey, messages, namespace, updateStoredHashes), decrypt) removeDuplicates(
publicKey = publicKey,
messages = parseEnvelopes(messages, decrypt),
messageHashGetter = { it.second },
namespace = namespace,
updateStoredHashes = updateStoredHashes
)
} ?: listOf() } ?: listOf()
fun updateLastMessageHashValueIfPossible(snode: Snode, publicKey: String, rawMessages: List<*>, namespace: Int) { fun updateLastMessageHashValueIfPossible(snode: Snode, publicKey: String, rawMessages: List<*>, namespace: Int) {
@ -1005,17 +1008,35 @@ object SnodeAPI {
* database#setReceivedMessageHashValues is only called here. * database#setReceivedMessageHashValues is only called here.
*/ */
@Synchronized @Synchronized
fun removeDuplicates(publicKey: String, rawMessages: List<*>, namespace: Int, updateStoredHashes: Boolean): List<Map<*, *>> { fun <M> removeDuplicates(
publicKey: String,
messages: List<M>,
messageHashGetter: (M) -> String?,
namespace: Int,
updateStoredHashes: Boolean
): List<M> {
val hashValues = database.getReceivedMessageHashValues(publicKey, namespace)?.toMutableSet() ?: mutableSetOf() val hashValues = database.getReceivedMessageHashValues(publicKey, namespace)?.toMutableSet() ?: mutableSetOf()
return rawMessages.filterIsInstance<Map<*, *>>().filter { rawMessage -> return messages
val hash = rawMessage["hash"] as? String .filter { message ->
hash ?: Log.d("Loki", "Missing hash value for message: ${rawMessage.prettifiedDescription()}.") val hash = messageHashGetter(message)
hash?.let(hashValues::add) == true if (hash == null) {
}.also { Log.d("Loki", "Missing hash value for message: ${message?.prettifiedDescription()}.")
if (updateStoredHashes && it.isNotEmpty()) { return@filter false
database.setReceivedMessageHashValues(publicKey, hashValues, namespace) }
val isNew = hashValues.add(hash)
if (!isNew) {
Log.d("Loki", "Duplicate message hash: $hash.")
}
isNew
}
.also {
if (updateStoredHashes && it.isNotEmpty()) {
database.setReceivedMessageHashValues(publicKey, hashValues, namespace)
}
} }
}
} }
private fun parseEnvelopes(rawMessages: List<*>, decrypt: ((ByteArray)->Pair<ByteArray, AccountId>?)?): List<Pair<SignalServiceProtos.Envelope, String?>> { private fun parseEnvelopes(rawMessages: List<*>, decrypt: ((ByteArray)->Pair<ByteArray, AccountId>?)?): List<Pair<SignalServiceProtos.Envelope, String?>> {

Loading…
Cancel
Save