add memberId to entity_changes to avoid having to resend all changes second time

This commit is contained in:
zadam
2022-01-09 20:16:39 +01:00
parent c448d34a38
commit 7159b13c9d
25 changed files with 11520 additions and 266 deletions

View File

@@ -4,7 +4,7 @@ const log = require('./log');
const sql = require('./sql');
const optionService = require('./options');
const utils = require('./utils');
const sourceIdService = require('./source_id');
const memberId = require('./member_id');
const dateUtils = require('./date_utils');
const syncUpdateService = require('./sync_update');
const contentHashService = require('./content_hash');
@@ -107,11 +107,11 @@ async function doLogin() {
hash: hash
});
if (sourceIdService.isLocalSourceId(resp.sourceId)) {
throw new Error(`Sync server has source ID ${resp.sourceId} which is also local. This usually happens when the sync client is (mis)configured to sync with itself (URL points back to client) instead of the correct sync server.`);
if (resp.memberId === memberId) {
throw new Error(`Sync server has member ID ${resp.memberId} which is also local. This usually happens when the sync client is (mis)configured to sync with itself (URL points back to client) instead of the correct sync server.`);
}
syncContext.sourceId = resp.sourceId;
syncContext.memberId = resp.memberId;
const lastSyncedPull = getLastSyncedPull();
@@ -131,7 +131,7 @@ async function pullChanges(syncContext) {
while (true) {
const lastSyncedPull = getLastSyncedPull();
const changesUri = '/api/sync/changed?lastEntityChangeId=' + lastSyncedPull;
const changesUri = `/api/sync/changed?memberId=${memberId}&lastEntityChangeId=${lastSyncedPull}`;
const startDate = Date.now();
@@ -141,36 +141,38 @@ async function pullChanges(syncContext) {
outstandingPullCount = Math.max(0, resp.maxEntityChangeId - lastSyncedPull);
const {entityChanges} = resp;
if (entityChanges.length === 0) {
break;
}
const {entityChanges, lastEntityChangeId} = resp;
sql.transactional(() => {
for (const {entityChange, entity} of entityChanges) {
const changeAppliedAlready = !entityChange.changeId
|| !!sql.getValue("SELECT id FROM entity_changes WHERE changeId = ?", [entityChange.changeId]);
if (!changeAppliedAlready && !sourceIdService.isLocalSourceId(entityChange.sourceId)) {
if (!changeAppliedAlready) {
if (!atLeastOnePullApplied) { // send only for first
ws.syncPullInProgress();
atLeastOnePullApplied = true;
}
syncUpdateService.updateEntity(entityChange, entity);
syncUpdateService.updateEntity(entityChange, entity, syncContext.memberId);
}
outstandingPullCount = Math.max(0, resp.maxEntityChangeId - entityChange.id);
}
setLastSyncedPull(entityChanges[entityChanges.length - 1].entityChange.id);
if (lastSyncedPull !== lastEntityChangeId) {
setLastSyncedPull(lastEntityChangeId);
}
});
const sizeInKb = Math.round(JSON.stringify(resp).length / 1024);
if (entityChanges.length === 0) {
break;
} else {
const sizeInKb = Math.round(JSON.stringify(resp).length / 1024);
log.info(`Pulled ${entityChanges.length} changes in ${sizeInKb} KB, starting at entityChangeId=${lastSyncedPull} in ${pulledDate - startDate}ms and applied them in ${Date.now() - pulledDate}ms, ${outstandingPullCount} outstanding pulls`);
log.info(`Pulled ${entityChanges.length} changes in ${sizeInKb} KB, starting at entityChangeId=${lastSyncedPull} in ${pulledDate - startDate}ms and applied them in ${Date.now() - pulledDate}ms, ${outstandingPullCount} outstanding pulls`);
}
}
log.info("Finished pull");
@@ -189,10 +191,9 @@ async function pushChanges(syncContext) {
}
const filteredEntityChanges = entityChanges.filter(entityChange => {
if (entityChange.sourceId === syncContext.sourceId) {
if (entityChange.memberId === syncContext.memberId) {
// this may set lastSyncedPush beyond what's actually sent (because of size limit)
// so this is applied to the database only if there's no actual update
// TODO: it would be better to simplify this somehow
lastSyncedPush = entityChange.id;
return false;
@@ -214,8 +215,8 @@ async function pushChanges(syncContext) {
const startDate = new Date();
await syncRequest(syncContext, 'PUT', '/api/sync/update', {
sourceId: sourceIdService.getCurrentSourceId(),
entities: entityChangesRecords
entities: entityChangesRecords,
memberId
});
ws.syncPushInProgress();