WIP partial sync requests

This commit is contained in:
zadam
2021-01-10 21:56:40 +01:00
parent cd653b9f0c
commit 7f8b19aee4
6 changed files with 111 additions and 47 deletions

View File

@@ -138,14 +138,54 @@ function getChanged(req) {
return ret;
}
const partialRequests = {};
function update(req) {
const {sourceId, entities} = req.body;
let {body} = req;
const pageCount = parseInt(req.get('pageCount'));
const pageIndex = parseInt(req.get('pageIndex'));
if (pageCount !== 1) {
const requestId = req.get('requestId');
if (pageIndex === 0) {
partialRequests[requestId] = {
createdAt: Date.now(),
payload: ''
};
}
if (!partialRequests[requestId]) {
throw new Error(`Partial request ${requestId}, index ${pageIndex} of ${pageCount} of pages does not have expected record.`);
}
partialRequests[requestId].payload += req.body;
if (pageIndex !== pageCount - 1) {
return;
}
else {
body = JSON.parse(partialRequests[requestId].payload);
delete partialRequests[requestId];
}
}
const {sourceId, entities} = body;
for (const {entityChange, entity} of entities) {
syncUpdateService.updateEntity(entityChange, entity, sourceId);
}
}
setInterval(() => {
for (const key in partialRequests) {
if (partialRequests[key].createdAt - Date.now() > 5 * 60 * 1000) {
delete partialRequests[key];
}
}
}, 60 * 1000);
function syncFinished() {
// after first sync finishes, the application is ready to be used
// this is meaningless but at the same time harmless (idempotent) for further syncs

View File

@@ -5,7 +5,7 @@ const packageJson = require('../../package');
const {TRILIUM_DATA_DIR} = require('./data_dir');
const APP_DB_VERSION = 178;
const SYNC_VERSION = 18;
const SYNC_VERSION = 19;
const CLIPPER_PROTOCOL_VERSION = "1.0";
module.exports = {

View File

@@ -16,15 +16,25 @@ function exec(opts) {
opts.proxy = null;
}
if (!opts.paging) {
opts.paging = {
pageCount: 1,
pageIndex: 0
};
}
const proxyAgent = getProxyAgent(opts);
const parsedTargetUrl = url.parse(opts.url);
return new Promise((resolve, reject) => {
try {
const headers = {
const headers = Object.assign({
Cookie: (opts.cookieJar && opts.cookieJar.header) || "",
'Content-Type': 'application/json'
};
'Content-Type': opts.paging.pageCount === 1 ? 'application/json' : 'text/plain',
pageCount: opts.pageCount,
pageIndex: opts.pageIndex,
requestId: opts.requestId
}, opts.headers || {});
if (opts.auth) {
const token = Buffer.from(opts.auth.user + ":" + opts.auth.pass).toString('base64');

View File

@@ -253,19 +253,33 @@ async function checkContentHash(syncContext) {
return failedChecks.length > 0;
}
async function syncRequest(syncContext, method, requestPath, body) {
async function syncRequest(syncContext, method, requestPath, body = '') {
const timeout = syncOptions.getSyncTimeout();
const opts = {
method,
url: syncOptions.getSyncServerHost() + requestPath,
cookieJar: syncContext.cookieJar,
timeout: timeout,
body,
proxy: proxyToggle ? syncOptions.getSyncProxy() : null
};
let response;
const requestId = utils.randomString(10);
const pageCount = Math.ceil(body.length / 1000000);
for (let pageIndex = 0; pageIndex < pageCount; pageIndex++) {
const opts = {
method,
url: syncOptions.getSyncServerHost() + requestPath,
cookieJar: syncContext.cookieJar,
timeout: timeout,
paging: {
pageIndex,
pageCount,
requestId
},
body,
proxy: proxyToggle ? syncOptions.getSyncProxy() : null
};
response = await utils.timeLimit(request.exec(opts), timeout);
}
return await utils.timeLimit(request.exec(opts), timeout);
}
function getEntityChangeRow(entityName, entityId) {