Last active
June 16, 2017 02:31
-
-
Save axefrog/83f22161d0ffb39b112f837a0f13d56e to your computer and use it in GitHub Desktop.
Hot-swappable pipelines for Most.js streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import {Stream, Sink, Scheduler, Disposable} from '@most/types'; | |
import {disposeOnce, disposeAll} from '@most/disposable'; | |
export type ActivateExtension<A = any> = (source: Stream<A>) => Stream<A>; | |
export type DeactivateExtension = null; | |
export type ExtensionAction<A = any> = ActivateExtension<A>|DeactivateExtension; | |
export type ExtensionLifecycle<A = any> = Stream<ExtensionAction<A>>; | |
export function extensible<A>(source: Stream<A>, extensions: Stream<ExtensionLifecycle<A>>): Stream<A> { | |
return new ExtensiblePipeline(extensions, source); | |
} | |
export class ExtensiblePipeline<A> implements Stream<A> { | |
constructor( | |
public readonly extensions: Stream<ExtensionLifecycle<A>>, | |
public readonly source: Stream<A> | |
) {} | |
run(sink: Sink<A>, scheduler: Scheduler): Disposable { | |
const head = new Boundary<A>(); | |
const tail = new Boundary<A>(); | |
head.next = tail; | |
head.sink = tail; | |
tail.previous = head; | |
tail.sink = sink; | |
const extender = new PipelineExtender(head, tail, scheduler); | |
return disposeOnce(disposeAll([ | |
extender, | |
this.extensions.run(extender, scheduler), | |
this.source.run(head, scheduler) | |
])); | |
} | |
} | |
class PipelineExtender<A> implements Sink<ExtensionLifecycle<A>> { | |
constructor( | |
private _head: Boundary<A>, | |
private _tail: Boundary<A>, | |
private _scheduler: Scheduler | |
) {} | |
event(time: number, stream: ExtensionLifecycle<A>): void { | |
const segment = new LoadableSegment(this._tail); | |
segment.run(stream, this._scheduler); | |
} | |
end(time: number): void {} | |
error(time: number, error: Error): void { | |
this._tail.error(time, error); | |
} | |
dispose() { | |
this._head.sink = this._tail.sink; | |
this._tail.dispose(); | |
} | |
} | |
interface PipelineSegment<A> extends Sink<A> { | |
previous?: PipelineSegment<A>; | |
next?: PipelineSegment<A>; | |
sink: Sink<A>; | |
active: boolean; | |
} | |
class Boundary<A> implements PipelineSegment<A> { | |
public previous?: PipelineSegment<A>; | |
public next?: PipelineSegment<A> = void 0; | |
public sink: Sink<A>; | |
public readonly active = true; | |
event(time: number, value: A): void { | |
this.sink.event(time, value); | |
} | |
end(time: number): void { | |
this.sink.end(time); | |
} | |
error(time: number, error: Error): void { | |
this.sink.error(time, error); | |
} | |
dispose() { | |
let previous = this.previous; | |
while(previous instanceof LoadableSegment) { | |
previous.dispose(); | |
previous = previous.previous; | |
} | |
} | |
} | |
class LoadableSegment<A> implements PipelineSegment<A> { | |
private _loader: ExtensionLoader<A> = <any>void 0; | |
private _disposable: Disposable; | |
private _disposed = false; | |
public previous: PipelineSegment<A>; | |
public next: PipelineSegment<A>; | |
public sink: Sink<A>; | |
public active = false; | |
constructor(public tail: Boundary<A>) { | |
this.previous = tail.previous!; | |
this.previous.next = this; | |
tail.previous = this; | |
this.next = tail; | |
this.sink = tail; | |
} | |
run(stream: ExtensionLifecycle<A>, scheduler: Scheduler): void { | |
this._loader = new ExtensionLoader<A>(this, scheduler); | |
this._disposable = stream.run(this._loader, scheduler); | |
} | |
private _getPreviousActive() { | |
let previous = this.previous!; | |
let segment: PipelineSegment<A> = this; | |
while(!previous.active) { | |
segment = previous; | |
previous = segment.previous!; | |
} | |
return previous; | |
} | |
load(sink: Sink<A>): void { | |
this.active = true; | |
const previous = this._getPreviousActive(); | |
this.sink = previous.sink; | |
previous.sink = sink; | |
} | |
unload(): void { | |
if(!this.active) return; | |
this.active = false; | |
const previous = this._getPreviousActive(); | |
previous.sink = this.sink; | |
} | |
event(time: number, value: A): void { | |
if(this._disposed) return; | |
this.sink.event(time, value); | |
} | |
end(time: number): void { | |
if(this._disposed) return; | |
this.sink.end(time); | |
this.dispose(); | |
} | |
error(time: number, error: Error): void { | |
if(this._disposed) return; | |
this.sink.error(time, error); | |
this.dispose(); | |
} | |
dispose() { | |
if(this._disposed) return; | |
this._disposed = true; | |
this.unload(); | |
this._disposable.dispose(); | |
this._loader.dispose(); | |
const previous = this.previous; | |
const next = this.next; | |
previous.next = next; | |
next.previous = previous; | |
} | |
} | |
class ExtensionLoader<A> implements Sink<ExtensionAction<A>>, Disposable { | |
private _current?: Disposable = void 0; | |
private _disposed = false; | |
constructor( | |
private _segment: LoadableSegment<A>, | |
private _scheduler: Scheduler | |
) {} | |
private _disposeCurrent(): void { | |
if(this._current !== void 0) { | |
this._current.dispose(); | |
} | |
} | |
private _mount(extend: ActivateExtension<A>): void { | |
const source = new DisconnectableSource(this._segment); | |
const stream = extend(source); | |
const sink = new DisconnectableSink(this._segment); | |
this._current = disposeAll([source, sink, stream.run(sink, this._scheduler)]); | |
} | |
event(time: number, upgrade: ExtensionAction<A>): void { | |
this._disposeCurrent(); | |
if(upgrade !== null) { | |
this._mount(upgrade); | |
} | |
} | |
end(time: number): void { | |
// end signal signifies that this pipeline segment is no longer required | |
this._segment.dispose(); | |
} | |
error(time: number, error: Error): void { | |
this._segment.error(time, error); | |
} | |
dispose(): void { | |
if(!this._disposed) { | |
this._disposed = true; | |
this._disposeCurrent(); | |
} | |
} | |
} | |
class DisconnectableSource<A> implements Stream<A>, Disposable { | |
constructor(private _segment: LoadableSegment<A>) {} | |
run(sink: Sink<A>, scheduler: Scheduler): Disposable { | |
this._segment.load(sink); | |
return disposeOnce(this); | |
} | |
dispose() { | |
this._segment.unload(); | |
} | |
} | |
class DisconnectableSink<A> implements Sink<A>, Disposable { | |
private _segment: LoadableSegment<A>; | |
private _disposed = false; | |
constructor(segment: LoadableSegment<A>) { | |
this._segment = segment; | |
} | |
event(time: number, value: A): void { | |
if(this._disposed) return; | |
this._segment.event(time, value); | |
} | |
end(time: number): void { | |
if(this._disposed) return; | |
this._segment.end(time); | |
} | |
error(time: number, error: Error): void { | |
if(this._disposed) return; | |
this._segment.error(time, error); | |
} | |
dispose() { | |
this._disposed = true; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import {runEffects, propagateEventTask, propagateEndTask, scan, skip, map, periodic, take, tap} from '@most/core'; | |
import {newDefaultScheduler} from '@most/scheduler'; | |
const periodicCount = (interval: number) => scan(n => n + 1, 0, skip(1, periodic(interval))); | |
const count$ = take(50, periodicCount(150)); | |
const logNow = (msg: string) => () => console.log(`${msg}: ${(new Date()).toISOString()}`); | |
const times100 = (stream: Stream<number>) => (console.log('[MULTIPLY x 100]'), map((n: number) => n * 100, stream)); | |
const times1337 = (stream: Stream<number>) => (console.log('[MULTIPLY x 1337]'), map((n: number) => n * 1337, stream)); | |
const sqrt = (stream: Stream<number>) => (console.log('[SQUARE ROOT]'), map((n: number) => Math.sqrt(n), stream)); | |
const subtract1 = (stream: Stream<number>) => (console.log('[SUBTRACT 1]'), map((n: number) => n - 1, stream)); | |
function makeEmitter<A>(lifespan: number, emissions: [number, A][]) { | |
let disposed = false; | |
const stream: Stream<A> = { | |
run(sink: Sink<A>, scheduler: Scheduler): Disposable { | |
if(disposed) throw new Error(); | |
disposed = true; | |
emissions.forEach(([delay, f]) => scheduler.delay(delay, propagateEventTask(f, sink))); | |
scheduler.delay(lifespan, propagateEndTask(sink)); | |
return {dispose() {}}; | |
} | |
}; | |
return stream; | |
} | |
const applyTimes100 = (event$: Stream<number>) => times100(event$); | |
const applyTimes1337 = (event$: Stream<number>) => times1337(event$); | |
const applySubtract1 = (event$: Stream<number>) => subtract1(event$); | |
const applySqrt = (event$: Stream<number>) => sqrt(event$); | |
const a$ = makeEmitter<ExtensionAction<number>>(3500, [ | |
[750, applyTimes100], | |
[1750, null], | |
[2500, applySqrt] | |
]); | |
const b$ = makeEmitter<ExtensionAction<number>>(1250, [ | |
[250, applySubtract1], | |
[750, null] | |
]); | |
const c$ = makeEmitter<ExtensionAction<number>>(3750, [ | |
[3000, applyTimes1337] | |
]); | |
const d$ = makeEmitter<ExtensionAction<number>>(1000, [ | |
[500, applySqrt] | |
]); | |
const extensions$ = makeEmitter(2000, [[750, a$], [750, b$], [1, c$], [5000, d$]]); | |
const number$ = extensible(count$, extensions$); | |
logNow('start')(); | |
runEffects(tap(n => console.log(n), number$), newDefaultScheduler()).then(logNow('end')).catch(e => console.error(e)); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
start: 2017-06-16T02:00:23.937Z | |
0 | |
1 | |
2 | |
3 | |
4 | |
5 | |
6 | |
[SUBTRACT 1] | |
6 | |
7 | |
8 | |
[MULTIPLY x 100] | |
1000 | |
1100 | |
1200 | |
1300 | |
1400 | |
1500 | |
1600 | |
17 | |
18 | |
19 | |
20 | |
[MULTIPLY x 1337] | |
28077 | |
[SQUARE ROOT] | |
171.50510196492698 | |
175.3596304740632 | |
179.13123680698462 | |
182.8250529878222 | |
5.0990195135927845 | |
5.196152422706632 | |
5.291502622129181 | |
29 | |
30 | |
31 | |
32 | |
33 | |
34 | |
35 | |
36 | |
[SQUARE ROOT] | |
6.082762530298219 | |
6.164414002968976 | |
6.244997998398398 | |
6.324555320336759 | |
41 | |
42 | |
43 | |
44 | |
45 | |
46 | |
47 | |
48 | |
49 | |
end: 2017-06-16T02:00:31.291Z |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment