Created
December 17, 2019 18:49
-
-
Save TechnotronicOz/f9b56c8bd49678fb5db925fdabc2b498 to your computer and use it in GitHub Desktop.
amqp.service.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Injectable, Logger } from '@nestjs/common'; | |
import * as amqp from 'amqplib'; | |
import { ConfigService } from '../config/config.service'; | |
export interface AmqpChannel { | |
ack(message: AmqpMessage); | |
assertQueue(queueName: string); | |
consume(queueName: string, cb: any); | |
sendToQueue(queueName: string, message: any); | |
} | |
export interface AmqpConnection { | |
createChannel(); | |
} | |
export interface AmqpMessage { | |
content: string; | |
} | |
@Injectable() | |
export class AmqpService { | |
private readonly logger: Logger = new Logger(AmqpService.name); | |
private amqpConnection: AmqpConnection; | |
private readonly channelMap: Map<string, any> = new Map(); | |
constructor( | |
private readonly configService: ConfigService, | |
) { | |
if (!this.amqpConnection) { | |
this.logger.log('connection to amqp...'); | |
amqp | |
.connect(this.configService.amqpConfigConnectionString()) | |
.then(connection => { | |
this.logger.log('connected!'); | |
this.amqpConnection = connection; | |
}); | |
} | |
} | |
async getOrCreateChannel(channelName: string): Promise<AmqpChannel> { | |
if (!this.channelMap.has(channelName)) { | |
this.logger.log(`getOrCreateChannel channelMap does not have channelName [${channelName}]`); | |
const ch: AmqpChannel = await this.amqpConnection.createChannel(); | |
this.channelMap.set(channelName, ch); | |
return Promise.resolve(ch); | |
} | |
this.logger.debug(`getOrCreateChannel exists in cache [channelName=${channelName}]`); | |
return Promise.resolve(this.channelMap.get(channelName)); | |
} | |
async ackMessage(channelName: string, message: AmqpMessage) { | |
const ch = await this.getOrCreateChannel(channelName); | |
this.logger.debug(`ackMessage [channelName=${channelName}, message=${message.content.toString()}]`); | |
return ch.ack(message); | |
} | |
async consume(channelName: string, queueName: string, cbFn) { | |
this.logger.log(`consume [channelName=${channelName}, queue=${queueName}]`); | |
const ch = await this.getOrCreateChannel(channelName); | |
await ch.assertQueue(queueName); | |
await ch.consume(queueName, cbFn); | |
} | |
async publish(channelName: string, queueName: string, message: string) { | |
this.logger.log(`publish [channelName=${channelName}, queueName=${queueName}]`); | |
const ch = await this.getOrCreateChannel(channelName); | |
ch.assertQueue(queueName); | |
ch.sendToQueue(queueName, Buffer.from(message)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment