Skip to content
This repository was archived by the owner on Jan 14, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions app/config/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,13 @@ module.exports = Object.freeze({
errorsLoggingDepths: {
no: () => null,
message: (logger, er) => logger.error(er.message),
stack: (logger, er) => logger.error(er.stack),
object: (logger, er) => logger.error(JSON.stringify(er, null, 2)),
stack: (logger, er) => logger.error(JSON.stringify({ message: er.message, stack: er.stack }, null, 2)),
full: (logger, er) => logger.error(JSON.stringify({
message: er.message,
stack: er.stack,
...er,
cause: er.cause,
}, null, 2)),
},
defaultErrorsLogginDepth: ResProvider.envOrDef('RCT_ERR_DEPTH', 'object'),
defaultErrorsLogginDepth: ResProvider.envOrDef('RCT_ERR_DEPTH', 'full'),
});
39 changes: 33 additions & 6 deletions app/lib/alimonitor-services/AbstractServiceSynchronizer.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,40 @@ const defaultServiceSynchronizerOptions = {
batchSize: 4,
};

class ProgressMonitor {
constructor({ total, percentageStep, logger }) {
this.total = total;
this.percentageStep = percentageStep;
this.progress = 0;
this.lastLogAt = 0;
this.logger = logger;
}

update(progress) {
this.progress += progress;
}

setTotal(total) {
this.total = total;
}

tryLog() {
const potentialLogProgress = this.lastLogAt + this.percentageStep * this.total;
if (this.progress >= potentialLogProgress || this.progress === this.total) {
this.lastLogAt = this.progress;
this.logger(`progress of ${this.progress} / ${this.total}`);
}
}
}

