From 8193a31245b5bf963aecd692c486ac4fa79e3c35 Mon Sep 17 00:00:00 2001 From: Omri <516342+omrilotan@users.noreply.github.com> Date: Tue, 24 Apr 2018 14:33:34 +0300 Subject: [PATCH] Add StatsD support (with bulking) (#1994) * Add StatsD support with bulking --- lib/cli/cli.js | 12 ++ lib/plugins/graphite/data-generator.js | 9 +- lib/plugins/graphite/graphite-sender.js | 26 +++ lib/plugins/graphite/helpers/format-entry.js | 14 ++ lib/plugins/graphite/helpers/is-statsd.js | 6 + lib/plugins/graphite/index.js | 12 +- lib/plugins/graphite/sender.js | 48 +++-- lib/plugins/graphite/statsd-sender.js | 30 ++++ test/graphiteTests.js | 176 ++++++++++++++++++- 9 files changed, 311 insertions(+), 22 deletions(-) create mode 100644 lib/plugins/graphite/graphite-sender.js create mode 100644 lib/plugins/graphite/helpers/format-entry.js create mode 100644 lib/plugins/graphite/helpers/is-statsd.js create mode 100644 lib/plugins/graphite/statsd-sender.js diff --git a/lib/cli/cli.js b/lib/cli/cli.js index eca24ef21..8c634df8a 100644 --- a/lib/cli/cli.js +++ b/lib/cli/cli.js @@ -408,6 +408,18 @@ module.exports.parseCommandLine = function parseCommandLine() { 'Send the tags as Array or a String. In Graphite 1.0 the tags is a array. Before a String', group: 'Graphite' }) + .option('graphite.statsd', { + default: graphiteConfig.statsd, + type: 'boolean', + describe: 'Uses the StatsD interface', + group: 'Graphite' + }) + .option('graphite.bulkSize', { + default: graphiteConfig.bulkSize, + type: 'number', + describe: 'Break up number of metrics to send with each request.', + group: 'Graphite' + }) /** Plugins */ .option('plugins.list', { describe: 'List all configured plugins in the log.', diff --git a/lib/plugins/graphite/data-generator.js b/lib/plugins/graphite/data-generator.js index 35f227eb5..959ca079e 100644 --- a/lib/plugins/graphite/data-generator.js +++ b/lib/plugins/graphite/data-generator.js @@ -3,7 +3,9 @@ const flatten = require('../../support/flattenMessage'), util = require('util'), graphiteUtil = require('../../support/tsdbUtil'), - reduce = require('lodash.reduce'); + reduce = require('lodash.reduce'), + formatEntry = require('./helpers/format-entry'), + isStatsd = require('./helpers/is-statsd'); function keyPathFromMessage(message, options, includeQueryParams) { let typeParts = message.type.split('.'); @@ -53,6 +55,7 @@ class GraphiteDataGenerator { this.namespace = namespace; this.includeQueryParams = !!includeQueryParams; this.options = options; + this.entryFormat = isStatsd(options.graphite) ? 'statsd' : 'graphite'; } dataFromMessage(message, time) { @@ -68,7 +71,9 @@ class GraphiteDataGenerator { flatten.flattenMessageData(message), (entries, value, key) => { const fullKey = util.format('%s.%s.%s', this.namespace, keypath, key); - entries.push(util.format('%s %s %s', fullKey, value, timestamp)); + entries.push( + util.format(formatEntry(this.entryFormat), fullKey, value, timestamp) + ); return entries; }, [] diff --git a/lib/plugins/graphite/graphite-sender.js b/lib/plugins/graphite/graphite-sender.js new file mode 100644 index 000000000..871d2d145 --- /dev/null +++ b/lib/plugins/graphite/graphite-sender.js @@ -0,0 +1,26 @@ +'use strict'; + +const net = require('net'), + Promise = require('bluebird'), + Sender = require('./sender'); + +class GraphiteSender extends Sender { + get facility() { + return 'Graphite'; + } + + bulk(data) { + this.log(data); + + return new Promise((resolve, reject) => { + const socket = net.connect(this.port, this.host, () => { + socket.write(data); + socket.end(); + resolve(); + }); + socket.on('error', reject); + }); + } +} + +module.exports = GraphiteSender; diff --git a/lib/plugins/graphite/helpers/format-entry.js b/lib/plugins/graphite/helpers/format-entry.js new file mode 100644 index 000000000..94a187796 --- /dev/null +++ b/lib/plugins/graphite/helpers/format-entry.js @@ -0,0 +1,14 @@ +/** + * picks the correct format for a graphite entry + * @param {string} [type='graphite'] ['statsd', 'graphite'] + * @return {string} The string template + */ +module.exports = type => { + switch (type) { + case 'statsd': + return '%s:%s|t'; + case 'graphite': + default: + return '%s %s %s'; + } +}; diff --git a/lib/plugins/graphite/helpers/is-statsd.js b/lib/plugins/graphite/helpers/is-statsd.js new file mode 100644 index 000000000..c88c199be --- /dev/null +++ b/lib/plugins/graphite/helpers/is-statsd.js @@ -0,0 +1,6 @@ +/** + * Graphite options qualify for StatsD use + * @param {Object} opts graphite options + * @return {boolean} + */ +module.exports = (opts = {}) => opts.statsd === true; diff --git a/lib/plugins/graphite/index.js b/lib/plugins/graphite/index.js index c535dbcc3..40fe7f382 100644 --- a/lib/plugins/graphite/index.js +++ b/lib/plugins/graphite/index.js @@ -1,17 +1,21 @@ 'use strict'; const isEmpty = require('lodash.isempty'); -const Sender = require('./sender'); +const GraphiteSender = require('./graphite-sender'); +const StatsDSender = require('./statsd-sender'); const merge = require('lodash.merge'); const log = require('intel').getLogger('sitespeedio.plugin.graphite'); const sendAnnotations = require('./send-annotation'); const DataGenerator = require('./data-generator'); +const isStatsd = require('./helpers/is-statsd'); const throwIfMissing = require('../../support/util').throwIfMissing; const defaultConfig = { port: 2003, namespace: 'sitespeed_io.default', - includeQueryParams: false + includeQueryParams: false, + statsd: false, + bulkSize: null }; module.exports = { @@ -19,8 +23,10 @@ module.exports = { throwIfMissing(options.graphite, ['host'], 'graphite'); const opts = merge({}, defaultConfig, options.graphite); this.options = options; + const SenderConstructor = isStatsd(opts) ? StatsDSender : GraphiteSender; + this.filterRegistry = context.filterRegistry; - this.sender = new Sender(opts.host, opts.port); + this.sender = new SenderConstructor(opts.host, opts.port, opts.bulkSize); this.dataGenerator = new DataGenerator( opts.namespace, opts.includeQueryParams, diff --git a/lib/plugins/graphite/sender.js b/lib/plugins/graphite/sender.js index 0780b7673..5252628c5 100644 --- a/lib/plugins/graphite/sender.js +++ b/lib/plugins/graphite/sender.js @@ -1,27 +1,43 @@ 'use strict'; -const net = require('net'), - log = require('intel').getLogger('sitespeedio.plugin.graphite'), - Promise = require('bluebird'); +const Promise = require('bluebird'), + log = require('intel').getLogger('sitespeedio.plugin.graphite'); -class GraphiteSender { - constructor(host, port) { +class Sender { + constructor(host, port, bulkSize) { this.host = host; this.port = port; + this.bulkSize = bulkSize; + } + + get facility() { + return 'None'; + } + + log(data) { + log.debug('Send data to %s %s:%s', this.facility, this.host, this.port); + log.verbose(`Sending ${data}`); } send(data) { - log.debug('Send data to Graphite %s:%s', this.host, this.port); - log.verbose('Sending ' + data); - return new Promise((resolve, reject) => { - let socket = net.connect(this.port, this.host, () => { - socket.write(data); - socket.end(); - resolve(); - }); - socket.on('error', reject); - }); + return this[this.bulkSize ? 'bulks' : 'bulk'](data); + } + + bulk() { + throw new ReferenceError('bulk function not implemented'); + } + + bulks(data) { + const lines = data.split('\n'); + const promises = []; + const bulkSize = this.bulkSize || lines.length; + + while (lines.length) { + promises.push(this.bulk(lines.splice(-bulkSize).join('\n'))); + } + + return Promise.all(promises); } } -module.exports = GraphiteSender; +module.exports = Sender; diff --git a/lib/plugins/graphite/statsd-sender.js b/lib/plugins/graphite/statsd-sender.js new file mode 100644 index 000000000..aa5745f08 --- /dev/null +++ b/lib/plugins/graphite/statsd-sender.js @@ -0,0 +1,30 @@ +'use strict'; + +const dgram = require('dgram'), + Promise = require('bluebird'), + Sender = require('./sender'); + +class StatsDSender extends Sender { + get facility() { + return 'StatsD'; + } + + bulk(data) { + this.log(data); + + return new Promise((resolve, reject) => { + const client = dgram.createSocket('udp4'); + + client.send( + data, + 0, + data.length, + this.port, + this.host, + error => (client.close() && error ? reject(error) : resolve()) + ); + }); + } +} + +module.exports = StatsDSender; diff --git a/test/graphiteTests.js b/test/graphiteTests.js index 846dba07d..d938b8d69 100644 --- a/test/graphiteTests.js +++ b/test/graphiteTests.js @@ -64,9 +64,183 @@ describe('graphite', function() { connectivity: 'cable' }); var data = generator.dataFromMessage(message, moment()); + expect(data).to.match( - /ns.summary.sub_domain_com.chrome.cable.domains.www.sitespeed.io.dns.median/ + /ns.summary.sub_domain_com.chrome.cable.domains.www.sitespeed.io.dns.median [\d]{1,} [\d]*/ + ); + }); + + it('should generate data in statsD format', function() { + const message = { + type: 'domains.summary', + timestamp: '2016-01-08T12:59:06+01:00', + source: 'domains', + data: { + 'www.sitespeed.io': { + dns: { + median: '0', + mean: '13', + min: '0', + p10: '0', + p90: '40', + p99: '40', + max: '40' + } + } + }, + group: 'sub_domain_com' + }; + + let generator = new DataGenerator('ns', false, { + _: ['sub_domain_com'], + browser: 'chrome', + connectivity: 'cable', + graphite: { statsd: true } + }); + var data = generator.dataFromMessage(message, moment()); + + expect(data).to.match( + /ns.summary.sub_domain_com.chrome.cable.domains.www.sitespeed.io.dns.median:[\d]{1,}\|t/ ); }); }); + + describe('index', function() { + const messageMaker = require('../lib/support/messageMaker'); + const filterRegistry = require('../lib/support/filterRegistry'); + const intel = require('intel'); + const statsHelpers = require('../lib/support/statsHelpers'); + const context = { messageMaker, filterRegistry, intel, statsHelpers }; + let plugin; + let options; + + beforeEach(function() { + plugin = require('../lib/plugins/graphite'); + options = { + graphite: { + host: '127.0.0.1' + } + }; + }); + + it('Should use graphite interface by default', function() { + plugin.open(context, options); + + expect(plugin.sender.facility).to.match(/graphite/i); + }); + + it('Should use statsd interface', function() { + Object.assign(options.graphite, { + statsd: true + }); + + plugin.open(context, options); + expect(plugin.sender.facility).to.match(/statsd/i); + }); + + it('Should use graphite interface by default', function() { + plugin.open(context, options); + + expect(plugin.sender.facility).to.match(/graphite/i); + }); + }); + + describe('helpers/is-statsd', function() { + const isStatsD = require('../lib/plugins/graphite/helpers/is-statsd'); + + it('Should be set to statsd', function() { + expect(isStatsD({ statsd: true })).to.be.true; + }); + it('Should not be set to statsd', function() { + ['true', 1, null, false, undefined].forEach( + value => expect(isStatsD({ statsd: value })).to.be.false + ); + }); + }); + + describe('helpers/format-entry', function() { + const formatEntry = require('../lib/plugins/graphite/helpers/format-entry'); + + it('Should retrieve the format of statsd', function() { + expect(formatEntry('statsd')).to.equal('%s:%s|t'); + }); + + it('Should retrieve the default format of graphite', function() { + ['StatsD', 'stats', 'graphite', null, false, undefined].forEach(value => + expect(formatEntry(value)).to.equal('%s %s %s') + ); + }); + }); + + describe('GraphiteSender', function() { + const GraphiteSender = require('../lib/plugins/graphite/graphite-sender'); + const net = require('net'); + const { connect } = net; + + afterEach(function() { + net.connect = connect; + }); + + function mock(fn) { + net.connect = (host, port, callback) => { + setTimeout(callback, 0); + return { write: fn, end: () => null, on: () => null }; + }; + } + + it('Should send data to graphite via net', function(done) { + mock(() => done()); + + const sender = new GraphiteSender('127.0.0.1', '2003'); + sender.send('some.data'); + }); + + it('Should send data to graphite in bulks', function(done) { + let sent = 0; + mock(() => { + ++sent === 2 && done(); + }); + + const sender = new GraphiteSender('127.0.0.1', '2003', 2); + sender.send('some.data.1\nmore.data.2\nmore.data.3\nmore.data.4'); + }); + }); + + describe('StatsDSender', function() { + const StatsDSender = require('../lib/plugins/graphite/statsd-sender'); + const dgram = require('dgram'); + const { createSocket } = dgram; + + afterEach(function() { + dgram.createSocket = createSocket; + }); + + function mock(fn) { + dgram.createSocket = () => ({ send: fn }); + } + + it('Should send data to statsd via dgram', function() { + let sent = false; + mock(() => { + sent = true; + }); + + const sender = new StatsDSender('127.0.0.1', '8125'); + sender.send('some.data'); + + expect(sent).to.be.true; + }); + + it('Should send data to statsd in bulks', function() { + let sent = 0; + mock(() => { + sent++; + }); + + const sender = new StatsDSender('127.0.0.1', '8125', 2); + sender.send('some.data.1\nmore.data.2\nmore.data.3\nmore.data.4'); + + expect(sent).to.equal(2); + }); + }); });