refactor otel middleware

This commit is contained in:
Simon Larsen 2023-10-10 14:51:59 +01:00
parent 1892d06cec
commit 8fc2f93e94
No known key found for this signature in database
GPG Key ID: AB45983AA9C81CDE
2 changed files with 52 additions and 59 deletions

View File

@ -33,7 +33,7 @@ import { DashboardApiRoute } from 'Common/ServiceRoute';
import HTTPResponse from 'Common/Types/API/HTTPResponse'; import HTTPResponse from 'Common/Types/API/HTTPResponse';
import HTTPErrorResponse from 'Common/Types/API/HTTPErrorResponse'; import HTTPErrorResponse from 'Common/Types/API/HTTPErrorResponse';
import ServerException from 'Common/Types/Exception/ServerException'; import ServerException from 'Common/Types/Exception/ServerException';
// import zlib from 'zlib'; import zlib from 'zlib';
// import OpenTelemetrySDK from "./OpenTelemetry"; // import OpenTelemetrySDK from "./OpenTelemetry";
const app: ExpressApplication = Express.getExpressApp(); const app: ExpressApplication = Express.getExpressApp();
@ -99,18 +99,25 @@ app.use(setDefaultHeaders);
app.use(function (req, res, next) { app.use(function (req, res, next) {
if (req.headers['content-encoding'] === 'gzip') { if (req.headers['content-encoding'] === 'gzip') {
// var gunzip = zlib.createGunzip(); let buffers: any = [];
// req.pipe(gunzip);
// var buffer: any = []; req.on('data', (chunk) => {
// gunzip.on('data', function (data) { buffers.push(chunk);
// buffer.push(data.toString()); });
// }).on('end', function () {
// req.body = buffer.join(''); req.on('end', () => {
// next(); let buffer = Buffer.concat(buffers);
// }).on('error', function (e) { zlib.gunzip(buffer, (err, decoded) => {
// next(e); if (err) {
// }); logger.error(err);
return Response.sendErrorResponse(req, res, new ServerException("Error decompressing data"));
}
req.body = decoded;
next(); next();
});
});
} else { } else {
jsonBodyParserMiddleware(req, res, next); jsonBodyParserMiddleware(req, res, next);
} }

View File

@ -7,7 +7,7 @@ import Express, {
import Response from 'CommonServer/Utils/Response'; import Response from 'CommonServer/Utils/Response';
import logger from 'CommonServer/Utils/Logger'; import logger from 'CommonServer/Utils/Logger';
import protobuf from 'protobufjs'; import protobuf from 'protobufjs';
import zlib from 'zlib';
// Load proto file for OTel // Load proto file for OTel
// Create a root namespace // Create a root namespace
@ -23,6 +23,33 @@ const MetricsData = MetricsProto.lookupType('MetricsData');
const router: ExpressRouter = Express.getRouter(); const router: ExpressRouter = Express.getRouter();
/**
*
* Otel Middleware
*
*/
router.use('/otel/*', (req: ExpressRequest, _res: ExpressResponse, next: NextFunction) => {
try {
if (req.baseUrl === '/otel/v1/traces') {
req.body = TracesData.decode(req.body);
}
if (req.baseUrl === '/otel/v1/logs') {
req.body = LogsData.decode(req.body);
}
if (req.baseUrl === '/otel/v1/metrics') {
req.body = MetricsData.decode(req.body);
}
next();
} catch (err) {
return next(err);
}
});
router.post( router.post(
'/otel/*', '/otel/*',
async ( async (
@ -31,52 +58,11 @@ router.post(
next: NextFunction next: NextFunction
): Promise<void> => { ): Promise<void> => {
try { try {
logger.info('OTel Ingestor API called');
let buffers: any = []; logger.info(req.body);
req.on('data', (chunk) => {
buffers.push(chunk);
});
req.on('end', () => {
let buffer = Buffer.concat(buffers);
zlib.gunzip(buffer, (err, decoded) => {
if (err) {
console.error(err);
res.status(500).send('Error decompressing data');
return;
}
if (req.url === '/otel/v1/traces') {
const traces = TracesData.decode(decoded);
logger.info('Traces: ', traces);
}
if (req.url === '/otel/v1/logs') {
const logs = LogsData.decode(decoded);
logger.info('Logs: ', logs);
}
if (req.url === '/otel/v1/metrics') {
const metrics = MetricsData.decode(decoded);
logger.info('Metrics: ', metrics);
}
// middleware marks the probe as alive.
// so we don't need to do anything here.
return Response.sendEmptyResponse(req, res); return Response.sendEmptyResponse(req, res);
});
});
logger.info('OTelIngest URL: ', req.url);
} catch (err) { } catch (err) {
return next(err); return next(err);
} }