Onion paths as a StateFlow (#901)

pull/1709/head
SessionHero01 3 months ago committed by GitHub
parent 0dc8aa1410
commit e5e00c4548
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -183,7 +183,6 @@ import org.thoughtcrime.securesms.mms.MediaConstraints
import org.thoughtcrime.securesms.mms.Slide import org.thoughtcrime.securesms.mms.Slide
import org.thoughtcrime.securesms.mms.SlideDeck import org.thoughtcrime.securesms.mms.SlideDeck
import org.thoughtcrime.securesms.mms.VideoSlide import org.thoughtcrime.securesms.mms.VideoSlide
import org.thoughtcrime.securesms.openUrl
import org.thoughtcrime.securesms.permissions.Permissions import org.thoughtcrime.securesms.permissions.Permissions
import org.thoughtcrime.securesms.reactions.ReactionsDialogFragment import org.thoughtcrime.securesms.reactions.ReactionsDialogFragment
import org.thoughtcrime.securesms.reactions.any.ReactWithAnyEmojiDialogFragment import org.thoughtcrime.securesms.reactions.any.ReactWithAnyEmojiDialogFragment
@ -2071,28 +2070,6 @@ class ConversationActivityV2 : PassphraseRequiredActionBarActivity(), InputBarDe
} }
} }
private fun informUserIfNetworkOrSessionNodePathIsInvalid() {
// Check that we have a valid network network connection & inform the user if not
val connectedToInternet = NetworkUtils.haveValidNetworkConnection(applicationContext)
if (!connectedToInternet)
{
// TODO: Adjust to display error to user with official localised string when SES-2319 is addressed
Log.e(TAG, "Cannot sent voice message - no network connection.")
}
// Check that we have a suite of Session Nodes to route through.
// Note: We can have the entry node plus the 2 Session Nodes and the data _still_ might not
// send due to any node flakiness - but without doing some manner of test-ping through
// there's no way to test our client -> destination connectivity (unless we abuse the typing
// indicators?)
val paths = OnionRequestAPI.paths
if (paths.isNullOrEmpty() || paths.count() != 2) {
// TODO: Adjust to display error to user with official localised string when SES-2319 is addressed
Log.e(TAG, "Cannot send voice message - bad Session Node path.")
}
}
override fun sendVoiceMessage() { override fun sendVoiceMessage() {
// When the record voice message button is released we always need to reset the UI and cancel // When the record voice message button is released we always need to reset the UI and cancel
// any further recording operation.. // any further recording operation..
@ -2119,7 +2096,6 @@ class ConversationActivityV2 : PassphraseRequiredActionBarActivity(), InputBarDe
return return
} }
informUserIfNetworkOrSessionNodePathIsInvalid()
// Note: We could return here if there was a network or node path issue, but instead we'll try // Note: We could return here if there was a network or node path issue, but instead we'll try
// our best to send the voice message even if it might fail - because in that case it'll get put // our best to send the voice message even if it might fail - because in that case it'll get put
// into the draft database and can be retried when we regain network connectivity and a working // into the draft database and can be retried when we regain network connectivity and a working

@ -16,11 +16,17 @@ import android.widget.TextView
import android.widget.Toast import android.widget.Toast
import androidx.annotation.ColorRes import androidx.annotation.ColorRes
import androidx.core.content.ContextCompat import androidx.core.content.ContextCompat
import androidx.lifecycle.Lifecycle
import androidx.lifecycle.lifecycleScope
import androidx.lifecycle.repeatOnLifecycle
import androidx.localbroadcastmanager.content.LocalBroadcastManager import androidx.localbroadcastmanager.content.LocalBroadcastManager
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.isActive import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
@ -64,25 +70,21 @@ class PathActivity : PassphraseRequiredActionBarActivity() {
registerObservers() registerObservers()
IP2Country.configureIfNeeded(this) IP2Country.configureIfNeeded(this)
}
private fun registerObservers() {
val buildingPathsReceiver: BroadcastReceiver = object : BroadcastReceiver() {
override fun onReceive(context: Context, intent: Intent) { lifecycleScope.launch {
handleBuildingPathsEvent() // Check if the
repeatOnLifecycle(Lifecycle.State.STARTED) {
OnionRequestAPI.paths
.map { it.isEmpty() }
.distinctUntilChanged()
.collectLatest {
update(true)
}
} }
} }
broadcastReceivers.add(buildingPathsReceiver) }
LocalBroadcastManager.getInstance(this).registerReceiver(buildingPathsReceiver, IntentFilter("buildingPaths"))
val pathsBuiltReceiver: BroadcastReceiver = object : BroadcastReceiver() {
override fun onReceive(context: Context, intent: Intent) { private fun registerObservers() {
handlePathsBuiltEvent()
}
}
broadcastReceivers.add(pathsBuiltReceiver)
LocalBroadcastManager.getInstance(this).registerReceiver(pathsBuiltReceiver, IntentFilter("pathsBuilt"))
val onionRequestPathCountriesLoadedReceiver: BroadcastReceiver = object : BroadcastReceiver() { val onionRequestPathCountriesLoadedReceiver: BroadcastReceiver = object : BroadcastReceiver() {
override fun onReceive(context: Context, intent: Intent) { override fun onReceive(context: Context, intent: Intent) {
@ -102,15 +104,15 @@ class PathActivity : PassphraseRequiredActionBarActivity() {
// endregion // endregion
// region Updating // region Updating
private fun handleBuildingPathsEvent() { update(false) }
private fun handlePathsBuiltEvent() { update(false) }
private fun handleOnionRequestPathCountriesLoaded() { update(false) } private fun handleOnionRequestPathCountriesLoaded() { update(false) }
private fun update(isAnimated: Boolean) { private fun update(isAnimated: Boolean) {
binding.pathRowsContainer.removeAllViews() binding.pathRowsContainer.removeAllViews()
if (OnionRequestAPI.paths.isNotEmpty()) { val paths = OnionRequestAPI.paths.value
val path = OnionRequestAPI.paths.firstOrNull() ?: return finish() if (paths.isNotEmpty()) {
val path = paths.firstOrNull() ?: return finish()
val dotAnimationRepeatInterval = path.count().toLong() * 1000 + 1000 val dotAnimationRepeatInterval = path.count().toLong() * 1000 + 1000
val pathRows = path.mapIndexed { index, snode -> val pathRows = path.mapIndexed { index, snode ->
val isGuardSnode = (OnionRequestAPI.guardSnodes.contains(snode)) val isGuardSnode = (OnionRequestAPI.guardSnodes.contains(snode))

@ -1,27 +1,23 @@
package org.thoughtcrime.securesms.home package org.thoughtcrime.securesms.home
import android.content.BroadcastReceiver
import android.content.Context import android.content.Context
import android.content.Intent
import android.content.IntentFilter
import android.graphics.Canvas import android.graphics.Canvas
import android.graphics.Paint import android.graphics.Paint
import android.util.AttributeSet import android.util.AttributeSet
import android.view.View import android.view.View
import androidx.annotation.ColorInt import androidx.annotation.ColorInt
import androidx.core.content.ContextCompat import androidx.core.content.ContextCompat
import androidx.lifecycle.coroutineScope
import androidx.localbroadcastmanager.content.LocalBroadcastManager
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.withContext import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import network.loki.messenger.R import network.loki.messenger.R
import org.session.libsession.snode.OnionRequestAPI import org.session.libsession.snode.OnionRequestAPI
import org.thoughtcrime.securesms.conversation.v2.ViewUtil
import org.thoughtcrime.securesms.util.toPx import org.thoughtcrime.securesms.util.toPx
class PathStatusView : View { class PathStatusView : View {
private val broadcastReceivers = mutableListOf<BroadcastReceiver>()
@ColorInt var mainColor: Int = 0 @ColorInt var mainColor: Int = 0
set(newValue) { field = newValue; paint.color = newValue } set(newValue) { field = newValue; paint.color = newValue }
@ColorInt var sessionShadowColor: Int = 0 @ColorInt var sessionShadowColor: Int = 0
@ -53,65 +49,36 @@ class PathStatusView : View {
} }
private fun initialize() { private fun initialize() {
if (!isInEditMode) {
update()
}
setWillNotDraw(false) setWillNotDraw(false)
} }
override fun onAttachedToWindow() { override fun onAttachedToWindow() {
super.onAttachedToWindow() super.onAttachedToWindow()
registerObservers()
}
private fun registerObservers() { updateJob = GlobalScope.launch(Dispatchers.Main) {
val buildingPathsReceiver: BroadcastReceiver = object : BroadcastReceiver() { OnionRequestAPI.hasPath
.collectLatest { pathsBuilt ->
override fun onReceive(context: Context, intent: Intent) { if (pathsBuilt) {
handleBuildingPathsEvent() setBackgroundResource(R.drawable.accent_dot)
} val hasPathsColor = context.getColor(R.color.accent_green)
} mainColor = hasPathsColor
broadcastReceivers.add(buildingPathsReceiver) sessionShadowColor = hasPathsColor
LocalBroadcastManager.getInstance(context).registerReceiver(buildingPathsReceiver, IntentFilter("buildingPaths")) } else {
val pathsBuiltReceiver: BroadcastReceiver = object : BroadcastReceiver() { setBackgroundResource(R.drawable.paths_building_dot)
val pathsBuildingColor = ContextCompat.getColor(context, R.color.paths_building)
override fun onReceive(context: Context, intent: Intent) { mainColor = pathsBuildingColor
handlePathsBuiltEvent() sessionShadowColor = pathsBuildingColor
} }
}
} }
broadcastReceivers.add(pathsBuiltReceiver)
LocalBroadcastManager.getInstance(context).registerReceiver(pathsBuiltReceiver, IntentFilter("pathsBuilt"))
} }
override fun onDetachedFromWindow() { override fun onDetachedFromWindow() {
for (receiver in broadcastReceivers) { updateJob?.cancel()
LocalBroadcastManager.getInstance(context).unregisterReceiver(receiver)
}
super.onDetachedFromWindow() super.onDetachedFromWindow()
} }
private fun handleBuildingPathsEvent() { update() }
private fun handlePathsBuiltEvent() { update() }
private fun update() {
if (updateJob?.isActive != true) { // false or null
updateJob = ViewUtil.getActivityLifecycle(this)?.coroutineScope?.launchWhenStarted {
val paths = withContext(Dispatchers.IO) { OnionRequestAPI.paths }
if (paths.isNotEmpty()) {
setBackgroundResource(R.drawable.accent_dot)
val hasPathsColor = context.getColor(R.color.accent_green)
mainColor = hasPathsColor
sessionShadowColor = hasPathsColor
} else {
setBackgroundResource(R.drawable.paths_building_dot)
val pathsBuildingColor = ContextCompat.getColor(context, R.color.paths_building)
mainColor = pathsBuildingColor
sessionShadowColor = pathsBuildingColor
}
}
}
}
override fun onDraw(c: Canvas) { override fun onDraw(c: Canvas) {
val w = width.toFloat() val w = width.toFloat()
val h = height.toFloat() val h = height.toFloat()

@ -103,7 +103,6 @@ import org.thoughtcrime.securesms.ui.theme.PreviewTheme
import org.thoughtcrime.securesms.ui.theme.SessionColorsParameterProvider import org.thoughtcrime.securesms.ui.theme.SessionColorsParameterProvider
import org.thoughtcrime.securesms.ui.theme.ThemeColors import org.thoughtcrime.securesms.ui.theme.ThemeColors
import org.thoughtcrime.securesms.ui.theme.dangerButtonColors import org.thoughtcrime.securesms.ui.theme.dangerButtonColors
import org.thoughtcrime.securesms.util.ConfigurationMessageUtilities
import org.thoughtcrime.securesms.util.NetworkUtils import org.thoughtcrime.securesms.util.NetworkUtils
import org.thoughtcrime.securesms.util.push import org.thoughtcrime.securesms.util.push
import java.io.File import java.io.File
@ -429,7 +428,7 @@ class SettingsActivity : PassphraseRequiredActionBarActivity() {
Spacer(modifier = Modifier.height(LocalDimensions.current.spacing)) Spacer(modifier = Modifier.height(LocalDimensions.current.spacing))
val hasPaths by hasPaths().collectAsState(initial = false) val hasPaths by OnionRequestAPI.hasPath.collectAsState()
Cell { Cell {
Column { Column {
@ -620,19 +619,3 @@ class SettingsActivity : PassphraseRequiredActionBarActivity() {
} }
} }
} }
private fun Context.hasPaths(): Flow<Boolean> = LocalBroadcastManager.getInstance(this).hasPaths()
private fun LocalBroadcastManager.hasPaths(): Flow<Boolean> = callbackFlow {
val receiver: BroadcastReceiver = object : BroadcastReceiver() {
override fun onReceive(context: Context, intent: Intent) { trySend(Unit) }
}
registerReceiver(receiver, IntentFilter("buildingPaths"))
registerReceiver(receiver, IntentFilter("pathsBuilt"))
awaitClose { unregisterReceiver(receiver) }
}.onStart { emit(Unit) }.map {
withContext(Dispatchers.Default) {
OnionRequestAPI.paths.isNotEmpty()
}
}

@ -7,6 +7,8 @@ import android.content.IntentFilter
import androidx.localbroadcastmanager.content.LocalBroadcastManager import androidx.localbroadcastmanager.content.LocalBroadcastManager
import com.opencsv.CSVReader import com.opencsv.CSVReader
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import org.session.libsession.snode.OnionRequestAPI import org.session.libsession.snode.OnionRequestAPI
import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Log
@ -72,19 +74,16 @@ class IP2Country internal constructor(
if (isInitialized) { return; } if (isInitialized) { return; }
shared = IP2Country(context.applicationContext) shared = IP2Country(context.applicationContext)
val pathsBuiltEventReceiver = object : BroadcastReceiver() { GlobalScope.launch {
override fun onReceive(context: Context, intent: Intent) { OnionRequestAPI.paths
shared.populateCacheIfNeeded() .filter { it.isNotEmpty() }
} .collectLatest {
shared.populateCacheIfNeeded()
}
} }
LocalBroadcastManager.getInstance(context).registerReceiver(pathsBuiltEventReceiver, IntentFilter("pathsBuilt"))
} }
} }
init {
populateCacheIfNeeded()
}
// TODO: Deinit? // TODO: Deinit?
// endregion // endregion
@ -105,16 +104,14 @@ class IP2Country internal constructor(
} }
private fun populateCacheIfNeeded() { private fun populateCacheIfNeeded() {
GlobalScope.launch { val start = System.currentTimeMillis()
val start = System.currentTimeMillis() OnionRequestAPI.paths.value.iterator().forEach { path ->
OnionRequestAPI.paths.iterator().forEach { path -> path.iterator().forEach { snode ->
path.iterator().forEach { snode -> cacheCountryForIP(snode.ip) // Preload if needed
cacheCountryForIP(snode.ip) // Preload if needed
}
} }
Log.d("Loki","Cache populated in ${System.currentTimeMillis() - start}ms")
Broadcaster(context).broadcast("onionRequestPathCountriesLoaded")
} }
Log.d("Loki","IP2Country cache populated in ${System.currentTimeMillis() - start}ms")
Broadcaster(context).broadcast("onionRequestPathCountriesLoaded")
} }
// endregion // endregion
} }

@ -2,6 +2,13 @@ package org.session.libsession.snode
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.drop
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import nl.komponents.kovenant.Deferred import nl.komponents.kovenant.Deferred
import nl.komponents.kovenant.Promise import nl.komponents.kovenant.Promise
@ -40,37 +47,33 @@ object OnionRequestAPI {
private var buildPathsPromise: Promise<List<Path>, Exception>? = null private var buildPathsPromise: Promise<List<Path>, Exception>? = null
private val database: LokiAPIDatabaseProtocol private val database: LokiAPIDatabaseProtocol
get() = SnodeModule.shared.storage get() = SnodeModule.shared.storage
private val broadcaster: Broadcaster
get() = SnodeModule.shared.broadcaster
private val pathFailureCount = mutableMapOf<Path, Int>() private val pathFailureCount = mutableMapOf<Path, Int>()
private val snodeFailureCount = mutableMapOf<Snode, Int>() private val snodeFailureCount = mutableMapOf<Snode, Int>()
var guardSnodes = setOf<Snode>() var guardSnodes = setOf<Snode>()
var _paths: AtomicReference<List<Path>?> = AtomicReference(null)
var paths: List<Path> // Not a Set to ensure we consistently show the same path to the user private val mutablePaths = MutableStateFlow(database.getOnionRequestPaths())
@Synchronized
get() { val paths: StateFlow<List<Path>> get() = mutablePaths
val paths = _paths.get() val hasPath: StateFlow<Boolean> = mutablePaths
.drop(1)
if (paths != null) { return paths } .map { it.isNotEmpty() }
.stateIn(GlobalScope, SharingStarted.Eagerly, paths.value.isNotEmpty())
// Storing this in an atomic variable as it was causing a number of background
// ANRs when this value was accessed via the main thread after tapping on init {
// a notification) // Listen for the changes in paths and persist it to the db
val result = database.getOnionRequestPaths() GlobalScope.launch {
_paths.set(result) mutablePaths
return result .drop(1) // Drop the first result where it just comes from the db
} .collectLatest {
@Synchronized if (it.isEmpty()) {
set(newValue) { database.clearOnionRequestPaths()
if (newValue.isEmpty()) { } else {
database.clearOnionRequestPaths() database.setOnionRequestPaths(it)
_paths.set(null) }
} else {
database.setOnionRequestPaths(newValue)
_paths.set(newValue)
} }
} }
}
// region Settings // region Settings
/** /**
@ -172,7 +175,6 @@ object OnionRequestAPI {
val existingBuildPathsPromise = buildPathsPromise val existingBuildPathsPromise = buildPathsPromise
if (existingBuildPathsPromise != null) { return existingBuildPathsPromise } if (existingBuildPathsPromise != null) { return existingBuildPathsPromise }
Log.d("Loki", "Building onion request paths.") Log.d("Loki", "Building onion request paths.")
broadcaster.broadcast("buildingPaths")
val promise = SnodeAPI.getRandomSnode().bind { // Just used to populate the snode pool val promise = SnodeAPI.getRandomSnode().bind { // Just used to populate the snode pool
val reusableGuardSnodes = reusablePaths.map { it[0] } val reusableGuardSnodes = reusablePaths.map { it[0] }
getGuardSnodes(reusableGuardSnodes).map { guardSnodes -> getGuardSnodes(reusableGuardSnodes).map { guardSnodes ->
@ -193,8 +195,7 @@ object OnionRequestAPI {
result result
} }
}.map { paths -> }.map { paths ->
OnionRequestAPI.paths = paths + reusablePaths mutablePaths.value = paths + reusablePaths
broadcaster.broadcast("pathsBuilt")
paths paths
} }
} }
@ -209,7 +210,7 @@ object OnionRequestAPI {
*/ */
private fun getPath(snodeToExclude: Snode?): Promise<Path, Exception> { private fun getPath(snodeToExclude: Snode?): Promise<Path, Exception> {
if (pathSize < 1) { throw Exception("Can't build path of size zero.") } if (pathSize < 1) { throw Exception("Can't build path of size zero.") }
val paths = this.paths val paths = this.paths.value
val guardSnodes = mutableSetOf<Snode>() val guardSnodes = mutableSetOf<Snode>()
if (paths.isNotEmpty()) { if (paths.isNotEmpty()) {
guardSnodes.add(paths[0][0]) guardSnodes.add(paths[0][0])
@ -256,7 +257,7 @@ object OnionRequestAPI {
// path we leave the re-building up to getPath() because re-building the path in that case // path we leave the re-building up to getPath() because re-building the path in that case
// is async. // is async.
snodeFailureCount[snode] = 0 snodeFailureCount[snode] = 0
val oldPaths = paths.toMutableList() val oldPaths = mutablePaths.value.toMutableList()
val pathIndex = oldPaths.indexOfFirst { it.contains(snode) } val pathIndex = oldPaths.indexOfFirst { it.contains(snode) }
if (pathIndex == -1) { return } if (pathIndex == -1) { return }
val path = oldPaths[pathIndex].toMutableList() val path = oldPaths[pathIndex].toMutableList()
@ -269,16 +270,16 @@ object OnionRequestAPI {
// Don't test the new snode as this would reveal the user's IP // Don't test the new snode as this would reveal the user's IP
oldPaths.removeAt(pathIndex) oldPaths.removeAt(pathIndex)
val newPaths = oldPaths + listOf( path ) val newPaths = oldPaths + listOf( path )
paths = newPaths mutablePaths.value = newPaths
} }
private fun dropPath(path: Path) { private fun dropPath(path: Path) {
pathFailureCount[path] = 0 pathFailureCount[path] = 0
val paths = OnionRequestAPI.paths.toMutableList() val paths = mutablePaths.value.toMutableList()
val pathIndex = paths.indexOf(path) val pathIndex = paths.indexOf(path)
if (pathIndex == -1) { return } if (pathIndex == -1) { return }
paths.removeAt(pathIndex) paths.removeAt(pathIndex)
OnionRequestAPI.paths = paths mutablePaths.value = paths
} }
/** /**
@ -369,7 +370,7 @@ object OnionRequestAPI {
val checkedGuardSnode = guardSnode val checkedGuardSnode = guardSnode
val path = val path =
if (checkedGuardSnode == null) null if (checkedGuardSnode == null) null
else paths.firstOrNull { it.contains(checkedGuardSnode) } else paths.value.firstOrNull { it.contains(checkedGuardSnode) }
fun handleUnspecificError() { fun handleUnspecificError() {
if (path == null) { return } if (path == null) { return }

Loading…
Cancel
Save