class AbstractServiceSynchronizer {
constructor() {
this.name = this.constructor.name;
this.logger = new Log(this.name);

this.opts = this.createHttpOpts();

this.metaStore = { processedCtr: 0 };
this.metaStore = {};

this.errorsLoggingDepth = config.defaultErrorsLogginDepth;
applyOptsToObj(this, defaultServiceSynchronizerOptions);
Expand Down Expand Up @@ -169,6 +195,7 @@ class AbstractServiceSynchronizer {
async makeBatchedRequest(data) {
const rowsChunks = arrayToChunks(data, this.batchSize);
const total = this.metaStore.totalCount || data.length;
this.progressMonitor.setTotal(total);
for (const chunk of rowsChunks) {
if (this.forceStop) {
return;
Expand All @@ -178,9 +205,8 @@ class AbstractServiceSynchronizer {
.catch((e) => this.monitor.handleIncorrect(e, { dataUnit: dataUnit })));

await Promise.all(promises);

this.metaStore['processedCtr'] += chunk.length;
this.logger.info(`progress of ${this.metaStore['processedCtr']} / ${total}`);
this.progressMonitor.update(chunk.length);
this.progressMonitor.tryLog();
}
}

Expand Down Expand Up @@ -211,9 +237,10 @@ class AbstractServiceSynchronizer {
.catch((e) => this.logger.error(e));
}

async setSyncTask() {
async setSyncTask(options) {
this.progressMonitor = new ProgressMonitor({ logger: this.logger.info.bind(this.logger), percentageStep: 0.25 });
this.forceStop = false;
await this.sync()
await this.sync(options)
.then(() => {
if (this.forceStop) {
this.logger.info(`${this.name} forced to stop`);
Expand Down
62 changes: 40 additions & 22 deletions app/lib/alimonitor-services/BookkeepingService.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class BookkeepingService extends AbstractServiceSynchronizer {
}

async sync() {
DetectorSubsystemRepository.findAll({ raw: true }).then((r) => {
await DetectorSubsystemRepository.findAll({ raw: true }).then((r) => {
this.detectorsNameToId = r?.length > 0 ? r :
Utils.throwWrapper(new Error('Incorrect setup of database, no detector subsystems data in it'));
this.detectorsNameToId = Object.fromEntries(this.detectorsNameToId.map(({ id, name }) => [name, id]));
Expand Down Expand Up @@ -133,27 +133,45 @@ class BookkeepingService extends AbstractServiceSynchronizer {
where: {
name: beamType,
},
}).then(async ([beamType, _]) => await PeriodRepository.T.findOrCreate({
where: {
name: periodName,
year,
BeamTypeId: beamType.id,
},
})).then(async ([period, _]) => await RunRepository.T.findOrCreate({
where: {
runNumber: run.runNumber,
},
defaults: { PeriodId: period.id, ...run },
})).then(async ([run, _]) => {
const d = detectorNames?.map((detectorName, i) => ({
run_number: run.runNumber,
detector_id: detectorsNameToId[detectorName],
quality: detectorQualities[i] }));

await RunDetectorsRepository.T.bulkCreate(
d, { updateOnDublicate: ['quality'] },
);
});
})
.then(async ([beamType, _]) => await PeriodRepository.T.findOrCreate({
where: {
name: periodName,
year,
BeamTypeId: beamType.id,
},
}))
.catch((e) => {
throw new Error('Find or create period failed', {
cause: {
error: e.message,
meta: {
explicitValues: {
name: periodName,
year,
BeamTypeId: beamType.id,
},
implicitValues: {
BeamType: beamType,
},
},
},
});
})
.then(async ([period, _]) => await RunRepository.T.upsert({
PeriodId: period.id,
...run,
}))
.then(async ([run, _]) => {
const d = detectorNames?.map((detectorName, i) => ({
run_number: run.runNumber,
detector_id: detectorsNameToId[detectorName],
quality: detectorQualities[i] }));

await RunDetectorsRepository.T.bulkCreate(
d, { updateOnDublicate: ['quality'] },
);
});
}

metaDataHandler(requestJsonResult) {
Expand Down
89 changes: 62 additions & 27 deletions app/lib/alimonitor-services/MonalisaService.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,54 @@ const EndpointsFormatter = require('./ServicesEndpointsFormatter.js');
const MonalisaServiceDetails = require('./MonalisaServiceDetails.js');
const config = require('../config/configProvider.js');

const { databaseManager: {
repositories: {
BeamTypeRepository,
PeriodRepository,
DataPassRepository,
},
sequelize,
} } = require('../database/DatabaseManager.js');

class MonalisaService extends AbstractServiceSynchronizer {
constructor() {
super();
this.batchSize = config.services.batchSize.ML;

this.ketpFields = {
name: 'name',
reconstructed_events: 'number_of_events',
reconstructed_events: 'reconstructedEvents',
description: 'description',
output_size: 'size',
output_size: 'outputSize',
interaction_type: 'beam_type',
last_run: 'last_run',
last_run: 'lastRun',
};

this.monalisaServiceDetails = new MonalisaServiceDetails();
}

async sync() {
await this.dbConnect();
const last_runs_res = await this.dbClient.query('SELECT name, last_run from data_passes;');
this.last_runs = Object.fromEntries(last_runs_res.rows.map((r) => Object.values(r)));
await this.dbDisconnect();
const last_runs_res = await sequelize.query(
'SELECT name, last_run, max(run_number) as last_run_in_details \
FROM data_passes AS dp \
LEFT JOIN data_passes_runs AS dpr \
ON dpr.data_pass_id = dp.id \
GROUP BY name, last_run;',
);
this.last_runs = Object.fromEntries(last_runs_res[0].map((r) => {
const { name, last_run, last_run_in_details } = r;
return [name, { last_run, last_run_in_details }];
}));

return await this.syncPerEndpoint(
EndpointsFormatter.dataPassesRaw(),
this.responsePreprocess.bind(this),
this.dataAdjuster.bind(this),
(r) => r.period.year >= config.dataFromYearIncluding && r.last_run != this.last_runs[r.name],
(dataPass) => {
const { last_run, last_run_in_details } = this.last_runs[dataPass.name] ?? {};
return dataPass.period.year >= config.dataFromYearIncluding &&
(dataPass.lastRun !== last_run || last_run !== last_run_in_details);
},
this.dbAction.bind(this),
);
}
Expand All @@ -63,29 +83,44 @@ class MonalisaService extends AbstractServiceSynchronizer {

dataAdjuster(dp) {
dp = Utils.filterObject(dp, this.ketpFields);
dp.size = Number(dp.size);
dp.outputSize = dp.outputSize ? Number(dp.outputSize) : null;
dp.period = ServicesDataCommons.mapBeamTypeToCommonFormat(this.extractPeriod(dp));
return dp;
}

async dbAction(dbClient, d) {
const { description } = d;
d = Utils.adjusetObjValuesToSql(d);
d.rawDes = description;
const { period } = d;
const period_insert =
d?.period?.name ? `call insert_period(${period.name}, ${period.year}, ${period.beamType});` : '';
const pgCommand = `${period_insert}; call insert_prod(
${d.name},
${d.period.name},
${d.description},
${d.number_of_events},
${d.size},
${d.last_run}
);`;
const q1 = await dbClient.query(pgCommand);
const q2 = await this.monalisaServiceDetails.sync(d);
return Promise.all([q1, q2]);
async dbAction(dbClient, dataPass) {
const { period } = dataPass;

return await BeamTypeRepository.T.findOrCreate({
where: {
name: period.beamType,
},
})
.then(async ([beamType, _]) => await PeriodRepository.T.findOrCreate({
where: {
name: period.name,
year: period.year,
BeamTypeId: beamType.id,
},
}))
.catch((e) => {
throw new Error('Find or create period failed', {
cause: {
error: e.message,
meta: {
explicitValues: {
name: period.name,
year: period.year,
},
},
},
});
})
.then(async ([period, _]) => await DataPassRepository.T.upsert({
PeriodId: period.id,
...dataPass,
}))
.then(async ([dataPass, _]) => await this.monalisaServiceDetails.setSyncTask({ dataUnit: dataPass }));
}

extractPeriod(rowData) {
Expand Down
58 changes: 48 additions & 10 deletions app/lib/alimonitor-services/MonalisaServiceDetails.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,67 @@ const AbstractServiceSynchronizer = require('./AbstractServiceSynchronizer.js');
const Utils = require('../utils');
const EndpointsFormatter = require('./ServicesEndpointsFormatter.js');

const { databaseManager: {
repositories: {
RunRepository,
PeriodRepository,
},
sequelize,
} } = require('../database/DatabaseManager.js');

class MonalisaServiceDetails extends AbstractServiceSynchronizer {
constructor() {
super();
this.batchSize = 5;

this.ketpFields = {
run_no: 'run_number',
run_no: 'runNumber',
raw_partition: 'period',
};
}

async sync(d) {
async sync({ dataUnit: dataPass }) {
return await this.syncPerEndpoint(
EndpointsFormatter.dataPassesDetailed(d.rawDes),
EndpointsFormatter.dataPassesDetailed(dataPass.description),
(raw) => this.responsePreprocess(raw),
(v) => Utils.adjusetObjValuesToSql(Utils.filterObject(v, this.ketpFields)),
(v) => Utils.filterObject(v, this.ketpFields),
() => true,
async (dbClient, v) => {
v.parentDataUnit = d;
const detailsSql = v ?
`call insert_prod_details(${d.name}, ${v.run_number}, ${v.period});`
: '';
return await dbClient.query(detailsSql);
v.parentDataUnit = dataPass;
return await PeriodRepository.T.findOrCreate({
where: {
name: v.period,
},
})
.then(async ([period, _]) => {
v.PeriodId = period.id;
return await RunRepository.T.findOrCreate({
where: {
runNumber: v.runNumber,
PeriodId: period.id,
},
});
})
.catch(async (e) => {
throw new Error('Find or create run failed', {
cause: {
error: e.message,
meta: {
actualValueInDB: await RunRepository.findOne({ where: { runNumber: v.runNumber } }, { raw: true }),
inQueryValues: {
runNumber: v.runNumber,
PeriodId: v.PeriodId,
},
sourceValues: {
runNumber: v.runNumber,
periodName: v.period,
},
},
},
});
})
// eslint-disable-next-line no-unused-vars
.then(async ([run, _]) => await sequelize.transaction((t) => run.addDataPasses(dataPass.id)));
},
);
}
Expand All @@ -49,7 +87,7 @@ class MonalisaServiceDetails extends AbstractServiceSynchronizer {
const res = entries.map(([hid, vObj]) => {
vObj['hid'] = hid.trim();
return vObj;
});
}).sort((a, b) => a.run_no - b.run_no);
return res;
}
}
Expand Down
2 changes: 1 addition & 1 deletion app/lib/alimonitor-services/PassCorrectnessMonitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class PassCorrectnessMonitor {

if (incorrect > 0) {
const logFunc = switchCase(errorsLoggingDepth, config.errorsLoggingDepths);
errors.forEach((e) => logFunc(logger, e.stack));
errors.forEach((e) => logFunc(logger, e));
logger.warn(`sync unseccessful for ${incorrect}/${dataSize}`);
}
if (omitted > 0) {
Expand Down
Loading