Created
December 17, 2019 04:16
-
-
Save doug-martin/b434a04f164c81da82165f4adcb144ec to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const path = require('path'); | |
const fs = require('fs'); | |
const { Transform } = require('stream'); | |
const csv = require('fast-csv'); | |
class PersistStream extends Transform { | |
constructor(args) { | |
super({ objectMode: true, ...(args || {}) }); | |
this.batchSize = 100; | |
this.batch = []; | |
if (args && args.batchSize) { | |
this.batchSize = args.batchSize; | |
} | |
} | |
_transform(record, encoding, callback) { | |
this.batch.push(record); | |
if (this.shouldSaveBatch) { | |
// we have hit our batch size to process the records as a batch | |
this.processRecords() | |
// we successfully processed the records so callback | |
.then(() => callback()) | |
// An error occurred! | |
.catch(err => err(err)); | |
return; | |
} | |
// we shouldnt persist so ignore | |
callback(); | |
} | |
_flush(callback) { | |
if (this.batch.length) { | |
// handle any leftover records that were not persisted because the batch was too small | |
this.processRecords() | |
// we successfully processed the records so callback | |
.then(() => callback()) | |
// An error occurred! | |
.catch(err => err(err)); | |
return; | |
} | |
// no records to persist so just call callback | |
callback(); | |
} | |
pushRecords(records) { | |
// emit each record for down stream processing | |
records.forEach(r => this.push(r)); | |
} | |
get shouldSaveBatch() { | |
// this could be any check, for this example is is record cont | |
return this.batch.length >= this.batchSize; | |
} | |
async processRecords() { | |
// save the records | |
const records = await this.saveBatch(); | |
// besure to emit them | |
this.pushRecords(records); | |
return records; | |
} | |
async saveBatch() { | |
const records = this.batch; | |
this.batch = []; | |
console.log(`Saving batch [noOfRecords=${records.length}]`); | |
// This is where you should save/update/delete the records | |
return new Promise(res => { | |
setTimeout(() => res(records), 100); | |
}); | |
} | |
} | |
const processCsv = ({ file, batchSize }) => | |
new Promise((res, rej) => { | |
let recordCount = 0; | |
fs.createReadStream(file) | |
// catch file read errors | |
.on('error', err => rej(err)) | |
.pipe(csv.parse({ headers: true })) | |
// catch an parsing errors | |
.on('error', err => rej(err)) | |
// pipe into our processing stream | |
.pipe(new PersistStream({ batchSize })) | |
.on('error', err => rej(err)) | |
.on('data', () => { | |
recordCount += 1; | |
}) | |
.on('end', () => res({ event: 'end', recordCount })); | |
}); | |
const file = path.resolve(__dirname, `batch_write.csv`); | |
// end early after 30000 records | |
processCsv({ file, batchSize: 5 }) | |
.then(({ event, recordCount }) => { | |
console.log(`Done Processing [event=${event}] [recordCount=${recordCount}]`); | |
}) | |
.catch(e => { | |
console.error(e.stack); | |
}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
id | first_name | last_name | address | |
---|---|---|---|---|
1 | Bob | Yukon | 1111 State St. Yukon AK | |
2 | Sally | Yukon | 1111 State St. Yukon AK | |
3 | Bobby | Yukon | 1111 State St. Yukon AK | |
4 | Jane | Yukon | 1111 State St. Yukon AK | |
5 | Dick | Yukon | 1111 State St. Yukon AK | |
6 | John | Doe | 1112 State St. Yukon AK | |
7 | Jane | Doe | 1113 State St. Yukon AK | |
8 | Billy | Doe | 1112 State St. Yukon AK | |
9 | Edith | Doe | 1112 State St. Yukon AK | |
10 | Bob | Yukon | 1111 State St. Yukon AK | |
11 | Sally | Yukon | 1111 State St. Yukon AK | |
12 | Bobby | Yukon | 1111 State St. Yukon AK | |
13 | Jane | Yukon | 1111 State St. Yukon AK | |
14 | Dick | Yukon | 1111 State St. Yukon AK | |
15 | John | Doe | 1112 State St. Yukon AK | |
16 | Jane | Doe | 1113 State St. Yukon AK | |
17 | Billy | Doe | 1112 State St. Yukon AK | |
18 | Edith | Doe | 1112 State St. Yukon AK | |
19 | Bob | Yukon | 1111 State St. Yukon AK | |
20 | Sally | Yukon | 1111 State St. Yukon AK | |
21 | Bobby | Yukon | 1111 State St. Yukon AK | |
22 | Jane | Yukon | 1111 State St. Yukon AK | |
23 | Dick | Yukon | 1111 State St. Yukon AK | |
24 | John | Doe | 1112 State St. Yukon AK | |
25 | Jane | Doe | 1113 State St. Yukon AK | |
26 | Billy | Doe | 1112 State St. Yukon AK | |
27 | Edith | Doe | 1112 State St. Yukon AK | |
28 | Bob | Yukon | 1111 State St. Yukon AK | |
29 | Sally | Yukon | 1111 State St. Yukon AK | |
30 | Bobby | Yukon | 1111 State St. Yukon AK | |
31 | Jane | Yukon | 1111 State St. Yukon AK | |
32 | Dick | Yukon | 1111 State St. Yukon AK | |
33 | John | Doe | 1112 State St. Yukon AK | |
34 | Jane | Doe | 1113 State St. Yukon AK | |
35 | Billy | Doe | 1112 State St. Yukon AK | |
36 | Edith | Doe | 1112 State St. Yukon AK |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment