-
-
Save jewelsjacobs/63c5477413b83e1b447aefeb7b1e5a77 to your computer and use it in GitHub Desktop.
Nodejs DynamoDB writer with exponential back off
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
var fs = require('fs'); | |
var path = require('path'); | |
var AWS = require('aws-sdk'); | |
var util = require('util'); | |
var stream = require('readable-stream'); | |
var _ = require('lodash'); | |
function DynamoDbWriter() { | |
if (!(this instanceof DynamoDbWriter)) { | |
return new DynamoDbWriter(); | |
} | |
stream.Writable.call(this, {objectMode: true}); | |
this.maxNumberInBatch = 25; | |
this.queue = []; | |
const exists = (filePath) => { | |
try { | |
fs.statSync(filePath); | |
} catch (err) { | |
if (err.code == 'ENOENT') { | |
return false; | |
} | |
} | |
return true; | |
}; | |
const awsJson = path.resolve('config/aws.json'); | |
if (exists(awsJson)) { | |
AWS.config.loadFromPath(awsJson); | |
} | |
AWS.config.update({region: 'eu-west-1'}); | |
this.docClient = new AWS.DynamoDB.DocumentClient(); | |
} | |
util.inherits(DynamoDbWriter, stream.Writable); | |
DynamoDbWriter.prototype.toPutRequest = function (item) { | |
return { | |
PutRequest: { | |
Item: item | |
} | |
} | |
}; | |
DynamoDbWriter.prototype.batchWriteToDynamo = function (failureBackoffTime, params, callback) { | |
const self = this; | |
self.docClient.batchWrite(params, (err, response) => { | |
if (err) { | |
throw err; | |
} | |
if (response && !_.isEmpty(response.UnprocessedItems)) { | |
const repostParams = { | |
RequestItems: response.UnprocessedItems | |
}; | |
const f = () => { | |
return self.batchWriteToDynamo(failureBackoffTime * 2, repostParams, callback); | |
}; | |
console.log("Dynamo return unprocessed items, sleeping %s ms before retry", failureBackoffTime); | |
setTimeout(f, failureBackoffTime); | |
} else { | |
console.log("Write to dynamo worked cleanly"); | |
self.queue = []; | |
callback(err, response); | |
} | |
}); | |
}; | |
DynamoDbWriter.prototype.flushToDynamo = function (callback) { | |
var self = this; | |
const putRequests = _.map(self.queue, self.toPutRequest); | |
var params = { | |
RequestItems: { | |
"Trueput": putRequests | |
} | |
}; | |
return self.batchWriteToDynamo(100, params, callback); | |
}; | |
DynamoDbWriter.prototype.enqueue = function (item, sizeInBytes, callback) { | |
const self = this; | |
self.queue.push(item); | |
if (self.queue.length === self.maxNumberInBatch) { | |
return self.flushToDynamo(callback); | |
} else { | |
return callback(); | |
} | |
}; | |
DynamoDbWriter.prototype._write = function (data, encoding, callback) { | |
const self = this; | |
var sizeInBytes = JSON.stringify(data).length; | |
return self.enqueue(data, sizeInBytes, callback); | |
}; | |
module.exports.DynamoDbWriter = DynamoDbWriter; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment