Skip to content

Instantly share code, notes, and snippets.

@bholloway
Forked from anonymous/index.html
Last active January 14, 2016 07:23
Show Gist options
  • Save bholloway/0a9ece7d8ff48bfc7893 to your computer and use it in GitHub Desktop.
Save bholloway/0a9ece7d8ff48bfc7893 to your computer and use it in GitHub Desktop.
RxJS cold behaviour subject RxJS cold behaviour subject // source http://jsbin.com/golani
<!DOCTYPE html>
<html>
<head>
<meta name="description" content="RxJS cold behaviour subject">
<meta charset="utf-8">
<title>RxJS cold behaviour subject</title>
<script src="https://npmcdn.com/@reactivex/[email protected]/dist/global/Rx.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/q.js/1.0.1/q.js"></script>
</head>
<body>
<ui-view></ui-view>
<script id="jsbin-javascript">
console.clear();
var start = Date.now();
function now() {
return ((Date.now() - start) / 1000 + '0000').slice(0, 5);
}
function timestamped(...message) {
console.log([now()].concat(message).join(' '));
}
function banner(...message) {
const WIDTH = 40;
var text = ' ' + message.join(' ') + ' ';
while (text.length < WIDTH) {
text = '-' + text + '-';
}
timestamped(text.slice(0, WIDTH));
}
function getSource(){
var result = Rx.Observable.defer(() => {
banner('SOURCE');
return Rx.Observable.interval(300).take(10)
.do((x) => {
timestamped('SOURCE >', x);
});
})
.publish()
.refCount();
return hookSubscribe(
result,
(subscription) => {
timestamped('SOURCE subscribe', '(refcount', result.refCount-1, '->', result.refCount, ')');
return subscription;
},
() => {
timestamped('SOURCE unsubscribe', '(refcount', result.refCount, '->', result.refCount-1, ')');
});
}
function hookSubscribe(target, subscribeFn, unsubscribeFn) {
var existingSubscribeFn = target._subscribe;
target._subscribe = (...args) => {
var subscription = subscribeFn.call(target, existingSubscribeFn.apply(target, args));
var existingUnsubscribeFn = subscription._unsubscribe;
subscription._unsubscribe = (...args) => {
return unsubscribeFn.call(target, existingUnsubscribeFn.apply(target, args));
};
return subscription;
};
return target;
};
function refCountSubject(observable) {
var isDisposed;
var disposedNext;
var disposed = Rx.Observable.create((instance) => {
disposedNext = instance.next.bind(instance);
return ()=> disposeNext = null;
})
.publish();
disposed
.connect();
var countNext;
var count = Rx.Observable.create((instance) => {
countNext = instance.next.bind(instance);
return ()=> countNext = null;
})
.takeUntil(disposed);
var counted = observable
.publish()
.refCount();
hookSubscribe(
counted,
(subscription) => {
countNext && countNext(counted.refCount);
return subscription;
},
() => {
countNext && countNext(counted.refCount-1);
});
return Object.assign(counted.takeUntil(disposed), {
refCount: count,
dispose : dispose
});
function dispose() {
timestamped('COUNT dispose()');
isDisposed = true;
disposedNext && disposedNext();
}
}
function coldBehaviourSubject(observable, initial) {
var isDisposed;
var disposedNext;
var disposed = Rx.Observable.create((instance) => {
disposedNext = instance.next.bind(instance);
return ()=> disposeNext = null;
})
.publish();
disposed
.connect();
var observed, current, isValid, clear;
var value = Rx.Observable.defer(() => {
banner('SUBJECT');
observed = observed || observable
.do(storeCurrent, undefined, dispose);
return isDisposed ? Rx.Observable.empty() : Rx.Observable.merge(
Rx.Observable.create((instance) => {
timestamped('SUBJECT ~', current || initial);
instance.next(current || initial);
clear = ()=> {
current = undefined;
isValid = false;
instance.next(initial);
};
}),
observed
)
.takeUntil(disposed);
function storeCurrent(x) {
timestamped('SUBJECT >', x);
current = x;
isValid = true;
}
});
return {
value : value,
dispose : dispose,
isDisposed: isDisposed,
clear : clear,
isValid : isValid
};
function dispose() {
timestamped('SUBJECT dispose()');
isDisposed = true;
disposedNext && disposedNext();
}
}
function getItem(source) {
return source
.map((x) => x)
.publish()
.refCount();
}
// -------------
var refCounted = refCountSubject(getSource());
var service = coldBehaviourSubject(refCounted, 'null');
refCounted.refCount
.subscribe((count)=> {
timestamped('COUNT ', count);
}, null, ()=> {
timestamped('COUNT lifecycle complete');
});
var subscriptions = [];
banner('START');
setTimeout(add('a'), 500);
setTimeout(add('b'), 1500);
setTimeout(() => {
banner('CLEANUP');
// either of the following should work
refCounted.dispose();
add('after')();
}, 2500);
function add(label) {
banner(label);
return function delayed() {
banner('BEGIN', label);
timestamped('subscribe', label, ' item x2, item x1');
var item1 = getItem(service.value);
var item2 = getItem(service.value);
subscriptions.push(
item1.subscribe((x) => timestamped(' ', label, '>', x)),
item1.subscribe(() => {}),
item2.subscribe(null, null, () => timestamped('COMPLETE', label))
);
banner('BEGUN', label);
};
}
</script>
<script id="jsbin-source-javascript" type="text/javascript">console.clear();
var start = Date.now();
function now() {
return ((Date.now() - start) / 1000 + '0000').slice(0, 5);
}
function timestamped(...message) {
console.log([now()].concat(message).join(' '));
}
function banner(...message) {
const WIDTH = 40;
var text = ' ' + message.join(' ') + ' ';
while (text.length < WIDTH) {
text = '-' + text + '-';
}
timestamped(text.slice(0, WIDTH));
}
function getSource(){
var result = Rx.Observable.defer(() => {
banner('SOURCE');
return Rx.Observable.interval(300).take(10)
.do((x) => {
timestamped('SOURCE >', x);
});
})
.publish()
.refCount();
return hookSubscribe(
result,
(subscription) => {
timestamped('SOURCE subscribe', '(refcount', result.refCount-1, '->', result.refCount, ')');
return subscription;
},
() => {
timestamped('SOURCE unsubscribe', '(refcount', result.refCount, '->', result.refCount-1, ')');
});
}
function hookSubscribe(target, subscribeFn, unsubscribeFn) {
var existingSubscribeFn = target._subscribe;
target._subscribe = (...args) => {
var subscription = subscribeFn.call(target, existingSubscribeFn.apply(target, args));
var existingUnsubscribeFn = subscription._unsubscribe;
subscription._unsubscribe = (...args) => {
return unsubscribeFn.call(target, existingUnsubscribeFn.apply(target, args));
};
return subscription;
};
return target;
};
function refCountSubject(observable) {
var isDisposed;
var disposedNext;
var disposed = Rx.Observable.create((instance) => {
disposedNext = instance.next.bind(instance);
return ()=> disposeNext = null;
})
.publish();
disposed
.connect();
var countNext;
var count = Rx.Observable.create((instance) => {
countNext = instance.next.bind(instance);
return ()=> countNext = null;
})
.takeUntil(disposed);
var counted = observable
.publish()
.refCount();
hookSubscribe(
counted,
(subscription) => {
countNext && countNext(counted.refCount);
return subscription;
},
() => {
countNext && countNext(counted.refCount-1);
});
return Object.assign(counted.takeUntil(disposed), {
refCount: count,
dispose : dispose
});
function dispose() {
timestamped('COUNT dispose()');
isDisposed = true;
disposedNext && disposedNext();
}
}
function coldBehaviourSubject(observable, initial) {
var isDisposed;
var disposedNext;
var disposed = Rx.Observable.create((instance) => {
disposedNext = instance.next.bind(instance);
return ()=> disposeNext = null;
})
.publish();
disposed
.connect();
var observed, current, isValid, clear;
var value = Rx.Observable.defer(() => {
banner('SUBJECT');
observed = observed || observable
.do(storeCurrent, undefined, dispose);
return isDisposed ? Rx.Observable.empty() : Rx.Observable.merge(
Rx.Observable.create((instance) => {
timestamped('SUBJECT ~', current || initial);
instance.next(current || initial);
clear = ()=> {
current = undefined;
isValid = false;
instance.next(initial);
};
}),
observed
)
.takeUntil(disposed);
function storeCurrent(x) {
timestamped('SUBJECT >', x);
current = x;
isValid = true;
}
});
return {
value : value,
dispose : dispose,
isDisposed: isDisposed,
clear : clear,
isValid : isValid
};
function dispose() {
timestamped('SUBJECT dispose()');
isDisposed = true;
disposedNext && disposedNext();
}
}
function getItem(source) {
return source
.map((x) => x)
.publish()
.refCount();
}
// -------------
var refCounted = refCountSubject(getSource());
var service = coldBehaviourSubject(refCounted, 'null');
refCounted.refCount
.subscribe((count)=> {
timestamped('COUNT ', count);
}, null, ()=> {
timestamped('COUNT lifecycle complete');
});
var subscriptions = [];
banner('START');
setTimeout(add('a'), 500);
setTimeout(add('b'), 1500);
setTimeout(() => {
banner('CLEANUP');
// either of the following should work
refCounted.dispose();
add('after')();
}, 2500);
function add(label) {
banner(label);
return function delayed() {
banner('BEGIN', label);
timestamped('subscribe', label, ' item x2, item x1');
var item1 = getItem(service.value);
var item2 = getItem(service.value);
subscriptions.push(
item1.subscribe((x) => timestamped(' ', label, '>', x)),
item1.subscribe(() => {}),
item2.subscribe(null, null, () => timestamped('COMPLETE', label))
);
banner('BEGUN', label);
};
}</script></body>
</html>
console.clear();
var start = Date.now();
function now() {
return ((Date.now() - start) / 1000 + '0000').slice(0, 5);
}
function timestamped(...message) {
console.log([now()].concat(message).join(' '));
}
function banner(...message) {
const WIDTH = 40;
var text = ' ' + message.join(' ') + ' ';
while (text.length < WIDTH) {
text = '-' + text + '-';
}
timestamped(text.slice(0, WIDTH));
}
function getSource(){
var result = Rx.Observable.defer(() => {
banner('SOURCE');
return Rx.Observable.interval(300).take(10)
.do((x) => {
timestamped('SOURCE >', x);
});
})
.publish()
.refCount();
return hookSubscribe(
result,
(subscription) => {
timestamped('SOURCE subscribe', '(refcount', result.refCount-1, '->', result.refCount, ')');
return subscription;
},
() => {
timestamped('SOURCE unsubscribe', '(refcount', result.refCount, '->', result.refCount-1, ')');
});
}
function hookSubscribe(target, subscribeFn, unsubscribeFn) {
var existingSubscribeFn = target._subscribe;
target._subscribe = (...args) => {
var subscription = subscribeFn.call(target, existingSubscribeFn.apply(target, args));
var existingUnsubscribeFn = subscription._unsubscribe;
subscription._unsubscribe = (...args) => {
return unsubscribeFn.call(target, existingUnsubscribeFn.apply(target, args));
};
return subscription;
};
return target;
};
function refCountSubject(observable) {
var isDisposed;
var disposedNext;
var disposed = Rx.Observable.create((instance) => {
disposedNext = instance.next.bind(instance);
return ()=> disposeNext = null;
})
.publish();
disposed
.connect();
var countNext;
var count = Rx.Observable.create((instance) => {
countNext = instance.next.bind(instance);
return ()=> countNext = null;
})
.takeUntil(disposed);
var counted = observable
.publish()
.refCount();
hookSubscribe(
counted,
(subscription) => {
countNext && countNext(counted.refCount);
return subscription;
},
() => {
countNext && countNext(counted.refCount-1);
});
return Object.assign(counted.takeUntil(disposed), {
refCount: count,
dispose : dispose
});
function dispose() {
timestamped('COUNT dispose()');
isDisposed = true;
disposedNext && disposedNext();
}
}
function coldBehaviourSubject(observable, initial) {
var isDisposed;
var disposedNext;
var disposed = Rx.Observable.create((instance) => {
disposedNext = instance.next.bind(instance);
return ()=> disposeNext = null;
})
.publish();
disposed
.connect();
var observed, current, isValid, clear;
var value = Rx.Observable.defer(() => {
banner('SUBJECT');
observed = observed || observable
.do(storeCurrent, undefined, dispose);
return isDisposed ? Rx.Observable.empty() : Rx.Observable.merge(
Rx.Observable.create((instance) => {
timestamped('SUBJECT ~', current || initial);
instance.next(current || initial);
clear = ()=> {
current = undefined;
isValid = false;
instance.next(initial);
};
}),
observed
)
.takeUntil(disposed);
function storeCurrent(x) {
timestamped('SUBJECT >', x);
current = x;
isValid = true;
}
});
return {
value : value,
dispose : dispose,
isDisposed: isDisposed,
clear : clear,
isValid : isValid
};
function dispose() {
timestamped('SUBJECT dispose()');
isDisposed = true;
disposedNext && disposedNext();
}
}
function getItem(source) {
return source
.map((x) => x)
.publish()
.refCount();
}
// -------------
var refCounted = refCountSubject(getSource());
var service = coldBehaviourSubject(refCounted, 'null');
refCounted.refCount
.subscribe((count)=> {
timestamped('COUNT ', count);
}, null, ()=> {
timestamped('COUNT lifecycle complete');
});
var subscriptions = [];
banner('START');
setTimeout(add('a'), 500);
setTimeout(add('b'), 1500);
setTimeout(() => {
banner('CLEANUP');
// either of the following should work
refCounted.dispose();
add('after')();
}, 2500);
function add(label) {
banner(label);
return function delayed() {
banner('BEGIN', label);
timestamped('subscribe', label, ' item x2, item x1');
var item1 = getItem(service.value);
var item2 = getItem(service.value);
subscriptions.push(
item1.subscribe((x) => timestamped(' ', label, '>', x)),
item1.subscribe(() => {}),
item2.subscribe(null, null, () => timestamped('COMPLETE', label))
);
banner('BEGUN', label);
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment