Created
January 14, 2016 07:23
-
-
Save anonymous/ec446730ad02efad1dea to your computer and use it in GitHub Desktop.
RxJS cold behaviour subject RxJS cold behaviour subject // source http://jsbin.com/golani
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!DOCTYPE html> | |
<html> | |
<head> | |
<meta name="description" content="RxJS cold behaviour subject"> | |
<meta charset="utf-8"> | |
<title>RxJS cold behaviour subject</title> | |
<script src="https://npmcdn.com/@reactivex/[email protected]/dist/global/Rx.js"></script> | |
<script src="https://cdnjs.cloudflare.com/ajax/libs/q.js/1.0.1/q.js"></script> | |
</head> | |
<body> | |
<ui-view></ui-view> | |
<script id="jsbin-javascript"> | |
console.clear(); | |
var start = Date.now(); | |
function now() { | |
return ((Date.now() - start) / 1000 + '0000').slice(0, 5); | |
} | |
function timestamped(...message) { | |
console.log([now()].concat(message).join(' ')); | |
} | |
function banner(...message) { | |
const WIDTH = 40; | |
var text = ' ' + message.join(' ') + ' '; | |
while (text.length < WIDTH) { | |
text = '-' + text + '-'; | |
} | |
timestamped(text.slice(0, WIDTH)); | |
} | |
function getSource(){ | |
var result = Rx.Observable.defer(() => { | |
banner('SOURCE'); | |
return Rx.Observable.interval(300).take(10) | |
.do((x) => { | |
timestamped('SOURCE >', x); | |
}); | |
}) | |
.publish() | |
.refCount(); | |
return hookSubscribe( | |
result, | |
(subscription) => { | |
timestamped('SOURCE subscribe', '(refcount', result.refCount-1, '->', result.refCount, ')'); | |
return subscription; | |
}, | |
() => { | |
timestamped('SOURCE unsubscribe', '(refcount', result.refCount, '->', result.refCount-1, ')'); | |
}); | |
} | |
function hookSubscribe(target, subscribeFn, unsubscribeFn) { | |
var existingSubscribeFn = target._subscribe; | |
target._subscribe = (...args) => { | |
var subscription = subscribeFn.call(target, existingSubscribeFn.apply(target, args)); | |
var existingUnsubscribeFn = subscription._unsubscribe; | |
subscription._unsubscribe = (...args) => { | |
return unsubscribeFn.call(target, existingUnsubscribeFn.apply(target, args)); | |
}; | |
return subscription; | |
}; | |
return target; | |
}; | |
function refCountSubject(observable) { | |
var isDisposed; | |
var disposedNext; | |
var disposed = Rx.Observable.create((instance) => { | |
disposedNext = instance.next.bind(instance); | |
return ()=> disposeNext = null; | |
}) | |
.publish(); | |
disposed | |
.connect(); | |
var countNext; | |
var count = Rx.Observable.create((instance) => { | |
countNext = instance.next.bind(instance); | |
return ()=> countNext = null; | |
}) | |
.takeUntil(disposed); | |
var counted = observable | |
.publish() | |
.refCount(); | |
hookSubscribe( | |
counted, | |
(subscription) => { | |
countNext && countNext(counted.refCount); | |
return subscription; | |
}, | |
() => { | |
countNext && countNext(counted.refCount-1); | |
}); | |
return Object.assign(counted.takeUntil(disposed), { | |
refCount: count, | |
dispose : dispose | |
}); | |
function dispose() { | |
timestamped('COUNT dispose()'); | |
isDisposed = true; | |
disposedNext && disposedNext(); | |
} | |
} | |
function coldBehaviourSubject(observable, initial) { | |
var isDisposed; | |
var disposedNext; | |
var disposed = Rx.Observable.create((instance) => { | |
disposedNext = instance.next.bind(instance); | |
return ()=> disposeNext = null; | |
}) | |
.publish(); | |
disposed | |
.connect(); | |
var observed, current, isValid, clear; | |
var value = Rx.Observable.defer(() => { | |
banner('SUBJECT'); | |
observed = observed || observable | |
.do(storeCurrent, undefined, dispose); | |
return isDisposed ? Rx.Observable.empty() : Rx.Observable.merge( | |
Rx.Observable.create((instance) => { | |
timestamped('SUBJECT ~', current || initial); | |
instance.next(current || initial); | |
clear = ()=> { | |
current = undefined; | |
isValid = false; | |
instance.next(initial); | |
}; | |
}), | |
observed | |
) | |
.takeUntil(disposed); | |
function storeCurrent(x) { | |
timestamped('SUBJECT >', x); | |
current = x; | |
isValid = true; | |
} | |
}); | |
return { | |
value : value, | |
dispose : dispose, | |
isDisposed: isDisposed, | |
clear : clear, | |
isValid : isValid | |
}; | |
function dispose() { | |
timestamped('SUBJECT dispose()'); | |
isDisposed = true; | |
disposedNext && disposedNext(); | |
} | |
} | |
function getItem(source) { | |
return source | |
.map((x) => x) | |
.publish() | |
.refCount(); | |
} | |
// ------------- | |
var refCounted = refCountSubject(getSource()); | |
var service = coldBehaviourSubject(refCounted, 'null'); | |
refCounted.refCount | |
.subscribe((count)=> { | |
timestamped('COUNT ', count); | |
}, null, ()=> { | |
timestamped('COUNT lifecycle complete'); | |
}); | |
var subscriptions = []; | |
banner('START'); | |
setTimeout(add('a'), 500); | |
setTimeout(add('b'), 1500); | |
setTimeout(() => { | |
banner('CLEANUP'); | |
// either of the following should work | |
refCounted.dispose(); | |
add('after')(); | |
}, 2500); | |
function add(label) { | |
banner(label); | |
return function delayed() { | |
banner('BEGIN', label); | |
timestamped('subscribe', label, ' item x2, item x1'); | |
var item1 = getItem(service.value); | |
var item2 = getItem(service.value); | |
subscriptions.push( | |
item1.subscribe((x) => timestamped(' ', label, '>', x)), | |
item1.subscribe(() => {}), | |
item2.subscribe(null, null, () => timestamped('COMPLETE', label)) | |
); | |
banner('BEGUN', label); | |
}; | |
} | |
</script> | |
<script id="jsbin-source-javascript" type="text/javascript">console.clear(); | |
var start = Date.now(); | |
function now() { | |
return ((Date.now() - start) / 1000 + '0000').slice(0, 5); | |
} | |
function timestamped(...message) { | |
console.log([now()].concat(message).join(' ')); | |
} | |
function banner(...message) { | |
const WIDTH = 40; | |
var text = ' ' + message.join(' ') + ' '; | |
while (text.length < WIDTH) { | |
text = '-' + text + '-'; | |
} | |
timestamped(text.slice(0, WIDTH)); | |
} | |
function getSource(){ | |
var result = Rx.Observable.defer(() => { | |
banner('SOURCE'); | |
return Rx.Observable.interval(300).take(10) | |
.do((x) => { | |
timestamped('SOURCE >', x); | |
}); | |
}) | |
.publish() | |
.refCount(); | |
return hookSubscribe( | |
result, | |
(subscription) => { | |
timestamped('SOURCE subscribe', '(refcount', result.refCount-1, '->', result.refCount, ')'); | |
return subscription; | |
}, | |
() => { | |
timestamped('SOURCE unsubscribe', '(refcount', result.refCount, '->', result.refCount-1, ')'); | |
}); | |
} | |
function hookSubscribe(target, subscribeFn, unsubscribeFn) { | |
var existingSubscribeFn = target._subscribe; | |
target._subscribe = (...args) => { | |
var subscription = subscribeFn.call(target, existingSubscribeFn.apply(target, args)); | |
var existingUnsubscribeFn = subscription._unsubscribe; | |
subscription._unsubscribe = (...args) => { | |
return unsubscribeFn.call(target, existingUnsubscribeFn.apply(target, args)); | |
}; | |
return subscription; | |
}; | |
return target; | |
}; | |
function refCountSubject(observable) { | |
var isDisposed; | |
var disposedNext; | |
var disposed = Rx.Observable.create((instance) => { | |
disposedNext = instance.next.bind(instance); | |
return ()=> disposeNext = null; | |
}) | |
.publish(); | |
disposed | |
.connect(); | |
var countNext; | |
var count = Rx.Observable.create((instance) => { | |
countNext = instance.next.bind(instance); | |
return ()=> countNext = null; | |
}) | |
.takeUntil(disposed); | |
var counted = observable | |
.publish() | |
.refCount(); | |
hookSubscribe( | |
counted, | |
(subscription) => { | |
countNext && countNext(counted.refCount); | |
return subscription; | |
}, | |
() => { | |
countNext && countNext(counted.refCount-1); | |
}); | |
return Object.assign(counted.takeUntil(disposed), { | |
refCount: count, | |
dispose : dispose | |
}); | |
function dispose() { | |
timestamped('COUNT dispose()'); | |
isDisposed = true; | |
disposedNext && disposedNext(); | |
} | |
} | |
function coldBehaviourSubject(observable, initial) { | |
var isDisposed; | |
var disposedNext; | |
var disposed = Rx.Observable.create((instance) => { | |
disposedNext = instance.next.bind(instance); | |
return ()=> disposeNext = null; | |
}) | |
.publish(); | |
disposed | |
.connect(); | |
var observed, current, isValid, clear; | |
var value = Rx.Observable.defer(() => { | |
banner('SUBJECT'); | |
observed = observed || observable | |
.do(storeCurrent, undefined, dispose); | |
return isDisposed ? Rx.Observable.empty() : Rx.Observable.merge( | |
Rx.Observable.create((instance) => { | |
timestamped('SUBJECT ~', current || initial); | |
instance.next(current || initial); | |
clear = ()=> { | |
current = undefined; | |
isValid = false; | |
instance.next(initial); | |
}; | |
}), | |
observed | |
) | |
.takeUntil(disposed); | |
function storeCurrent(x) { | |
timestamped('SUBJECT >', x); | |
current = x; | |
isValid = true; | |
} | |
}); | |
return { | |
value : value, | |
dispose : dispose, | |
isDisposed: isDisposed, | |
clear : clear, | |
isValid : isValid | |
}; | |
function dispose() { | |
timestamped('SUBJECT dispose()'); | |
isDisposed = true; | |
disposedNext && disposedNext(); | |
} | |
} | |
function getItem(source) { | |
return source | |
.map((x) => x) | |
.publish() | |
.refCount(); | |
} | |
// ------------- | |
var refCounted = refCountSubject(getSource()); | |
var service = coldBehaviourSubject(refCounted, 'null'); | |
refCounted.refCount | |
.subscribe((count)=> { | |
timestamped('COUNT ', count); | |
}, null, ()=> { | |
timestamped('COUNT lifecycle complete'); | |
}); | |
var subscriptions = []; | |
banner('START'); | |
setTimeout(add('a'), 500); | |
setTimeout(add('b'), 1500); | |
setTimeout(() => { | |
banner('CLEANUP'); | |
// either of the following should work | |
refCounted.dispose(); | |
add('after')(); | |
}, 2500); | |
function add(label) { | |
banner(label); | |
return function delayed() { | |
banner('BEGIN', label); | |
timestamped('subscribe', label, ' item x2, item x1'); | |
var item1 = getItem(service.value); | |
var item2 = getItem(service.value); | |
subscriptions.push( | |
item1.subscribe((x) => timestamped(' ', label, '>', x)), | |
item1.subscribe(() => {}), | |
item2.subscribe(null, null, () => timestamped('COMPLETE', label)) | |
); | |
banner('BEGUN', label); | |
}; | |
}</script></body> | |
</html> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
console.clear(); | |
var start = Date.now(); | |
function now() { | |
return ((Date.now() - start) / 1000 + '0000').slice(0, 5); | |
} | |
function timestamped(...message) { | |
console.log([now()].concat(message).join(' ')); | |
} | |
function banner(...message) { | |
const WIDTH = 40; | |
var text = ' ' + message.join(' ') + ' '; | |
while (text.length < WIDTH) { | |
text = '-' + text + '-'; | |
} | |
timestamped(text.slice(0, WIDTH)); | |
} | |
function getSource(){ | |
var result = Rx.Observable.defer(() => { | |
banner('SOURCE'); | |
return Rx.Observable.interval(300).take(10) | |
.do((x) => { | |
timestamped('SOURCE >', x); | |
}); | |
}) | |
.publish() | |
.refCount(); | |
return hookSubscribe( | |
result, | |
(subscription) => { | |
timestamped('SOURCE subscribe', '(refcount', result.refCount-1, '->', result.refCount, ')'); | |
return subscription; | |
}, | |
() => { | |
timestamped('SOURCE unsubscribe', '(refcount', result.refCount, '->', result.refCount-1, ')'); | |
}); | |
} | |
function hookSubscribe(target, subscribeFn, unsubscribeFn) { | |
var existingSubscribeFn = target._subscribe; | |
target._subscribe = (...args) => { | |
var subscription = subscribeFn.call(target, existingSubscribeFn.apply(target, args)); | |
var existingUnsubscribeFn = subscription._unsubscribe; | |
subscription._unsubscribe = (...args) => { | |
return unsubscribeFn.call(target, existingUnsubscribeFn.apply(target, args)); | |
}; | |
return subscription; | |
}; | |
return target; | |
}; | |
function refCountSubject(observable) { | |
var isDisposed; | |
var disposedNext; | |
var disposed = Rx.Observable.create((instance) => { | |
disposedNext = instance.next.bind(instance); | |
return ()=> disposeNext = null; | |
}) | |
.publish(); | |
disposed | |
.connect(); | |
var countNext; | |
var count = Rx.Observable.create((instance) => { | |
countNext = instance.next.bind(instance); | |
return ()=> countNext = null; | |
}) | |
.takeUntil(disposed); | |
var counted = observable | |
.publish() | |
.refCount(); | |
hookSubscribe( | |
counted, | |
(subscription) => { | |
countNext && countNext(counted.refCount); | |
return subscription; | |
}, | |
() => { | |
countNext && countNext(counted.refCount-1); | |
}); | |
return Object.assign(counted.takeUntil(disposed), { | |
refCount: count, | |
dispose : dispose | |
}); | |
function dispose() { | |
timestamped('COUNT dispose()'); | |
isDisposed = true; | |
disposedNext && disposedNext(); | |
} | |
} | |
function coldBehaviourSubject(observable, initial) { | |
var isDisposed; | |
var disposedNext; | |
var disposed = Rx.Observable.create((instance) => { | |
disposedNext = instance.next.bind(instance); | |
return ()=> disposeNext = null; | |
}) | |
.publish(); | |
disposed | |
.connect(); | |
var observed, current, isValid, clear; | |
var value = Rx.Observable.defer(() => { | |
banner('SUBJECT'); | |
observed = observed || observable | |
.do(storeCurrent, undefined, dispose); | |
return isDisposed ? Rx.Observable.empty() : Rx.Observable.merge( | |
Rx.Observable.create((instance) => { | |
timestamped('SUBJECT ~', current || initial); | |
instance.next(current || initial); | |
clear = ()=> { | |
current = undefined; | |
isValid = false; | |
instance.next(initial); | |
}; | |
}), | |
observed | |
) | |
.takeUntil(disposed); | |
function storeCurrent(x) { | |
timestamped('SUBJECT >', x); | |
current = x; | |
isValid = true; | |
} | |
}); | |
return { | |
value : value, | |
dispose : dispose, | |
isDisposed: isDisposed, | |
clear : clear, | |
isValid : isValid | |
}; | |
function dispose() { | |
timestamped('SUBJECT dispose()'); | |
isDisposed = true; | |
disposedNext && disposedNext(); | |
} | |
} | |
function getItem(source) { | |
return source | |
.map((x) => x) | |
.publish() | |
.refCount(); | |
} | |
// ------------- | |
var refCounted = refCountSubject(getSource()); | |
var service = coldBehaviourSubject(refCounted, 'null'); | |
refCounted.refCount | |
.subscribe((count)=> { | |
timestamped('COUNT ', count); | |
}, null, ()=> { | |
timestamped('COUNT lifecycle complete'); | |
}); | |
var subscriptions = []; | |
banner('START'); | |
setTimeout(add('a'), 500); | |
setTimeout(add('b'), 1500); | |
setTimeout(() => { | |
banner('CLEANUP'); | |
// either of the following should work | |
refCounted.dispose(); | |
add('after')(); | |
}, 2500); | |
function add(label) { | |
banner(label); | |
return function delayed() { | |
banner('BEGIN', label); | |
timestamped('subscribe', label, ' item x2, item x1'); | |
var item1 = getItem(service.value); | |
var item2 = getItem(service.value); | |
subscriptions.push( | |
item1.subscribe((x) => timestamped(' ', label, '>', x)), | |
item1.subscribe(() => {}), | |
item2.subscribe(null, null, () => timestamped('COMPLETE', label)) | |
); | |
banner('BEGUN', label); | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment