Skip to content

Instantly share code, notes, and snippets.

@CoderYellow
Last active February 3, 2024 14:36
Show Gist options
  • Save CoderYellow/c9f789111eff8a1df0c7177571114885 to your computer and use it in GitHub Desktop.
Save CoderYellow/c9f789111eff8a1df0c7177571114885 to your computer and use it in GitHub Desktop.
RpcImpl
import * as grpc from '@grpc/grpc-js';
import { Observable } from 'rxjs';
import { first, toArray } from 'rxjs/operators';
class GrpcRpcImpl implements Rpc {
private client: grpc.Client;
constructor(serviceAddress: string) {
this.client = new grpc.Client(serviceAddress, grpc.credentials.createInsecure());
}
request(service: string, method: string, data: Uint8Array): Promise<Uint8Array> {
return new Promise((resolve, reject) => {
const path = `/${service}/${method}`;
this.client.makeUnaryRequest(path, (arg) => arg, (arg) => arg, data, null, null, (err, response) => {
if (err) {
reject(err);
} else {
resolve(response);
}
});
});
}
clientStreamingRequest(service: string, method: string, data: Observable<Uint8Array>): Promise<Uint8Array> {
const path = `/${service}/${method}`;
return new Promise((resolve, reject) => {
const call = this.client.makeClientStreamRequest(path, (arg) => arg, (arg) => arg, (err, response) => {
if (err) {
reject(err);
} else {
resolve(response);
}
});
data.subscribe({
next: (chunk) => call.write(chunk),
error: reject,
complete: () => call.end(),
});
});
}
serverStreamingRequest(service: string, method: string, data: Uint8Array): Observable<Uint8Array> {
const path = `/${service}/${method}`;
const call = this.client.makeServerStreamRequest(path, (arg) => arg, (arg) => arg, data);
return new Observable((subscriber) => {
call.on('data', (chunk) => subscriber.next(chunk));
call.on('end', () => subscriber.complete());
call.on('error', (err) => subscriber.error(err));
});
}
bidirectionalStreamingRequest(service: string, method: string, data: Observable<Uint8Array>): Observable<Uint8Array> {
const path = `/${service}/${method}`;
const call = this.client.makeBidiStreamRequest(path, (arg) => arg, (arg) => arg);
data.subscribe({
next: (chunk) => call.write(chunk),
error: (err) => console.error(err),
complete: () => call.end(),
});
return new Observable((subscriber) => {
call.on('data', (chunk) => subscriber.next(chunk));
call.on('end', () => subscriber.complete());
call.on('error', (err) => subscriber.error(err));
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment