Created
January 15, 2020 17:45
-
-
Save TheXenocide/c337db0edc7601736d3f5a5e57c7117c to your computer and use it in GitHub Desktop.
Prototype Blocking Streams Over SharedArrayBuffer With Web Workers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// this could take advantage of other text encodings using existing JS libraries, but for now just using UTF16 with no BOM and both state and current buffer length slots | |
// also could stand to be updated to use "let" where appropriate, etc. it was just a quick demo | |
function StreamWriter(sharedBuffer) { | |
// This could be optimized, but for simplicity, stateView is 2 UInt16s | |
// stateView[0] = stream state ( | |
// 0 = ready for initialization, | |
// 1 = ready for final read | |
// 2 = ready for partial read, | |
// 3 = ready for next write | |
// 4 = complete | |
// stateView[1] = string length to read | |
// additional members could be added to identify if the stream is ready for reuse, etc. | |
console.log("Initializing StreamWriter State. Buffer Length: " + sharedBuffer.byteLength) | |
var stateView = new Uint16Array(sharedBuffer, 0, 2); // buffer, byteStart = 0, typed length (aka 2 UInt16 = 4 bytes) | |
var bufferView = new Uint16Array(sharedBuffer, 2 * 2, (sharedBuffer.byteLength - (2 * 2)) / 2); // byteStart = 4, typed length = (bytes - 4) / 2 | |
if (stateView[0] != 0) throw "Shared Buffer is not ready for stream initialization"; | |
stateView[0] = 3; | |
this.writeString = function (text) { | |
if (stateView[0] < 3) throw "Shared Buffer is not ready for writing"; | |
if (stateView[0] > 3) stateView[0] = 2; | |
console.log("Text Length: " + text.length + ", Buffer View Length: " + bufferView.length) | |
if (text.length <= bufferView.length) { | |
console.log("Writing single block") | |
for (var i = 0; i < text.length; i++) { | |
bufferView[i] = text.charCodeAt(i); | |
} | |
console.log("Setting length"); | |
stateView[1] = text.length; | |
console.log("Setting state to: 1, ready for final read"); | |
// we always set the state flag last to ensure all other data/length is written | |
stateView[0] = 1; // ready for final read | |
} | |
else { | |
console.log("Writing multiple blocks"); | |
var writtenSoFar = 0; | |
while (writtenSoFar < text.length) { | |
console.log("Written so far: " + writtenSoFar) | |
// block until the reader has read the buffer | |
while (stateView[0] != 3) { } // 3 = ready for next write | |
// this could use a ternary staement, but it's written long form for readability | |
var toWriteThisTime = text.length - writtenSoFar; | |
if (toWriteThisTime > bufferView.length) | |
toWriteThisTime = bufferView.length; | |
console.log("To write this time: " + writtenSoFar) | |
for (var textIndex = writtenSoFar, outputIndex = 0; outputIndex < toWriteThisTime; textIndex++ , outputIndex++) { | |
bufferView[outputIndex] = text.charCodeAt(textIndex); | |
} | |
writtenSoFar += toWriteThisTime; | |
console.log("Setting length") | |
stateView[1] = toWriteThisTime; | |
if (writtenSoFar < text.length) { | |
console.log("Setting state to: 2, ready for partial read") | |
stateView[0] = 2; // ready for partial read | |
} else { | |
console.log("Setting state to: 1, ready for final read"); | |
stateView[0] = 1; // ready for final read | |
} | |
} | |
} | |
} | |
// we could also implement a writeAsync that employs setTimeout loops or microtasks but this initial prototype was built to block on purpose | |
} | |
function StreamReader(sharedBuffer) { | |
console.log("Initializing StreamWriter State. Buffer Length: " + sharedBuffer.byteLength) | |
var stateView = new Uint16Array(sharedBuffer, 0, 2); | |
// commented because one side or the other has to perform initialization; might be a better way to coordinate this | |
// if (stateView[0] > 2) throw "Shared Buffer is not ready for stream initialization"; | |
this.readString = function () { | |
var text = ""; | |
while (stateView[0] != 4) { | |
console.log("Waiting for stream state to be readable; current state: " + stateView[0]); | |
var waitCount = 0; | |
// block until there is text to be read | |
while (stateView[0] == 0 || stateView[0] > 2) { // 1 = ready for final read, 2 = ready for partial read | |
waitCount++; | |
if ((waitCount % 100000) == 0) { | |
console.log("Still waiting for readable state; current state: " + stateView[0]); | |
} | |
} | |
var bufferTextLength = stateView[1]; | |
console.log("Preparing to read; current buffer size: " + bufferTextLength); | |
var bufferView = new Uint16Array(sharedBuffer, 2 * 2, bufferTextLength); | |
if (stateView[0] == 1) { // 1 = ready for final read | |
console.log("Reading final text"); | |
text += String.fromCharCode.apply(null, bufferView); | |
stateView[0] = 4; // 4 = complete | |
} else if (stateView[0] == 2) { // 2 = ready for partial read | |
console.log("Reading partial text"); | |
text += String.fromCharCode.apply(null, bufferView); | |
stateView[0] = 3; // 3 = ready for next write | |
} else { | |
throw "Unexpected state transation; current state: " + stateView[0]; | |
} | |
} | |
console.log("Read completed"); | |
return text; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <refererence path="Stream.js" /> | |
var streamWorker = new Worker("StreamWorker.js"); | |
function reverseText(text) { | |
var requestBuffer = new SharedArrayBuffer(65536); | |
var responseBuffer = new SharedArrayBuffer(65536); | |
var requestWriter = new StreamWriter(requestBuffer); | |
var responseReader = new StreamReader(responseBuffer); | |
streamWorker.postMessage({ | |
requestBuffer: requestBuffer, | |
responseBuffer: responseBuffer | |
}); | |
console.log("About to write to request stream"); | |
requestWriter.writeString(text); | |
console.log("About to read from response stream"); | |
var result = responseReader.readString(); | |
//console.log("REUSLT: " + result); | |
return result; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
importScripts("Stream.js"); | |
function reverseString(str) { | |
// Check input | |
if (!str || str.length < 2 || | |
typeof str !== 'string') { | |
return 'Not valid'; | |
} | |
// Take empty array revArray | |
const revArray = []; | |
const length = str.length - 1; | |
// Looping from the end | |
for (let i = length; i >= 0; i--) { | |
revArray.push(str[i]); | |
} | |
// Joining the array elements | |
return revArray.join(''); | |
} | |
// this was a self.addListener-type thing before and probably should be; was changed during troubleshooting | |
onmessage = function (e) { | |
console.log("Received data streams"); | |
var requestReader = new StreamReader(e.data.requestBuffer); | |
var responseWriter = new StreamWriter(e.data.responseBuffer); | |
var reqText = requestReader.readString(); | |
var respText = reverseString(reqText); | |
responseWriter.writeString(respText); | |
} | |
console.log("Loaded StreamWorker"); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
These streams are blocking, which is not exactly advisable, but this sample shows how to use typed array views over shared buffers in a way that coordinates data transfer with a web worker without using
postMessage
. It's very much a quick prototype so there's plenty of cleanup that could be done (read: "includes free typos and outdated JS practices!"), but I thought it might be worth sharing, if only for basic demonstration of some newer/less blogged about technologies.String reverse logic from here: https://www.geeksforgeeks.org/reverse-a-string-in-javascript/