-
-
Save tinovyatkin/858d9ff869abf518c14779e1e6b9bed7 to your computer and use it in GitHub Desktop.
RxJS HTTP Server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Rx HTTP Server | |
// first attempt at creating a minimal reactive nodejs HTTP server | |
// | |
// has: | |
// * url/querystring parsing | |
// * body parsing | |
// * some kind of routing | |
// * some kind of error handling | |
// * minimalist request logging | |
// | |
// has not: | |
// * robust routing | |
// * cookie parsing | |
// * solid error handling | |
// * file uploads | |
const Rx = require("rxjs/Rx"), | |
fs = require("fs"), | |
http = require("http"), | |
url = require("url"), | |
mime = require("mime"); | |
// creates Observable from a nodejs ReadableStream | |
const fromReadableStream = stream => { | |
stream.pause(); | |
return Rx.Observable.create(observer => { | |
let next = chunk => observer.next(chunk), | |
complete = () => observer.complete(), | |
error = err => observer.error(err); | |
stream | |
.on('data', next) | |
.on('error', error) | |
.on('end', complete) | |
.resume(); | |
return () => { | |
stream.removeListener('data',next); | |
stream.removeListener('error',error); | |
stream.removeListener('end',complete); | |
}; | |
}).share(); | |
}; | |
// creates an HTTP server observable | |
// I wonder if this is a case where I should be considering doing some OOP stuff with a RxServer class or some nonesense... nah | |
const createHttpRxStream = (http,port) => { | |
return Rx.Observable | |
.create(observer => { | |
// create a http server that emits a connection event of the request and response objects | |
const server = http.createServer((req,res) => observer.next({req,res})).listen(port); | |
// close the server as our unsubscriber fn | |
return server.close.bind(server); | |
}); | |
}; | |
// url parsing middleware | |
// add querystring object and other URL parsed data to the request object | |
const urlParser = ({req}) => { | |
const urlObj = url.parse(req.url,true); | |
req.query = urlObj.query; | |
req.hash = urlObj.hash; | |
req.pathname = urlObj.pathname; | |
req.search = urlObj.search; | |
}; | |
// request logging middleware | |
// log the incoming request data | |
const logger = ({req}) => console.log(`${req.headers.host} - - ${req.method} ${req.headers['content-type'] || '-'} ${req.url} - ${req.headers['user-agent'] || '-'}`); | |
// body parsing middleware | |
// adds the `rawBody` buffer and the parsed `body` object/string to the request object | |
// this returns an observable so it needs to be added to the stream using `flatMap` rather than `do` or regular `map` | |
const bodyParser = (conn) => { | |
let { req, res } = conn; | |
if(req.method!="POST" && req.method != "PUT" && req.method != "PATCH") return Rx.Observable.of(conn); | |
// PRO - will allow subsequent handlers to have ready access to the body data | |
// CON - waits on body parsing before continuing operation, so small lag on requests that have no need for the body data | |
let body_ = fromReadableStream(req) | |
.toArray() | |
.map(chunks => Buffer.concat(chunks)) | |
.do(rawBody => req.rawBody=rawBody) | |
.map(rawBody => { | |
switch(req.headers["content-type"]) { | |
case "application/json": | |
case "application/javascript": | |
return JSON.parse(rawBody); | |
default: | |
return rawBody.toString(); | |
} | |
}) | |
.do(body => req.body=body) | |
.do(body => console.log("request body: ", req.body, typeof req.body)) | |
.map(body => conn) | |
.catch(err => { | |
console.log("Error caught: ", err); | |
if(err instanceof SyntaxError) { | |
res.writeHead(400, { 'Content-Type': 'text/plain' }); | |
res.end("Bad JSON"); | |
} else { | |
res.writeHead(500, { 'Content-Type': 'text/plain' }); | |
res.end("Internal Server Error"); | |
} | |
return body_; | |
}); | |
return body_; | |
}; | |
// this is our trunkline subscription endpoint (sink) | |
const _404 = { | |
"next": ({res}) => { | |
res.writeHead(404, { 'Content-Type': 'text/plain' }); | |
res.end("Not Found"); | |
}, | |
"error": err => console.log("default server connection stream error: ", err), | |
"complete": () => console.log("default server connection stream completed") | |
}; | |
// this is our index handler, this is where we'd deliver the root of a client web app | |
const _index = { | |
"next": ({res}) => { | |
res.writeHead(200, { 'Content-Type': 'text/plain' }); | |
res.end("Hello World\n"); | |
}, | |
"error": err => console.log("index server connection stream error: ", err), | |
"complete": () => console.log("index server connection stream completed") | |
}; | |
// this our interaction tracking handler, all tracking requests (POST /interaction) should come here | |
const _tracker = { | |
"next": ({res}) => { | |
res.writeHead(200, { 'Content-Type': 'application/json' }); | |
res.end('{ "ok": true }'); | |
}, | |
"error": err => console.log("`tracking` resource connection stream error: ", err), | |
"complete": () => console.log("`tracking` resource connection stream completed") | |
} | |
// take a folder path and make a subscriber which will server matching files from that path | |
// ideally also has a mechanism to re-emit connections into another stream | |
const createStaticSubscriber = (dir) => { | |
return { | |
"next": (conn) => { | |
let {req,res} = conn, | |
pathname = __dirname + dir + (req.pathname=="/" ? "/index.html" : req.pathname); | |
console.log("get static file at ", pathname); | |
fs.readFile(pathname, (err,file) => { | |
if(err) { | |
if(err.code=="ENOENT") { | |
// TODO - fix up this stuff here | |
console.log("static file 404"); | |
return _404.next(conn); | |
} | |
console.log("problem getting the file", err); | |
res.writeHead(400, { 'Content-Type': 'text/plain' }); | |
res.end(err.message); | |
return; | |
} | |
res.writeHead(200, { 'Content-Type': mime.lookup(pathname) }); | |
res.end(file.toString()); | |
}); | |
}, | |
"error": err => console.log(`${dir} static resource connection stream error: `, err), | |
"complete": () => console.log(`${dir} static resource connection stream completed`) | |
} | |
}; | |
// this actually creates our server stream and sets it up to share the events | |
const server_ = Rx.Observable | |
.onErrorResumeNext( | |
createHttpRxStream(http,8000) | |
.do(urlParser) | |
.do(logger) | |
.flatMap(bodyParser)) | |
.share(); | |
// Rx Routing | |
// take a trunk stream and a dictionary of branching predicate functions | |
// return a matching dictionary of branch streams which produce events from the trunk stream if passing the predicate | |
// adds a default branch to the returned dictionary which produces all the events that matched none of the predicates | |
// to consider, adding some kind of "break" or stopPropagation functionality to stop the event if it matches | |
const branchStream = (trunk$,cases) => { | |
let branches = {}, | |
branchList = []; | |
Object.keys(cases) | |
.forEach(k=>{ | |
let predicate = cases[k]; | |
branch = new Rx.Subject(); | |
branches[k] = branch; | |
branchList.push([predicate,branch]); | |
}); | |
branches.default = new Rx.Subject(); | |
trunk$.subscribe({ | |
next: (e)=>{ | |
let gutter = true; | |
branchList.forEach(([predicate,branch])=>{ | |
if(predicate(e)) { | |
branch.next(e); | |
gutter=false; | |
} | |
}); | |
if(gutter) { | |
branches.default.next(e); | |
} | |
}, | |
error: err=>console.error(err), | |
complete: ()=>{ | |
branchList.forEach(([predicate,branch])=>branch.complete()); | |
} | |
}); | |
return branches; | |
} | |
const routes = { | |
"getAll$": ({req})=>(req.method=="GET"), | |
"postInteraction$": ({req})=>(req.pathname=="/interaction"&&req.method=="POST") | |
}, | |
Router = branchStream(server_,routes); | |
// console.log("our router!", Router); | |
// GET static subscription | |
// should handle all requests in this stream with a static file or 404 | |
// need to consider various test cases for propagating events that match multiple routes | |
Router.getAll$.subscribe(createStaticSubscriber("/public")); | |
// POST Interaction subscription | |
// we should try to collect over a time interval and process all the interaction requests at once | |
Router.postInteraction$.subscribe(_tracker); | |
// Send everything that isn't routed somewhere to 404ville | |
Router.default.subscribe(_404); | |
; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment