feat: add first try to build SharedConfigMessages

pull/2620/head
Audric Ackermann 2 years ago
parent d1cefd4729
commit 9cf1419ca5

@ -98,6 +98,7 @@
"filesize": "3.6.1",
"firstline": "1.2.1",
"fs-extra": "9.0.0",
"git": "^0.1.5",
"glob": "7.1.2",
"image-type": "^4.1.0",
"ip2country": "1.0.1",
@ -139,7 +140,7 @@
"rimraf": "2.6.2",
"sanitize.css": "^12.0.1",
"semver": "5.4.1",
"session_util_wrapper": "https://github.com/oxen-io/libsession-util-nodejs",
"session_util_wrapper": "/home/audric/pro/contribs/libsession-util-nodejs",
"styled-components": "5.1.1",
"uuid": "8.3.2"
},
@ -289,7 +290,11 @@
"StartupWMClass": "Session"
},
"asarUnpack": "node_modules/spellchecker/vendor/hunspell_dictionaries",
"target": ["deb", "rpm", "freebsd"],
"target": [
"deb",
"rpm",
"freebsd"
],
"icon": "build/icon-linux.icns"
},
"asarUnpack": [

@ -21,6 +21,14 @@ import { sanitizeSessionUsername } from '../../session/utils/String';
import { setLastProfileUpdateTimestamp } from '../../util/storage';
import { ConversationTypeEnum } from '../../models/conversationAttributes';
import { MAX_USERNAME_BYTES } from '../../session/constants';
import { SharedConfigMessage } from '../../session/messages/outgoing/controlMessage/SharedConfigMessage';
import { callLibSessionWorker } from '../../webworker/workers/browser/libsession_worker_interface';
import { SignalService } from '../../protobuf';
import Long from 'long';
import { GetNetworkTime } from '../../session/apis/snode_api/getNetworkTime';
import { getMessageQueue } from '../../session/sending';
import { SnodeNamespaces } from '../../session/apis/snode_api/namespaces';
import { from_string } from 'libsodium-wrappers-sumo';
interface State {
profileName: string;
@ -337,8 +345,35 @@ async function commitProfileEdits(newName: string, scaledAvatarUrl: string | nul
}
// do not update the avatar if it did not change
conversation.setSessionDisplayNameNoCommit(newName);
// might be good to not trigger a sync if the name did not change
await conversation.commit();
await setLastProfileUpdateTimestamp(Date.now());
await SyncUtils.forceSyncConfigurationNowIfNeeded(true);
if (window.sessionFeatureFlags.useSharedUtilForUserConfig) {
await callLibSessionWorker(['UserConfig', 'setName', newName]);
const pointer = conversation.get('avatarPointer');
const profileKey = conversation.get('profileKey');
if (profileKey && pointer) {
await callLibSessionWorker([
'UserConfig',
'setProfilePicture',
pointer,
from_string(profileKey),
]);
} else {
await callLibSessionWorker(['UserConfig', 'setProfilePicture', '', new Uint8Array()]);
}
const message = new SharedConfigMessage({
data: (await callLibSessionWorker(['UserConfig', 'dump'])) as Uint8Array,
kind: SignalService.SharedConfigMessage.Kind.USER_PROFILE,
seqno: Long.fromNumber(0),
timestamp: GetNetworkTime.getNowWithNetworkOffset(),
});
await getMessageQueue().sendSyncMessage({ message, namespace: SnodeNamespaces.UserProfile });
} else {
await setLastProfileUpdateTimestamp(Date.now());
await SyncUtils.forceSyncConfigurationNowIfNeeded(true);
}
}

@ -2,57 +2,52 @@ import React, { useEffect, useState } from 'react';
import { getConversationController } from '../../session/conversations';
import { syncConfigurationIfNeeded } from '../../session/utils/sync/syncUtils';
import { useDispatch, useSelector } from 'react-redux';
import {
Data,
hasSyncedInitialConfigurationItem,
lastAvatarUploadTimestamp,
} from '../../data/data';
import { getMessageQueue } from '../../session/sending';
import { useDispatch, useSelector } from 'react-redux';
// tslint:disable: no-submodule-imports
import useInterval from 'react-use/lib/useInterval';
import useTimeoutFn from 'react-use/lib/useTimeoutFn';
import { getOurNumber } from '../../state/selectors/user';
import { clearSearch } from '../../state/ducks/search';
import { resetOverlayMode, SectionType, showLeftPaneSection } from '../../state/ducks/section';
import {
getOurPrimaryConversation,
getUnreadMessageCount,
} from '../../state/selectors/conversations';
import { getFocusedSection } from '../../state/selectors/section';
import { clearSearch } from '../../state/ducks/search';
import { resetOverlayMode, SectionType, showLeftPaneSection } from '../../state/ducks/section';
import { getOurNumber } from '../../state/selectors/user';
import { cleanUpOldDecryptedMedias } from '../../session/crypto/DecryptedAttachmentsManager';
import { DURATION } from '../../session/constants';
import { onionPathModal } from '../../state/ducks/modalDialog';
import { uploadOurAvatar } from '../../interactions/conversationInteractions';
import { debounce, isEmpty, isString } from 'lodash';
import { uploadOurAvatar } from '../../interactions/conversationInteractions';
import { editProfileModal, onionPathModal } from '../../state/ducks/modalDialog';
// tslint:disable-next-line: no-import-side-effect no-submodule-imports
import { ActionPanelOnionStatusLight } from '../dialog/OnionStatusPathDialog';
import { ipcRenderer } from 'electron';
import { loadDefaultRooms } from '../../session/apis/open_group_api/opengroupV2/ApiUtil';
import { getOpenGroupManager } from '../../session/apis/open_group_api/opengroupV2/OpenGroupManagerV2';
import { getSwarmPollingInstance } from '../../session/apis/snode_api';
import { UserUtils } from '../../session/utils';
import { Avatar, AvatarSize } from '../avatar/Avatar';
import { ActionPanelOnionStatusLight } from '../dialog/OnionStatusPathDialog';
import { SessionIconButton } from '../icon';
import { LeftPaneSectionContainer } from './LeftPaneSectionContainer';
import { ipcRenderer } from 'electron';
import { UserUtils } from '../../session/utils';
import { getLatestReleaseFromFileServer } from '../../session/apis/file_server_api/FileServerApi';
import { switchThemeTo } from '../../themes/switchTheme';
import { ThemeStateType } from '../../themes/constants/colors';
import { isDarkTheme } from '../../state/selectors/theme';
import { forceRefreshRandomSnodePool } from '../../session/apis/snode_api/snodePool';
import { SharedConfigMessage } from '../../session/messages/outgoing/controlMessage/SharedConfigMessage';
import { SignalService } from '../../protobuf';
import { GetNetworkTime } from '../../session/apis/snode_api/getNetworkTime';
import Long from 'long';
import { SnodeNamespaces } from '../../session/apis/snode_api/namespaces';
import { initializeLibSessionUtilWrappers } from '../../session/utils/libsession/libsession_utils';
import { isDarkTheme } from '../../state/selectors/theme';
import { ThemeStateType } from '../../themes/constants/colors';
import { switchThemeTo } from '../../themes/switchTheme';
const Section = (props: { type: SectionType }) => {
const ourNumber = useSelector(getOurNumber);
@ -67,15 +62,7 @@ const Section = (props: { type: SectionType }) => {
const handleClick = async () => {
/* tslint:disable:no-void-expression */
if (type === SectionType.Profile) {
const message = new SharedConfigMessage({
data: new Uint8Array([1, 2, 3]),
kind: SignalService.SharedConfigMessage.Kind.USER_PROFILE,
seqno: Long.fromNumber(0),
timestamp: GetNetworkTime.getNowWithNetworkOffset(),
});
await getMessageQueue().sendSyncMessage({ message, namespace: SnodeNamespaces.UserProfile });
console.warn('FIXME');
// dispatch(editProfileModal({}));
dispatch(editProfileModal({}));
} else if (type === SectionType.ColorMode) {
const currentTheme = String(window.Events.getThemeSetting());
const newTheme = (isDarkMode
@ -220,10 +207,6 @@ const doAppStartUp = async () => {
// TODO make this a job of the JobRunner
debounce(triggerAvatarReUploadIfNeeded, 200);
// init the messageQueue. In the constructor, we add all not send messages
// this call does nothing except calling the constructor, which will continue sending message in the pipeline
void getMessageQueue().processAllPending();
/* Postpone a little bit of the polling of sogs messages to let the swarm messages come in first. */
global.setTimeout(() => {
void getOpenGroupManager().startPolling();
@ -311,7 +294,6 @@ export const ActionsPanel = () => {
<Section type={SectionType.Profile} />
<Section type={SectionType.Message} />
<Section type={SectionType.Settings} />
<Section type={SectionType.PathIndicator} />
<Section type={SectionType.ColorMode} />
</LeftPaneSectionContainer>

@ -11,7 +11,12 @@ import { HexKeyPair } from '../receiver/keypairs';
import { getConversationController } from '../session/conversations';
import { getSodiumRenderer } from '../session/crypto';
import { PubKey } from '../session/types';
import { MsgDuplicateSearchOpenGroup, UpdateLastHashType } from '../types/sqlSharedTypes';
import {
AsyncWrapper,
MsgDuplicateSearchOpenGroup,
UnprocessedDataNode,
UpdateLastHashType,
} from '../types/sqlSharedTypes';
import { ExpirationTimerOptions } from '../util/expiringMessages';
import { Storage } from '../util/storage';
import { channels } from './channels';
@ -101,96 +106,6 @@ function _cleanData(data: any): any {
return data;
}
// we export them like this instead of directly with the `export function` cause this is helping a lot for testing
export const Data = {
shutdown,
close,
removeDB,
getPasswordHash,
// items table logic
createOrUpdateItem,
getItemById,
getAllItems,
removeItemById,
// guard nodes
getGuardNodes,
updateGuardNodes,
generateAttachmentKeyIfEmpty,
getSwarmNodesForPubkey,
updateSwarmNodesForPubkey,
getAllEncryptionKeyPairsForGroup,
getLatestClosedGroupEncryptionKeyPair,
addClosedGroupEncryptionKeyPair,
removeAllClosedGroupEncryptionKeyPairs,
saveConversation,
getConversationById,
removeConversation,
getAllConversations,
getPubkeysInPublicConversation,
searchConversations,
searchMessages,
searchMessagesInConversation,
cleanSeenMessages,
cleanLastHashes,
saveSeenMessageHashes,
updateLastHash,
saveMessage,
saveMessages,
removeMessage,
removeMessagesByIds,
getMessageIdsFromServerIds,
getMessageById,
getMessageBySenderAndSentAt,
getMessageByServerId,
filterAlreadyFetchedOpengroupMessage,
getMessageBySenderAndTimestamp,
getUnreadByConversation,
getUnreadCountByConversation,
markAllAsReadByConversationNoExpiration,
getMessageCountByType,
getMessagesByConversation,
getLastMessagesByConversation,
getLastMessageIdInConversation,
getLastMessageInConversation,
getOldestMessageInConversation,
getMessageCount,
getFirstUnreadMessageIdInConversation,
getFirstUnreadMessageWithMention,
hasConversationOutgoingMessage,
getLastHashBySnode,
getSeenMessagesByHashList,
removeAllMessagesInConversation,
getMessagesBySentAt,
getExpiredMessages,
getOutgoingWithoutExpiresAt,
getNextExpiringMessage,
getUnprocessedCount,
getAllUnprocessed,
getUnprocessedById,
saveUnprocessed,
updateUnprocessedAttempts,
updateUnprocessedWithData,
removeUnprocessed,
removeAllUnprocessed,
getNextAttachmentDownloadJobs,
saveAttachmentDownloadJob,
setAttachmentDownloadJobPending,
resetAttachmentDownloadPending,
removeAttachmentDownloadJob,
removeAllAttachmentDownloadJobs,
removeAll,
removeAllConversations,
cleanupOrphanedAttachments,
removeOtherData,
getMessagesWithVisualMediaAttachments,
getMessagesWithFileAttachments,
getSnodePoolFromDb,
updateSnodePoolOnDb,
fillWithTestData,
};
// Basic
async function shutdown(): Promise<void> {
// Stop accepting new SQL jobs, flush outstanding queue
@ -674,49 +589,42 @@ async function getNextExpiringMessage(): Promise<MessageCollection> {
// Unprocessed
async function getUnprocessedCount(): Promise<number> {
const getUnprocessedCount: AsyncWrapper<UnprocessedDataNode['getUnprocessedCount']> = () => {
return channels.getUnprocessedCount();
}
};
async function getAllUnprocessed(): Promise<Array<UnprocessedParameter>> {
const getAllUnprocessed: AsyncWrapper<UnprocessedDataNode['getAllUnprocessed']> = () => {
return channels.getAllUnprocessed();
}
};
async function getUnprocessedById(id: string): Promise<UnprocessedParameter | undefined> {
const getUnprocessedById: AsyncWrapper<UnprocessedDataNode['getUnprocessedById']> = id => {
return channels.getUnprocessedById(id);
}
export type UnprocessedParameter = {
id: string;
version: number;
envelope: string;
timestamp: number;
attempts: number;
messageHash: string;
senderIdentity?: string;
decrypted?: string; // added once the envelopes's content is decrypted with updateCache
source?: string; // added once the envelopes's content is decrypted with updateCache
};
async function saveUnprocessed(data: UnprocessedParameter): Promise<string> {
const id = await channels.saveUnprocessed(_cleanData(data));
return id;
}
const saveUnprocessed: AsyncWrapper<UnprocessedDataNode['saveUnprocessed']> = data => {
return channels.saveUnprocessed(_cleanData(data));
};
async function updateUnprocessedAttempts(id: string, attempts: number): Promise<void> {
await channels.updateUnprocessedAttempts(id, attempts);
}
async function updateUnprocessedWithData(id: string, data: any): Promise<void> {
await channels.updateUnprocessedWithData(id, data);
}
const updateUnprocessedAttempts: AsyncWrapper<UnprocessedDataNode['updateUnprocessedAttempts']> = (
id,
attempts
) => {
return channels.updateUnprocessedAttempts(id, attempts);
};
const updateUnprocessedWithData: AsyncWrapper<UnprocessedDataNode['updateUnprocessedWithData']> = (
id,
data
) => {
return channels.updateUnprocessedWithData(id, _cleanData(data));
};
async function removeUnprocessed(id: string): Promise<void> {
await channels.removeUnprocessed(id);
}
const removeUnprocessed: AsyncWrapper<UnprocessedDataNode['removeUnprocessed']> = id => {
return channels.removeUnprocessed(id);
};
async function removeAllUnprocessed(): Promise<void> {
await channels.removeAllUnprocessed();
}
const removeAllUnprocessed: AsyncWrapper<UnprocessedDataNode['removeAllUnprocessed']> = () => {
return channels.removeAllUnprocessed();
};
// Attachment downloads
@ -908,3 +816,97 @@ export async function getAllItems(): Promise<Array<StorageItem>> {
export async function removeItemById(id: string): Promise<void> {
await channels.removeItemById(id);
}
// we export them like this instead of directly with the `export function` cause this is helping a lot for testing
export const Data = {
shutdown,
close,
removeDB,
getPasswordHash,
// items table logic
createOrUpdateItem,
getItemById,
getAllItems,
removeItemById,
// guard nodes
getGuardNodes,
updateGuardNodes,
generateAttachmentKeyIfEmpty,
getSwarmNodesForPubkey,
updateSwarmNodesForPubkey,
getAllEncryptionKeyPairsForGroup,
getLatestClosedGroupEncryptionKeyPair,
addClosedGroupEncryptionKeyPair,
removeAllClosedGroupEncryptionKeyPairs,
saveConversation,
getConversationById,
removeConversation,
getAllConversations,
getPubkeysInPublicConversation,
searchConversations,
searchMessages,
searchMessagesInConversation,
cleanSeenMessages,
cleanLastHashes,
saveSeenMessageHashes,
updateLastHash,
saveMessage,
saveMessages,
removeMessage,
removeMessagesByIds,
getMessageIdsFromServerIds,
getMessageById,
getMessageBySenderAndSentAt,
getMessageByServerId,
filterAlreadyFetchedOpengroupMessage,
getMessageBySenderAndTimestamp,
getUnreadByConversation,
getUnreadCountByConversation,
markAllAsReadByConversationNoExpiration,
getMessageCountByType,
getMessagesByConversation,
getLastMessagesByConversation,
getLastMessageIdInConversation,
getLastMessageInConversation,
getOldestMessageInConversation,
getMessageCount,
getFirstUnreadMessageIdInConversation,
getFirstUnreadMessageWithMention,
hasConversationOutgoingMessage,
getLastHashBySnode,
getSeenMessagesByHashList,
removeAllMessagesInConversation,
getMessagesBySentAt,
getExpiredMessages,
getOutgoingWithoutExpiresAt,
getNextExpiringMessage,
// Unprocessed messages data
getUnprocessedCount,
getAllUnprocessed,
getUnprocessedById,
saveUnprocessed,
updateUnprocessedAttempts,
updateUnprocessedWithData,
removeUnprocessed,
removeAllUnprocessed,
// attachments download jobs
getNextAttachmentDownloadJobs,
saveAttachmentDownloadJob,
setAttachmentDownloadJobPending,
resetAttachmentDownloadPending,
removeAttachmentDownloadJob,
removeAllAttachmentDownloadJobs,
removeAll,
removeAllConversations,
cleanupOrphanedAttachments,
removeOtherData,
getMessagesWithVisualMediaAttachments,
getMessagesWithFileAttachments,
getSnodePoolFromDb,
updateSnodePoolOnDb,
fillWithTestData,
};

@ -455,6 +455,10 @@ export async function uploadOurAvatar(newAvatarDecrypted?: ArrayBuffer) {
`Reuploading avatar finished at ${newTimestampReupload}, newAttachmentPointer ${fileUrl}`
);
}
return {
avatarPointer: ourConvo.get('avatarPointer'),
profileKey: ourConvo.get('profileKey'),
};
}
export async function replyToMessage(messageId: string) {

@ -440,7 +440,9 @@ async function connect() {
Notifications.enable();
}, 10 * 1000); // 10 sec
await queueAllCached();
setTimeout(() => {
void queueAllCached();
}, 10 * 1000); // 10 sec
await AttachmentDownloads.start({
logger: window.log,
});

@ -91,7 +91,7 @@ export interface ConversationAttributes {
/**
* When we create a closed group v3 or get promoted to admim, we need to save the private key of that closed group.
*/
identityPrivateKey?: string;
// identityPrivateKey?: string;
}
/**

@ -169,10 +169,10 @@ export function formatRowOfConversation(row?: Record<string, any>): Conversation
convo.active_at = 0;
}
convo.identityPrivateKey = row.identityPrivateKey;
if (!convo.identityPrivateKey) {
convo.identityPrivateKey = undefined;
}
// convo.identityPrivateKey = row.identityPrivateKey;
// if (!convo.identityPrivateKey) {
// convo.identityPrivateKey = undefined;
// }
return convo;
}

@ -1242,13 +1242,15 @@ function updateToSessionSchemaVersion31(currentVersion: number, db: BetterSqlite
combinedMessageHashes TEXT);
`);
db.exec(`ALTER TABLE conversations
ADD COLUMN lastReadTimestampMs INTEGER;
;
`);
// db.exec(`ALTER TABLE conversations
// ADD COLUMN lastReadTimestampMs INTEGER;
// ;
// `);
db.exec(`ALTER TABLE unprocessed DROP COLUMN serverTimestamp;`);
// we need to populate those fields with the current state of the conversation so let's throw null until this is done
throw null;
// throw new Error('update me');
writeSessionSchemaVersion(targetVersion, db);
})();

@ -17,6 +17,7 @@ import {
isString,
last,
map,
omit,
} from 'lodash';
import { redactAll } from '../util/privacy'; // checked - only node
import { LocaleMessagesType } from './locale'; // checked - only node
@ -46,7 +47,14 @@ import {
toSqliteBoolean,
} from './database_utility';
import { ConfigDumpDataNode, ConfigDumpRow, UpdateLastHashType } from '../types/sqlSharedTypes';
import {
ConfigDumpDataNode,
ConfigDumpRow,
MsgDuplicateSearchOpenGroup,
UnprocessedDataNode,
UnprocessedParameter,
UpdateLastHashType,
} from '../types/sqlSharedTypes';
import { OpenGroupV2Room } from '../data/opengroups';
import {
@ -437,9 +445,16 @@ function saveConversation(data: ConversationAttributes, instance?: BetterSqlite3
avatarInProfile,
displayNameInProfile,
conversationIdOrigin,
identityPrivateKey,
// identityPrivateKey,
} = formatted;
//FIXME
console.warn('FIXME omit(formatted, identityPrivateKey);');
const omited = omit(formatted, 'identityPrivateKey');
const keys = Object.keys(omited);
const columnsCommaSeparated = keys.join(', ');
const valuesArgs = keys.map(k => `$${k}`).join(', ');
const maxLength = 300;
// shorten the last message as we never need more than `maxLength` chars (and it bloats the redux/ipc calls uselessly.
@ -450,73 +465,9 @@ function saveConversation(data: ConversationAttributes, instance?: BetterSqlite3
assertGlobalInstanceOrInstance(instance)
.prepare(
`INSERT OR REPLACE INTO ${CONVERSATIONS_TABLE} (
id,
active_at,
type,
members,
nickname,
profileKey,
zombies,
left,
expireTimer,
mentionedUs,
unreadCount,
lastMessageStatus,
lastMessage,
lastJoinedTimestamp,
groupAdmins,
groupModerators,
isKickedFromGroup,
subscriberCount,
readCapability,
writeCapability,
uploadCapability,
is_medium_group,
avatarPointer,
avatarImageId,
triggerNotificationsFor,
isTrustedForAttachmentDownload,
isPinned,
isApproved,
didApproveMe,
avatarInProfile,
displayNameInProfile,
conversationIdOrigin,
identityPrivateKey
${columnsCommaSeparated}
) values (
$id,
$active_at,
$type,
$members,
$nickname,
$profileKey,
$zombies,
$left,
$expireTimer,
$mentionedUs,
$unreadCount,
$lastMessageStatus,
$lastMessage,
$lastJoinedTimestamp,
$groupAdmins,
$groupModerators,
$isKickedFromGroup,
$subscriberCount,
$readCapability,
$writeCapability,
$uploadCapability,
$is_medium_group,
$avatarPointer,
$avatarImageId,
$triggerNotificationsFor,
$isTrustedForAttachmentDownload,
$isPinned,
$isApproved,
$didApproveMe,
$avatarInProfile,
$displayNameInProfile,
$conversationIdOrigin,
$identityPrivateKey
${valuesArgs}
)`
)
.run({
@ -555,7 +506,7 @@ function saveConversation(data: ConversationAttributes, instance?: BetterSqlite3
avatarInProfile,
displayNameInProfile,
conversationIdOrigin,
identityPrivateKey,
// identityPrivateKey,
});
}
@ -1076,8 +1027,8 @@ function getMessageBySenderAndTimestamp({
}
function filterAlreadyFetchedOpengroupMessage(
msgDetails: Array<{ sender: string; serverTimestamp: number }> // MsgDuplicateSearchOpenGroup
): Array<{ sender: string; serverTimestamp: number }> {
msgDetails: MsgDuplicateSearchOpenGroup
): MsgDuplicateSearchOpenGroup {
const filteredNonBlinded = msgDetails.filter(msg => {
const rows = assertGlobalInstance()
.prepare(
@ -1478,127 +1429,119 @@ function getNextExpiringMessage() {
}
/* Unproccessed a received messages not yet processed */
function saveUnprocessed(data: any) {
const { id, timestamp, version, attempts, envelope, senderIdentity, messageHash } = data;
if (!id) {
throw new Error(`saveUnprocessed: id was falsey: ${id}`);
}
const unprocessed: UnprocessedDataNode = {
saveUnprocessed: (data: UnprocessedParameter) => {
const { id, timestamp, version, attempts, envelope, senderIdentity, messageHash } = data;
if (!id) {
throw new Error(`saveUnprocessed: id was falsey: ${id}`);
}
assertGlobalInstance()
.prepare(
`INSERT OR REPLACE INTO unprocessed (
id,
timestamp,
version,
attempts,
envelope,
senderIdentity,
serverHash
) values (
$id,
$timestamp,
$version,
$attempts,
$envelope,
$senderIdentity,
$messageHash
);`
)
.run({
id,
timestamp,
version,
attempts,
envelope,
senderIdentity,
messageHash,
});
assertGlobalInstance()
.prepare(
`INSERT OR REPLACE INTO unprocessed (
id,
timestamp,
version,
attempts,
envelope,
senderIdentity,
serverHash
) values (
$id,
$timestamp,
$version,
$attempts,
$envelope,
$senderIdentity,
$messageHash
);`
)
.run({
id,
timestamp,
version,
attempts,
envelope,
senderIdentity,
messageHash,
});
},
return id;
}
updateUnprocessedAttempts: (id: string, attempts: number) => {
assertGlobalInstance()
.prepare('UPDATE unprocessed SET attempts = $attempts WHERE id = $id;')
.run({
id,
attempts,
});
},
function updateUnprocessedAttempts(id: string, attempts: number) {
assertGlobalInstance()
.prepare('UPDATE unprocessed SET attempts = $attempts WHERE id = $id;')
.run({
id,
attempts,
});
}
function updateUnprocessedWithData(id: string, data: any = {}) {
const { source, serverTimestamp, decrypted, senderIdentity } = data;
updateUnprocessedWithData: (id: string, data: UnprocessedParameter) => {
const { source, decrypted, senderIdentity } = data;
assertGlobalInstance()
.prepare(
`UPDATE unprocessed SET
source = $source,
serverTimestamp = $serverTimestamp,
decrypted = $decrypted,
senderIdentity = $senderIdentity
WHERE id = $id;`
)
.run({
id,
source,
serverTimestamp,
decrypted,
senderIdentity,
});
}
assertGlobalInstance()
.prepare(
`UPDATE unprocessed SET
source = $source,
decrypted = $decrypted,
senderIdentity = $senderIdentity
WHERE id = $id;`
)
.run({
id,
source,
decrypted,
senderIdentity,
});
},
function getUnprocessedById(id: string) {
const row = assertGlobalInstance()
.prepare('SELECT * FROM unprocessed WHERE id = $id;')
.get({
id,
});
getUnprocessedById: (id: string) => {
const row = assertGlobalInstance()
.prepare('SELECT * FROM unprocessed WHERE id = $id;')
.get({
id,
});
return row;
}
return row;
},
function getUnprocessedCount() {
const row = assertGlobalInstance()
.prepare('SELECT count(*) from unprocessed;')
.get();
getUnprocessedCount: () => {
const row = assertGlobalInstance()
.prepare('SELECT count(*) from unprocessed;')
.get();
if (!row) {
throw new Error('getMessageCount: Unable to get count of unprocessed');
}
if (!row) {
throw new Error('getMessageCount: Unable to get count of unprocessed');
}
return row['count(*)'];
}
return row['count(*)'];
},
function getAllUnprocessed() {
const rows = assertGlobalInstance()
.prepare('SELECT * FROM unprocessed ORDER BY timestamp ASC;')
.all();
getAllUnprocessed: () => {
const rows = assertGlobalInstance()
.prepare('SELECT * FROM unprocessed ORDER BY timestamp ASC;')
.all();
return rows;
}
return rows;
},
function removeUnprocessed(id: string) {
if (!Array.isArray(id)) {
removeUnprocessed: (id: string): void => {
if (Array.isArray(id)) {
console.warn('removeUnprocessed only supports single ids at a time');
throw new Error('removeUnprocessed only supports single ids at a time');
}
assertGlobalInstance()
.prepare('DELETE FROM unprocessed WHERE id = $id;')
.run({ id });
return;
}
if (!id.length) {
throw new Error('removeUnprocessed: No ids to delete!');
}
// Our node interface doesn't seem to allow you to replace one single ? with an array
assertGlobalInstance()
.prepare(`DELETE FROM unprocessed WHERE id IN ( ${id.map(() => '?').join(', ')} );`)
.run(id);
}
},
function removeAllUnprocessed() {
assertGlobalInstance()
.prepare('DELETE FROM unprocessed;')
.run();
}
removeAllUnprocessed: () => {
assertGlobalInstance()
.prepare('DELETE FROM unprocessed;')
.run();
},
};
function getNextAttachmentDownloadJobs(limit: number) {
const timestamp = Date.now();
@ -2565,14 +2508,8 @@ export const sqlNode = {
hasConversationOutgoingMessage,
fillWithTestData,
getUnprocessedCount,
getAllUnprocessed,
saveUnprocessed,
updateUnprocessedAttempts,
updateUnprocessedWithData,
getUnprocessedById,
removeUnprocessed,
removeAllUnprocessed,
// add all the calls related to the unprocessed cache of incoming messages
...unprocessed,
getNextAttachmentDownloadJobs,
saveAttachmentDownloadJob,

@ -1,12 +1,11 @@
import { EnvelopePlus } from './types';
import { StringUtils } from '../session/utils';
import _ from 'lodash';
import { Data, UnprocessedParameter } from '../data/data';
import { Data } from '../data/data';
import { UnprocessedParameter } from '../types/sqlSharedTypes';
export async function removeFromCache(envelope: EnvelopePlus) {
const { id } = envelope;
// window?.log?.info(`removing from cache envelope: ${id}`);
return Data.removeUnprocessed(id);
return Data.removeUnprocessed(envelope.id);
}
export async function addToCache(
@ -15,7 +14,6 @@ export async function addToCache(
messageHash: string
) {
const { id } = envelope;
// window?.log?.info(`adding to cache envelope: ${id}`);
const encodedEnvelope = StringUtils.decode(plaintext, 'base64');
const data: UnprocessedParameter = {
@ -30,10 +28,10 @@ export async function addToCache(
if (envelope.senderIdentity) {
data.senderIdentity = envelope.senderIdentity;
}
return Data.saveUnprocessed(data);
await Data.saveUnprocessed(data);
}
async function fetchAllFromCache(): Promise<Array<any>> {
async function fetchAllFromCache(): Promise<Array<UnprocessedParameter>> {
const count = await Data.getUnprocessedCount();
if (count > 1500) {
@ -42,30 +40,26 @@ async function fetchAllFromCache(): Promise<Array<any>> {
return [];
}
const items = await Data.getAllUnprocessed();
return items;
return Data.getAllUnprocessed();
}
export async function getAllFromCache() {
window?.log?.info('getAllFromCache');
const items = await fetchAllFromCache();
window?.log?.info('getAllFromCache loaded', items.length, 'saved envelopes');
async function increaseAttemptsOrRemove(
items: Array<UnprocessedParameter>
): Promise<Array<UnprocessedParameter>> {
return Promise.all(
_.map(items, async (item: any) => {
_.map(items, async item => {
const attempts = _.toNumber(item.attempts || 0) + 1;
try {
if (attempts >= 10) {
window?.log?.warn('getAllFromCache final attempt for envelope', item.id);
window?.log?.warn('increaseAttemptsOrRemove final attempt for envelope', item.id);
await Data.removeUnprocessed(item.id);
} else {
await Data.updateUnprocessedAttempts(item.id, attempts);
}
} catch (error) {
window?.log?.error(
'getAllFromCache error updating item after load:',
'increaseAttemptsOrRemove error updating item after load:',
error && error.stack ? error.stack : error
);
}
@ -75,6 +69,14 @@ export async function getAllFromCache() {
);
}
export async function getAllFromCache() {
window?.log?.info('getAllFromCache');
const items = await fetchAllFromCache();
window?.log?.info('getAllFromCache loaded', items.length, 'saved envelopes');
return increaseAttemptsOrRemove(items);
}
export async function getAllFromCacheForSource(source: string) {
const items = await fetchAllFromCache();
@ -85,34 +87,19 @@ export async function getAllFromCacheForSource(source: string) {
window?.log?.info('getAllFromCacheForSource loaded', itemsFromSource.length, 'saved envelopes');
return Promise.all(
_.map(items, async (item: any) => {
const attempts = _.toNumber(item.attempts || 0) + 1;
try {
if (attempts >= 10) {
window?.log?.warn('getAllFromCache final attempt for envelope', item.id);
await Data.removeUnprocessed(item.id);
} else {
await Data.updateUnprocessedAttempts(item.id, attempts);
}
} catch (error) {
window?.log?.error(
'getAllFromCache error updating item after load:',
error && error.stack ? error.stack : error
);
}
return item;
})
);
return increaseAttemptsOrRemove(itemsFromSource);
}
export async function updateCache(envelope: EnvelopePlus, plaintext: ArrayBuffer): Promise<void> {
export async function updateCacheWithDecryptedContent(
envelope: EnvelopePlus,
plaintext: ArrayBuffer
): Promise<void> {
const { id } = envelope;
const item = await Data.getUnprocessedById(id);
if (!item) {
window?.log?.error(`updateCache: Didn't find item ${id} in cache to update`);
window?.log?.error(
`updateCacheWithDecryptedContent: Didn't find item ${id} in cache to update`
);
return;
}
@ -125,5 +112,5 @@ export async function updateCache(envelope: EnvelopePlus, plaintext: ArrayBuffer
item.decrypted = StringUtils.decode(plaintext, 'base64');
return Data.updateUnprocessedWithData(item.id, item);
await Data.updateUnprocessedWithData(item.id, item);
}

@ -1,28 +1,27 @@
import _, { groupBy, isArray, isEmpty } from 'lodash';
import _, { isEmpty } from 'lodash';
import { ContactInfo, ProfilePicture } from 'session_util_wrapper';
import { Data, hasSyncedInitialConfigurationItem } from '../data/data';
import { ConversationInteraction } from '../interactions';
import { ConversationTypeEnum } from '../models/conversationAttributes';
import { SignalService } from '../protobuf';
import {
joinOpenGroupV2WithUIEvents,
parseOpenGroupV2,
} from '../session/apis/open_group_api/opengroupV2/JoinOpenGroupV2';
import { getOpenGroupV2ConversationId } from '../session/apis/open_group_api/utils/OpenGroupUtils';
import { SignalService } from '../protobuf';
import { getConversationController } from '../session/conversations';
import { IncomingMessage } from '../session/messages/incoming/IncomingMessage';
import { UserUtils } from '../session/utils';
import { toHex } from '../session/utils/String';
import { configurationMessageReceived, trigger } from '../shims/events';
import { BlockedNumberController } from '../util';
import { getLastProfileUpdateTimestamp, setLastProfileUpdateTimestamp } from '../util/storage';
import { ConfigWrapperObjectTypes } from '../webworker/workers/browser/libsession_worker_functions';
import { callLibSessionWorker } from '../webworker/workers/browser/libsession_worker_interface';
import { removeFromCache } from './cache';
import { handleNewClosedGroup } from './closedGroups';
import { EnvelopePlus } from './types';
import { ConversationInteraction } from '../interactions';
import { getLastProfileUpdateTimestamp, setLastProfileUpdateTimestamp } from '../util/storage';
import { appendFetchAvatarAndProfileJob, updateOurProfileSync } from './userProfileImageUpdates';
import { ConversationTypeEnum } from '../models/conversationAttributes';
import { callLibSessionWorker } from '../webworker/workers/browser/libsession_worker_interface';
import { IncomingMessage } from '../session/messages/incoming/IncomingMessage';
import { ConfigWrapperObjectTypes } from '../webworker/workers/browser/libsession_worker_functions';
import { Dictionary } from '@reduxjs/toolkit';
import { ContactInfo, ProfilePicture } from 'session_util_wrapper';
type IncomingConfResult = {
needsPush: boolean;
@ -45,26 +44,22 @@ function protobufSharedConfigTypeToWrapper(
}
async function mergeConfigsWithIncomingUpdates(
groupedByKind: Dictionary<Array<IncomingMessage<SignalService.SharedConfigMessage>>>
incomingConfig: IncomingMessage<SignalService.ISharedConfigMessage>
) {
const kindMessageMap: Map<SignalService.SharedConfigMessage.Kind, IncomingConfResult> = new Map();
// do the merging on all wrappers sequentially instead of with a promise.all()
const allKinds = (Object.keys(groupedByKind) as unknown) as Array<
SignalService.SharedConfigMessage.Kind
>;
const allKinds = [incomingConfig.message.kind];
for (let index = 0; index < allKinds.length; index++) {
const kind = allKinds[index];
// see comment above "groupedByKind = groupBy" about why this is needed
const castedKind = (kind as unknown) as SignalService.SharedConfigMessage.Kind;
const currentKindMessages = groupedByKind[castedKind];
const currentKindMessages = [incomingConfig];
if (!currentKindMessages) {
continue;
}
const toMerge = currentKindMessages.map(m => m.message.data);
const wrapperId = protobufSharedConfigTypeToWrapper(castedKind);
const wrapperId = protobufSharedConfigTypeToWrapper(kind);
if (!wrapperId) {
throw new Error(`Invalid castedKind: ${castedKind}`);
throw new Error(`Invalid kind: ${kind}`);
}
await callLibSessionWorker([wrapperId, 'merge', toMerge]);
@ -181,6 +176,7 @@ async function handleContactsUpdate(result: IncomingConfResult) {
}
async function processMergingResults(
envelope: EnvelopePlus,
results: Map<SignalService.SharedConfigMessage.Kind, IncomingConfResult>
) {
const keys = [...results.keys()];
@ -206,32 +202,30 @@ async function processMergingResults(
throw e;
}
}
await removeFromCache(envelope);
}
async function handleConfigMessagesViaLibSession(
configMessages: Array<IncomingMessage<SignalService.SharedConfigMessage>>
async function handleConfigMessageViaLibSession(
envelope: EnvelopePlus,
configMessage: IncomingMessage<SignalService.ISharedConfigMessage>
) {
// FIXME: Remove this once `useSharedUtilForUserConfig` is permanent
if (
!window.sessionFeatureFlags.useSharedUtilForUserConfig ||
!configMessages ||
!isArray(configMessages) ||
configMessages.length === 0
) {
if (!window.sessionFeatureFlags.useSharedUtilForUserConfig) {
await removeFromCache(envelope);
return;
}
window?.log?.info(
`Handling our profileUdpates via libsession_util. count: ${configMessages.length}`
);
if (!configMessage) {
await removeFromCache(envelope);
return;
}
// lodash does not have a way to give the type of the keys as generic parameter so this can only be a string: Array<>
const groupedByKind = groupBy(configMessages, m => m.message.kind);
window?.log?.info(`Handling our profileUdpates via libsession_util.`);
const kindMessagesMap = await mergeConfigsWithIncomingUpdates(groupedByKind);
const kindMessagesMap = await mergeConfigsWithIncomingUpdates(configMessage);
await processMergingResults(kindMessagesMap);
await processMergingResults(envelope, kindMessagesMap);
}
async function handleOurProfileUpdate(
@ -426,6 +420,7 @@ async function handleConfigurationMessage(
window?.log?.info(
'useSharedUtilForUserConfig is set, not handling config messages with "handleConfigurationMessage()"'
);
await removeFromCache(envelope);
return;
}
@ -449,5 +444,5 @@ async function handleConfigurationMessage(
export const ConfigMessageHandler = {
handleConfigurationMessage,
handleConfigMessagesViaLibSession,
handleConfigMessageViaLibSession,
};

@ -1,7 +1,7 @@
import { EnvelopePlus } from './types';
import { handleSwarmDataMessage } from './dataMessage';
import { removeFromCache, updateCache } from './cache';
import { removeFromCache, updateCacheWithDecryptedContent } from './cache';
import { SignalService } from '../protobuf';
import { compact, flatten, identity, isEmpty, pickBy, toNumber } from 'lodash';
import { KeyPrefixType, PubKey } from '../session/types';
@ -28,6 +28,7 @@ import {
import { ConversationTypeEnum } from '../models/conversationAttributes';
import { findCachedBlindedMatchOrLookupOnAllServers } from '../session/apis/open_group_api/sogsv3/knownBlindedkeys';
import { appendFetchAvatarAndProfileJob } from './userProfileImageUpdates';
import { IncomingMessage } from '../session/messages/incoming/IncomingMessage';
export async function handleSwarmContentMessage(envelope: EnvelopePlus, messageHash: string) {
try {
@ -39,7 +40,7 @@ export async function handleSwarmContentMessage(envelope: EnvelopePlus, messageH
return;
}
const sentAtTimestamp = toNumber(envelope.timestamp);
// swarm messages already comes with a timestamp is milliseconds, so this sentAtTimestamp is correct.
// swarm messages already comes with a timestamp in milliseconds, so this sentAtTimestamp is correct.
// the sogs messages do not come as milliseconds but just seconds, so we override it
await innerHandleSwarmContentMessage(envelope, sentAtTimestamp, plaintext, messageHash);
} catch (e) {
@ -270,15 +271,15 @@ async function decrypt(envelope: EnvelopePlus, ciphertext: ArrayBuffer): Promise
return null;
}
perfStart(`updateCache-${envelope.id}`);
perfStart(`updateCacheWithDecryptedContent-${envelope.id}`);
await updateCache(envelope, plaintext).catch((error: any) => {
await updateCacheWithDecryptedContent(envelope, plaintext).catch((error: any) => {
window?.log?.error(
'decrypt failed to save decrypted message contents to cache:',
error && error.stack ? error.stack : error
);
});
perfEnd(`updateCache-${envelope.id}`, 'updateCache');
perfEnd(`updateCacheWithDecryptedContent-${envelope.id}`, 'updateCacheWithDecryptedContent');
return plaintext;
} catch (error) {
@ -433,6 +434,18 @@ export async function innerHandleSwarmContentMessage(
);
return;
}
if (content.sharedConfigMessage) {
if (window.sessionFeatureFlags.useSharedUtilForUserConfig) {
const asIncomingMsg: IncomingMessage<SignalService.ISharedConfigMessage> = {
envelopeTimestamp: sentAtTimestamp,
message: content.sharedConfigMessage,
messageHash: messageHash,
authorOrGroupPubkey: envelope.source,
authorInGroup: envelope.senderIdentity,
};
await ConfigMessageHandler.handleConfigMessageViaLibSession(envelope, asIncomingMsg);
}
}
if (content.dataExtractionNotification) {
perfStart(`handleDataExtractionNotification-${envelope.id}`);

@ -14,12 +14,11 @@ import { SignalService } from '../protobuf';
import { Data } from '../data/data';
import { createTaskWithTimeout } from '../session/utils/TaskWithTimeout';
import { perfEnd, perfStart } from '../session/utils/Performance';
import { UnprocessedParameter } from '../types/sqlSharedTypes';
// TODO: check if some of these exports no longer needed
interface ReqOptions {
export type ReqOptions = {
conversationId: string;
}
};
const incomingMessagePromises: Array<Promise<any>> = [];
@ -29,7 +28,7 @@ async function handleSwarmEnvelope(envelope: EnvelopePlus, messageHash: string)
}
await removeFromCache(envelope);
throw new Error('Received message with no content and no legacyMessage');
throw new Error('Received message with no content');
}
class EnvelopeQueue {
@ -57,8 +56,6 @@ const envelopeQueue = new EnvelopeQueue();
function queueSwarmEnvelope(envelope: EnvelopePlus, messageHash: string) {
const id = getEnvelopeId(envelope);
// window?.log?.info('queueing envelope', id);
const task = handleSwarmEnvelope.bind(null, envelope, messageHash);
const taskWithTimeout = createTaskWithTimeout(task, `queueSwarmEnvelope ${id}`);
@ -102,7 +99,7 @@ async function handleRequestDetail(
envelope.senderIdentity = senderIdentity;
}
envelope.id = envelope.serverGuid || uuidv4();
envelope.id = uuidv4();
envelope.serverTimestamp = envelope.serverTimestamp ? envelope.serverTimestamp.toNumber() : null;
envelope.messageHash = messageHash;
@ -115,11 +112,7 @@ async function handleRequestDetail(
await addToCache(envelope, plaintext, messageHash);
perfEnd(`addToCache-${envelope.id}`, 'addToCache');
// TODO: This is the glue between the first and the last part of the
// receiving pipeline refactor. It is to be implemented in the next PR.
// To ensure that we queue in the same order we receive messages
await lastPromise;
queueSwarmEnvelope(envelope, messageHash);
@ -131,7 +124,11 @@ async function handleRequestDetail(
}
}
export function handleRequest(plaintext: any, options: ReqOptions, messageHash: string): void {
export function handleRequest(
plaintext: Uint8Array,
options: ReqOptions,
messageHash: string
): void {
// tslint:disable-next-line no-promise-as-boolean
const lastPromise = _.last(incomingMessagePromises) || Promise.resolve();
@ -163,7 +160,7 @@ export async function queueAllCachedFromSource(source: string) {
}, Promise.resolve());
}
async function queueCached(item: any) {
async function queueCached(item: UnprocessedParameter) {
try {
const envelopePlaintext = StringUtils.encode(item.envelope, 'base64');
const envelopeArray = new Uint8Array(envelopePlaintext);
@ -174,7 +171,7 @@ async function queueCached(item: any) {
// Why do we need to do this???
envelope.senderIdentity = envelope.senderIdentity || item.senderIdentity;
envelope.serverTimestamp = envelope.serverTimestamp || item.serverTimestamp;
envelope.serverTimestamp = envelope.serverTimestamp;
const { decrypted } = item;
@ -194,8 +191,7 @@ async function queueCached(item: any) {
);
try {
const { id } = item;
await Data.removeUnprocessed(id);
await Data.removeUnprocessed(item.id);
} catch (deleteError) {
window?.log?.error(
'queueCached error deleting item',

@ -57,6 +57,7 @@ export async function updateOurProfileSync(
window?.log?.warn('[profileupdate] Cannot update our profile with empty convoid');
return;
}
const oneAtaTimeStr = `appendFetchAvatarAndProfileJob:${ourConvo.id}`;
return allowOnlyOneAtATime(oneAtaTimeStr, async () => {
return createOrUpdateProfile(ourConvo.id, profileInDataMessage, profileKey);

@ -110,7 +110,6 @@ async function retrieveNextMessages(
const firstResult = results[0];
// TODO we should probably check for status code of all the results (when polling for a few namespaces at a time)
console.warn('what should we do if we dont get a 200 on any of those fetches?');
if (firstResult.code !== 200) {
window?.log?.warn(`retrieveNextMessages result is not 200 but ${firstResult.code}`);

@ -17,23 +17,33 @@ import pRetry from 'p-retry';
import { SnodeAPIRetrieve } from './retrieveRequest';
import { SnodeNamespace, SnodeNamespaces } from './namespaces';
import { RetrieveMessageItem, RetrieveMessagesResultsBatched } from './types';
import { ConfigMessageHandler } from '../../../receiver/configMessage';
import { IncomingMessage } from '../../messages/incoming/IncomingMessage';
// Some websocket nonsense
export function processMessage(message: string, options: any = {}, messageHash: string) {
export function extractWebSocketContent(
message: string,
options: any = {},
messageHash: string
): null | {
body: Uint8Array;
messageHash: string;
options: any;
} {
try {
const dataPlaintext = new Uint8Array(StringUtils.encode(message, 'base64'));
const messageBuf = SignalService.WebSocketMessage.decode(dataPlaintext);
if (messageBuf.type === SignalService.WebSocketMessage.Type.REQUEST) {
Receiver.handleRequest(messageBuf.request?.body, options, messageHash);
if (
messageBuf.type === SignalService.WebSocketMessage.Type.REQUEST &&
messageBuf.request?.body?.length
) {
return {
body: messageBuf.request.body,
messageHash,
options,
};
}
return null;
} catch (error) {
const info = {
message,
error: error.message,
};
window?.log?.warn('HTTP-Resources Failed to handle message:', info);
window?.log?.warn('processMessage Failed to handle message:', error.message);
return null;
}
}
@ -276,27 +286,27 @@ export class SwarmPolling {
const newMessages = await this.handleSeenMessages(messages);
perfEnd(`handleSeenMessages-${pkStr}`, 'handleSeenMessages');
// try {
// if (
// window.sessionFeatureFlags.useSharedUtilForUserConfig &&
// userConfigMessagesMerged.length
// ) {
// const asIncomingMessages = userConfigMessagesMerged.map(msg => {
// const incomingMessage: IncomingMessage<SignalService.SharedConfigMessage> = {
// envelopeTimestamp: msg.timestamp,
// message: msg.data,
// messageHash: msg.hash,
// };
// });
// await ConfigMessageHandler.handleConfigMessagesViaLibSession();
// }
// } catch (e) {
// console.error('shared util lib process messages failed with: ', e);
// }
newMessages.forEach((m: RetrieveMessageItem) => {
const options = isGroup ? { conversationId: pkStr } : {};
processMessage(m.data, options, m.hash);
if (window.sessionFeatureFlags.useSharedUtilForUserConfig) {
const extractedUserConfigMessage = compact(
userConfigMessagesMerged.map((m: RetrieveMessageItem) => {
return extractWebSocketContent(m.data, {}, m.hash);
})
);
extractedUserConfigMessage.forEach(m => {
Receiver.handleRequest(m.body, m.options, m.messageHash);
});
}
const extractedContentMessage = compact(
newMessages.map((m: RetrieveMessageItem) => {
const options = isGroup ? { conversationId: pkStr } : {};
return extractWebSocketContent(m.data, options, m.hash);
})
);
extractedContentMessage.forEach(m => {
Receiver.handleRequest(m.body, m.options, m.messageHash);
});
}

@ -245,7 +245,7 @@ export async function updateOrCreateClosedGroup(details: GroupInfo | GroupInfoV3
const updates: Pick<
ConversationAttributes,
| 'type'
| 'identityPrivateKey'
// | 'identityPrivateKey'
| 'members'
| 'displayNameInProfile'
| 'is_medium_group'
@ -260,7 +260,7 @@ export async function updateOrCreateClosedGroup(details: GroupInfo | GroupInfoV3
active_at: details.activeAt ? details.activeAt : 0,
left: details.activeAt ? false : true,
lastJoinedTimestamp: details.activeAt && weWereJustAdded ? Date.now() : details.activeAt || 0,
identityPrivateKey: isV3(details) ? details.identityPrivateKey : undefined,
// identityPrivateKey: isV3(details) ? details.identityPrivateKey : undefined,
};
console.warn('updates', updates);

@ -10,7 +10,7 @@ type IncomingMessageAvailableTypes =
| SignalService.DataExtractionNotification
| SignalService.Unsend
| SignalService.MessageRequestResponse
| SignalService.SharedConfigMessage;
| SignalService.ISharedConfigMessage;
export class IncomingMessage<T extends IncomingMessageAvailableTypes> {
public readonly envelopeTimestamp: number;

@ -333,7 +333,7 @@ export class MessageQueue {
message instanceof SharedConfigMessage ||
(message as any).syncTarget?.length > 0
) {
window?.log?.warn('Processing sync message');
window?.log?.warn('OutgoingMessageQueue: Processing sync message');
isSyncMessage = true;
} else {
window?.log?.warn('Dropping message in process() to be sent to ourself');

@ -9,6 +9,7 @@ import { downloadAttachment, downloadAttachmentSogsV3 } from '../../receiver/att
import { initializeAttachmentLogic, processNewAttachment } from '../../types/MessageAttachment';
import { getAttachmentMetadata } from '../../types/message/initializeAttachmentMetadata';
import { was404Error } from '../apis/snode_api/onions';
import { AttachmentDownloadMessageDetails } from '../../types/sqlSharedTypes';
// this may cause issues if we increment that value to > 1, but only having one job will block the whole queue while one attachment is downloading
const MAX_ATTACHMENT_JOB_PARALLELISM = 3;
@ -49,7 +50,7 @@ export function stop() {
}
}
export async function addJob(attachment: any, job: any = {}) {
export async function addJob(attachment: any, job: AttachmentDownloadMessageDetails) {
if (!attachment) {
throw new Error('attachments_download/addJob: attachment is required');
}

@ -43,7 +43,7 @@ export class PersistedJobRunner {
constructor(jobRunnerType: JobRunnerType, jobEventsListener: JobEventListener | null) {
this.jobRunnerType = jobRunnerType;
this.jobEventsListener = jobEventsListener;
console.warn('new runner');
window.log.warn('new runner');
}
public async loadJobsFromDb() {
@ -157,7 +157,7 @@ export class PersistedJobRunner {
private async writeJobsToDB() {
const serialized = this.getSerializedJobs();
console.warn('writing to db', serialized);
window.log.warn('writing to db', serialized);
await Data.createOrUpdateItem({
id: this.getJobRunnerItemId(),
value: JSON.stringify(serialized),
@ -171,7 +171,6 @@ export class PersistedJobRunner {
// a new job was added. trigger it if we can/have to start it
const result = this.planNextJob();
console.warn('addJobUnchecked: ', result);
if (result === 'no_job') {
throw new Error('We just pushed a job, there cannot be no job');
}
@ -258,7 +257,7 @@ export class PersistedJobRunner {
// if the time is 101, and that task is to be run at t=101, we need to start it right away.
if (nextJob.nextAttemptTimestamp > Date.now()) {
console.warn(
window.log.warn(
'next job is not due to be run just yet. Going idle.',
nextJob.nextAttemptTimestamp - Date.now()
);

@ -28,10 +28,10 @@ export class ConfigurationSyncJob extends Persistedjob {
public async run() {
// blablha do everything from the notion page, and if success, return true.
console.warn(`running job ${this.jobType} with id:"${this.identifier}" `);
window.log.warn(`running job ${this.jobType} with id:"${this.identifier}" `);
await sleepFor(5000);
console.warn(
window.log.warn(
`running job ${this.jobType} with id:"${this.identifier}" done and returning failed `
);

@ -42,11 +42,11 @@ export class FakeSleepForMultiJob extends Persistedjob {
}
public async run() {
console.warn(
window.log.warn(
`running job ${this.jobType} with id:"${this.identifier}". sleeping for ${this.sleepDuration} & returning ${this.returnResult} `
);
await sleepFor(this.sleepDuration);
console.warn(`${this.jobType} with id:"${this.identifier}" done. returning success `);
window.log.warn(`${this.jobType} with id:"${this.identifier}" done. returning success `);
return this.returnResult;
}
@ -85,9 +85,9 @@ export class FakeSleepForJob extends Persistedjob {
}
public async run() {
console.warn(`running job ${this.jobType} with id:"${this.identifier}" `);
window.log.warn(`running job ${this.jobType} with id:"${this.identifier}" `);
await sleepFor(5000);
console.warn(`${this.jobType} with id:"${this.identifier}" done. returning failed `);
window.log.warn(`${this.jobType} with id:"${this.identifier}" done. returning failed `);
return false;
}

@ -58,16 +58,16 @@ describe('JobRunner', () => {
clock = Sinon.useFakeTimers({ shouldAdvanceTime: true });
jobEventsListener = {
onJobDeferred: (_job: SerializedPersistedJob) => {
// console.warn('listener got deferred for job ', job);
// window.log.warn('listener got deferred for job ', job);
},
onJobSuccess: (_job: SerializedPersistedJob) => {
// console.warn('listener got success for job ', job);
// window.log.warn('listener got success for job ', job);
},
onJobError: (_job: SerializedPersistedJob) => {
// console.warn('listener got error for job ', job);
// window.log.warn('listener got error for job ', job);
},
onJobStarted: (_job: SerializedPersistedJob) => {
// console.warn('listener got started for job ', job);
// window.log.warn('listener got started for job ', job);
},
};
runner = new PersistedJobRunner('FakeSleepForJob', jobEventsListener);

@ -1,3 +1,4 @@
import { OpenGroupRequestCommonType } from '../session/apis/open_group_api/opengroupV2/ApiUtil';
import { ConfigWrapperObjectTypes } from '../webworker/workers/browser/libsession_worker_functions';
/**
@ -30,6 +31,8 @@ export type ConfigDumpRow = {
// we might need to add a `seqno` field here.
};
// ========== configdump
export type GetByVariantAndPubkeyConfigDump = (
variant: ConfigWrapperObjectTypes,
pubkey: string
@ -45,3 +48,39 @@ export type ConfigDumpDataNode = {
getAllDumpsWithData: GetAllDumps;
getAllDumpsWithoutData: GetAllDumps;
};
// ========== unprocessed
export type UnprocessedParameter = {
id: string;
version: number;
envelope: string;
timestamp: number;
// serverTimestamp: number;
attempts: number;
messageHash: string;
senderIdentity?: string;
decrypted?: string; // added once the envelopes's content is decrypted with updateCacheWithDecryptedContent
source?: string; // added once the envelopes's content is decrypted with updateCacheWithDecryptedContent
};
export type UnprocessedDataNode = {
saveUnprocessed: (data: UnprocessedParameter) => void;
updateUnprocessedAttempts: (id: string, attempts: number) => void;
updateUnprocessedWithData: (id: string, data: UnprocessedParameter) => void;
getUnprocessedById: (id: string) => UnprocessedParameter | undefined;
getUnprocessedCount: () => number;
getAllUnprocessed: () => Array<UnprocessedParameter>;
removeUnprocessed: (id: string) => void;
removeAllUnprocessed: () => void;
};
// ======== attachment downloads
export type AttachmentDownloadMessageDetails = {
messageId: string;
type: 'preview' | 'quote' | 'attachment';
index: number;
isOpenGroupV2: boolean;
openGroupV2Details: OpenGroupRequestCommonType | undefined;
};

@ -4965,6 +4965,13 @@ getobject@^1.0.0, getobject@~0.1.0, getobject@~1.0.0:
resolved "https://registry.yarnpkg.com/getobject/-/getobject-1.1.1.tgz#29f7858609fee7ef1c58d062f1b2335e425bdb45"
integrity sha512-Rftr+NsUMxFcCmFopFmyCCfsJPaqUmf7TW61CtKMu0aE93ir62I6VjXt2koiCQgcunGgVog/U6g24tBPq67rlg==
git@^0.1.5:
version "0.1.5"
resolved "https://registry.yarnpkg.com/git/-/git-0.1.5.tgz#9ef62df93f851c27542143bf52d1c68b1017ca15"
integrity sha512-N+bfOrXyKMU/fQtCj6D/U9MQOEN0DAA8TLHSLdUQRSWBOkeRvsjJHdrdkvcq05xO7GSDKWc3nDEGoTZ4DfCCSg==
dependencies:
mime "1.2.9"
glob-parent@^6.0.1, glob-parent@~5.1.2:
version "6.0.2"
resolved "https://registry.yarnpkg.com/glob-parent/-/glob-parent-6.0.2.tgz#6d237d99083950c79290f24c7642a3de9a28f9e3"
@ -6494,6 +6501,11 @@ mime-types@^2.1.12, mime-types@^2.1.27:
dependencies:
mime-db "1.52.0"
mime@1.2.9:
version "1.2.9"
resolved "https://registry.yarnpkg.com/mime/-/mime-1.2.9.tgz#009cd40867bd35de521b3b966f04e2f8d4d13d09"
integrity sha512-WiLgbHTIq5AYUvU/Luli4mZ1bUcHpGNHyCsbl+KPMg4zt+XUDpQehWjuBjdLaEvDTinvKj/FgfQt3fPoT7j08g==
mime@^2.4.6:
version "2.6.0"
resolved "https://registry.yarnpkg.com/mime/-/mime-2.6.0.tgz#a2a682a95cd4d0cb1d6257e28f83da7e35800367"
@ -6810,9 +6822,9 @@ node-gyp@9.0.0:
which "^2.0.2"
node-gyp@^9.3.0:
version "9.3.0"
resolved "https://registry.yarnpkg.com/node-gyp/-/node-gyp-9.3.0.tgz#f8eefe77f0ad8edb3b3b898409b53e697642b319"
integrity sha512-A6rJWfXFz7TQNjpldJ915WFb1LnhO4lIve3ANPbWreuEoLoKlFT3sxIepPBkLhM27crW8YmN+pjlgbasH6cH/Q==
version "9.3.1"
resolved "https://registry.yarnpkg.com/node-gyp/-/node-gyp-9.3.1.tgz#1e19f5f290afcc9c46973d68700cbd21a96192e4"
integrity sha512-4Q16ZCqq3g8awk6UplT7AuxQ35XN4R/yf/+wSAwcBUAjg7l58RTactWaP8fIDTi0FzI7YcVLujwExakZlfWkXg==
dependencies:
env-paths "^2.2.0"
glob "^7.1.4"
@ -8392,15 +8404,14 @@ serialize-javascript@6.0.0, serialize-javascript@^6.0.0:
dependencies:
randombytes "^2.1.0"
"session_util_wrapper@file:../libsession-util-nodejs":
session_util_wrapper@/home/audric/pro/contribs/libsession-util-nodejs:
version "0.1.0"
dependencies:
nan "^2.17.0"
node-gyp "^9.3.0"
"session_util_wrapper@https://github.com/oxen-io/libsession-util-nodejs":
"session_util_wrapper@file:../libsession-util-nodejs":
version "0.1.0"
resolved "https://github.com/oxen-io/libsession-util-nodejs#0951d0138a289795257198d556409b2e73068fc1"
dependencies:
nan "^2.17.0"
node-gyp "^9.3.0"

Loading…
Cancel
Save