sync fixes and refactorings

This commit is contained in:
zadam
2023-07-29 21:59:20 +02:00
parent 2a7fe85020
commit 04b125afc0
14 changed files with 109 additions and 190 deletions

View File

@@ -9,10 +9,8 @@ const optionService = require('../../services/options');
const contentHashService = require('../../services/content_hash');
const log = require('../../services/log');
const syncOptions = require('../../services/sync_options');
const dateUtils = require('../../services/date_utils');
const utils = require('../../services/utils');
const ws = require('../../services/ws');
const becca = require("../../becca/becca");
async function testSync() {
try {
@@ -84,54 +82,14 @@ function forceFullSync() {
syncService.sync();
}
function forceNoteSync(req) {
const noteId = req.params.noteId;
const note = becca.getNote(noteId);
const now = dateUtils.utcNowDateTime();
sql.execute(`UPDATE notes SET utcDateModified = ? WHERE noteId = ?`, [now, noteId]);
entityChangesService.moveEntityChangeToTop('notes', noteId);
sql.execute(`UPDATE blobs SET utcDateModified = ? WHERE blobId = ?`, [now, note.blobId]);
entityChangesService.moveEntityChangeToTop('blobs', note.blobId);
for (const branchId of sql.getColumn("SELECT branchId FROM branches WHERE noteId = ?", [noteId])) {
sql.execute(`UPDATE branches SET utcDateModified = ? WHERE branchId = ?`, [now, branchId]);
entityChangesService.moveEntityChangeToTop('branches', branchId);
}
for (const attributeId of sql.getColumn("SELECT attributeId FROM attributes WHERE noteId = ?", [noteId])) {
sql.execute(`UPDATE attributes SET utcDateModified = ? WHERE attributeId = ?`, [now, attributeId]);
entityChangesService.moveEntityChangeToTop('attributes', attributeId);
}
for (const revisionId of sql.getColumn("SELECT revisionId FROM revisions WHERE noteId = ?", [noteId])) {
sql.execute(`UPDATE revisions SET utcDateModified = ? WHERE revisionId = ?`, [now, revisionId]);
entityChangesService.moveEntityChangeToTop('revisions', revisionId);
}
for (const attachmentId of sql.getColumn("SELECT attachmentId FROM attachments WHERE noteId = ?", [noteId])) {
sql.execute(`UPDATE attachments SET utcDateModified = ? WHERE attachmentId = ?`, [now, attachmentId]);
entityChangesService.moveEntityChangeToTop('attachments', attachmentId);
}
log.info(`Forcing note sync for ${noteId}`);
// not awaiting for the job to finish (will probably take a long time)
syncService.sync();
}
function getChanged(req) {
const startTime = Date.now();
let lastEntityChangeId = parseInt(req.query.lastEntityChangeId);
const clientinstanceId = req.query.instanceId;
const clientInstanceId = req.query.instanceId;
let filteredEntityChanges = [];
while (filteredEntityChanges.length === 0) {
do {
const entityChanges = sql.getRows(`
SELECT *
FROM entity_changes
@@ -144,20 +102,22 @@ function getChanged(req) {
break;
}
filteredEntityChanges = entityChanges.filter(ec => ec.instanceId !== clientinstanceId);
filteredEntityChanges = entityChanges.filter(ec => ec.instanceId !== clientInstanceId);
if (filteredEntityChanges.length === 0) {
lastEntityChangeId = entityChanges[entityChanges.length - 1].id;
}
}
} while (filteredEntityChanges.length === 0);
const entityChangeRecords = syncService.getEntityChangeRecords(filteredEntityChanges);
if (entityChangeRecords.length > 0) {
lastEntityChangeId = entityChangeRecords[entityChangeRecords.length - 1].entityChange.id;
log.info(`Returning ${entityChangeRecords.length} entity changes in ${Date.now() - startTime}ms`);
}
const ret = {
return {
entityChanges: entityChangeRecords,
lastEntityChangeId,
outstandingPullCount: sql.getValue(`
@@ -165,14 +125,8 @@ function getChanged(req) {
FROM entity_changes
WHERE isSynced = 1
AND instanceId != ?
AND id > ?`, [clientinstanceId, lastEntityChangeId])
AND id > ?`, [clientInstanceId, lastEntityChangeId])
};
if (ret.entityChanges.length > 0) {
log.info(`Returning ${ret.entityChanges.length} entity changes in ${Date.now() - startTime}ms`);
}
return ret;
}
const partialRequests = {};
@@ -194,12 +148,12 @@ function update(req) {
}
if (!partialRequests[requestId]) {
throw new Error(`Partial request ${requestId}, index ${pageIndex} of ${pageCount} of pages does not have expected record.`);
throw new Error(`Partial request ${requestId}, page ${pageIndex + 1} of ${pageCount} of pages does not have expected record.`);
}
partialRequests[requestId].payload += req.body;
log.info(`Receiving partial request ${requestId}, page index ${pageIndex} out of ${pageCount} pages.`);
log.info(`Receiving a partial request ${requestId}, page ${pageIndex + 1} out of ${pageCount} pages.`);
if (pageIndex !== pageCount - 1) {
return;
@@ -212,9 +166,11 @@ function update(req) {
const {entities, instanceId} = body;
for (const {entityChange, entity} of entities) {
syncUpdateService.updateEntity(entityChange, entity, instanceId);
}
sql.transactional(() => {
for (const {entityChange, entity} of entities) {
syncUpdateService.updateEntity(entityChange, entity, instanceId);
}
});
}
setInterval(() => {
@@ -241,8 +197,7 @@ function queueSector(req) {
}
function checkEntityChanges() {
const consistencyChecks = require("../../services/consistency_checks");
consistencyChecks.runEntityChangesChecks();
require("../../services/consistency_checks").runEntityChangesChecks();
}
module.exports = {
@@ -251,7 +206,6 @@ module.exports = {
syncNow,
fillEntityChanges,
forceFullSync,
forceNoteSync,
getChanged,
update,
getStats,