Last active
May 23, 2022 10:50
-
-
Save repiatx/43eb059e22ad81160545d4e7e7434604 to your computer and use it in GitHub Desktop.
RabbitMQService With logger
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const pino = require('pino') | |
const transport = pino.transport({ | |
target: 'pino-mongodb', | |
level: 'info', | |
options: { | |
uri: process.env.MONGO_URI, | |
collection: process.env.PREFIX + 'logs' | |
} | |
}) | |
const streams = [ | |
{stream: transport}, | |
{stream: process.stdout} | |
] | |
const hlog = pino( | |
{ | |
level: 'info' | |
}, | |
pino.multistream(streams) | |
) | |
module.exports = hlog |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const amqp = require('amqp-connection-manager') | |
const hlog = require('./Logger') | |
const Runner = require('../jobs/RabbitRunner') | |
class RabbitMQConnection { | |
constructor() { | |
this.channelWrapper = null | |
} | |
consumeFunction = async (msg) => { | |
Runner.Start(msg, this.channelWrapper) | |
} | |
async connect() { | |
hlog.warn('RabbitMQ Connnectiing...') | |
const rabbitConn = await amqp.connect([process.env.RABBITMQ_URI], { | |
heartbeatIntervalInSeconds: 60 | |
}) | |
rabbitConn.on('connect', () => { | |
hlog.info('RabbitMQ connected') | |
}) | |
rabbitConn.on('disconnect', (err) => { | |
hlog.error(err, 'RabbitMQ Disconnected') | |
process.exit() | |
}) | |
this.channelWrapper = rabbitConn.createChannel({ | |
json: true | |
}) | |
await this.channelWrapper.addSetup((channel) => { | |
const productQueueName = process.env.PREFIX + process.env.RABBITMQ_PRODUCT_QUEUE | |
const tokenQueueName = process.env.PREFIX + process.env.RABBITMQ_TOKEN_QUEUE | |
const concurrentJobCount = 10 | |
return Promise.all([ | |
channel.assertQueue(productQueueName), | |
channel.assertQueue(tokenQueueName), | |
channel.prefetch(concurrentJobCount), | |
channel.consume(productQueueName, this.consumeFunction), | |
channel.consume(tokenQueueName, this.consumeFunction) | |
]) | |
}) | |
await this.channelWrapper.waitForConnect() | |
} | |
} | |
module.exports = new RabbitMQConnection() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment