commit
e077956ffb
@ -0,0 +1,101 @@
|
|||||||
|
package org.thoughtcrime.securesms.loki.api
|
||||||
|
|
||||||
|
import android.content.Context
|
||||||
|
import android.graphics.Bitmap
|
||||||
|
import androidx.annotation.WorkerThread
|
||||||
|
import org.session.libsession.messaging.MessagingModuleConfiguration
|
||||||
|
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
|
||||||
|
import org.session.libsession.messaging.open_groups.OpenGroupV2
|
||||||
|
import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPollerV2
|
||||||
|
import org.session.libsession.utilities.Util
|
||||||
|
import org.session.libsignal.utilities.ThreadUtils
|
||||||
|
import org.thoughtcrime.securesms.database.DatabaseFactory
|
||||||
|
import org.thoughtcrime.securesms.groups.GroupManager
|
||||||
|
import org.thoughtcrime.securesms.util.BitmapUtil
|
||||||
|
import java.util.concurrent.Executors
|
||||||
|
|
||||||
|
object OpenGroupManager {
|
||||||
|
private val executorService = Executors.newScheduledThreadPool(4)
|
||||||
|
private var pollers = mutableMapOf<String, OpenGroupPollerV2>() // One for each server
|
||||||
|
private var isPolling = false
|
||||||
|
|
||||||
|
fun startPolling() {
|
||||||
|
if (isPolling) { return }
|
||||||
|
isPolling = true
|
||||||
|
val storage = MessagingModuleConfiguration.shared.storage
|
||||||
|
val servers = storage.getAllV2OpenGroups().values.map { it.server }.toSet()
|
||||||
|
servers.forEach { server ->
|
||||||
|
pollers[server]?.stop() // Shouldn't be necessary
|
||||||
|
val poller = OpenGroupPollerV2(server, executorService)
|
||||||
|
poller.startIfNeeded()
|
||||||
|
pollers[server] = poller
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fun stopPolling() {
|
||||||
|
pollers.forEach { it.value.stop() }
|
||||||
|
pollers.clear()
|
||||||
|
}
|
||||||
|
|
||||||
|
@WorkerThread
|
||||||
|
fun add(server: String, room: String, publicKey: String, context: Context) {
|
||||||
|
val openGroupID = "$server.$room"
|
||||||
|
var threadID = GroupManager.getOpenGroupThreadID(openGroupID, context)
|
||||||
|
val storage = MessagingModuleConfiguration.shared.storage
|
||||||
|
val threadDB = DatabaseFactory.getLokiThreadDatabase(context)
|
||||||
|
// Check it it's added already
|
||||||
|
val existingOpenGroup = threadDB.getOpenGroupChat(threadID)
|
||||||
|
if (existingOpenGroup != null) { return }
|
||||||
|
// Clear any existing data if needed
|
||||||
|
storage.removeLastDeletionServerId(room, server)
|
||||||
|
storage.removeLastMessageServerId(room, server)
|
||||||
|
// Store the public key
|
||||||
|
storage.setOpenGroupPublicKey(server,publicKey)
|
||||||
|
// Get an auth token
|
||||||
|
OpenGroupAPIV2.getAuthToken(room, server).get()
|
||||||
|
// Get group info
|
||||||
|
val info = OpenGroupAPIV2.getInfo(room, server).get()
|
||||||
|
// Download the group image
|
||||||
|
// FIXME: Don't wait for the image to download
|
||||||
|
val image: Bitmap?
|
||||||
|
if (threadID < 0) {
|
||||||
|
val profilePictureAsByteArray = try {
|
||||||
|
OpenGroupAPIV2.downloadOpenGroupProfilePicture(info.id, server).get()
|
||||||
|
} catch (e: Exception) {
|
||||||
|
null
|
||||||
|
}
|
||||||
|
image = BitmapUtil.fromByteArray(profilePictureAsByteArray)
|
||||||
|
// Create the group locally
|
||||||
|
threadID = GroupManager.createOpenGroup(openGroupID, context, image, info.name).threadId
|
||||||
|
}
|
||||||
|
val openGroup = OpenGroupV2(server, room, info.name, publicKey)
|
||||||
|
threadDB.setOpenGroupChat(openGroup, threadID)
|
||||||
|
// Start the poller if needed
|
||||||
|
if (pollers[server] == null) {
|
||||||
|
val poller = OpenGroupPollerV2(server, executorService)
|
||||||
|
Util.runOnMain { poller.startIfNeeded() }
|
||||||
|
pollers[server] = poller
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fun delete(server: String, room: String, context: Context) {
|
||||||
|
val storage = MessagingModuleConfiguration.shared.storage
|
||||||
|
val threadDB = DatabaseFactory.getThreadDatabase(context)
|
||||||
|
val openGroupID = "$server.$room"
|
||||||
|
val threadID = GroupManager.getOpenGroupThreadID(openGroupID, context)
|
||||||
|
val groupID = threadDB.getRecipientForThreadId(threadID)!!.address.serialize()
|
||||||
|
// Stop the poller if needed
|
||||||
|
val openGroups = storage.getAllV2OpenGroups().filter { it.value.server == server }
|
||||||
|
if (openGroups.count() == 1) {
|
||||||
|
val poller = pollers[server]
|
||||||
|
poller?.stop()
|
||||||
|
pollers.remove(server)
|
||||||
|
}
|
||||||
|
// Delete
|
||||||
|
ThreadUtils.queue {
|
||||||
|
storage.removeLastDeletionServerId(room, server)
|
||||||
|
storage.removeLastMessageServerId(room, server)
|
||||||
|
GroupManager.deleteGroup(groupID, context) // Must be invoked on a background thread
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,212 +0,0 @@
|
|||||||
package org.thoughtcrime.securesms.loki.api
|
|
||||||
|
|
||||||
import android.content.Context
|
|
||||||
import android.database.ContentObserver
|
|
||||||
import android.graphics.Bitmap
|
|
||||||
import android.text.TextUtils
|
|
||||||
import androidx.annotation.WorkerThread
|
|
||||||
import org.session.libsession.messaging.MessagingModuleConfiguration
|
|
||||||
import org.session.libsession.messaging.open_groups.*
|
|
||||||
import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPoller
|
|
||||||
import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupV2Poller
|
|
||||||
import org.session.libsession.utilities.TextSecurePreferences
|
|
||||||
import org.session.libsession.utilities.Util
|
|
||||||
import org.session.libsignal.utilities.ThreadUtils
|
|
||||||
import org.thoughtcrime.securesms.database.DatabaseContentProviders
|
|
||||||
import org.thoughtcrime.securesms.database.DatabaseFactory
|
|
||||||
import org.thoughtcrime.securesms.groups.GroupManager
|
|
||||||
import org.thoughtcrime.securesms.util.BitmapUtil
|
|
||||||
import java.util.concurrent.Executors
|
|
||||||
|
|
||||||
class PublicChatManager(private val context: Context) {
|
|
||||||
private var chats = mutableMapOf<Long, OpenGroup>()
|
|
||||||
private var v2Chats = mutableMapOf<Long, OpenGroupV2>()
|
|
||||||
private val pollers = mutableMapOf<Long, OpenGroupPoller>()
|
|
||||||
private val v2Pollers = mutableMapOf<String, OpenGroupV2Poller>()
|
|
||||||
private val observers = mutableMapOf<Long, ContentObserver>()
|
|
||||||
private var isPolling = false
|
|
||||||
private val executorService = Executors.newScheduledThreadPool(4)
|
|
||||||
|
|
||||||
public fun areAllCaughtUp(): Boolean {
|
|
||||||
var areAllCaughtUp = true
|
|
||||||
refreshChatsAndPollers()
|
|
||||||
for ((threadID, _) in chats) {
|
|
||||||
val poller = pollers[threadID]
|
|
||||||
areAllCaughtUp = if (poller != null) areAllCaughtUp && poller.isCaughtUp else areAllCaughtUp
|
|
||||||
}
|
|
||||||
return areAllCaughtUp
|
|
||||||
}
|
|
||||||
|
|
||||||
public fun markAllAsNotCaughtUp() {
|
|
||||||
refreshChatsAndPollers()
|
|
||||||
for ((threadID, chat) in chats) {
|
|
||||||
val poller = pollers[threadID] ?: OpenGroupPoller(chat, executorService)
|
|
||||||
poller.isCaughtUp = false
|
|
||||||
}
|
|
||||||
for ((_,poller) in v2Pollers) {
|
|
||||||
poller.isCaughtUp = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public fun startPollersIfNeeded() {
|
|
||||||
refreshChatsAndPollers()
|
|
||||||
|
|
||||||
for ((threadId, chat) in chats) {
|
|
||||||
val poller = pollers[threadId] ?: OpenGroupPoller(chat, executorService)
|
|
||||||
poller.startIfNeeded()
|
|
||||||
listenToThreadDeletion(threadId)
|
|
||||||
if (!pollers.containsKey(threadId)) { pollers[threadId] = poller }
|
|
||||||
}
|
|
||||||
v2Pollers.values.forEach { it.stop() }
|
|
||||||
v2Pollers.clear()
|
|
||||||
v2Chats.entries.groupBy { (_, group) -> group.server }.forEach { (server, threadedRooms) ->
|
|
||||||
val poller = OpenGroupV2Poller(threadedRooms.map { it.value }, executorService)
|
|
||||||
poller.startIfNeeded()
|
|
||||||
threadedRooms.forEach { (thread, _) ->
|
|
||||||
listenToThreadDeletion(thread)
|
|
||||||
}
|
|
||||||
v2Pollers[server] = poller
|
|
||||||
}
|
|
||||||
|
|
||||||
isPolling = true
|
|
||||||
}
|
|
||||||
|
|
||||||
public fun stopPollers() {
|
|
||||||
pollers.values.forEach { it.stop() }
|
|
||||||
isPolling = false
|
|
||||||
executorService.shutdown()
|
|
||||||
}
|
|
||||||
|
|
||||||
//TODO Declare a specific type of checked exception instead of "Exception".
|
|
||||||
@WorkerThread
|
|
||||||
@Throws(java.lang.Exception::class)
|
|
||||||
public fun addChat(server: String, channel: Long): OpenGroup {
|
|
||||||
// Ensure the auth token is acquired.
|
|
||||||
OpenGroupAPI.getAuthToken(server).get()
|
|
||||||
|
|
||||||
val channelInfo = OpenGroupAPI.getChannelInfo(channel, server).get()
|
|
||||||
return addChat(server, channel, channelInfo)
|
|
||||||
}
|
|
||||||
|
|
||||||
@WorkerThread
|
|
||||||
public fun addChat(server: String, channel: Long, info: OpenGroupInfo): OpenGroup {
|
|
||||||
val chat = OpenGroup(channel, server, info.displayName, true)
|
|
||||||
var threadID = GroupManager.getOpenGroupThreadID(chat.id, context)
|
|
||||||
var profilePicture: Bitmap? = null
|
|
||||||
// Create the group if we don't have one
|
|
||||||
if (threadID < 0) {
|
|
||||||
if (info.profilePictureURL.isNotEmpty()) {
|
|
||||||
val profilePictureAsByteArray = OpenGroupAPI.downloadOpenGroupProfilePicture(server, info.profilePictureURL)
|
|
||||||
profilePicture = BitmapUtil.fromByteArray(profilePictureAsByteArray)
|
|
||||||
}
|
|
||||||
val result = GroupManager.createOpenGroup(chat.id, context, profilePicture, chat.displayName)
|
|
||||||
threadID = result.threadId
|
|
||||||
}
|
|
||||||
DatabaseFactory.getLokiThreadDatabase(context).setPublicChat(chat, threadID)
|
|
||||||
// Set our name on the server
|
|
||||||
val displayName = TextSecurePreferences.getProfileName(context)
|
|
||||||
if (!TextUtils.isEmpty(displayName)) {
|
|
||||||
OpenGroupAPI.setDisplayName(displayName, server)
|
|
||||||
}
|
|
||||||
// Start polling
|
|
||||||
Util.runOnMain { startPollersIfNeeded() }
|
|
||||||
|
|
||||||
return chat
|
|
||||||
}
|
|
||||||
|
|
||||||
@WorkerThread
|
|
||||||
fun addChat(server: String, room: String, info: OpenGroupAPIV2.Info, publicKey: String): OpenGroupV2 {
|
|
||||||
val chat = OpenGroupV2(server, room, info.name, publicKey)
|
|
||||||
var threadID = GroupManager.getOpenGroupThreadID(chat.id, context)
|
|
||||||
val profilePicture: Bitmap?
|
|
||||||
if (threadID < 0) {
|
|
||||||
val profilePictureAsByteArray = try {
|
|
||||||
OpenGroupAPIV2.downloadOpenGroupProfilePicture(info.id,server).get()
|
|
||||||
} catch (e: Exception) {
|
|
||||||
null
|
|
||||||
}
|
|
||||||
profilePicture = BitmapUtil.fromByteArray(profilePictureAsByteArray)
|
|
||||||
val result = GroupManager.createOpenGroup(chat.id, context, profilePicture, info.name)
|
|
||||||
threadID = result.threadId
|
|
||||||
}
|
|
||||||
DatabaseFactory.getLokiThreadDatabase(context).setOpenGroupChat(chat, threadID)
|
|
||||||
Util.runOnMain { startPollersIfNeeded() }
|
|
||||||
return chat
|
|
||||||
}
|
|
||||||
|
|
||||||
public fun removeChat(server: String, channel: Long) {
|
|
||||||
val threadDB = DatabaseFactory.getThreadDatabase(context)
|
|
||||||
val groupId = OpenGroup.getId(channel, server)
|
|
||||||
val threadId = GroupManager.getOpenGroupThreadID(groupId, context)
|
|
||||||
val groupAddress = threadDB.getRecipientForThreadId(threadId)!!.address.serialize()
|
|
||||||
ThreadUtils.queue {
|
|
||||||
GroupManager.deleteGroup(groupAddress, context) // Must be invoked on a background thread
|
|
||||||
Util.runOnMain { startPollersIfNeeded() }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fun removeChat(server: String, room: String) {
|
|
||||||
val threadDB = DatabaseFactory.getThreadDatabase(context)
|
|
||||||
val groupId = "$server.$room"
|
|
||||||
val threadId = GroupManager.getOpenGroupThreadID(groupId, context)
|
|
||||||
val groupAddress = threadDB.getRecipientForThreadId(threadId)!!.address.serialize()
|
|
||||||
ThreadUtils.queue {
|
|
||||||
GroupManager.deleteGroup(groupAddress, context) // Must be invoked on a background thread
|
|
||||||
Util.runOnMain { startPollersIfNeeded() }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun refreshChatsAndPollers() {
|
|
||||||
val storage = MessagingModuleConfiguration.shared.storage
|
|
||||||
val chatsInDB = storage.getAllOpenGroups()
|
|
||||||
val v2ChatsInDB = storage.getAllV2OpenGroups()
|
|
||||||
val removedChatThreadIds = chats.keys.filter { !chatsInDB.keys.contains(it) }
|
|
||||||
removedChatThreadIds.forEach { pollers.remove(it)?.stop() }
|
|
||||||
|
|
||||||
// Only append to chats if we have a thread for the chat
|
|
||||||
chats = chatsInDB.filter { GroupManager.getOpenGroupThreadID(it.value.id, context) > -1 }.toMutableMap()
|
|
||||||
v2Chats = v2ChatsInDB.filter { GroupManager.getOpenGroupThreadID(it.value.id, context) > -1 }.toMutableMap()
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun listenToThreadDeletion(threadID: Long) {
|
|
||||||
if (threadID < 0 || observers[threadID] != null) { return }
|
|
||||||
val observer = createDeletionObserver(threadID) {
|
|
||||||
val chat = chats[threadID]
|
|
||||||
|
|
||||||
// Reset last message cache
|
|
||||||
if (chat != null) {
|
|
||||||
val apiDatabase = DatabaseFactory.getLokiAPIDatabase(context)
|
|
||||||
apiDatabase.removeLastDeletionServerID(chat.channel, chat.server)
|
|
||||||
apiDatabase.removeLastMessageServerID(chat.channel, chat.server)
|
|
||||||
}
|
|
||||||
|
|
||||||
DatabaseFactory.getLokiThreadDatabase(context).removePublicChat(threadID)
|
|
||||||
pollers.remove(threadID)?.stop()
|
|
||||||
v2Pollers.values.forEach { it.stop() }
|
|
||||||
v2Pollers.clear()
|
|
||||||
observers.remove(threadID)
|
|
||||||
startPollersIfNeeded()
|
|
||||||
}
|
|
||||||
observers[threadID] = observer
|
|
||||||
|
|
||||||
context.applicationContext.contentResolver.registerContentObserver(DatabaseContentProviders.Conversation.getUriForThread(threadID), true, observer)
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun createDeletionObserver(threadID: Long, onDelete: Runnable): ContentObserver {
|
|
||||||
return object : ContentObserver(null) {
|
|
||||||
|
|
||||||
override fun onChange(selfChange: Boolean) {
|
|
||||||
super.onChange(selfChange)
|
|
||||||
// Stop the poller if thread is deleted
|
|
||||||
try {
|
|
||||||
if (!DatabaseFactory.getThreadDatabase(context).hasThread(threadID)) {
|
|
||||||
onDelete.run()
|
|
||||||
context.applicationContext.contentResolver.unregisterContentObserver(this)
|
|
||||||
}
|
|
||||||
} catch (e: Exception) {
|
|
||||||
// TODO: Handle
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -0,0 +1,92 @@
|
|||||||
|
package org.session.libsession.messaging.sending_receiving.pollers
|
||||||
|
|
||||||
|
import nl.komponents.kovenant.Promise
|
||||||
|
import nl.komponents.kovenant.functional.map
|
||||||
|
import org.session.libsession.messaging.MessagingModuleConfiguration
|
||||||
|
import org.session.libsession.messaging.jobs.JobQueue
|
||||||
|
import org.session.libsession.messaging.jobs.MessageReceiveJob
|
||||||
|
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
|
||||||
|
import org.session.libsession.messaging.open_groups.OpenGroupMessageV2
|
||||||
|
import org.session.libsession.utilities.Address
|
||||||
|
import org.session.libsession.utilities.GroupUtil
|
||||||
|
import org.session.libsignal.protos.SignalServiceProtos
|
||||||
|
import org.session.libsignal.utilities.Log
|
||||||
|
import org.session.libsignal.utilities.successBackground
|
||||||
|
import java.util.concurrent.ScheduledExecutorService
|
||||||
|
import java.util.concurrent.ScheduledFuture
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
|
class OpenGroupPollerV2(private val server: String, private val executorService: ScheduledExecutorService?) {
|
||||||
|
var hasStarted = false
|
||||||
|
private var future: ScheduledFuture<*>? = null
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
private val pollInterval: Long = 4 * 1000
|
||||||
|
}
|
||||||
|
|
||||||
|
fun startIfNeeded() {
|
||||||
|
if (hasStarted) { return }
|
||||||
|
hasStarted = true
|
||||||
|
future = executorService?.schedule(::poll, 0, TimeUnit.MILLISECONDS)
|
||||||
|
}
|
||||||
|
|
||||||
|
fun stop() {
|
||||||
|
future?.cancel(false)
|
||||||
|
hasStarted = false
|
||||||
|
}
|
||||||
|
|
||||||
|
fun poll(isBackgroundPoll: Boolean = false): Promise<Unit, Exception> {
|
||||||
|
val storage = MessagingModuleConfiguration.shared.storage
|
||||||
|
val rooms = storage.getAllV2OpenGroups().values.filter { it.server == server }.map { it.room }
|
||||||
|
return OpenGroupAPIV2.compactPoll(rooms, server).successBackground { responses ->
|
||||||
|
responses.forEach { (room, response) ->
|
||||||
|
val openGroupID = "$server.$room"
|
||||||
|
handleNewMessages(openGroupID, response.messages, isBackgroundPoll)
|
||||||
|
handleDeletedMessages(openGroupID, response.deletions)
|
||||||
|
}
|
||||||
|
}.always {
|
||||||
|
executorService?.schedule(this@OpenGroupPollerV2::poll, OpenGroupPollerV2.pollInterval, TimeUnit.MILLISECONDS)
|
||||||
|
}.map { }
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun handleNewMessages(openGroupID: String, messages: List<OpenGroupMessageV2>, isBackgroundPoll: Boolean) {
|
||||||
|
if (!hasStarted) { return }
|
||||||
|
messages.sortedBy { it.serverID!! }.forEach { message ->
|
||||||
|
try {
|
||||||
|
val senderPublicKey = message.sender!!
|
||||||
|
val builder = SignalServiceProtos.Envelope.newBuilder()
|
||||||
|
builder.type = SignalServiceProtos.Envelope.Type.SESSION_MESSAGE
|
||||||
|
builder.source = senderPublicKey
|
||||||
|
builder.sourceDevice = 1
|
||||||
|
builder.content = message.toProto().toByteString()
|
||||||
|
builder.timestamp = message.sentTimestamp
|
||||||
|
val envelope = builder.build()
|
||||||
|
val job = MessageReceiveJob(envelope.toByteArray(), message.serverID, openGroupID)
|
||||||
|
if (isBackgroundPoll) {
|
||||||
|
job.executeAsync()
|
||||||
|
} else {
|
||||||
|
JobQueue.shared.add(job)
|
||||||
|
}
|
||||||
|
} catch (e: Exception) {
|
||||||
|
Log.e("Loki", "Exception parsing message", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun handleDeletedMessages(openGroupID: String, deletedMessageServerIDs: List<Long>) {
|
||||||
|
val storage = MessagingModuleConfiguration.shared.storage
|
||||||
|
val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider
|
||||||
|
val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())
|
||||||
|
val threadID = storage.getThreadIdFor(Address.fromSerialized(groupID)) ?: return
|
||||||
|
val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { serverID ->
|
||||||
|
val messageID = dataProvider.getMessageID(serverID, threadID)
|
||||||
|
if (messageID == null) {
|
||||||
|
Log.d("Loki", "Couldn't find message ID for message with serverID: $serverID.")
|
||||||
|
}
|
||||||
|
messageID
|
||||||
|
}
|
||||||
|
deletedMessageIDs.forEach { (messageId, isSms) ->
|
||||||
|
MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,134 +0,0 @@
|
|||||||
package org.session.libsession.messaging.sending_receiving.pollers
|
|
||||||
|
|
||||||
import nl.komponents.kovenant.Promise
|
|
||||||
import org.session.libsession.messaging.MessagingModuleConfiguration
|
|
||||||
import org.session.libsession.messaging.jobs.JobQueue
|
|
||||||
import org.session.libsession.messaging.jobs.MessageReceiveJob
|
|
||||||
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
|
|
||||||
import org.session.libsession.messaging.open_groups.OpenGroupMessageV2
|
|
||||||
import org.session.libsession.messaging.open_groups.OpenGroupV2
|
|
||||||
import org.session.libsession.utilities.Address
|
|
||||||
import org.session.libsession.utilities.GroupUtil
|
|
||||||
import org.session.libsignal.protos.SignalServiceProtos
|
|
||||||
import org.session.libsignal.utilities.Log
|
|
||||||
import org.session.libsignal.utilities.successBackground
|
|
||||||
import java.util.concurrent.ScheduledExecutorService
|
|
||||||
import java.util.concurrent.ScheduledFuture
|
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
|
|
||||||
class OpenGroupV2Poller(private val openGroups: List<OpenGroupV2>, private val executorService: ScheduledExecutorService? = null) {
|
|
||||||
|
|
||||||
private var hasStarted = false
|
|
||||||
@Volatile private var isPollOngoing = false
|
|
||||||
var isCaughtUp = false
|
|
||||||
|
|
||||||
private val cancellableFutures = mutableListOf<ScheduledFuture<out Any>>()
|
|
||||||
|
|
||||||
// use this as a receive time-based window to calculate re-poll interval
|
|
||||||
private val receivedQueue = ArrayDeque<Long>(50)
|
|
||||||
|
|
||||||
private fun calculatePollInterval(): Long {
|
|
||||||
// sample last default poll time * 2
|
|
||||||
while (receivedQueue.size > 50) {
|
|
||||||
receivedQueue.removeLast()
|
|
||||||
}
|
|
||||||
val sampleWindow = System.currentTimeMillis() - pollForNewMessagesInterval * 2
|
|
||||||
val numberInSample = receivedQueue.toList().filter { it > sampleWindow }.size.coerceAtLeast(1)
|
|
||||||
return ((2 + (50 / numberInSample / 20)*5) * 1000).toLong()
|
|
||||||
}
|
|
||||||
|
|
||||||
// region Settings
|
|
||||||
companion object {
|
|
||||||
private val pollForNewMessagesInterval: Long = 10 * 1000
|
|
||||||
}
|
|
||||||
// endregion
|
|
||||||
|
|
||||||
// region Lifecycle
|
|
||||||
fun startIfNeeded() {
|
|
||||||
if (hasStarted || executorService == null) return
|
|
||||||
cancellableFutures += executorService.schedule(::compactPoll, 0, TimeUnit.MILLISECONDS)
|
|
||||||
hasStarted = true
|
|
||||||
}
|
|
||||||
|
|
||||||
fun stop() {
|
|
||||||
cancellableFutures.forEach { future ->
|
|
||||||
future.cancel(false)
|
|
||||||
}
|
|
||||||
cancellableFutures.clear()
|
|
||||||
hasStarted = false
|
|
||||||
}
|
|
||||||
// endregion
|
|
||||||
|
|
||||||
// region Polling
|
|
||||||
|
|
||||||
private fun compactPoll(): Promise<Any, Exception> {
|
|
||||||
return compactPoll(false)
|
|
||||||
}
|
|
||||||
|
|
||||||
fun compactPoll(isBackgroundPoll: Boolean): Promise<Any, Exception> {
|
|
||||||
if (isPollOngoing || !hasStarted) return Promise.of(Unit)
|
|
||||||
isPollOngoing = true
|
|
||||||
val server = openGroups.first().server // assume all the same server
|
|
||||||
val rooms = openGroups.map { it.room }
|
|
||||||
return OpenGroupAPIV2.compactPoll(rooms = rooms, server).successBackground { results ->
|
|
||||||
results.forEach { (room, results) ->
|
|
||||||
val serverRoomId = "$server.$room"
|
|
||||||
handleNewMessages(serverRoomId, results.messages.sortedBy { it.serverID }, isBackgroundPoll)
|
|
||||||
handleDeletedMessages(serverRoomId,results.deletions)
|
|
||||||
}
|
|
||||||
}.always {
|
|
||||||
isPollOngoing = false
|
|
||||||
if (!isBackgroundPoll) {
|
|
||||||
val delay = calculatePollInterval()
|
|
||||||
executorService?.schedule(this@OpenGroupV2Poller::compactPoll, delay, TimeUnit.MILLISECONDS)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun handleNewMessages(serverRoomId: String, newMessages: List<OpenGroupMessageV2>, isBackgroundPoll: Boolean) {
|
|
||||||
if (!hasStarted) return
|
|
||||||
newMessages.forEach { message ->
|
|
||||||
try {
|
|
||||||
val senderPublicKey = message.sender!!
|
|
||||||
// Main message
|
|
||||||
// Envelope
|
|
||||||
val builder = SignalServiceProtos.Envelope.newBuilder()
|
|
||||||
builder.type = SignalServiceProtos.Envelope.Type.SESSION_MESSAGE
|
|
||||||
builder.source = senderPublicKey
|
|
||||||
builder.sourceDevice = 1
|
|
||||||
builder.content = message.toProto().toByteString()
|
|
||||||
builder.timestamp = message.sentTimestamp
|
|
||||||
val envelope = builder.build()
|
|
||||||
val job = MessageReceiveJob(envelope.toByteArray(), message.serverID, serverRoomId)
|
|
||||||
Log.d("Loki", "Scheduling Job $job")
|
|
||||||
if (isBackgroundPoll) {
|
|
||||||
job.executeAsync()
|
|
||||||
// The promise is just used to keep track of when we're done
|
|
||||||
} else {
|
|
||||||
JobQueue.shared.add(job)
|
|
||||||
}
|
|
||||||
receivedQueue.addFirst(message.sentTimestamp)
|
|
||||||
} catch (e: Exception) {
|
|
||||||
Log.e("Loki", "Exception parsing message", e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun handleDeletedMessages(serverRoomId: String, deletedMessageServerIDs: List<Long>) {
|
|
||||||
val messagingModule = MessagingModuleConfiguration.shared
|
|
||||||
val address = GroupUtil.getEncodedOpenGroupID(serverRoomId.toByteArray())
|
|
||||||
val threadId = messagingModule.storage.getThreadIdFor(Address.fromSerialized(address)) ?: return
|
|
||||||
|
|
||||||
val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { serverId ->
|
|
||||||
val id = messagingModule.messageDataProvider.getMessageID(serverId, threadId)
|
|
||||||
if (id == null) {
|
|
||||||
Log.d("Loki", "Couldn't find server ID $serverId")
|
|
||||||
}
|
|
||||||
id
|
|
||||||
}
|
|
||||||
deletedMessageIDs.forEach { (messageId, isSms) ->
|
|
||||||
MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// endregion
|
|
||||||
}
|
|
Loading…
Reference in New Issue