Last active
October 29, 2015 02:50
-
-
Save tapmodo/19d0567b3e5bfe95c350 to your computer and use it in GitHub Desktop.
This is a first stab at a batch uploader for InfluxDB. It seems to work. If you wanted to use this for any other queue batching operation, you could overload the write() and _send() methods...
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var _ = require('lodash'); | |
var debug = require('debug')('influx:batch'); | |
var SeriesWriter = function(db,options){ | |
this.db = db; | |
this.setOptions(options); | |
this.sending = false; | |
this.timer = null; | |
this.queue = []; | |
}; | |
SeriesWriter.defaults = { | |
series: '_internal', | |
interval: 2000, | |
chunkSize: 500, | |
chunkMin: 100, | |
autoTime: true, | |
write_opts: { } | |
}; | |
SeriesWriter.prototype = { | |
write: function(values,tags){ | |
// Set time if not set and autoTime setting enabled | |
if (!values.time && this.options.autoTime) | |
values.time = (new Date()).toISOString(); | |
// Push value onto queue and trigger | |
this.queue.push([ values, tags ]); | |
this._trigger(); | |
return this; | |
}, | |
setOptions: function(options){ | |
this.options = _.assign({},SeriesWriter.defaults,options || {}); | |
return this; | |
}, | |
canSend: function(){ | |
return (this.queue.length >= this.options.chunkSize) || | |
(this.queue.length >= this.options.chunkMin); | |
}, | |
flush: function(){ | |
return this._trigger(true); | |
}, | |
_delay: function(flush){ | |
if (this.timer) clearTimeout(this.timer); | |
this.timer = setTimeout(this._trigger.bind(this,flush),this.options.interval); | |
}, | |
_trigger: function(flush){ | |
if (!this.queue.length) return; | |
if (!flush) { | |
if (this.sending) return this._delay(); | |
if (!this.canSend()) return this._delay(true); | |
} | |
if (!flush && (this.sending || !this.canSend())) return this._delay(); | |
var points = _.chunk(this.queue,this.options.chunkSize).shift(); | |
this.queue = _.drop(this.queue,points.length); | |
this.sending = true; | |
this._send(points); | |
}, | |
_send: function(points){ | |
debug('sending %s points',points.length); | |
var self = this; | |
var series = this.options.series; | |
var opts = this.options.write_opts; | |
this.db.writePoints(series,points,opts,function(err,data){ | |
self.sending = false; | |
if (err) throw err; | |
self._trigger(); | |
}); | |
} | |
}; | |
module.exports = SeriesWriter; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment