@ -30,6 +30,7 @@ import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.successBackground
import java.util.UUID
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
@ -39,96 +40,35 @@ class OpenGroupPoller(private val server: String, private val executorService: S
var isCaughtUp = false
var secondToLastJob : MessageReceiveJob ? = null
private var future : ScheduledFuture < * > ? = null
private var runId : UUID = UUID . randomUUID ( )
companion object {
private const val pollInterval : Long = 4000L
const val maxInactivityPeriod = 14 * 24 * 60 * 60 * 1000
}
fun startIfNeeded ( ) {
if ( hasStarted ) { return }
hasStarted = true
future = executorService ?. schedule ( :: poll , 0 , TimeUnit . MILLISECONDS )
}
fun stop ( ) {
future ?. cancel ( false )
hasStarted = false
}
fun poll ( isPostCapabilitiesRetry : Boolean = false ) : Promise < Unit , Exception > {
val storage = MessagingModuleConfiguration . shared . storage
val rooms = storage . getAllOpenGroups ( ) . values . filter { it . server == server } . map { it . room }
return OpenGroupApi . poll ( rooms , server ) . successBackground { responses ->
responses . filterNot { it . body == null } . forEach { response ->
when ( response . endpoint ) {
is Endpoint . Capabilities -> {
handleCapabilities ( server , response . body as OpenGroupApi . Capabilities )
}
is Endpoint . RoomPollInfo -> {
handleRoomPollInfo ( server , response . endpoint . roomToken , response . body as OpenGroupApi . RoomPollInfo )
}
is Endpoint . RoomMessagesRecent -> {
handleMessages ( server , response . endpoint . roomToken , response . body as List < OpenGroupApi . Message > )
}
is Endpoint . RoomMessagesSince -> {
handleMessages ( server , response . endpoint . roomToken , response . body as List < OpenGroupApi . Message > )
}
is Endpoint . Inbox , is Endpoint . InboxSince -> {
handleDirectMessages ( server , false , response . body as List < OpenGroupApi . DirectMessage > )
}
is Endpoint . Outbox , is Endpoint . OutboxSince -> {
handleDirectMessages ( server , true , response . body as List < OpenGroupApi . DirectMessage > )
}
}
if ( secondToLastJob == null && !is CaughtUp ) {
isCaughtUp = true
}
}
executorService ?. schedule ( this @OpenGroupPoller :: poll , pollInterval , TimeUnit . MILLISECONDS )
} . fail {
updateCapabilitiesIfNeeded ( isPostCapabilitiesRetry , it )
} . map { }
}
private fun updateCapabilitiesIfNeeded ( isPostCapabilitiesRetry : Boolean , exception : Exception ) {
if ( exception is OnionRequestAPI . HTTPRequestFailedBlindingRequiredException ) {
if ( !is PostCapabilitiesRetry ) {
OpenGroupApi . getCapabilities ( server ) . map {
handleCapabilities ( server , it )
}
executorService ?. schedule ( { poll ( isPostCapabilitiesRetry = true ) } , pollInterval , TimeUnit . MILLISECONDS )
}
} else {
executorService ?. schedule ( this @OpenGroupPoller :: poll , pollInterval , TimeUnit . MILLISECONDS )
}
}
private fun handleCapabilities ( server : String , capabilities : OpenGroupApi . Capabilities ) {
val storage = MessagingModuleConfiguration . shared . storage
storage . setServerCapabilities ( server , capabilities . capabilities )
}
private fun handleRoomPollInfo (
public fun handleRoomPollInfo (
server : String ,
roomToken : String ,
pollInfo : OpenGroupApi . RoomPollInfo
pollInfo : OpenGroupApi . RoomPollInfo ,
createGroupIfMissingWithPublicKey : String ? = null
) {
val storage = MessagingModuleConfiguration . shared . storage
val groupId = " $server . $roomToken "
val dbGroupId = GroupUtil . getEncodedOpenGroupID ( groupId . toByteArray ( ) )
val existingOpenGroup = storage . getOpenGroup ( roomToken , server )
val publicKey = existingOpenGroup ?. publicKey ?: return
// If we don't have an existing group and don't have a 'createGroupIfMissingWithPublicKey'
// value then don't process the poll info
val publicKey = ( ( existingOpenGroup ?. publicKey ?: createGroupIfMissingWithPublicKey ) ?: return )
val openGroup = OpenGroup (
server = server ,
room = pollInfo . token ,
name = if ( pollInfo . details != null ) { pollInfo . details . name } else { existingOpenGroup . name } ,
name = ( ( pollInfo . details ?. name ?: existingOpenGroup ?. name ) ?: " " ) ,
publicKey = publicKey ,
imageId = if ( pollInfo . details != null ) { pollInfo . details . imageId } else { existingOpenGroup . imageId } ,
imageId = ( pollInfo . details ?. imageId ?: existingOpenGroup ?. imageId ) ,
canWrite = pollInfo . write ,
infoUpdates = if ( pollInfo . details != null ) { pollInfo . details . infoUpdates } else { existingOpenGroup . infoUpdates }
infoUpdates = ( ( pollInfo . details ?. infoUpdates ?: existingOpenGroup ?. infoUpdates ) ?: 0 )
)
// - Open Group changes
storage . updateOpenGroup ( openGroup )
@ -164,27 +104,103 @@ class OpenGroupPoller(private val server: String, private val executorService: S
(
pollInfo . details != null &&
pollInfo . details . imageId != null && (
pollInfo . details . imageId != existingOpenGroup .imageId ||
pollInfo . details . imageId != existingOpenGroup ? .imageId ||
! storage . hasDownloadedProfilePicture ( dbGroupId )
) &&
storage . getGroupAvatarDownloadJob ( openGroup . server , openGroup . room , pollInfo . details . imageId ) == null
) || (
pollInfo . details == null &&
existingOpenGroup .imageId != null &&
existingOpenGroup ? .imageId != null &&
! storage . hasDownloadedProfilePicture ( dbGroupId ) &&
storage . getGroupAvatarDownloadJob ( openGroup . server , openGroup . room , existingOpenGroup . imageId ) == null
)
) {
JobQueue . shared . add ( GroupAvatarDownloadJob ( server , roomToken , existingO penGroup. imageId ) )
JobQueue . shared . add ( GroupAvatarDownloadJob ( server , roomToken , o penGroup. imageId ) )
}
else if (
pollInfo . details != null &&
pollInfo . details . imageId == null &&
existingOpenGroup .imageId != null
existingOpenGroup ? .imageId != null
) {
storage . removeProfilePicture ( dbGroupId )
}
}
}
fun startIfNeeded ( ) {
if ( hasStarted ) { return }
hasStarted = true
runId = UUID . randomUUID ( )
future = executorService ?. schedule ( :: poll , 0 , TimeUnit . MILLISECONDS )
}
fun stop ( ) {
future ?. cancel ( false )
hasStarted = false
}
fun poll ( isPostCapabilitiesRetry : Boolean = false ) : Promise < Unit , Exception > {
val currentRunId = runId
val storage = MessagingModuleConfiguration . shared . storage
val rooms = storage . getAllOpenGroups ( ) . values . filter { it . server == server } . map { it . room }
return OpenGroupApi . poll ( rooms , server ) . successBackground { responses ->
responses . filterNot { it . body == null } . forEach { response ->
when ( response . endpoint ) {
is Endpoint . Capabilities -> {
handleCapabilities ( server , response . body as OpenGroupApi . Capabilities )
}
is Endpoint . RoomPollInfo -> {
handleRoomPollInfo ( server , response . endpoint . roomToken , response . body as OpenGroupApi . RoomPollInfo )
}
is Endpoint . RoomMessagesRecent -> {
handleMessages ( server , response . endpoint . roomToken , response . body as List < OpenGroupApi . Message > )
}
is Endpoint . RoomMessagesSince -> {
handleMessages ( server , response . endpoint . roomToken , response . body as List < OpenGroupApi . Message > )
}
is Endpoint . Inbox , is Endpoint . InboxSince -> {
handleDirectMessages ( server , false , response . body as List < OpenGroupApi . DirectMessage > )
}
is Endpoint . Outbox , is Endpoint . OutboxSince -> {
handleDirectMessages ( server , true , response . body as List < OpenGroupApi . DirectMessage > )
}
}
if ( secondToLastJob == null && !is CaughtUp ) {
isCaughtUp = true
}
}
// Only poll again if it's the same poller run
if ( currentRunId == runId ) {
future = executorService ?. schedule ( this @OpenGroupPoller :: poll , pollInterval , TimeUnit . MILLISECONDS )
}
} . fail {
updateCapabilitiesIfNeeded ( isPostCapabilitiesRetry , currentRunId , it )
} . map { }
}
private fun updateCapabilitiesIfNeeded ( isPostCapabilitiesRetry : Boolean , currentRunId : UUID , exception : Exception ) {
if ( exception is OnionRequestAPI . HTTPRequestFailedBlindingRequiredException ) {
if ( !is PostCapabilitiesRetry ) {
OpenGroupApi . getCapabilities ( server ) . map {
handleCapabilities ( server , it )
}
// Only poll again if it's the same poller run
if ( currentRunId == runId ) {
future = executorService ?. schedule ( { poll ( isPostCapabilitiesRetry = true ) } , pollInterval , TimeUnit . MILLISECONDS )
}
}
} else if ( currentRunId == runId ) {
future = executorService ?. schedule ( this @OpenGroupPoller :: poll , pollInterval , TimeUnit . MILLISECONDS )
}
}
private fun handleCapabilities ( server : String , capabilities : OpenGroupApi . Capabilities ) {
val storage = MessagingModuleConfiguration . shared . storage
storage . setServerCapabilities ( server , capabilities . capabilities )
}
private fun handleMessages (
server : String ,