Introduce mandatary migration on startup, to minimum version

pull/1/head
Scott Nonnenberg 7 years ago
parent f38647dfa5
commit 02675312c5

@ -165,11 +165,64 @@
window.log.info('Storage fetch'); window.log.info('Storage fetch');
storage.fetch(); storage.fetch();
const MINIMUM_VERSION = 7;
async function upgradeMessages() {
const NUM_MESSAGES_PER_BATCH = 10;
window.log.info(
'upgradeMessages: Mandatory message schema upgrade started.',
`Target version: ${MINIMUM_VERSION}`
);
let isMigrationWithoutIndexComplete = false;
while (!isMigrationWithoutIndexComplete) {
const database = Migrations0DatabaseWithAttachmentData.getDatabase();
// eslint-disable-next-line no-await-in-loop
const batchWithoutIndex = await MessageDataMigrator.processNextBatchWithoutIndex(
{
databaseName: database.name,
minDatabaseVersion: database.version,
numMessagesPerBatch: NUM_MESSAGES_PER_BATCH,
upgradeMessageSchema,
maxVersion: MINIMUM_VERSION,
BackboneMessage: Whisper.Message,
saveMessage: window.Signal.Data.saveMessage,
}
);
window.log.info(
'upgradeMessages: upgrade without index',
batchWithoutIndex
);
isMigrationWithoutIndexComplete = batchWithoutIndex.done;
}
window.log.info('upgradeMessages: upgrade without index complete!');
let isMigrationWithIndexComplete = false;
while (!isMigrationWithIndexComplete) {
// eslint-disable-next-line no-await-in-loop
const batchWithIndex = await MessageDataMigrator.processNext({
BackboneMessage: Whisper.Message,
BackboneMessageCollection: Whisper.MessageCollection,
numMessagesPerBatch: NUM_MESSAGES_PER_BATCH,
upgradeMessageSchema,
getMessagesNeedingUpgrade: window.Signal.Data.getMessagesNeedingUpgrade,
saveMessage: window.Signal.Data.saveMessage,
maxVersion: MINIMUM_VERSION,
});
window.log.info('upgradeMessages: upgrade with index', batchWithIndex);
isMigrationWithIndexComplete = batchWithIndex.done;
}
window.log.info('upgradeMessages: upgrade with index complete!');
window.log.info('upgradeMessages: Message schema upgrade complete');
}
await upgradeMessages();
const idleDetector = new IdleDetector(); const idleDetector = new IdleDetector();
let isMigrationWithIndexComplete = false; let isMigrationWithIndexComplete = false;
let isMigrationWithoutIndexComplete = false; window.log.info('Starting background data migration. Target version: latest');
idleDetector.on('idle', async () => { idleDetector.on('idle', async () => {
window.log.info('Idle processing started');
const NUM_MESSAGES_PER_BATCH = 1; const NUM_MESSAGES_PER_BATCH = 1;
if (!isMigrationWithIndexComplete) { if (!isMigrationWithIndexComplete) {
@ -185,27 +238,8 @@
isMigrationWithIndexComplete = batchWithIndex.done; isMigrationWithIndexComplete = batchWithIndex.done;
} }
if (!isMigrationWithoutIndexComplete) { if (isMigrationWithIndexComplete) {
const database = Migrations0DatabaseWithAttachmentData.getDatabase(); window.log.info('Background migration complete. Stopping idle detector.');
const batchWithoutIndex = await MessageDataMigrator.processNextBatchWithoutIndex(
{
databaseName: database.name,
minDatabaseVersion: database.version,
numMessagesPerBatch: NUM_MESSAGES_PER_BATCH,
upgradeMessageSchema,
}
);
window.log.info(
'Upgrade message schema (without index):',
batchWithoutIndex
);
isMigrationWithoutIndexComplete = batchWithoutIndex.done;
}
const areAllMigrationsComplete =
isMigrationWithIndexComplete && isMigrationWithoutIndexComplete;
if (areAllMigrationsComplete) {
window.log.info('All migrations are complete. Stopping idle detector.');
idleDetector.stop(); idleDetector.stop();
} }
}); });

