Skip to content

Instantly share code, notes, and snippets.

@tengla
Created October 28, 2024 21:20
Show Gist options
  • Save tengla/b6fc1e485461b194964f55c2a622ab42 to your computer and use it in GitHub Desktop.
Save tengla/b6fc1e485461b194964f55c2a622ab42 to your computer and use it in GitHub Desktop.
ws message observable
import {
Observable, retry, map, throwError,
catchError, of, mergeMap, filter
} from "rxjs";
type Message<T> = {
status: number;
data: null;
} | {
status: number;
data: T;
}
export function socketObservable<T>(url: string | URL) {
const o = new Observable<Message<T>>((observer) => {
const socket = new WebSocket(url);
console.log('open socket');
observer.next({
status: socket.readyState,
data: null,
})
socket.addEventListener("error", function (event) {
observer.error(event);
});
socket.addEventListener("open", function () {
observer.next({
status: socket.readyState,
data: null,
});
});
socket.addEventListener("message", function (event: MessageEvent<string>) {
observer.next({
status: socket.readyState,
data: JSON.parse(event.data),
});
});
socket.addEventListener("close", function () {
observer.next({
status: socket.readyState,
data: null,
});
});
return () => {
console.log("socket.close()");
socket.close();
};
});
return o;
}
export function messageObserver<T>({
url,
count,
delay = 1000,
}: {
url: string | URL;
count?: number;
delay?: number;
}) {
return of(url)
.pipe(
mergeMap(url => socketObservable<T>(url))
)
.pipe(
map((message) => {
if (message.status === 1) {
return message.data;
}
if (message.status === 3) {
throw new Error("WebSocket is closed");
}
return null
}),
catchError((error) => {
return throwError(() => error);
}),
retry({ delay, count }),
filter((data) => data !== null),
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment