diff --git a/CommonServer/Utils/StartServer.ts b/CommonServer/Utils/StartServer.ts index cc597791c8..9a89acee68 100644 --- a/CommonServer/Utils/StartServer.ts +++ b/CommonServer/Utils/StartServer.ts @@ -33,7 +33,7 @@ import { DashboardApiRoute } from 'Common/ServiceRoute'; import HTTPResponse from 'Common/Types/API/HTTPResponse'; import HTTPErrorResponse from 'Common/Types/API/HTTPErrorResponse'; import ServerException from 'Common/Types/Exception/ServerException'; -// import zlib from 'zlib'; +import zlib from 'zlib'; // import OpenTelemetrySDK from "./OpenTelemetry"; const app: ExpressApplication = Express.getExpressApp(); @@ -99,25 +99,32 @@ app.use(setDefaultHeaders); app.use(function (req, res, next) { if (req.headers['content-encoding'] === 'gzip') { - // var gunzip = zlib.createGunzip(); - // req.pipe(gunzip); - // var buffer: any = []; - // gunzip.on('data', function (data) { - // buffer.push(data.toString()); - // }).on('end', function () { - // req.body = buffer.join(''); - // next(); - // }).on('error', function (e) { - // next(e); - // }); - next(); + let buffers: any = []; + + req.on('data', (chunk) => { + buffers.push(chunk); + }); + + req.on('end', () => { + let buffer = Buffer.concat(buffers); + zlib.gunzip(buffer, (err, decoded) => { + if (err) { + logger.error(err); + return Response.sendErrorResponse(req, res, new ServerException("Error decompressing data")); + } + + req.body = decoded; + + next(); + }); + }); } else { jsonBodyParserMiddleware(req, res, next); } }); app.use(function (req, res, next) { - + if (req.headers['content-encoding'] === 'gzip') { next(); } else { diff --git a/Ingestor/API/OTelIngest.ts b/Ingestor/API/OTelIngest.ts index 6f2a347cb8..320f1b9a7b 100644 --- a/Ingestor/API/OTelIngest.ts +++ b/Ingestor/API/OTelIngest.ts @@ -7,7 +7,7 @@ import Express, { import Response from 'CommonServer/Utils/Response'; import logger from 'CommonServer/Utils/Logger'; import protobuf from 'protobufjs'; -import zlib from 'zlib'; + // Load proto file for OTel // Create a root namespace @@ -23,6 +23,33 @@ const MetricsData = MetricsProto.lookupType('MetricsData'); const router: ExpressRouter = Express.getRouter(); +/** + * + * Otel Middleware + * + */ +router.use('/otel/*', (req: ExpressRequest, _res: ExpressResponse, next: NextFunction) => { + try { + + if (req.baseUrl === '/otel/v1/traces') { + req.body = TracesData.decode(req.body); + } + + if (req.baseUrl === '/otel/v1/logs') { + req.body = LogsData.decode(req.body); + } + + if (req.baseUrl === '/otel/v1/metrics') { + req.body = MetricsData.decode(req.body); + } + + next(); + + } catch (err) { + return next(err); + } +}); + router.post( '/otel/*', async ( @@ -31,52 +58,11 @@ router.post( next: NextFunction ): Promise => { try { + logger.info('OTel Ingestor API called'); - let buffers: any = []; - req.on('data', (chunk) => { - buffers.push(chunk); - }); + logger.info(req.body); - - req.on('end', () => { - let buffer = Buffer.concat(buffers); - zlib.gunzip(buffer, (err, decoded) => { - if (err) { - console.error(err); - res.status(500).send('Error decompressing data'); - return; - } - - - if (req.url === '/otel/v1/traces') { - const traces = TracesData.decode(decoded); - - logger.info('Traces: ', traces); - } - - if (req.url === '/otel/v1/logs') { - const logs = LogsData.decode(decoded); - - logger.info('Logs: ', logs); - } - - if (req.url === '/otel/v1/metrics') { - const metrics = MetricsData.decode(decoded); - - logger.info('Metrics: ', metrics); - } - - // middleware marks the probe as alive. - // so we don't need to do anything here. - return Response.sendEmptyResponse(req, res); - - - }); - }); - - logger.info('OTelIngest URL: ', req.url); - - + return Response.sendEmptyResponse(req, res); } catch (err) { return next(err); }