|
|
@ -27,7 +27,6 @@ import org.session.libsession.messaging.sending_receiving.handle
|
|
|
|
import org.session.libsession.messaging.sending_receiving.handleOpenGroupReactions
|
|
|
|
import org.session.libsession.messaging.sending_receiving.handleOpenGroupReactions
|
|
|
|
import org.session.libsession.messaging.sending_receiving.handleUnsendRequest
|
|
|
|
import org.session.libsession.messaging.sending_receiving.handleUnsendRequest
|
|
|
|
import org.session.libsession.messaging.sending_receiving.handleVisibleMessage
|
|
|
|
import org.session.libsession.messaging.sending_receiving.handleVisibleMessage
|
|
|
|
import org.session.libsession.messaging.sending_receiving.updateExpiryIfNeeded
|
|
|
|
|
|
|
|
import org.session.libsession.messaging.utilities.Data
|
|
|
|
import org.session.libsession.messaging.utilities.Data
|
|
|
|
import org.session.libsession.messaging.utilities.SessionId
|
|
|
|
import org.session.libsession.messaging.utilities.SessionId
|
|
|
|
import org.session.libsession.messaging.utilities.SodiumUtilities
|
|
|
|
import org.session.libsession.messaging.utilities.SodiumUtilities
|
|
|
@ -35,6 +34,7 @@ import org.session.libsession.utilities.SSKEnvironment
|
|
|
|
import org.session.libsignal.protos.UtilProtos
|
|
|
|
import org.session.libsignal.protos.UtilProtos
|
|
|
|
import org.session.libsignal.utilities.IdPrefix
|
|
|
|
import org.session.libsignal.utilities.IdPrefix
|
|
|
|
import org.session.libsignal.utilities.Log
|
|
|
|
import org.session.libsignal.utilities.Log
|
|
|
|
|
|
|
|
import kotlin.math.max
|
|
|
|
|
|
|
|
|
|
|
|
data class MessageReceiveParameters(
|
|
|
|
data class MessageReceiveParameters(
|
|
|
|
val data: ByteArray,
|
|
|
|
val data: ByteArray,
|
|
|
@ -144,19 +144,15 @@ class BatchMessageReceiveJob(
|
|
|
|
runBlocking(Dispatchers.IO) {
|
|
|
|
runBlocking(Dispatchers.IO) {
|
|
|
|
|
|
|
|
|
|
|
|
fun processMessages(threadId: Long, messages: List<ParsedMessage>) = async {
|
|
|
|
fun processMessages(threadId: Long, messages: List<ParsedMessage>) = async {
|
|
|
|
|
|
|
|
Log.d(TAG, "processMessages() threadId = $threadId, messages = $messages")
|
|
|
|
// The LinkedHashMap should preserve insertion order
|
|
|
|
// The LinkedHashMap should preserve insertion order
|
|
|
|
val messageIds = linkedMapOf<Long, Pair<Boolean, Boolean>>()
|
|
|
|
val messageIds = linkedMapOf<Long, Pair<Boolean, Boolean>>()
|
|
|
|
val myLastSeen = storage.getLastSeen(threadId)
|
|
|
|
val myLastSeen = storage.getLastSeen(threadId)
|
|
|
|
var newLastSeen = if (myLastSeen == -1L) 0 else myLastSeen
|
|
|
|
var newLastSeen = myLastSeen.takeUnless { it == -1L } ?: 0
|
|
|
|
messages.forEach { (parameters, message, proto) ->
|
|
|
|
messages.forEach { (parameters, message, proto) ->
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
when (message) {
|
|
|
|
when (message) {
|
|
|
|
is VisibleMessage -> {
|
|
|
|
is VisibleMessage -> {
|
|
|
|
MessageReceiver.updateExpiryIfNeeded(
|
|
|
|
|
|
|
|
message,
|
|
|
|
|
|
|
|
proto,
|
|
|
|
|
|
|
|
openGroupID
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
val isUserBlindedSender =
|
|
|
|
val isUserBlindedSender =
|
|
|
|
message.sender == serverPublicKey?.let {
|
|
|
|
message.sender == serverPublicKey?.let {
|
|
|
|
SodiumUtilities.blindedKeyPair(
|
|
|
|
SodiumUtilities.blindedKeyPair(
|
|
|
@ -168,12 +164,9 @@ class BatchMessageReceiveJob(
|
|
|
|
IdPrefix.BLINDED, it.publicKey.asBytes
|
|
|
|
IdPrefix.BLINDED, it.publicKey.asBytes
|
|
|
|
).hexString
|
|
|
|
).hexString
|
|
|
|
}
|
|
|
|
}
|
|
|
|
val sentTimestamp = message.sentTimestamp!!
|
|
|
|
|
|
|
|
if (message.sender == localUserPublicKey || isUserBlindedSender) {
|
|
|
|
if (message.sender == localUserPublicKey || isUserBlindedSender) {
|
|
|
|
if (sentTimestamp > newLastSeen) {
|
|
|
|
// use sent timestamp here since that is technically the last one we have
|
|
|
|
newLastSeen =
|
|
|
|
newLastSeen = max(newLastSeen, message.sentTimestamp!!)
|
|
|
|
sentTimestamp // use sent timestamp here since that is technically the last one we have
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
val messageId = MessageReceiver.handleVisibleMessage(message, proto, openGroupID,
|
|
|
|
val messageId = MessageReceiver.handleVisibleMessage(message, proto, openGroupID,
|
|
|
|
threadId,
|
|
|
|
threadId,
|
|
|
@ -221,9 +214,7 @@ class BatchMessageReceiveJob(
|
|
|
|
// last seen will be the current last seen if not changed (re-computes the read counts for thread record)
|
|
|
|
// last seen will be the current last seen if not changed (re-computes the read counts for thread record)
|
|
|
|
// might have been updated from a different thread at this point
|
|
|
|
// might have been updated from a different thread at this point
|
|
|
|
val currentLastSeen = storage.getLastSeen(threadId).let { if (it == -1L) 0 else it }
|
|
|
|
val currentLastSeen = storage.getLastSeen(threadId).let { if (it == -1L) 0 else it }
|
|
|
|
if (currentLastSeen > newLastSeen) {
|
|
|
|
newLastSeen = max(newLastSeen, currentLastSeen)
|
|
|
|
newLastSeen = currentLastSeen
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if (newLastSeen > 0 || currentLastSeen == 0L) {
|
|
|
|
if (newLastSeen > 0 || currentLastSeen == 0L) {
|
|
|
|
storage.markConversationAsRead(threadId, newLastSeen, force = true)
|
|
|
|
storage.markConversationAsRead(threadId, newLastSeen, force = true)
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -252,12 +243,12 @@ class BatchMessageReceiveJob(
|
|
|
|
|
|
|
|
|
|
|
|
private fun handleSuccess(dispatcherName: String) {
|
|
|
|
private fun handleSuccess(dispatcherName: String) {
|
|
|
|
Log.i(TAG, "Completed processing of ${messages.size} messages (id: $id)")
|
|
|
|
Log.i(TAG, "Completed processing of ${messages.size} messages (id: $id)")
|
|
|
|
this.delegate?.handleJobSucceeded(this, dispatcherName)
|
|
|
|
delegate?.handleJobSucceeded(this, dispatcherName)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private fun handleFailure(dispatcherName: String) {
|
|
|
|
private fun handleFailure(dispatcherName: String) {
|
|
|
|
Log.i(TAG, "Handling failure of ${failures.size} messages (${messages.size - failures.size} processed successfully) (id: $id)")
|
|
|
|
Log.i(TAG, "Handling failure of ${failures.size} messages (${messages.size - failures.size} processed successfully) (id: $id)")
|
|
|
|
this.delegate?.handleJobFailed(this, dispatcherName, Exception("One or more jobs resulted in failure"))
|
|
|
|
delegate?.handleJobFailed(this, dispatcherName, Exception("One or more jobs resulted in failure"))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
override fun serialize(): Data {
|
|
|
|
override fun serialize(): Data {
|
|
|
|