Skip to content
This repository was archived by the owner on Jan 14, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 23 additions & 48 deletions app/lib/alimonitor-services/BookkeepingService.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ const { databaseManager: {
repositories: {
RunRepository,
DetectorSubsystemRepository,
PeriodRepository,
BeamTypeRepository,
RunDetectorsRepository,
},
sequelize,
} } = require('../database/DatabaseManager.js');
const { createOrForceUpdate } = require('../services/periods/findOrUpdateOrCreatePeriod.js');

/**
* BookkeepingService used to synchronize runs
Expand Down Expand Up @@ -127,52 +127,27 @@ class BookkeepingService extends AbstractServiceSynchronizer {
const period = extractPeriod(periodName, beamType);
const { detectorsNameToId } = this;

return await BeamTypeRepository.T.findOrCreate({
where: {
name: beamType,
},
})
.then(async ([beamType, _]) => await PeriodRepository.T.findOrCreate({
where: {
name: period.name,
},
defaults: {
name: period.name,
year: period.year,
BeamTypeId: beamType.id,
},
}))
.catch((e) => {
throw new Error('Find or create period failed', {
cause: {
error: e.message,
meta: {
explicitValues: {
name: period.name,
year: period.year,
BeamTypeId: beamType.id,
},
implicitValues: {
BeamType: beamType,
},
},
},
});
})
.then(async ([period, _]) => await RunRepository.T.upsert({
PeriodId: period.id,
...run,
}))
.then(async ([run, _]) => {
const d = detectorNames?.map((detectorName, i) => ({
run_number: run.runNumber,
detector_id: detectorsNameToId[detectorName],
quality: detectorQualities[i] }));

await RunDetectorsRepository.T.bulkCreate(
d, { updateOnDuplicate: ['quality'] },
);
});
const upsertRun = async ([dbPeriod, _]) => await RunRepository.upsert({
PeriodId: dbPeriod.id,
...run,
});

const bulkCreateRunDetectors = async ([run, _]) => {
const d = detectorNames?.map((detectorName, i) => ({
run_number: run.runNumber,
detector_id: detectorsNameToId[detectorName],
quality: detectorQualities[i] }));

await RunDetectorsRepository.bulkCreate(
d, { updateOnDuplicate: ['quality'] },
);
};

const pipeline = async () => await createOrForceUpdate(period)
.then(upsertRun)
.then(bulkCreateRunDetectors);

return await sequelize.transaction(async () => await pipeline());
}

/**
Expand Down
39 changes: 6 additions & 33 deletions app/lib/alimonitor-services/MonalisaService.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ const config = require('../config/configProvider.js');

const { databaseManager: {
repositories: {
BeamTypeRepository,
PeriodRepository,
DataPassRepository,
},
sequelize,
} } = require('../database/DatabaseManager.js');
const { findOrCreatePeriod } = require('../services/periods/findOrUpdateOrCreatePeriod.js');

class MonalisaService extends AbstractServiceSynchronizer {
constructor() {
Expand Down Expand Up @@ -93,40 +92,14 @@ class MonalisaService extends AbstractServiceSynchronizer {

async executeDbAction(dataPass) {
const { period } = dataPass;

return await BeamTypeRepository.T.findOrCreate({
where: {
name: period.beamType,
},
})
.then(async ([beamType, _]) => await PeriodRepository.T.findOrCreate({
where: {
name: period.name,
},
defaults: {
name: period.name,
year: period.year,
BeamTypeId: beamType.id,
},
}))
.catch((e) => {
throw new Error('Find or create period failed', {
cause: {
error: e.message,
meta: {
explicitValues: {
name: period.name,
year: period.year,
},
},
},
});
})
.then(async ([period, _]) => await DataPassRepository.T.upsert({
const act = async () => findOrCreatePeriod(period)
.then(async ([period, _]) => await DataPassRepository.upsert({
PeriodId: period.id,
...dataPass,
}))
.then(async ([dataPass, _]) => await this.monalisaServiceDetails.setSyncTask({ parentDataUnit: dataPass }));
.then(async ([dbDataPass, _]) => await this.monalisaServiceDetails.setSyncTask({ parentDataUnit: dbDataPass }));

return await sequelize.transaction(async (_t1) => await act());
}
}

Expand Down
99 changes: 57 additions & 42 deletions app/lib/alimonitor-services/MonalisaServiceDetails.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@

const AbstractServiceSynchronizer = require('./AbstractServiceSynchronizer.js');
const Utils = require('../utils');
const { ServicesEndpointsFormatter } = require('./helpers');
const { ServicesEndpointsFormatter, ServicesDataCommons: { PERIOD_NAME_REGEX } } = require('./helpers');
const { findOrCreatePeriod } = require('../services/periods/findOrUpdateOrCreatePeriod.js');

const { databaseManager: {
repositories: {
RunRepository,
PeriodRepository,
},
sequelize,
} } = require('../database/DatabaseManager.js');
const { extractPeriod } = require('./helpers/ServicesDataCommons.js');

class MonalisaServiceDetails extends AbstractServiceSynchronizer {
constructor() {
Expand All @@ -32,7 +32,7 @@ class MonalisaServiceDetails extends AbstractServiceSynchronizer {

this.ketpFields = {
run_no: 'runNumber',
raw_partition: 'period',
raw_partition: 'periodName',
};
}

Expand All @@ -52,59 +52,74 @@ class MonalisaServiceDetails extends AbstractServiceSynchronizer {
}

adjustDataUnit(dataPassDetails) {
return Utils.filterObject(dataPassDetails, this.ketpFields);
dataPassDetails = Utils.filterObject(dataPassDetails, this.ketpFields);
const { periodName } = dataPassDetails;
dataPassDetails.period = PERIOD_NAME_REGEX.test(periodName) ? extractPeriod(periodName) : undefined;
return dataPassDetails;
}

isDataUnitValid() {
return true;
}

async executeDbAction(dataPassDetails, forUrlMetaStore) {
const { parentDataUnit: dataPass } = forUrlMetaStore;
return (async () => {
if (/LHC[0-9]{2}[a-z]+/.test(dataPassDetails.period)) {
return await PeriodRepository.T.findOrCreate({
where: {
name: dataPassDetails.period,
},
});
const { parentDataUnit: dbDataPass } = forUrlMetaStore;

const getPresumedPeriod = async () => {
if (dataPassDetails.period) {
return await findOrCreatePeriod(dataPassDetails.period);
} else {
// eslint-disable-next-line max-len
this.logger.warn(`Incorrect period from monalisa ${dataPassDetails.period} for run ${dataPassDetails.runNumber} in data pass ${dataPass.name}`);
this.logger.warn(`Incorrect period name from monalisa ${dataPassDetails.periodName}
for run ${dataPassDetails.runNumber} in details of data pass ${dbDataPass.name}`);
return [undefined, undefined];
}
})()
.then(async ([period, _]) => {
dataPassDetails.PeriodId = period?.id;
return await RunRepository.T.findOrCreate({
where: {
runNumber: dataPassDetails.runNumber,
},
defualt: {
runNumber: dataPassDetails.runNumber,
PeriodId: dataPassDetails.PeriodId,
},
});
};

const findOrCreateRun = async ([dbPeriod, _]) => {
dataPassDetails.PeriodId = dbPeriod?.id;
return await RunRepository.findOrCreate({
where: {
runNumber: dataPassDetails.runNumber,
},
defualt: {
runNumber: dataPassDetails.runNumber,
PeriodId: dataPassDetails.PeriodId,
},
})
.catch(async (e) => {
throw new Error('Find or create run failed', {
cause: {
error: e.message,
meta: {
actualValueInDB: await RunRepository.findOne({ where: { runNumber: dataPassDetails.runNumber } }, { raw: true }),
inQueryValues: {
runNumber: dataPassDetails.runNumber,
PeriodId: dataPassDetails.PeriodId,
.catch(async (e) => {
throw new Error('Find or create run failed', {
cause: {
error: {
error: e.message,
cause: e.cause,
},
sourceValues: {
runNumber: dataPassDetails.runNumber,
periodName: dataPassDetails.period,
meta: {
actualValueInDB: await RunRepository.findOne(
{ where: { runNumber: dataPassDetails.runNumber } },
{ raw: true },
).catch((error) => `ERROR RETRIVING ADDITIONAL INFO FROM DB: ${error.message}`),

inQueryValues: {
runNumber: dataPassDetails.runNumber,
PeriodId: dataPassDetails.PeriodId,
},
sourceValues: {
runNumber: dataPassDetails.runNumber,
periodName: dataPassDetails.period,
},
},
},
},
});
});
})
.then(async ([run, _]) => await sequelize.transaction(() => run.addDataPasses(dataPass.id, { ignoreDuplicates: true })));
};

const addRunToDataPass = async ([dbRun, _]) => await dbRun.addDataPasses(dbDataPass.id, { ignoreDuplicates: true });

const pipeline = async () => await getPresumedPeriod()
.then(findOrCreateRun)
.then(addRunToDataPass);

return await pipeline();
}
}

Expand Down
38 changes: 10 additions & 28 deletions app/lib/alimonitor-services/MonalisaServiceMC.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ const config = require('../config/configProvider.js');

const { databaseManager: {
repositories: {
BeamTypeRepository,
PeriodRepository,
SimulationPassRepository,
DataPassRepository,
RunRepository,
},
sequelize,
} } = require('../database/DatabaseManager.js');
const { findOrCreatePeriod } = require('../services/periods/findOrUpdateOrCreatePeriod.js');

class MonalisaServiceMC extends AbstractServiceSynchronizer {
constructor() {
Expand Down Expand Up @@ -114,37 +113,20 @@ class MonalisaServiceMC extends AbstractServiceSynchronizer {
requestedEvents: simulationPass.requestedEvents,
outputSize: simulationPass.outputSize,
})
.then(async ([simulationPassDBInstance, _]) => {
.then(async ([dbSimulationPass, _]) => {
await Promise.all(simulationPass.anchoredPeriods.map(async (period) =>
this.findOrCreatePeriod(period)
findOrCreatePeriod(period)
.then(async ([period, _]) => {
const periodAddPromise = simulationPassDBInstance.addPeriod(period.id, { ignoreDuplicates: true });
const dataPassPipelinePromises = this.findOrCreateAndAddDataPasses(simulationPass, simulationPassDBInstance, period);
const runsAddPipeline = this.findOrCreateAndAddRuns(simulationPass, simulationPassDBInstance, period);
const periodAddPromise = dbSimulationPass.addPeriod(period.id, { ignoreDuplicates: true });
const dataPassPipelinePromises = this.findOrCreateAndAddDataPasses(simulationPass, dbSimulationPass, period);
const runsAddPipeline = this.findOrCreateAndAddRuns(simulationPass, dbSimulationPass, period);

await Promise.all([periodAddPromise, dataPassPipelinePromises, runsAddPipeline]);
})));
});
}

async findOrCreatePeriod({ name: periodName, year: periodYear, beamType }) {
return await sequelize.transaction(async () => PeriodRepository.findOrCreate({
where: {
name: periodName,
},
defaults: {
name: periodName,
year: periodYear,
BeamTypeId: !beamType ? undefined : (await BeamTypeRepository.findOrCreate({
where: {
name: beamType,
},
}))[0]?.id,
},
}));
}

async findOrCreateAndAddDataPasses(simulationPass, simulationPassDBInstance, period) {
async findOrCreateAndAddDataPasses(simulationPass, dbSimulationPass, period) {
const promises = simulationPass.anchoredPasses
.map((passSuffix) => sequelize.transaction(
() => DataPassRepository.findOrCreate({
Expand All @@ -155,13 +137,13 @@ class MonalisaServiceMC extends AbstractServiceSynchronizer {
name: `${period.name}_${passSuffix}`,
PeriodId: period.id,
},
}).then(([dataPass, _]) => simulationPassDBInstance.addDataPass(dataPass.id,
}).then(([dataPass, _]) => dbSimulationPass.addDataPass(dataPass.id,
{ ignoreDuplicates: true })),
));
return await Promise.all(promises);
}

async findOrCreateAndAddRuns(simulationPass, simulationPassDBInstance, period) {
async findOrCreateAndAddRuns(simulationPass, dbSimulationPass, period) {
const promises = simulationPass.runs.map((runNumber) => sequelize.transaction(async () => {
const insertWithoutPeriod = simulationPass.anchoredPeriods.length > 1;
await RunRepository.findOrCreate({
Expand All @@ -174,7 +156,7 @@ class MonalisaServiceMC extends AbstractServiceSynchronizer {
},
});

return await simulationPassDBInstance.addRun(runNumber, { ignoreDuplicates: true });
return await dbSimulationPass.addRun(runNumber, { ignoreDuplicates: true });
}));

return await Promise.all(promises);
Expand Down
4 changes: 2 additions & 2 deletions app/lib/database/DatabaseManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class DatabaseManager {
constructor() {
this.logger = new Log(DatabaseManager.name);
this.schema = 'public';
const o2rct_namespace = cls.createNamespace('o2rct-namespace');
Sequelize.useCLS(o2rct_namespace);
this.o2rct_namespace = cls.createNamespace('o2rct-namespace');
Sequelize.useCLS(this.o2rct_namespace);

this.sequelize = new Sequelize({
...config.database,
Expand Down
2 changes: 1 addition & 1 deletion app/lib/database/repositories/Repository.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class Repository {
* @return {Promise<boolean>} promise that resolves when the patch has been applied
*/
async updateOne(dbOject, patch) {
return dbOject.update(patch);
return await dbOject.update(patch);
}

/**
Expand Down
Loading