@ -256,7 +256,10 @@ async function removeAll() {
// erase everything in the database // erase everything in the database
} }
async function getMessagesNeedingUpgrade(limit, { MessageCollection }) { async function getMessagesNeedingUpgrade(
limit,
{ MessageCollection, maxVersion = MessageType.CURRENT_SCHEMA_VERSION }
) {
const messages = new MessageCollection(); const messages = new MessageCollection();
await deferredToPromise( await deferredToPromise(
@ -264,7 +267,7 @@ async function getMessagesNeedingUpgrade(limit, { MessageCollection }) {
limit, limit,
index: { index: {
name: 'schemaVersion', name: 'schemaVersion',
upper: MessageType.CURRENT_SCHEMA_VERSION, upper: maxVersion,
excludeUpper: true, excludeUpper: true,
order: 'desc', order: 'desc',
}, },

@ -21,6 +21,7 @@ exports.processNext = async ({
upgradeMessageSchema, upgradeMessageSchema,
getMessagesNeedingUpgrade, getMessagesNeedingUpgrade,
saveMessage, saveMessage,
maxVersion = Message.CURRENT_SCHEMA_VERSION,
} = {}) => { } = {}) => {
if (!isFunction(BackboneMessage)) { if (!isFunction(BackboneMessage)) {
throw new TypeError( throw new TypeError(
@ -49,6 +50,7 @@ exports.processNext = async ({
const messagesRequiringSchemaUpgrade = await getMessagesNeedingUpgrade( const messagesRequiringSchemaUpgrade = await getMessagesNeedingUpgrade(
numMessagesPerBatch, numMessagesPerBatch,
{ {
maxVersion,
MessageCollection: BackboneMessageCollection, MessageCollection: BackboneMessageCollection,
} }
); );
@ -56,7 +58,9 @@ exports.processNext = async ({
const upgradeStartTime = Date.now(); const upgradeStartTime = Date.now();
const upgradedMessages = await Promise.all( const upgradedMessages = await Promise.all(
messagesRequiringSchemaUpgrade.map(upgradeMessageSchema) messagesRequiringSchemaUpgrade.map(message =>
upgradeMessageSchema(message, { maxVersion })
)
); );
const upgradeDuration = Date.now() - upgradeStartTime; const upgradeDuration = Date.now() - upgradeStartTime;
@ -87,6 +91,9 @@ exports.dangerouslyProcessAllWithoutIndex = async ({
numMessagesPerBatch, numMessagesPerBatch,
upgradeMessageSchema, upgradeMessageSchema,
logger, logger,
maxVersion = Message.CURRENT_SCHEMA_VERSION,
saveMessage,
BackboneMessage,
} = {}) => { } = {}) => {
if (!isString(databaseName)) { if (!isString(databaseName)) {
throw new TypeError("'databaseName' must be a string"); throw new TypeError("'databaseName' must be a string");
@ -99,10 +106,15 @@ exports.dangerouslyProcessAllWithoutIndex = async ({
if (!isNumber(numMessagesPerBatch)) { if (!isNumber(numMessagesPerBatch)) {
throw new TypeError("'numMessagesPerBatch' must be a number"); throw new TypeError("'numMessagesPerBatch' must be a number");
} }
if (!isFunction(upgradeMessageSchema)) { if (!isFunction(upgradeMessageSchema)) {
throw new TypeError("'upgradeMessageSchema' is required"); throw new TypeError("'upgradeMessageSchema' is required");
} }
if (!isFunction(BackboneMessage)) {
throw new TypeError("'upgradeMessageSchema' is required");
}
if (!isFunction(saveMessage)) {
throw new TypeError("'upgradeMessageSchema' is required");
}
const connection = await database.open(databaseName); const connection = await database.open(databaseName);
const databaseVersion = connection.version; const databaseVersion = connection.version;
@ -133,6 +145,9 @@ exports.dangerouslyProcessAllWithoutIndex = async ({
connection, connection,
numMessagesPerBatch, numMessagesPerBatch,
upgradeMessageSchema, upgradeMessageSchema,
maxVersion,
saveMessage,
BackboneMessage,
}); });
if (status.done) { if (status.done) {
break; break;
@ -162,6 +177,9 @@ exports.processNextBatchWithoutIndex = async ({
minDatabaseVersion, minDatabaseVersion,
numMessagesPerBatch, numMessagesPerBatch,
upgradeMessageSchema, upgradeMessageSchema,
maxVersion,
BackboneMessage,
saveMessage,
} = {}) => { } = {}) => {
if (!isFunction(upgradeMessageSchema)) { if (!isFunction(upgradeMessageSchema)) {
throw new TypeError("'upgradeMessageSchema' is required"); throw new TypeError("'upgradeMessageSchema' is required");
@ -172,6 +190,9 @@ exports.processNextBatchWithoutIndex = async ({
connection, connection,
numMessagesPerBatch, numMessagesPerBatch,
upgradeMessageSchema, upgradeMessageSchema,
maxVersion,
BackboneMessage,
saveMessage,
}); });
return batch; return batch;
}; };
@ -203,17 +224,29 @@ const _processBatch = async ({
connection, connection,
numMessagesPerBatch, numMessagesPerBatch,
upgradeMessageSchema, upgradeMessageSchema,
maxVersion,
BackboneMessage,
saveMessage,
} = {}) => { } = {}) => {
if (!isObject(connection)) { if (!isObject(connection)) {
throw new TypeError("'connection' must be a string"); throw new TypeError('_processBatch: connection must be a string');
} }
if (!isFunction(upgradeMessageSchema)) { if (!isFunction(upgradeMessageSchema)) {
throw new TypeError("'upgradeMessageSchema' is required"); throw new TypeError('_processBatch: upgradeMessageSchema is required');
} }
if (!isNumber(numMessagesPerBatch)) { if (!isNumber(numMessagesPerBatch)) {
throw new TypeError("'numMessagesPerBatch' is required"); throw new TypeError('_processBatch: numMessagesPerBatch is required');
}
if (!isNumber(maxVersion)) {
throw new TypeError('_processBatch: maxVersion is required');
}
if (!isFunction(BackboneMessage)) {
throw new TypeError('_processBatch: BackboneMessage is required');
}
if (!isFunction(saveMessage)) {
throw new TypeError('_processBatch: saveMessage is required');
} }
const isAttachmentMigrationComplete = await settings.isAttachmentMigrationComplete( const isAttachmentMigrationComplete = await settings.isAttachmentMigrationComplete(
@ -241,14 +274,20 @@ const _processBatch = async ({
const upgradeStartTime = Date.now(); const upgradeStartTime = Date.now();
const upgradedMessages = await Promise.all( const upgradedMessages = await Promise.all(
unprocessedMessages.map(upgradeMessageSchema) unprocessedMessages.map(message =>
upgradeMessageSchema(message, { maxVersion })
)
); );
const upgradeDuration = Date.now() - upgradeStartTime; const upgradeDuration = Date.now() - upgradeStartTime;
const saveMessagesStartTime = Date.now(); const saveMessagesStartTime = Date.now();
const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readwrite'); const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readwrite');
const transactionCompletion = database.completeTransaction(transaction); const transactionCompletion = database.completeTransaction(transaction);
await Promise.all(upgradedMessages.map(_saveMessage({ transaction }))); await Promise.all(
upgradedMessages.map(message =>
saveMessage(message, { Message: BackboneMessage })
)
);
await transactionCompletion; await transactionCompletion;
const saveDuration = Date.now() - saveMessagesStartTime; const saveDuration = Date.now() - saveMessagesStartTime;
@ -281,19 +320,6 @@ const _processBatch = async ({
}; };
}; };
const _saveMessage = ({ transaction } = {}) => message => {
if (!isObject(transaction)) {
throw new TypeError("'transaction' is required");
}
const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME);
const request = messagesStore.put(message, message.id);
return new Promise((resolve, reject) => {
request.onsuccess = () => resolve();
request.onerror = event => reject(event.target.error);
});
};
// NOTE: Named dangerous because it is not as efficient as using our // NOTE: Named dangerous because it is not as efficient as using our
// `messages` `schemaVersion` index: // `messages` `schemaVersion` index:
const _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex = ({ const _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex = ({

@ -125,8 +125,10 @@ function initializeMigrations({
loadMessage: MessageType.createAttachmentLoader(loadAttachmentData), loadMessage: MessageType.createAttachmentLoader(loadAttachmentData),
Migrations0DatabaseWithAttachmentData, Migrations0DatabaseWithAttachmentData,
Migrations1DatabaseWithoutAttachmentData, Migrations1DatabaseWithoutAttachmentData,
upgradeMessageSchema: message => upgradeMessageSchema: (message, options = {}) => {
MessageType.upgradeSchema(message, { const { maxVersion } = options;
return MessageType.upgradeSchema(message, {
writeNewAttachmentData: createWriterForNew(attachmentsPath), writeNewAttachmentData: createWriterForNew(attachmentsPath),
getRegionCode, getRegionCode,
getAbsoluteAttachmentPath, getAbsoluteAttachmentPath,
@ -136,7 +138,9 @@ function initializeMigrations({
makeImageThumbnail, makeImageThumbnail,
makeVideoScreenshot, makeVideoScreenshot,
logger, logger,
}), maxVersion,
});
},
writeMessageAttachments: MessageType.createAttachmentDataWriter({ writeMessageAttachments: MessageType.createAttachmentDataWriter({
writeExistingAttachmentData: createWriterForExisting(attachmentsPath), writeExistingAttachmentData: createWriterForExisting(attachmentsPath),
logger, logger,

@ -296,6 +296,7 @@ exports.upgradeSchema = async (
makeImageThumbnail, makeImageThumbnail,
makeVideoScreenshot, makeVideoScreenshot,
logger, logger,
maxVersion = exports.CURRENT_SCHEMA_VERSION,
} = {} } = {}
) => { ) => {
if (!isFunction(writeNewAttachmentData)) { if (!isFunction(writeNewAttachmentData)) {
@ -328,7 +329,12 @@ exports.upgradeSchema = async (
let message = rawMessage; let message = rawMessage;
// eslint-disable-next-line no-restricted-syntax // eslint-disable-next-line no-restricted-syntax
for (const currentVersion of VERSIONS) { for (let index = 0, max = VERSIONS.length; index < max; index += 1) {
if (maxVersion < index) {
break;
}
const currentVersion = VERSIONS[index];
// We really do want this intra-loop await because this is a chained async action, // We really do want this intra-loop await because this is a chained async action,
// each step dependent on the previous // each step dependent on the previous
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop

Loading…
Cancel
Save