Last active
June 20, 2019 19:56
-
-
Save Idorobots/84efbab01276ac30d9c74fd60f42c6f1 to your computer and use it in GitHub Desktop.
Reactive streams?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(define-struct subscriber (on-subscription on-next on-error on-complete)) | |
(define-struct subscription (on-request on-cancel)) | |
(define-struct publisher (on-subscriber on-publish on-error on-complete)) | |
(define (subscription on-request on-cancel) | |
(make-subscription on-request on-cancel)) | |
(define (request sub n) | |
((subscription-on-request sub) n)) | |
(define (cancel sub) | |
((subscription-on-cancel sub))) | |
(define (subscriber on-subscription on-next on-error on-complete) | |
(make-subscriber on-subscription on-next on-error on-complete)) | |
(define (on-subscription s sub) | |
((subscriber-on-subscription s) sub)) | |
(define (on-next s v) | |
((subscriber-on-next s) v)) | |
(define (on-error s e) | |
((subscriber-on-error s) e)) | |
(define (on-complete s) | |
((subscriber-on-complete s))) | |
(define (publisher on-subscriber on-publish on-error on-complete) | |
(make-publisher on-subscriber on-publish on-error on-complete)) | |
(define (subscribe p s) | |
((publisher-on-subscriber p) s)) | |
(define (publish p v) | |
((publisher-on-publish p) v)) | |
(define (error p e) | |
((publisher-on-error p) e)) | |
(define (complete p) | |
((publisher-on-complete p))) | |
;; A useful publisher | |
(define-struct subscription-data (demand queue) #:mutable) | |
(define (queueing-multi-publisher) | |
(let ((subscribers '())) | |
(publisher (lambda (s) | |
(unless (member s (map car subscribers)) | |
(let* ((data (make-subscription-data 0 '())) | |
(sub (subscription | |
(lambda (count) | |
(let* ((q (subscription-data-queue data)) | |
(len (length q))) | |
(cond ((>= len count) | |
(map (curry on-next s) | |
(reverse (drop q (- len count)))) | |
(set-subscription-data-queue! data (take q (- len count)))) | |
((> len 0) | |
(map (curry on-next s) | |
(reverse q)) | |
(set-subscription-data-queue! data '()) | |
(set-subscription-data-demand! data (- count len))) | |
('else | |
(set-subscription-data-demand! data (+ count (subscription-data-demand data))))))) | |
(lambda () | |
(set! subscribers | |
(filter (lambda (e) | |
(not (equal? s (car e)))) | |
subscribers)))))) | |
(set! subscribers (cons (cons s data) subscribers)) | |
(on-subscription s sub)))) | |
(lambda (v) | |
(map (lambda (e) | |
(let* ((s (car e)) | |
(data (cdr e)) | |
(demand (subscription-data-demand data))) | |
(if (> demand 0) | |
(begin (on-next s v) | |
(set-subscription-data-demand! data (- demand 1))) | |
(set-subscription-data-queue! data (cons v (subscription-data-queue data)))))) | |
subscribers)) | |
(lambda (error) | |
(map (lambda (e) | |
(on-error (car e) error)) | |
subscribers) | |
(set! subscribers '())) | |
(lambda () | |
(map (compose on-complete car) subscribers) | |
(set! subscribers '()))))) | |
;; Actual streams | |
(define-struct stream-stage (subscriber publisher)) | |
(define (source-from-publisher p) | |
(make-stream-stage '() p)) | |
(define (flow f) | |
(let* ((in-sub '()) | |
(out-sub '()) | |
(p (publisher (lambda (s) | |
(set! out-sub s) | |
(on-subscription s (subscription | |
(lambda (count) | |
(request in-sub count)) | |
(lambda () | |
(cancel in-sub))))) | |
(lambda (v) | |
(on-next out-sub v)) | |
(lambda (e) | |
(on-error out-sub)) | |
(lambda () | |
(on-complete out-sub)))) | |
(s (subscriber (lambda (s) | |
(set! in-sub s)) | |
(lambda (v) | |
(publish p (f v))) | |
(lambda (e) | |
(error p e)) | |
(lambda () | |
(complete p))))) | |
(make-stream-stage s p))) | |
(define (sink-foreach f) | |
(make-stream-stage (subscriber (curry (flip request) 123423425564889876543) ; All the thing! | |
f | |
f | |
identity) | |
'())) | |
(define (via source flow) | |
(subscribe (stream-stage-publisher source) | |
(stream-stage-subscriber flow)) | |
flow) | |
(define to via) | |
;; Exmaple | |
(define p (queueing-multi-publisher)) | |
(define s (-> (source-from-publisher p) | |
(via (flow (lambda (v) | |
(* 2 v)))) | |
(to (sink-foreach println)))) | |
(publish p 23) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment