feat: move avatar downloading to JobRunner

pull/2620/head
Audric Ackermann 2 years ago
parent 9cf1419ca5
commit 08a15b210a

@ -48,6 +48,8 @@ import { initializeLibSessionUtilWrappers } from '../../session/utils/libsession
import { isDarkTheme } from '../../state/selectors/theme';
import { ThemeStateType } from '../../themes/constants/colors';
import { switchThemeTo } from '../../themes/switchTheme';
import { AvatarDownloadJob } from '../../session/utils/job_runners/jobs/AvatarDownloadJob';
import { runners } from '../../session/utils/job_runners/JobRunner';
const Section = (props: { type: SectionType }) => {
const ourNumber = useSelector(getOurNumber);
@ -62,6 +64,20 @@ const Section = (props: { type: SectionType }) => {
const handleClick = async () => {
/* tslint:disable:no-void-expression */
if (type === SectionType.Profile) {
const us = UserUtils.getOurPubKeyStrFromCache();
const ourConvo = getConversationController().get(us);
const job = new AvatarDownloadJob({
conversationId: us,
currentRetry: 0,
delayBetweenRetries: 3000,
maxAttempts: 3,
profileKeyHex: ourConvo.get('profileKey') || null,
profilePictureUrl: ourConvo.get('avatarPointer') || null,
});
await runners.avatarDownloadRunner.loadJobsFromDb();
runners.avatarDownloadRunner.startProcessing();
await runners.avatarDownloadRunner.addJob(job);
dispatch(editProfileModal({}));
} else if (type === SectionType.ColorMode) {
const currentTheme = String(window.Events.getThemeSetting());

@ -11,6 +11,7 @@ import {
import { getOpenGroupV2ConversationId } from '../session/apis/open_group_api/utils/OpenGroupUtils';
import { getConversationController } from '../session/conversations';
import { IncomingMessage } from '../session/messages/incoming/IncomingMessage';
import { ProfileManager } from '../session/profile_manager/ProfileManager';
import { UserUtils } from '../session/utils';
import { toHex } from '../session/utils/String';
import { configurationMessageReceived, trigger } from '../shims/events';
@ -21,7 +22,6 @@ import { callLibSessionWorker } from '../webworker/workers/browser/libsession_wo
import { removeFromCache } from './cache';
import { handleNewClosedGroup } from './closedGroups';
import { EnvelopePlus } from './types';
import { appendFetchAvatarAndProfileJob, updateOurProfileSync } from './userProfileImageUpdates';
type IncomingConfResult = {
needsPush: boolean;
@ -101,10 +101,9 @@ async function handleUserProfileUpdate(result: IncomingConfResult) {
const picUpdate = !isEmpty(updatedProfilePicture.key) && !isEmpty(updatedProfilePicture.url);
// trigger an update of our profileName and picture if there is one.
// this call checks for differences between updating anything
void updateOurProfileSync(
{ displayName: updatedUserName, profilePicture: picUpdate ? updatedProfilePicture.url : null },
await ProfileManager.updateOurProfileSync(
updatedUserName,
picUpdate ? updatedProfilePicture.url : null,
picUpdate ? updatedProfilePicture.key : null
);
}
@ -157,18 +156,16 @@ async function handleContactsUpdate(result: IncomingConfResult) {
await existingConvo.setNickname(wrapperConvo.nickname || null, false);
changes = true;
}
// make sure to write the changes to the database now as the `appendFetchAvatarAndProfileJob` call below might take some time before getting run
// make sure to write the changes to the database now as the `AvatarDownloadJob` below might take some time before getting run
if (changes) {
await existingConvo.commit();
}
// we still need to handle the the `name` and the `profilePicture` but those are currently made asynchronously
void appendFetchAvatarAndProfileJob(
// we still need to handle the the `name` (sync) and the `profilePicture` (asynchronous)
await ProfileManager.updateProfileOfContact(
existingConvo.id,
{
displayName: wrapperConvo.name,
profilePicture: wrapperConvo.profilePicture?.url || null,
},
wrapperConvo.name,
wrapperConvo.profilePicture?.url || null,
wrapperConvo.profilePicture?.key || null
);
}
@ -243,11 +240,8 @@ async function handleOurProfileUpdate(
);
const { profileKey, profilePicture, displayName } = configMessage;
const lokiProfile = {
displayName,
profilePicture,
};
await updateOurProfileSync(lokiProfile, profileKey);
await ProfileManager.updateOurProfileSync(displayName, profilePicture, profileKey);
await setLastProfileUpdateTimestamp(_.toNumber(sentAt));
// do not trigger a signin by linking if the display name is empty
if (displayName) {
@ -398,10 +392,11 @@ const handleContactFromConfig = async (
await BlockedNumberController.unblock(contactConvo.id);
}
void appendFetchAvatarAndProfileJob(
await ProfileManager.updateProfileOfContact(
contactConvo.id,
profileInDataMessage,
contactReceived.profileKey
profileInDataMessage.displayName || undefined,
profileInDataMessage.profilePicture || null,
contactReceived.profileKey || null
);
} catch (e) {
window?.log?.warn('failed to handle a new closed group from configuration message');

@ -27,8 +27,8 @@ import {
} from '../interactions/conversations/unsendingInteractions';
import { ConversationTypeEnum } from '../models/conversationAttributes';
import { findCachedBlindedMatchOrLookupOnAllServers } from '../session/apis/open_group_api/sogsv3/knownBlindedkeys';
import { appendFetchAvatarAndProfileJob } from './userProfileImageUpdates';
import { IncomingMessage } from '../session/messages/incoming/IncomingMessage';
import { ProfileManager } from '../session/profile_manager/ProfileManager';
export async function handleSwarmContentMessage(envelope: EnvelopePlus, messageHash: string) {
try {
@ -700,9 +700,10 @@ async function handleMessageRequestResponse(
}
if (messageRequestResponse.profile && !isEmpty(messageRequestResponse.profile)) {
void appendFetchAvatarAndProfileJob(
await ProfileManager.updateProfileOfContact(
conversationToApprove.id,
messageRequestResponse.profile,
messageRequestResponse.profile.displayName,
messageRequestResponse.profile.profilePicture,
messageRequestResponse.profileKey
);
}

@ -18,11 +18,11 @@ import {
} from '../models/messageFactory';
import { MessageModel } from '../models/message';
import { isUsFromCache } from '../session/utils/User';
import { appendFetchAvatarAndProfileJob } from './userProfileImageUpdates';
import { toLogFormat } from '../types/attachments/Errors';
import { ConversationTypeEnum } from '../models/conversationAttributes';
import { Reactions } from '../util/reactions';
import { Action, Reaction } from '../types/Reaction';
import { ProfileManager } from '../session/profile_manager/ProfileManager';
function cleanAttachment(attachment: any) {
return {
@ -216,10 +216,10 @@ export async function handleSwarmDataMessage(
cleanDataMessage.profile &&
cleanDataMessage.profileKey?.length
) {
// do not await this
void appendFetchAvatarAndProfileJob(
await ProfileManager.updateProfileOfContact(
senderConversationModel.id,
cleanDataMessage.profile,
cleanDataMessage.profile.displayName,
cleanDataMessage.profile.profilePicture,
cleanDataMessage.profileKey
);
}

@ -13,9 +13,9 @@ import { showMessageRequestBanner } from '../state/ducks/userConfig';
import { MessageDirection } from '../models/messageType';
import { LinkPreviews } from '../util/linkPreviews';
import { GoogleChrome } from '../util';
import { appendFetchAvatarAndProfileJob } from './userProfileImageUpdates';
import { ConversationTypeEnum } from '../models/conversationAttributes';
import { getUsBlindedInThatServer } from '../session/apis/open_group_api/sogsv3/knownBlindedkeys';
import { ProfileManager } from '../session/profile_manager/ProfileManager';
function contentTypeSupported(type: string): boolean {
const Chrome = GoogleChrome;
@ -393,9 +393,10 @@ export async function handleMessageJob(
// the only profile we don't update with what is coming here is ours,
// as our profile is shared accross our devices with a ConfigurationMessage
if (messageModel.isIncoming() && regularDataMessage.profile) {
void appendFetchAvatarAndProfileJob(
await ProfileManager.updateProfileOfContact(
sendingDeviceConversation.id,
regularDataMessage.profile,
regularDataMessage.profile.displayName,
regularDataMessage.profile.profilePicture,
regularDataMessage.profileKey
);
}

@ -1,164 +0,0 @@
import Queue from 'queue-promise';
import ByteBuffer from 'bytebuffer';
import _ from 'lodash';
import { downloadAttachment } from './attachments';
import { allowOnlyOneAtATime, hasAlreadyOneAtaTimeMatching } from '../session/utils/Promise';
import { toHex } from '../session/utils/String';
import { processNewAttachment } from '../types/MessageAttachment';
import { MIME } from '../types';
import { autoScaleForIncomingAvatar } from '../util/attachmentsUtil';
import { decryptProfile } from '../util/crypto/profileEncrypter';
import { SignalService } from '../protobuf';
import { getConversationController } from '../session/conversations';
import { UserUtils } from '../session/utils';
const queue = new Queue({
concurrent: 1,
interval: 500,
});
queue.on('reject', error => {
window.log.warn('[profileupdate] task profile image update failed with', error);
});
export async function appendFetchAvatarAndProfileJob(
conversationId: string,
profileInDataMessage: SignalService.DataMessage.ILokiProfile,
profileKey?: Uint8Array | null
) {
if (!conversationId) {
window?.log?.warn('[profileupdate] Cannot update profile with empty convoid');
return;
}
const oneAtaTimeStr = `appendFetchAvatarAndProfileJob:${conversationId}`;
if (hasAlreadyOneAtaTimeMatching(oneAtaTimeStr)) {
return;
}
const task = allowOnlyOneAtATime(oneAtaTimeStr, async () => {
return createOrUpdateProfile(conversationId, profileInDataMessage, profileKey);
});
queue.enqueue(async () => task);
}
/**
* This function should be used only when we have to do a sync update to our conversation with a new profile/avatar image or display name
* It tries to fetch the profile image, scale it, save it, and update the conversationModel
*/
export async function updateOurProfileSync(
profileInDataMessage: SignalService.DataMessage.ILokiProfile,
profileKey?: Uint8Array | null
) {
const ourConvo = getConversationController().get(UserUtils.getOurPubKeyStrFromCache());
if (!ourConvo?.id) {
window?.log?.warn('[profileupdate] Cannot update our profile with empty convoid');
return;
}
const oneAtaTimeStr = `appendFetchAvatarAndProfileJob:${ourConvo.id}`;
return allowOnlyOneAtATime(oneAtaTimeStr, async () => {
return createOrUpdateProfile(ourConvo.id, profileInDataMessage, profileKey);
});
}
/**
* Creates a new profile from the profile provided. Creates the profile if it doesn't exist.
*/
async function createOrUpdateProfile(
conversationId: string,
profileInDataMessage: SignalService.DataMessage.ILokiProfile,
profileKey?: Uint8Array | null
) {
const conversation = getConversationController().get(conversationId);
if (!conversation) {
return;
}
if (!conversation.isPrivate()) {
window.log.warn('createOrUpdateProfile can only be used for private convos');
return;
}
const existingDisplayName = conversation.get('displayNameInProfile');
const newDisplayName = profileInDataMessage.displayName;
let changes = false;
if (existingDisplayName !== newDisplayName) {
changes = true;
conversation.set('displayNameInProfile', newDisplayName || undefined);
}
if (profileInDataMessage.profilePicture && profileKey) {
const prevPointer = conversation.get('avatarPointer');
const needsUpdate =
!prevPointer || !_.isEqual(prevPointer, profileInDataMessage.profilePicture);
if (needsUpdate) {
try {
window.log.debug(`[profileupdate] starting downloading task for ${conversation.id}`);
const downloaded = await downloadAttachment({
url: profileInDataMessage.profilePicture,
isRaw: true,
});
// null => use placeholder with color and first letter
let path = null;
if (profileKey) {
// Convert profileKey to ArrayBuffer, if needed
const encoding = typeof profileKey === 'string' ? 'base64' : undefined;
try {
const profileKeyArrayBuffer = ByteBuffer.wrap(profileKey, encoding).toArrayBuffer();
const decryptedData = await decryptProfile(downloaded.data, profileKeyArrayBuffer);
window.log.info(
`[profileupdate] about to auto scale avatar for convo ${conversation.id}`
);
const scaledData = await autoScaleForIncomingAvatar(decryptedData);
const upgraded = await processNewAttachment({
data: await scaledData.blob.arrayBuffer(),
contentType: MIME.IMAGE_UNKNOWN, // contentType is mostly used to generate previews and screenshot. We do not care for those in this case.
});
// Only update the convo if the download and decrypt is a success
conversation.set('avatarPointer', profileInDataMessage.profilePicture);
conversation.set('profileKey', toHex(profileKey));
({ path } = upgraded);
} catch (e) {
window?.log?.error(`[profileupdate] Could not decrypt profile image: ${e}`);
}
}
conversation.set({ avatarInProfile: path || undefined });
changes = true;
} catch (e) {
window.log.warn(
`[profileupdate] Failed to download attachment at ${profileInDataMessage.profilePicture}. Maybe it expired? ${e.message}`
);
// do not return here, we still want to update the display name even if the avatar failed to download
}
}
} else if (profileKey) {
conversation.set({ avatarInProfile: undefined });
}
if (conversation.id === UserUtils.getOurPubKeyStrFromCache()) {
// make sure the settings which should already set to `true` are
if (
!conversation.get('isTrustedForAttachmentDownload') ||
!conversation.get('isApproved') ||
!conversation.get('didApproveMe')
) {
conversation.set({
isTrustedForAttachmentDownload: true,
isApproved: true,
didApproveMe: true,
});
changes = true;
}
}
if (changes) {
await conversation.commit();
}
}

@ -0,0 +1,73 @@
import { to_hex } from 'libsodium-wrappers-sumo';
import { isEmpty } from 'lodash';
import { getConversationController } from '../conversations';
import { UserUtils } from '../utils';
import { runners } from '../utils/job_runners/JobRunner';
import {
AvatarDownloadJob,
shouldAddAvatarDownloadJob,
} from '../utils/job_runners/jobs/AvatarDownloadJob';
/**
* This can be used to update our conversation display name with the given name right away, and plan an AvatarDownloadJob to retrieve the new avatar if needed to download it
*/
async function updateOurProfileSync(
displayName: string | undefined,
profileUrl: string | null,
profileKey: Uint8Array | null
) {
const ourConvo = getConversationController().get(UserUtils.getOurPubKeyStrFromCache());
if (!ourConvo?.id) {
window?.log?.warn('[profileupdate] Cannot update our profile with empty convoid');
return;
}
return updateProfileOfContact(
UserUtils.getOurPubKeyStrFromCache(),
displayName,
profileUrl,
profileKey
);
}
/**
* This can be used to update the display name of the given pubkey right away, and plan an AvatarDownloadJob to retrieve the new avatar if needed to download it.
*/
async function updateProfileOfContact(
pubkey: string,
displayName: string | null | undefined,
profileUrl: string | null | undefined,
profileKey: Uint8Array | null | undefined
) {
const conversation = getConversationController().get(pubkey);
if (!conversation || !conversation.isPrivate()) {
window.log.warn('updateProfileOfContact can only be used for existing and private convos');
return;
}
const existingDisplayName = conversation.get('displayNameInProfile');
// avoid setting the display name to an invalid value
if (existingDisplayName !== displayName && !isEmpty(displayName)) {
conversation.set('displayNameInProfile', displayName || undefined);
await conversation.commit();
}
// add an avatar download job only if needed
const profileKeyHex = !profileKey || isEmpty(profileKey) ? null : to_hex(profileKey);
if (shouldAddAvatarDownloadJob({ pubkey, profileUrl, profileKeyHex })) {
const avatarDownloadJob = new AvatarDownloadJob({
conversationId: pubkey,
profileKeyHex,
profilePictureUrl: profileUrl || null,
});
await runners.avatarDownloadRunner.addJob(avatarDownloadJob);
}
}
export const ProfileManager = {
updateOurProfileSync,
updateProfileOfContact,
};

@ -3,40 +3,29 @@ import {
FakeSleepForJob,
FakeSleepForMultiJob,
} from '../../../test/session/unit/utils/job_runner/FakeSleepForJob';
import { AvatarDownloadJob } from './jobs/AvatarDownloadJob';
import { ConfigurationSyncJob } from './jobs/ConfigurationSyncJob';
import { Persistedjob, PersistedJobType, SerializedPersistedJob } from './PersistedJob';
import { PersistedJob, TypeOfPersistedData } from './PersistedJob';
export function persistedJobFromData(data: SerializedPersistedJob): Persistedjob | null {
export function persistedJobFromData<T extends TypeOfPersistedData>(
data: T
): PersistedJob<T> | null {
if (!data || isEmpty(data.jobType) || !isString(data?.jobType)) {
return null;
}
const jobType: PersistedJobType = data.jobType as PersistedJobType;
switch (jobType) {
switch (data.jobType) {
case 'ConfigurationSyncJobType':
return new ConfigurationSyncJob({
maxAttempts: data.maxAttempts,
identifier: data.identifier,
nextAttemptTimestamp: data.nextAttemptTimestamp,
currentRetry: data.currentRetry,
});
return (new ConfigurationSyncJob(data) as unknown) as PersistedJob<T>;
case 'AvatarDownloadJobType':
return (new AvatarDownloadJob(data) as unknown) as PersistedJob<T>;
case 'FakeSleepForJobType':
return new FakeSleepForJob({
maxAttempts: data.maxAttempts,
identifier: data.identifier,
nextAttemptTimestamp: data.nextAttemptTimestamp,
currentRetry: data.currentRetry,
});
return (new FakeSleepForJob(data) as unknown) as PersistedJob<T>;
case 'FakeSleepForJobMultiType':
return new FakeSleepForMultiJob({
maxAttempts: data.maxAttempts,
identifier: data.identifier,
nextAttemptTimestamp: data.nextAttemptTimestamp,
currentRetry: data.currentRetry,
returnResult: data.returnResult,
sleepDuration: data.sleepDuration,
});
return (new FakeSleepForMultiJob(data) as unknown) as PersistedJob<T>;
default:
console.warn('unknown persisted job type:', jobType);
console.warn('unknown persisted job type:', (data as any).jobType);
return null;
}
}

@ -2,7 +2,12 @@ import { cloneDeep, compact, isArray, isString } from 'lodash';
import { Data } from '../../../data/data';
import { persistedJobFromData } from './JobDeserialization';
import { JobRunnerType } from './jobs/JobRunnerType';
import { Persistedjob, SerializedPersistedJob } from './PersistedJob';
import {
AvatarDownloadPersistedData,
ConfigurationSyncPersistedData,
PersistedJob,
TypeOfPersistedData,
} from './PersistedJob';
/**
* 'job_in_progress' if there is already a job in progress
@ -15,10 +20,10 @@ export type StartProcessingResult = 'job_in_progress' | 'job_deferred' | 'job_st
export type AddJobResult = 'job_deferred' | 'job_started';
export type JobEventListener = {
onJobSuccess: (job: SerializedPersistedJob) => void;
onJobDeferred: (job: SerializedPersistedJob) => void;
onJobError: (job: SerializedPersistedJob) => void;
onJobStarted: (job: SerializedPersistedJob) => void;
onJobSuccess: (job: TypeOfPersistedData) => void;
onJobDeferred: (job: TypeOfPersistedData) => void;
onJobError: (job: TypeOfPersistedData) => void;
onJobStarted: (job: TypeOfPersistedData) => void;
};
/**
@ -31,26 +36,26 @@ export type JobEventListener = {
*
*
*/
export class PersistedJobRunner {
export class PersistedJobRunner<T extends TypeOfPersistedData> {
private isInit = false;
private jobsScheduled: Array<Persistedjob> = [];
private jobsScheduled: Array<PersistedJob<T>> = [];
private isStarted = false;
private readonly jobRunnerType: JobRunnerType;
private nextJobStartTimer: NodeJS.Timeout | null = null;
private currentJob: Persistedjob | null = null;
private currentJob: PersistedJob<T> | null = null;
private readonly jobEventsListener: JobEventListener | null;
constructor(jobRunnerType: JobRunnerType, jobEventsListener: JobEventListener | null) {
this.jobRunnerType = jobRunnerType;
this.jobEventsListener = jobEventsListener;
window.log.warn('new runner');
window?.log?.warn(`new runner of type ${jobRunnerType} built`);
}
public async loadJobsFromDb() {
if (this.isInit) {
throw new Error('job runner already init');
return;
}
let jobsArray: Array<SerializedPersistedJob> = [];
let jobsArray: Array<T> = [];
const found = await Data.getItemById(this.getJobRunnerItemId());
if (found && found.value && isString(found.value)) {
const asStr = found.value;
@ -67,7 +72,7 @@ export class PersistedJobRunner {
jobsArray = [];
}
}
const jobs: Array<Persistedjob> = compact(jobsArray.map(persistedJobFromData));
const jobs: Array<PersistedJob<T>> = compact(jobsArray.map(persistedJobFromData));
this.jobsScheduled = cloneDeep(jobs);
// make sure the list is sorted
this.sortJobsList();
@ -75,29 +80,42 @@ export class PersistedJobRunner {
}
public async addJob(
job: Persistedjob
job: PersistedJob<T>
): Promise<'type_exists' | 'identifier_exists' | AddJobResult> {
this.assertIsInitialized();
if (job.singleJobInQueue) {
// make sure there is no job with that same type already scheduled.
if (this.jobsScheduled.find(j => j.jobType === job.jobType)) {
console.info(
`job runner has already a job with type:"${job.jobType}" planned so not adding another one`
);
return 'type_exists';
}
return this.addJobUnchecked(job);
if (this.jobsScheduled.find(j => j.persistedData.identifier === job.persistedData.identifier)) {
window.log.info(
`job runner has already a job with id:"${job.persistedData.identifier}" planned so not adding another one`
);
return 'identifier_exists';
}
// make sure there is no job with that same identifier already .
if (this.jobsScheduled.find(j => j.identifier === job.identifier)) {
console.info(
`job runner has already a job with id:"${job.identifier}" planned so not adding another one`
const serializedNonRunningJobs = this.jobsScheduled
.filter(j => j !== this.currentJob)
.map(k => k.serializeJob());
const addJobChecks = job.addJobCheck(serializedNonRunningJobs);
if (addJobChecks === 'skipAsJobTypeAlreadyPresent') {
window.log.warn(
`job runner has already a job with type:"${job.persistedData.jobType}" planned so not adding another one`
);
return 'identifier_exists';
return 'type_exists';
}
// if addJobCheck returned 'removeJobsFromQueue it means that job logic estimates some jobs have to remove before adding that one.
// so let's grab the jobs to remove, remove them, and then add that new job nevertheless
if (addJobChecks === 'removeJobsFromQueue') {
// fetch all the jobs which we should remove and remove them
const toRemove = job.nonRunningJobsToRemove(serializedNonRunningJobs);
this.deleteJobsByIdentifier(toRemove.map(m => m.identifier));
this.sortJobsList();
await this.writeJobsToDB();
}
console.info(`job runner adding type :"${job.jobType}" `);
// make sure there is no job with that same identifier already .
window.log.info(`job runner adding type :"${job.persistedData.jobType}" `);
return this.addJobUnchecked(job);
}
@ -145,14 +163,16 @@ export class PersistedJobRunner {
public startProcessing(): StartProcessingResult {
if (this.isStarted) {
throw new Error('startProcessing already called');
return this.planNextJob();
}
this.isStarted = true;
return this.planNextJob();
}
private sortJobsList() {
this.jobsScheduled.sort((a, b) => a.nextAttemptTimestamp - b.nextAttemptTimestamp);
this.jobsScheduled.sort(
(a, b) => a.persistedData.nextAttemptTimestamp - b.persistedData.nextAttemptTimestamp
);
}
private async writeJobsToDB() {
@ -164,7 +184,8 @@ export class PersistedJobRunner {
});
}
private async addJobUnchecked(job: Persistedjob) {
private async addJobUnchecked(job: PersistedJob<T>) {
console.warn('job', job);
this.jobsScheduled.push(cloneDeep(job));
this.sortJobsList();
await this.writeJobsToDB();
@ -202,6 +223,7 @@ export class PersistedJobRunner {
return 'no_job';
}
}
if (this.currentJob) {
return 'job_in_progress';
}
@ -211,7 +233,7 @@ export class PersistedJobRunner {
return 'no_job';
}
if (nextJob.nextAttemptTimestamp <= Date.now()) {
if (nextJob.persistedData.nextAttemptTimestamp <= Date.now()) {
if (this.nextJobStartTimer) {
global.clearTimeout(this.nextJobStartTimer);
this.nextJobStartTimer = null;
@ -233,18 +255,20 @@ export class PersistedJobRunner {
this.nextJobStartTimer = null;
}
void this.runNextJob();
}, Math.max(nextJob.nextAttemptTimestamp - Date.now(), 1));
}, Math.max(nextJob.persistedData.nextAttemptTimestamp - Date.now(), 1));
return 'job_deferred';
}
private deleteJobByIdentifier(identifier: string) {
const jobIndex = this.jobsScheduled.findIndex(f => f.identifier === identifier);
console.info('deleteJobByIdentifier job', identifier, ' index', jobIndex);
private deleteJobsByIdentifier(identifiers: Array<string>) {
identifiers.forEach(identifier => {
const jobIndex = this.jobsScheduled.findIndex(f => f.persistedData.identifier === identifier);
window.log.info('deleteJobsByIdentifier job', identifier, ' index', jobIndex);
if (jobIndex >= 0) {
this.jobsScheduled.splice(jobIndex, 1);
}
if (jobIndex >= 0) {
this.jobsScheduled.splice(jobIndex, 1);
}
});
}
private async runNextJob() {
@ -256,10 +280,10 @@ export class PersistedJobRunner {
const nextJob = this.jobsScheduled[0];
// if the time is 101, and that task is to be run at t=101, we need to start it right away.
if (nextJob.nextAttemptTimestamp > Date.now()) {
if (nextJob.persistedData.nextAttemptTimestamp > Date.now()) {
window.log.warn(
'next job is not due to be run just yet. Going idle.',
nextJob.nextAttemptTimestamp - Date.now()
nextJob.persistedData.nextAttemptTimestamp - Date.now()
);
this.planNextJob();
return;
@ -274,25 +298,26 @@ export class PersistedJobRunner {
this.jobEventsListener?.onJobStarted(this.currentJob.serializeJob());
const success = await this.currentJob.runJob();
if (!success) {
throw new Error(`job ${nextJob.identifier} failed`);
throw new Error(`job ${nextJob.persistedData.identifier} failed`);
}
// here the job did not throw and didn't return false. Consider it OK then and remove it from the list of jobs to run.
this.deleteJobByIdentifier(this.currentJob.identifier);
this.deleteJobsByIdentifier([this.currentJob.persistedData.identifier]);
await this.writeJobsToDB();
} catch (e) {
// either the job throw or didn't return 'OK'
if (nextJob.currentRetry >= nextJob.maxAttempts - 1) {
if (nextJob.persistedData.currentRetry >= nextJob.persistedData.maxAttempts - 1) {
// we cannot restart this job anymore. Remove the entry completely
this.deleteJobByIdentifier(nextJob.identifier);
this.deleteJobsByIdentifier([nextJob.persistedData.identifier]);
if (this.jobEventsListener && this.currentJob) {
this.jobEventsListener.onJobError(this.currentJob.serializeJob());
}
} else {
nextJob.currentRetry = nextJob.currentRetry + 1;
nextJob.persistedData.currentRetry = nextJob.persistedData.currentRetry + 1;
// that job can be restarted. Plan a retry later with the already defined retry
nextJob.nextAttemptTimestamp = Date.now() + nextJob.delayBetweenRetries;
nextJob.persistedData.nextAttemptTimestamp =
Date.now() + nextJob.persistedData.delayBetweenRetries;
if (this.jobEventsListener && this.currentJob) {
this.jobEventsListener.onJobDeferred(this.currentJob.serializeJob());
}
@ -322,8 +347,16 @@ export class PersistedJobRunner {
}
}
const configurationSyncRunner = new PersistedJobRunner('ConfigurationSyncJob', null);
const configurationSyncRunner = new PersistedJobRunner<ConfigurationSyncPersistedData>(
'ConfigurationSyncJob',
null
);
const avatarDownloadRunner = new PersistedJobRunner<AvatarDownloadPersistedData>(
'AvatarDownloadJob',
null
);
export const runners = {
configurationSyncRunner,
avatarDownloadRunner,
};

@ -1,75 +1,84 @@
import { isEmpty } from 'lodash';
import { cloneDeep, isEmpty } from 'lodash';
export type PersistedJobType =
| 'ConfigurationSyncJobType'
| 'AvatarDownloadJobType'
| 'FakeSleepForJobType'
| 'FakeSleepForJobMultiType';
export type SerializedPersistedJob = {
// we need at least those as they are needed to do lookups of the list of jobs.
jobType: string;
interface PersistedJobData {
jobType: PersistedJobType;
identifier: string;
nextAttemptTimestamp: number;
maxAttempts: number; // to run try to run it twice, set this to 2.
currentRetry: number; //
// then we can have other details on a specific type of job case
[key: string]: any;
};
export abstract class Persistedjob {
public readonly identifier: string;
public readonly singleJobInQueue: boolean;
public readonly delayBetweenRetries: number;
public readonly maxAttempts: number;
public readonly jobType: PersistedJobType;
public currentRetry: number;
public nextAttemptTimestamp: number;
delayBetweenRetries: number;
maxAttempts: number; // to try to run this job twice, set this to 2.
currentRetry: number;
}
export interface FakeSleepJobData extends PersistedJobData {
jobType: 'FakeSleepForJobType';
returnResult: boolean;
sleepDuration: number;
}
export interface FakeSleepForMultiJobData extends PersistedJobData {
jobType: 'FakeSleepForJobMultiType';
returnResult: boolean;
sleepDuration: number;
}
export interface AvatarDownloadPersistedData extends PersistedJobData {
jobType: 'AvatarDownloadJobType';
conversationId: string;
profileKeyHex: string | null;
profilePictureUrl: string | null;
}
export interface ConfigurationSyncPersistedData extends PersistedJobData {
jobType: 'ConfigurationSyncJobType';
}
export type TypeOfPersistedData =
| ConfigurationSyncPersistedData
| AvatarDownloadPersistedData
| FakeSleepJobData
| FakeSleepForMultiJobData;
/**
* This class can be used to save and run jobs from the database.
* Every child class must take the minimum amount of arguments, and make sure they are unlikely to change.
* For instance, don't have the attachments to downloads as arguments, just the messageId and the index.
* Don't have the new profileImage url for an avatar download job, just the conversationId.
*
* It is the role of the job to fetch the latest data, and decide if a process is needed or not
* If the job throws or returns false, it will be retried by the corresponding job runner.
*/
export abstract class PersistedJob<T extends PersistedJobData> {
public persistedData: T;
private runningPromise: Promise<boolean> | null = null;
public constructor({
maxAttempts,
delayBetweenRetries,
identifier,
singleJobInQueue,
jobType,
nextAttemptTimestamp,
}: {
identifier: string;
maxAttempts: number;
delayBetweenRetries: number;
singleJobInQueue: boolean;
jobType: PersistedJobType;
nextAttemptTimestamp: number;
currentRetry: number;
}) {
this.identifier = identifier;
this.jobType = jobType;
this.delayBetweenRetries = delayBetweenRetries;
this.maxAttempts = maxAttempts;
this.currentRetry = 0;
this.singleJobInQueue = singleJobInQueue;
this.nextAttemptTimestamp = nextAttemptTimestamp;
if (maxAttempts < 1) {
public constructor(data: T) {
if (data.maxAttempts < 1) {
throw new Error('maxAttempts must be >= 1');
}
if (isEmpty(identifier)) {
if (isEmpty(data.identifier)) {
throw new Error('identifier must be not empty');
}
if (isEmpty(jobType)) {
throw new Error('identifier must be not empty');
if (isEmpty(data.jobType)) {
throw new Error('jobType must be not empty');
}
if (delayBetweenRetries <= 0) {
if (data.delayBetweenRetries <= 0) {
throw new Error('delayBetweenRetries must be at least > 0');
}
if (nextAttemptTimestamp <= 0) {
if (data.nextAttemptTimestamp <= 0) {
throw new Error('nextAttemptTimestamp must be set and > 0');
}
this.persistedData = data;
}
public async runJob() {
@ -86,30 +95,40 @@ export abstract class Persistedjob {
public async waitForCurrentTry() {
try {
// tslint:disable-next-line: no-promise-as-boolean
return this.runningPromise || Promise.resolve();
return this.runningPromise || Promise.resolve(true);
} catch (e) {
window.log.warn('waitForCurrentTry got an error: ', e.message);
return Promise.resolve();
return Promise.resolve(true);
}
}
/**
* This one must be reimplemented in the child class, and must first call `super.serializeBase()`
*/
public abstract serializeJob(): SerializedPersistedJob;
protected abstract run(): Promise<boolean>; // must return true if that job is a success and doesn't need to be retried
protected serializeBase(): SerializedPersistedJob {
return {
// those are mandatory
jobType: this.jobType,
identifier: this.identifier,
nextAttemptTimestamp: this.nextAttemptTimestamp,
maxAttempts: this.maxAttempts,
currentRetry: this.currentRetry,
delayBetweenRetries: this.delayBetweenRetries,
singleJobInQueue: this.singleJobInQueue,
};
public abstract serializeJob(): T;
public abstract nonRunningJobsToRemove(jobs: Array<T>): Array<T>;
public abstract addJobCheck(
jobs: Array<T>
): 'skipAsJobTypeAlreadyPresent' | 'removeJobsFromQueue' | null;
public addJobCheckSameTypePresent(jobs: Array<T>): 'skipAsJobTypeAlreadyPresent' | null {
return jobs.some(j => j.jobType === this.persistedData.jobType)
? 'skipAsJobTypeAlreadyPresent'
: null;
}
/**
* This function will be called by the runner do run the logic of that job.
* It **must** return true if that job is a success and doesn't need to be retried.
* If it returns false, or throws, it will be retried (if not reach the retries limit yet).
*
* Note: you should check the this.isAborted() to know if you should cancel the current processing of your logic.
*/
protected abstract run(): Promise<boolean>;
protected serializeBase(): T {
return cloneDeep(this.persistedData);
}
}

@ -0,0 +1,233 @@
import { isEmpty, isEqual, isNumber } from 'lodash';
import { v4 } from 'uuid';
import { UserUtils } from '../..';
import { downloadAttachment } from '../../../../receiver/attachments';
import { MIME } from '../../../../types';
import { processNewAttachment } from '../../../../types/MessageAttachment';
import { autoScaleForIncomingAvatar } from '../../../../util/attachmentsUtil';
import { decryptProfile } from '../../../../util/crypto/profileEncrypter';
import { getConversationController } from '../../../conversations';
import { fromHexToArray } from '../../String';
import { AvatarDownloadPersistedData, PersistedJob } from '../PersistedJob';
const defaultMsBetweenRetries = 10000;
const defaultMaxAttemps = 3;
/**
* Returns true if given those details we should add an Avatar Download Job to the list of jobs to run
*/
export function shouldAddAvatarDownloadJob({
profileKeyHex,
profileUrl,
pubkey,
}: {
pubkey: string;
profileUrl: string | null | undefined;
profileKeyHex: string | null | undefined;
}) {
const conversation = getConversationController().get(pubkey);
if (!conversation) {
// return true so we do not retry this task.
window.log.warn('shouldAddAvatarDownloadJob did not corresponding conversation');
return false;
}
if (!conversation.isPrivate()) {
window.log.warn('shouldAddAvatarDownloadJob can only be used for private convos currently');
return false;
}
if (profileUrl && !isEmpty(profileKeyHex)) {
const prevPointer = conversation.get('avatarPointer');
const needsUpdate = !prevPointer || !isEqual(prevPointer, profileUrl);
return needsUpdate;
}
return false;
}
/**
* This job can be used to add the downloading of the avatar of a conversation to the list of jobs to be run.
* The conversationId is used as identifier so we can only have a single job per conversation.
* When the jobRunners starts this job, the job first checks if a download is required or not (avatarPointer changed and wasn't already downloaded).
* If yes, it downloads the new avatar, decrypt it and store it before updating the conversation with the new url,profilekey and local file storage.
*/
export class AvatarDownloadJob extends PersistedJob<AvatarDownloadPersistedData> {
constructor({
conversationId,
nextAttemptTimestamp,
maxAttempts,
currentRetry,
profileKeyHex,
profilePictureUrl,
identifier,
}: Pick<AvatarDownloadPersistedData, 'profileKeyHex' | 'profilePictureUrl'> & {
conversationId: string;
} & Partial<
Pick<
AvatarDownloadPersistedData,
| 'nextAttemptTimestamp'
| 'identifier'
| 'maxAttempts'
| 'delayBetweenRetries'
| 'currentRetry'
>
>) {
super({
jobType: 'AvatarDownloadJobType',
identifier: identifier || v4(),
conversationId,
delayBetweenRetries: defaultMsBetweenRetries,
maxAttempts: isNumber(maxAttempts) ? maxAttempts : defaultMaxAttemps,
nextAttemptTimestamp: nextAttemptTimestamp || Date.now() + defaultMsBetweenRetries,
currentRetry: isNumber(currentRetry) ? currentRetry : 0,
profileKeyHex,
profilePictureUrl,
});
}
// tslint:disable-next-line: cyclomatic-complexity
public async run(): Promise<boolean> {
const convoId = this.persistedData.conversationId;
window.log.warn(
`running job ${this.persistedData.jobType} with conversationId:"${convoId}" id:"${this.persistedData.identifier}" `
);
if (!this.persistedData.identifier || !convoId) {
// return true so we do not retry this task.
return true;
}
let conversation = getConversationController().get(convoId);
if (!conversation) {
// return true so we do not retry this task.
window.log.warn('AvatarDownloadJob did not corresponding conversation');
return true;
}
if (!conversation.isPrivate()) {
window.log.warn('AvatarDownloadJob can only be used for private convos currently');
return true;
}
let changes = false;
const shouldRunJob = shouldAddAvatarDownloadJob({
pubkey: convoId,
profileKeyHex: this.persistedData.profileKeyHex,
profileUrl: this.persistedData.profilePictureUrl,
});
if (!shouldRunJob) {
// return true so we do not retry this task.
window.log.warn('AvatarDownloadJob shouldAddAvatarDownloadJob said no');
return true;
}
if (this.persistedData.profilePictureUrl && this.persistedData.profileKeyHex) {
const prevPointer = conversation.get('avatarPointer');
const needsUpdate =
!prevPointer || !isEqual(prevPointer, this.persistedData.profilePictureUrl);
if (needsUpdate) {
try {
window.log.debug(`[profileupdate] starting downloading task for ${conversation.id}`);
const downloaded = await downloadAttachment({
url: this.persistedData.profilePictureUrl,
isRaw: true,
});
conversation = getConversationController().getOrThrow(convoId);
// null => use placeholder with color and first letter
let path = null;
try {
const profileKeyArrayBuffer = fromHexToArray(this.persistedData.profileKeyHex);
const decryptedData = await decryptProfile(downloaded.data, profileKeyArrayBuffer);
window.log.info(
`[profileupdate] about to auto scale avatar for convo ${conversation.id}`
);
const scaledData = await autoScaleForIncomingAvatar(decryptedData);
const upgraded = await processNewAttachment({
data: await scaledData.blob.arrayBuffer(),
contentType: MIME.IMAGE_UNKNOWN, // contentType is mostly used to generate previews and screenshot. We do not care for those in this case.
});
conversation = getConversationController().getOrThrow(convoId);
// Only update the convo if the download and decrypt is a success
conversation.set('avatarPointer', this.persistedData.profilePictureUrl);
conversation.set('profileKey', this.persistedData.profileKeyHex || undefined);
({ path } = upgraded);
} catch (e) {
window?.log?.error(`[profileupdate] Could not decrypt profile image: ${e}`);
}
conversation.set({ avatarInProfile: path || undefined });
changes = true;
} catch (e) {
window.log.warn(
`[profileupdate] Failed to download attachment at ${this.persistedData.profilePictureUrl}. Maybe it expired? ${e.message}`
);
// do not return here, we still want to update the display name even if the avatar failed to download
}
}
} else {
if (
conversation.get('avatarInProfile') ||
conversation.get('avatarPointer') ||
conversation.get('profileKey')
) {
changes = true;
conversation.set({
avatarInProfile: undefined,
avatarPointer: undefined,
profileKey: undefined,
});
}
}
if (conversation.id === UserUtils.getOurPubKeyStrFromCache()) {
// make sure the settings which should already set to `true` are
if (
!conversation.get('isTrustedForAttachmentDownload') ||
!conversation.get('isApproved') ||
!conversation.get('didApproveMe')
) {
conversation.set({
isTrustedForAttachmentDownload: true,
isApproved: true,
didApproveMe: true,
});
changes = true;
}
}
if (changes) {
await conversation.commit();
}
// return true so this job is marked as a success
return true;
}
public serializeJob(): AvatarDownloadPersistedData {
return super.serializeBase();
}
public nonRunningJobsToRemove(jobs: Array<AvatarDownloadPersistedData>) {
// for an avatar download job, we want to remove any job matching the same conversationID.
return jobs.filter(j => j.conversationId === this.persistedData.conversationId);
}
public addJobCheck(
jobs: Array<AvatarDownloadPersistedData>
): 'skipAsJobTypeAlreadyPresent' | 'removeJobsFromQueue' | null {
if (this.nonRunningJobsToRemove(jobs).length) {
return 'removeJobsFromQueue';
}
return null;
}
}

@ -1,45 +1,58 @@
import { isNumber } from 'lodash';
import { v4 } from 'uuid';
import { sleepFor } from '../../Promise';
import { Persistedjob, SerializedPersistedJob } from '../PersistedJob';
import { ConfigurationSyncPersistedData, PersistedJob } from '../PersistedJob';
export class ConfigurationSyncJob extends Persistedjob {
const defaultMsBetweenRetries = 3000;
export class ConfigurationSyncJob extends PersistedJob<ConfigurationSyncPersistedData> {
constructor({
identifier,
nextAttemptTimestamp,
maxAttempts,
currentRetry,
}: {
identifier: string | null;
nextAttemptTimestamp: number | null;
maxAttempts: number | null;
currentRetry: number;
}) {
}: Pick<ConfigurationSyncPersistedData, 'identifier' | 'currentRetry' | 'maxAttempts'> &
Partial<Pick<ConfigurationSyncPersistedData, 'nextAttemptTimestamp'>>) {
super({
jobType: 'ConfigurationSyncJobType',
identifier: identifier || v4(),
delayBetweenRetries: 3000,
maxAttempts: isNumber(maxAttempts) ? maxAttempts : 3,
nextAttemptTimestamp: nextAttemptTimestamp || Date.now() + 3000,
singleJobInQueue: true,
delayBetweenRetries: defaultMsBetweenRetries,
maxAttempts: maxAttempts,
nextAttemptTimestamp: nextAttemptTimestamp || Date.now() + defaultMsBetweenRetries,
currentRetry,
});
}
public async run() {
// blablha do everything from the notion page, and if success, return true.
window.log.warn(`running job ${this.jobType} with id:"${this.identifier}" `);
window.log.warn(
`running job ${this.persistedData.jobType} with id:"${this.persistedData.identifier}" `
);
await sleepFor(5000);
window.log.warn(
`running job ${this.jobType} with id:"${this.identifier}" done and returning failed `
`running job ${this.persistedData.jobType} with id:"${this.persistedData.identifier}" done and returning failed `
);
return false;
}
public serializeJob(): SerializedPersistedJob {
public serializeJob(): ConfigurationSyncPersistedData {
const fromParent = super.serializeBase();
return fromParent;
}
public addJobCheck(
jobs: Array<ConfigurationSyncPersistedData>
): 'skipAsJobTypeAlreadyPresent' | 'removeJobsFromQueue' | null {
return this.addJobCheckSameTypePresent(jobs);
}
/**
* For the SharedConfig job, we do not care about the jobs already in the list.
* We never want to add a new sync configuration job if there is already one in the queue.
* This is done by the `addJobCheck` method above
*/
public nonRunningJobsToRemove(_jobs: Array<ConfigurationSyncPersistedData>) {
return [];
}
}

@ -1 +1,5 @@
export type JobRunnerType = 'ConfigurationSyncJob' | 'FakeSleepForJob';
export type JobRunnerType =
| 'ConfigurationSyncJob'
| 'FakeSleepForJob'
| 'FakeSleepForMultiJob'
| 'AvatarDownloadJob';

@ -2,14 +2,12 @@ import { isNumber } from 'lodash';
import { v4 } from 'uuid';
import { sleepFor } from '../../../../../session/utils/Promise';
import {
Persistedjob,
SerializedPersistedJob,
FakeSleepForMultiJobData,
FakeSleepJobData,
PersistedJob,
} from '../../../../../session/utils/job_runners/PersistedJob';
export class FakeSleepForMultiJob extends Persistedjob {
private readonly sleepDuration: number;
private readonly returnResult: boolean;
export class FakeSleepForMultiJob extends PersistedJob<FakeSleepForMultiJobData> {
constructor({
identifier,
nextAttemptTimestamp,
@ -17,25 +15,20 @@ export class FakeSleepForMultiJob extends Persistedjob {
currentRetry,
returnResult,
sleepDuration,
}: {
identifier: string | null;
nextAttemptTimestamp: number | null;
maxAttempts: number | null;
currentRetry: number;
sleepDuration: number;
returnResult: boolean;
}) {
}: Pick<FakeSleepForMultiJobData, 'currentRetry' | 'returnResult' | 'sleepDuration'> &
Partial<
Pick<FakeSleepForMultiJobData, 'nextAttemptTimestamp' | 'maxAttempts' | 'identifier'>
>) {
super({
jobType: 'FakeSleepForJobMultiType',
identifier: identifier || v4(),
delayBetweenRetries: 10000,
maxAttempts: isNumber(maxAttempts) ? maxAttempts : 3,
nextAttemptTimestamp: nextAttemptTimestamp || Date.now() + 3000,
singleJobInQueue: false,
currentRetry,
returnResult,
sleepDuration,
});
this.returnResult = returnResult;
this.sleepDuration = sleepDuration;
if (process.env.NODE_APP_INSTANCE !== undefined) {
throw new Error('FakeSleepForJobMultiType are only meant for testing purposes');
}
@ -43,41 +36,53 @@ export class FakeSleepForMultiJob extends Persistedjob {
public async run() {
window.log.warn(
`running job ${this.jobType} with id:"${this.identifier}". sleeping for ${this.sleepDuration} & returning ${this.returnResult} `
`running job ${this.persistedData.jobType} with id:"${this.persistedData.identifier}". sleeping for ${this.persistedData.sleepDuration} & returning ${this.persistedData.returnResult} `
);
await sleepFor(this.persistedData.sleepDuration);
window.log.warn(
`${this.persistedData.jobType} with id:"${this.persistedData.identifier}" done. returning success `
);
await sleepFor(this.sleepDuration);
window.log.warn(`${this.jobType} with id:"${this.identifier}" done. returning success `);
return this.returnResult;
return this.persistedData.returnResult;
}
public serializeJob(): FakeSleepForMultiJobData {
return super.serializeBase();
}
/**
* For the fakesleep for multi, we want to allow as many job as we want, so this returns null
*/
public addJobCheck(
_jobs: Array<FakeSleepForMultiJobData>
): 'skipAsJobTypeAlreadyPresent' | 'removeJobsFromQueue' | null {
return null;
}
public serializeJob(): SerializedPersistedJob {
const fromParent = super.serializeBase();
fromParent.sleepDuration = this.sleepDuration;
fromParent.returnResult = this.returnResult;
return fromParent;
/**
* For the MultiFakeSleep job, there are no jobs to remove if we try to add a new one of the same type.
*/
public nonRunningJobsToRemove(_jobs: Array<FakeSleepForMultiJobData>) {
return [];
}
}
export class FakeSleepForJob extends Persistedjob {
export class FakeSleepForJob extends PersistedJob<FakeSleepJobData> {
constructor({
identifier,
nextAttemptTimestamp,
maxAttempts,
currentRetry,
}: {
identifier: string | null;
nextAttemptTimestamp: number | null;
maxAttempts: number | null;
currentRetry: number;
}) {
}: Pick<FakeSleepJobData, 'currentRetry' | 'maxAttempts'> &
Partial<Pick<FakeSleepJobData, 'nextAttemptTimestamp' | 'identifier'>>) {
super({
jobType: 'FakeSleepForJobType',
identifier: identifier || v4(),
delayBetweenRetries: 10000,
maxAttempts: isNumber(maxAttempts) ? maxAttempts : 3,
maxAttempts,
nextAttemptTimestamp: nextAttemptTimestamp || Date.now() + 3000,
singleJobInQueue: true,
currentRetry,
returnResult: false,
sleepDuration: 5000,
});
if (process.env.NODE_APP_INSTANCE !== undefined) {
throw new Error('FakeSleepForJob are only meant for testing purposes');
@ -85,14 +90,32 @@ export class FakeSleepForJob extends Persistedjob {
}
public async run() {
window.log.warn(`running job ${this.jobType} with id:"${this.identifier}" `);
await sleepFor(5000);
window.log.warn(`${this.jobType} with id:"${this.identifier}" done. returning failed `);
window.log.warn(
`running job ${this.persistedData.jobType} with id:"${this.persistedData.identifier}" `
);
await sleepFor(this.persistedData.sleepDuration);
window.log.warn(
`${this.persistedData.jobType} with id:"${this.persistedData.identifier}" done. returning failed `
);
return false;
}
public serializeJob(): SerializedPersistedJob {
const fromParent = super.serializeBase();
return fromParent;
public serializeJob(): FakeSleepJobData {
return super.serializeBase();
}
public addJobCheck(
jobs: Array<FakeSleepJobData>
): 'skipAsJobTypeAlreadyPresent' | 'removeJobsFromQueue' | null {
return this.addJobCheckSameTypePresent(jobs);
}
/**
* For the FakeSleep job, we do not care about the jobs already in the list.
* We just never want to add a new job of that type if there is already one in the queue.
* This is done by the `addJobCheck` method above
*/
public nonRunningJobsToRemove(_jobs: Array<FakeSleepJobData>) {
return [];
}
}

@ -7,7 +7,11 @@ import {
PersistedJobRunner,
} from '../../../../../session/utils/job_runners/JobRunner';
import { FakeSleepForJob, FakeSleepForMultiJob } from './FakeSleepForJob';
import { SerializedPersistedJob } from '../../../../../session/utils/job_runners/PersistedJob';
import {
FakeSleepForMultiJobData,
FakeSleepJobData,
TypeOfPersistedData,
} from '../../../../../session/utils/job_runners/PersistedJob';
import { sleepFor } from '../../../../../session/utils/Promise';
import { stubData } from '../../../../test-utils/utils';
@ -21,7 +25,7 @@ function getFakeSleepForJob(timestamp: number): FakeSleepForJob {
return job;
}
function getFakeSleepForJobPersisted(timestamp: number): SerializedPersistedJob {
function getFakeSleepForJobPersisted(timestamp: number): FakeSleepJobData {
return getFakeSleepForJob(timestamp).serializeJob();
}
@ -49,7 +53,8 @@ function getFakeSleepForMultiJob({
describe('JobRunner', () => {
let getItemById: Sinon.SinonStub;
let clock: Sinon.SinonFakeTimers;
let runner: PersistedJobRunner;
let runner: PersistedJobRunner<FakeSleepJobData>;
let runnerMulti: PersistedJobRunner<FakeSleepForMultiJobData>;
let jobEventsListener: JobEventListener;
beforeEach(() => {
@ -57,31 +62,40 @@ describe('JobRunner', () => {
stubData('createOrUpdateItem');
clock = Sinon.useFakeTimers({ shouldAdvanceTime: true });
jobEventsListener = {
onJobDeferred: (_job: SerializedPersistedJob) => {
onJobDeferred: (_job: TypeOfPersistedData) => {
// window.log.warn('listener got deferred for job ', job);
},
onJobSuccess: (_job: SerializedPersistedJob) => {
onJobSuccess: (_job: TypeOfPersistedData) => {
// window.log.warn('listener got success for job ', job);
},
onJobError: (_job: SerializedPersistedJob) => {
onJobError: (_job: TypeOfPersistedData) => {
// window.log.warn('listener got error for job ', job);
},
onJobStarted: (_job: SerializedPersistedJob) => {
onJobStarted: (_job: TypeOfPersistedData) => {
// window.log.warn('listener got started for job ', job);
},
};
runner = new PersistedJobRunner('FakeSleepForJob', jobEventsListener);
runner = new PersistedJobRunner<FakeSleepJobData>('FakeSleepForJob', jobEventsListener);
runnerMulti = new PersistedJobRunner<FakeSleepForMultiJobData>(
'FakeSleepForMultiJob',
jobEventsListener
);
});
afterEach(() => {
Sinon.restore();
runner.resetForTesting();
runnerMulti.resetForTesting();
});
describe('loadJobsFromDb', () => {
it('throw if already loaded', async () => {
await runner.loadJobsFromDb();
it('throw if not loaded', async () => {
try {
getItemById.resolves({
id: '',
value: JSON.stringify([]),
});
await runner.loadJobsFromDb();
throw new Error('PLOP'); // the line above should throw something else
} catch (e) {
@ -154,21 +168,21 @@ describe('JobRunner', () => {
});
it('can add a FakeSleepForJobMulti (sorted) even if one is already there', async () => {
await runner.loadJobsFromDb();
await runnerMulti.loadJobsFromDb();
const job = getFakeSleepForMultiJob({ timestamp: 1234 });
const job2 = getFakeSleepForMultiJob({ timestamp: 123 });
const job3 = getFakeSleepForMultiJob({ timestamp: 1 });
let result = await runner.addJob(job);
let result = await runnerMulti.addJob(job);
expect(result).to.eq('job_deferred');
result = await runner.addJob(job2);
result = await runnerMulti.addJob(job2);
expect(result).to.eq('job_deferred');
result = await runner.addJob(job3);
result = await runnerMulti.addJob(job3);
expect(result).to.eq('job_deferred');
expect(runner.getJobList()).to.deep.eq([
expect(runnerMulti.getJobList()).to.deep.eq([
job3.serializeJob(),
job2.serializeJob(),
job.serializeJob(),
@ -176,87 +190,90 @@ describe('JobRunner', () => {
});
it('cannot add a FakeSleepForJobMulti with an id already existing', async () => {
await runner.loadJobsFromDb();
await runnerMulti.loadJobsFromDb();
const job = getFakeSleepForMultiJob({ timestamp: 1234 });
const job2 = getFakeSleepForMultiJob({ timestamp: 123, identifier: job.identifier });
let result = await runner.addJob(job);
const job2 = getFakeSleepForMultiJob({
timestamp: 123,
identifier: job.persistedData.identifier,
});
let result = await runnerMulti.addJob(job);
expect(result).to.be.eq('job_deferred');
result = await runner.addJob(job2);
result = await runnerMulti.addJob(job2);
expect(result).to.be.eq('identifier_exists');
expect(runner.getJobList()).to.deep.eq([job.serializeJob()]);
expect(runnerMulti.getJobList()).to.deep.eq([job.serializeJob()]);
});
it('two jobs are running sequentially', async () => {
await runner.loadJobsFromDb();
await runnerMulti.loadJobsFromDb();
const job = getFakeSleepForMultiJob({ timestamp: 100 });
const job2 = getFakeSleepForMultiJob({ timestamp: 200 });
runner.startProcessing();
runnerMulti.startProcessing();
clock.tick(110);
// job should be started right away
let result = await runner.addJob(job);
let result = await runnerMulti.addJob(job);
expect(result).to.eq('job_started');
result = await runner.addJob(job2);
result = await runnerMulti.addJob(job2);
expect(result).to.eq('job_deferred');
expect(runner.getJobList()).to.deep.eq([job.serializeJob(), job2.serializeJob()]);
expect(runner.getJobList()).to.deep.eq([job.serializeJob(), job2.serializeJob()]);
expect(runnerMulti.getJobList()).to.deep.eq([job.serializeJob(), job2.serializeJob()]);
expect(runnerMulti.getJobList()).to.deep.eq([job.serializeJob(), job2.serializeJob()]);
// each job takes 5s to finish, so let's tick once the first one should be done
clock.tick(5010);
await runner.waitCurrentJob();
await runnerMulti.waitCurrentJob();
clock.tick(5010);
await runner.waitCurrentJob();
await runnerMulti.waitCurrentJob();
expect(runner.getJobList()).to.deep.eq([job2.serializeJob()]);
expect(runnerMulti.getJobList()).to.deep.eq([job2.serializeJob()]);
clock.tick(5000);
await runner.waitCurrentJob();
await runnerMulti.waitCurrentJob();
expect(runner.getJobList()).to.deep.eq([]);
expect(runnerMulti.getJobList()).to.deep.eq([]);
});
it('adding one job after the first is done starts it', async () => {
await runner.loadJobsFromDb();
await runnerMulti.loadJobsFromDb();
const job = getFakeSleepForMultiJob({ timestamp: 100 });
const job2 = getFakeSleepForMultiJob({ timestamp: 120 });
runner.startProcessing();
runnerMulti.startProcessing();
clock.tick(110);
// job should be started right away
let result = await runner.addJob(job);
expect(runner.getJobList()).to.deep.eq([job.serializeJob()]);
let result = await runnerMulti.addJob(job);
expect(runnerMulti.getJobList()).to.deep.eq([job.serializeJob()]);
expect(result).to.eq('job_started');
clock.tick(5010);
await runner.waitCurrentJob();
await runnerMulti.waitCurrentJob();
clock.tick(5010);
// just give some time for the runner to pick up a new job
// just give some time for the runnerMulti to pick up a new job
await sleepFor(100);
// the first job should already be finished now
result = await runner.addJob(job2);
result = await runnerMulti.addJob(job2);
expect(result).to.eq('job_started');
expect(runner.getJobList()).to.deep.eq([job2.serializeJob()]);
expect(runnerMulti.getJobList()).to.deep.eq([job2.serializeJob()]);
// each job takes 5s to finish, so let's tick once the first one should be done
clock.tick(5010);
await runner.waitCurrentJob();
await runnerMulti.waitCurrentJob();
expect(runner.getJobList()).to.deep.eq([]);
expect(runnerMulti.getJobList()).to.deep.eq([]);
});
it('adding one job after the first is done schedules it', async () => {
await runner.loadJobsFromDb();
await runnerMulti.loadJobsFromDb();
const job = getFakeSleepForMultiJob({ timestamp: 100 });
runner.startProcessing();
runnerMulti.startProcessing();
clock.tick(110);
// job should be started right away
let result = await runner.addJob(job);
expect(runner.getJobList()).to.deep.eq([job.serializeJob()]);
let result = await runnerMulti.addJob(job);
expect(runnerMulti.getJobList()).to.deep.eq([job.serializeJob()]);
expect(result).to.eq('job_started');
clock.tick(5010);
await runner.waitCurrentJob();
await runnerMulti.waitCurrentJob();
clock.tick(5010);
// just give some time for the runner to pick up a new job
@ -265,10 +282,10 @@ describe('JobRunner', () => {
const job2 = getFakeSleepForMultiJob({ timestamp: clock.now + 100 });
// job should already be finished now
result = await runner.addJob(job2);
result = await runnerMulti.addJob(job2);
// new job should be deferred as timestamp is not in the past
expect(result).to.eq('job_deferred');
expect(runner.getJobList()).to.deep.eq([job2.serializeJob()]);
expect(runnerMulti.getJobList()).to.deep.eq([job2.serializeJob()]);
// tick enough for the job to need to be started
clock.tick(100);
@ -278,9 +295,9 @@ describe('JobRunner', () => {
clock.tick(5000);
await job2.waitForCurrentTry();
await runner.waitCurrentJob();
await runnerMulti.waitCurrentJob();
expect(runner.getJobList()).to.deep.eq([]);
expect(runnerMulti.getJobList()).to.deep.eq([]);
});
});
@ -352,27 +369,27 @@ describe('JobRunner', () => {
});
it('does await if there are jobs and one is started', async () => {
await runner.loadJobsFromDb();
await runnerMulti.loadJobsFromDb();
const job = getFakeSleepForMultiJob({ timestamp: 100, returnResult: false }); // this job keeps failing
runner.startProcessing();
runnerMulti.startProcessing();
clock.tick(110);
// job should be started right away
const result = await runner.addJob(job);
expect(runner.getJobList()).to.deep.eq([job.serializeJob()]);
const result = await runnerMulti.addJob(job);
expect(runnerMulti.getJobList()).to.deep.eq([job.serializeJob()]);
expect(result).to.eq('job_started');
clock.tick(5010);
await runner.waitCurrentJob();
await runnerMulti.waitCurrentJob();
const jobUpdated = {
...job.serializeJob(),
nextAttemptTimestamp: clock.now + 10000,
currentRetry: 1,
};
// just give time for the runner to pick up a new job
// just give time for the runnerMulti to pick up a new job
await sleepFor(10);
// the job failed, so the job should still be there
expect(runner.getJobList()).to.deep.eq([jobUpdated]);
expect(runnerMulti.getJobList()).to.deep.eq([jobUpdated]);
// that job should be retried now
clock.tick(11000);
@ -384,16 +401,16 @@ describe('JobRunner', () => {
};
await sleepFor(10);
await runner.waitCurrentJob();
expect(runner.getJobList()).to.deep.eq([jobUpdated2]);
await runnerMulti.waitCurrentJob();
expect(runnerMulti.getJobList()).to.deep.eq([jobUpdated2]);
// that job should be retried one more time and then removed from the list of jobs to be run
clock.tick(11000);
await runner.waitCurrentJob();
await runnerMulti.waitCurrentJob();
await sleepFor(10);
await runner.waitCurrentJob();
expect(runner.getJobList()).to.deep.eq([]);
await runnerMulti.waitCurrentJob();
expect(runnerMulti.getJobList()).to.deep.eq([]);
});
});
});

Loading…
Cancel
Save