Skip to content

Instantly share code, notes, and snippets.

@uzimith
Last active December 21, 2017 02:04
Show Gist options
  • Save uzimith/5376295af0daff63f4e85d087b4234e7 to your computer and use it in GitHub Desktop.
Save uzimith/5376295af0daff63f4e85d087b4234e7 to your computer and use it in GitHub Desktop.
stream API / async iteration
{
"presets": ["env"],
"plugins": ["transform-runtime", "transform-async-generator-functions"]
}
import { Readable, Writable } from 'stream'
const sleep = msec => new Promise(resolve => setTimeout(resolve, msec))
const fetchFromRemote = async (n) => {
await sleep(1000)
return n
}
// Stream API
// (出力を求める計算を行って その出力が求まった時 、その結果を非同期に出す)
class MyReadable extends Readable {
constructor(max) {
super();
this.max = max;
this.state = 0;
}
async _read() {
console.log("(stream) fetchFromRemote call", this.state)
const result = await fetchFromRemote(this.state)
console.log("(stream) fetchFromRemote called", this.state)
this.state++
if (result <= this.max) {
this.push(result.toString());
} else {
this.push(null);
}
}
}
class MyWritable extends Writable {
async _write(chunk, encoding, callback) {
console.log("(Stream) process start")
console.log(chunk.toString())
console.log("(Stream) process finish")
console.log()
callback()
}
}
new MyReadable(3).pipe(new MyWritable())
// Async Iterator
async function* read(max) {
let state = 0
while (true) {
console.log(" (iterator) fetchFromRemote call", state)
const result = await fetchFromRemote(state)
console.log(" (iterator) fetchFromRemote called", state)
state++
if (result <= max) {
yield result.toString();
} else {
break
}
}
}
(async () => {
for await (const chunk of read(3)) {
console.log(" (iterator) process start")
console.log(" ", chunk.toString())
console.log(" (iterator) process finish")
console.log("")
}
})()
console.log(" --- 0s ---")
let count = 1;
setInterval(() => console.log(` --- ${count++}s --`),1000);
// (iterator) fetchFromRemote call 0
// --- 0s ---
// (stream) fetchFromRemote call 0
// --- 1s --
// (iterator) fetchFromRemote called 0
// (stream) fetchFromRemote called 0
// (Stream) process start
// 0
// (Stream) process finish
//
// (stream) fetchFromRemote call 1
// (iterator) process start
// 0
// (iterator) process finish
//
// (iterator) fetchFromRemote call 1
// --- 2s --
// (stream) fetchFromRemote called 1
// (Stream) process start
// 1
// (Stream) process finish
//
// (stream) fetchFromRemote call 2
// (iterator) fetchFromRemote called 1
// (iterator) process start
// 1
// (iterator) process finish
//
// (iterator) fetchFromRemote call 2
// --- 3s --
// (stream) fetchFromRemote called 2
// (Stream) process start
// 2
// (Stream) process finish
//
// (stream) fetchFromRemote call 3
// (iterator) fetchFromRemote called 2
// (iterator) process start
// 2
// (iterator) process finish
//
// (iterator) fetchFromRemote call 3
// --- 4s --
// (stream) fetchFromRemote called 3
// (Stream) process start
// 3
// (Stream) process finish
//
// (stream) fetchFromRemote call 4
// (iterator) fetchFromRemote called 3
// (iterator) process start
// 3
// (iterator) process finish
//
// (iterator) fetchFromRemote call 4
// --- 5s --
// (stream) fetchFromRemote called 4
// (iterator) fetchFromRemote called 4
// --- 6s --
// --- 7s --
import { Readable, Writable } from 'stream'
const sleep = msec => new Promise(resolve => setTimeout(resolve, msec))
const fetchFromRemote = async (n) => {
await sleep(1000)
return n
}
class MyReadable extends Readable {
constructor(max) {
super();
this.max = max;
this.state = 0;
}
async _read() {
console.log("(stream) fetchFromRemote call", this.state)
const result = await fetchFromRemote(this.state)
console.log("(stream) fetchFromRemote called", this.state)
this.state++
if (result <= this.max) {
this.push(result.toString());
} else {
this.push(null);
}
}
}
class MyWritable extends Writable {
async _write(chunk, encoding, callback) {
console.log("(Stream) process start")
console.log(chunk.toString())
await sleep(3000)
console.log("(Stream) process finish")
console.log()
callback()
}
}
new MyReadable(3).pipe(new MyWritable())
// (stream) fetchFromRemote call 0
// (stream) fetchFromRemote called 0
// (Stream) process start
// 0
// (stream) fetchFromRemote call 1
// (stream) fetchFromRemote called 1
// (stream) fetchFromRemote call 2
// (stream) fetchFromRemote called 2
// (stream) fetchFromRemote call 3
// (Stream) process finish
//
// (Stream) process start
// 1
// (stream) fetchFromRemote called 3
// (stream) fetchFromRemote call 4
// (stream) fetchFromRemote called 4
// (Stream) process finish
//
// (Stream) process start
// 2
// (Stream) process finish
//
// (Stream) process start
// 3
// (Stream) process finish
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment