Skip to content
This repository was archived by the owner on Jan 14, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/application.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const config = require('./lib/config/configProvider.js');
const { buildPublicConfig } = require('./lib/config/publicConfigProvider.js');

// Services
const alimonitorServices = require('./lib/alimonitor-services');
const { syncManager } = require('./lib/alimonitor-services/SyncManager.js');
// Database
const { databaseManager } = require('./lib/database/DatabaseManager.js');
const { databaseService } = require('./lib/database/DatabaseService.js');
Expand All @@ -42,7 +42,7 @@ class RunConditionTableApplication {
this.webUiServer = webUiServer;
this.databaseService = databaseService;
this.databaseManager = databaseManager;
this.syncManager = alimonitorServices.syncManager;
this.syncManager = syncManager;
this.defineEndpoints();

buildPublicConfig(config);
Expand Down
27 changes: 10 additions & 17 deletions app/config/services.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
*/

const { ResProvider } = require('../lib/utils');
const path = require('path');

const services = {
bookkeeping: {
url: {
rct: 'http://rct-bookkeeping.cern.ch:4000/api/runs',
ali: ResProvider.getServiceEndpoint('BK_RUNS'),
},
},
Expand All @@ -36,22 +36,15 @@ const services = {
batchSize: {
ML: ResProvider.envOrDef('RCT_ML_BATCH_SIZE', 1),
},
};

/*
* Endpoints bases
* ali: 'https://ali-bookkeeping.cern.ch/api/runs?filter[definitions]=PHYSICS',
* dataPassesRaw: 'https://alimonitor.cern.ch/production/raw.jsp?res_path=json',
* dataPassesDetailed: 'https://alimonitor.cern.ch/raw/raw_details.jsp?timesel=0&res_path=json',
* mcRaw: 'https://alimonitor.cern.ch/MC/?res_path=json',
* mcDetailed: 'https://alimonitor.cern.ch/job_events.jsp?timesel=0&res_path=json',
* mcDetTag: 'https://alimonitor.cern.ch/MC/prodDetails.jsp?res_path=json',
*/

// LHC21i3f3
// eslint-disable-next-line max-len
//E rawDataDetalied: 'https://alimonitor.cern.ch/production/raw_details.jsp?timesel=0&filter_jobtype=OCT+-+async+production+for+pilot+beam+pass+3%2C+O2-2763&res_path=json',
// eslint-disable-next-line max-len
//E mcRawDataDetailed: 'https://alimonitor.cern.ch/job_events.jsp?timesel=0&owner=aliprod&filter_jobtype=Pb-Pb%2C+5.02+TeV+-+HIJING+%2B+nuclei+Geant4+with+modified+material+budget+%2B4.5%+(Pb-Pb+Pass3)%2C+50-90%+centrality%2C+ALIROOT-8784&res_path=json',
rawJsonCachePath: process.env.RAW_JSON_CACHE_PATH ? path.resolve(process.env.RAW_JSON_CACHE_PATH) : path.join(
__dirname,
'..',
'..',
'database',
'cache',
'rawJson',
),
};

module.exports = services;
170 changes: 83 additions & 87 deletions app/lib/alimonitor-services/AbstractServiceSynchronizer.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,56 +13,32 @@
* or submit itself to any jurisdiction.
*/

const { Client } = require('pg');
const { SocksProxyAgent } = require('socks-proxy-agent');
const { Log } = require('@aliceo2/web-ui');
const config = require('../config/configProvider.js');
const { ResProvider, makeHttpRequestForJSON, arrayToChunks, applyOptsToObj, throwNotImplemented } = require('../utils');
const Cacher = require('./Cacher.js');
const PassCorrectnessMonitor = require('./PassCorrectnessMonitor.js');
const { Cacher, PassCorrectnessMonitor, ProgressMonitor } = require('./helpers');

const defaultServiceSynchronizerOptions = {
forceStop: false,
rawCacheUse: process.env['RCT_DEV_USE_CACHE'] === 'false' ? false : true,
cacheRawResponse: process.env['RCT_DEV_USE_CACHE'] === 'false' ? false : true,
useCacheJsonInsteadIfPresent: process.env['RCT_DEV_USE_CACHE_INSTEAD'] === 'true' ? true : false,
omitWhenCached: process.env['RCT_DEV_OMIT_WHEN_CACHED'] === 'true' ? true : false,
batchSize: 4,
};

class ProgressMonitor {
constructor({ total, percentageStep, logger }) {
this.total = total;
this.percentageStep = percentageStep;
this.progress = 0;
this.lastLogAt = 0;
this.logger = logger;
}

update(progress) {
this.progress += progress;
}

setTotal(total) {
this.total = total;
}

tryLog() {
const potentialLogProgress = this.lastLogAt + this.percentageStep * this.total;
if (this.progress >= potentialLogProgress || this.progress === this.total) {
this.lastLogAt = this.progress;
this.logger(`progress of ${this.progress} / ${this.total}`);
}
}
}

/**
* AbstractServiceSynchronizer
* The class provides schema for excecuting process of data synchronization with external service (fetching from it)
* Its behaviour can be customized with overriding abstract methods
*/
class AbstractServiceSynchronizer {
constructor() {
this.name = this.constructor.name;
this.logger = new Log(this.name);

this.opts = this.createHttpOpts();

this.metaStore = {};
this.metaStore = { perUrl: {} };

this.errorsLoggingDepth = config.defaultErrorsLogginDepth;
applyOptsToObj(this, defaultServiceSynchronizerOptions);
Expand All @@ -74,7 +50,7 @@ class AbstractServiceSynchronizer {
createHttpOpts() {
let opts = this.getHttpOptsBasic();
opts = this.setSLLForHttpOpts(opts);
opts = this.setHttpSocket(opts);
opts = this.setHttpSocksProxy(opts);
return opts;
}

Expand Down Expand Up @@ -114,7 +90,7 @@ class AbstractServiceSynchronizer {
return opts;
}

setHttpSocket(opts) {
setHttpSocksProxy(opts) {
const proxy = ResProvider.socksProvider();
if (proxy?.length > 0) {
this.logger.info(`using proxy/socks '${proxy}' to CERN network`);
Expand All @@ -135,72 +111,60 @@ class AbstractServiceSynchronizer {
}

/**
* Combine logic of fetching data from service
* like bookkeeping and processing
* and inserting to local database
* @param {URL} endpoint endpoint to fetch data
* @param {CallableFunction} responsePreprocess used to preprocess response to objects list
* @param {CallableFunction} dataAdjuster logic for processing data
* before inserting to database (also adjusting data to sql foramt) - should returns null if error occured
* @param {CallableFunction} filterer filter rows
* @param {CallableFunction} dbAction logic for inserting data to database
* @param {CallableFunction} metaDataHandler used to handle logic of hanling data
* Combine logic of fetching data from service like bookkeeping, processing and inserting to database
* @param {URL} endpoint endpoint to fetch data from
* @param {CallableFunction} metaDataHandler used if synchronization requires handling some meta data.
* like total pages to see etc., on the whole might be used to any custom logic
* @returns {*} void
* Besides given arguemnts the method depends on three abstract methods to be overriden
* @see AbstractServiceSynchronizer.processRawResponse
* @see AbstractServiceSynchronizer.isDataUnitValid
* @see AbstractServiceSynchronizer.executeDbAction
*
* @returns {boolean} - true if process was finalized without major errors and with/without minor errors, otherwise false,
* Major errors are understood as ones indicating that further synchronization is purposeless: e.g. due to networ error, invalid certificate.
* Minor errors are understood as e.g. managable ambiguities in data.
*/
async syncPerEndpoint(
endpoint,
responsePreprocess,
dataAdjuster,
filterer,
dbAction,
metaDataHandler = null,
) {
if (this.omitWhenCached && Cacher.isCached(this.name, endpoint)) {
this.logger.info(`omitting cached json at :: ${Cacher.cachedFilePath(this.name, endpoint)}`);
return;
}

try {
await this.dbConnect();

this.dbAction = dbAction; //TODO
this.monitor = new PassCorrectnessMonitor(this.logger, this.errorsLoggingDepth);

const rawResponse = await this.getRawResponse(endpoint);
if (metaDataHandler) {
metaDataHandler(rawResponse);
await metaDataHandler(rawResponse);
}
const data = responsePreprocess(rawResponse)
.map((r) => dataAdjuster(r))

const data = this.processRawResponse(rawResponse)
.filter((r) => {
const f = r && filterer(r);
const f = r && this.isDataUnitValid(r);
if (!f) {
this.monitor.handleOmitted();
}
return f;
});

await this.makeBatchedRequest(data);
await this.makeBatchedRequest(data, endpoint);

this.monitor.logResults();
return true;
} catch (fatalError) {
this.logger.error(fatalError.stack);
throw fatalError;
} finally {
await this.dbDisconnect();
this.logger.error(fatalError.message + fatalError.stack);
await this.interrtuptSyncTask();
return false;
}
}

async makeBatchedRequest(data) {
async makeBatchedRequest(data, endpoint) {
const rowsChunks = arrayToChunks(data, this.batchSize);
const total = this.metaStore.totalCount || data.length;
this.progressMonitor.setTotal(total);
for (const chunk of rowsChunks) {
if (this.forceStop) {
return;
}
const promises = chunk.map((dataUnit) => this.dbAction(this.dbClient, dataUnit)
const promises = chunk.map((dataUnit) => this.executeDbAction(dataUnit, this.metaStore.perUrl[endpoint])
.then(() => this.monitor.handleCorrect())
.catch((e) => this.monitor.handleIncorrect(e, { dataUnit: dataUnit })));

Expand All @@ -212,48 +176,80 @@ class AbstractServiceSynchronizer {

async getRawResponse(endpoint) {
if (this.useCacheJsonInsteadIfPresent && Cacher.isCached(this.name, endpoint)) {
// eslint-disable-next-line capitalized-comments
this.logger.info(`using cached json :: ${Cacher.cachedFilePath(this.name, endpoint)}`);
return Cacher.getJsonSync(this.name, endpoint);
}
const onSucces = (endpoint, data) => {
if (this.rawCacheUse) {
if (this.cacheRawResponse) {
Cacher.cache(this.name, endpoint, data);
}
};
return await makeHttpRequestForJSON(endpoint, this.opts, this.logger, onSucces);
}

async dbConnect() {
this.dbClient = new Client(config.database);
this.dbClient.on('error', (e) => this.logger.error(e));

return await this.dbClient.connect()
.catch((e) => this.logger.error(e));
}

async dbDisconnect() {
return await this.dbClient.end()
.catch((e) => this.logger.error(e));
}

/**
* Start process of synchroniztion with particular external system,
* it depends on custom configuration of class inheriting from this class
* @param {Object} options - customize sync procedure,
* e.g. some custom class may required some context to start process, e.g. some data unit,
* @return {boolean} - true if process was finalized without major errors and with/without minor errors, otherwise false,
* Major errors are understood as ones indicating that further synchronization is purposeless: e.g. due to networ error, invalid certificate.
* Minor errors are understood as e.g. managable ambiguities in data.
*/
async setSyncTask(options) {
this.progressMonitor = new ProgressMonitor({ logger: this.logger.info.bind(this.logger), percentageStep: 0.25 });
this.forceStop = false;
await this.sync(options)
return await this.sync(options)
.then(() => {
if (this.forceStop) {
this.logger.info(`${this.name} forced to stop`);
}
});
}

async clearSyncTask() {
/**
* Interrupt sync task, so dbAction or syncPerEndpoint methods,
* which is subsequent towards one being executed, will not be called.
* It will NOT interrupt any sequelize call being executed.
* @return {void}
*/
async interrtuptSyncTask() {
this.forceStop = true;
}

isConnected() {
return this.dbClient?._connected;
isStopped() {
return this.forceStop;
}

/**
* ProcessRawResponse - used to preprocess response to custom format
* @abstractMethod
* @param {*} _rawResponse - raw data acquired from external service
* @return {*} adjusted data
*/
async processRawResponse(_rawResponse) {
throwNotImplemented();
}

/**
* Check if data unit is valid; should be filterd out or not, may handle reason for rejecting some data unit
* @abstractMethod
* @param {*} _dataUnit - data portion to be filterd out or left in set of valid data
* @return {boolean} - true if dataUnit is valid
*/
async isDataUnitValid(_dataUnit) {
throwNotImplemented();
}

/**
* Implements logic for inserting/updating database data
* @abstractMethod
* @param {*} _dataUnit - data unit some db action is to be performed on
* @param {*|undefined} _options - some meta data, e.g. some context required execute db action
* @return {void}
*/
async executeDbAction(_dataUnit, _options) {
throwNotImplemented();
}
}

Expand Down
Loading