Add StatsD support (with bulking) (#1994)

* Add StatsD support with bulking
This commit is contained in:
Omri 2018-04-24 14:33:34 +03:00 committed by Peter Hedenskog
parent 72e69ae089
commit 8193a31245
9 changed files with 311 additions and 22 deletions

View File

@ -408,6 +408,18 @@ module.exports.parseCommandLine = function parseCommandLine() {
'Send the tags as Array or a String. In Graphite 1.0 the tags is a array. Before a String',
group: 'Graphite'
})
.option('graphite.statsd', {
default: graphiteConfig.statsd,
type: 'boolean',
describe: 'Uses the StatsD interface',
group: 'Graphite'
})
.option('graphite.bulkSize', {
default: graphiteConfig.bulkSize,
type: 'number',
describe: 'Break up number of metrics to send with each request.',
group: 'Graphite'
})
/** Plugins */
.option('plugins.list', {
describe: 'List all configured plugins in the log.',

View File

@ -3,7 +3,9 @@
const flatten = require('../../support/flattenMessage'),
util = require('util'),
graphiteUtil = require('../../support/tsdbUtil'),
reduce = require('lodash.reduce');
reduce = require('lodash.reduce'),
formatEntry = require('./helpers/format-entry'),
isStatsd = require('./helpers/is-statsd');
function keyPathFromMessage(message, options, includeQueryParams) {
let typeParts = message.type.split('.');
@ -53,6 +55,7 @@ class GraphiteDataGenerator {
this.namespace = namespace;
this.includeQueryParams = !!includeQueryParams;
this.options = options;
this.entryFormat = isStatsd(options.graphite) ? 'statsd' : 'graphite';
}
dataFromMessage(message, time) {
@ -68,7 +71,9 @@ class GraphiteDataGenerator {
flatten.flattenMessageData(message),
(entries, value, key) => {
const fullKey = util.format('%s.%s.%s', this.namespace, keypath, key);
entries.push(util.format('%s %s %s', fullKey, value, timestamp));
entries.push(
util.format(formatEntry(this.entryFormat), fullKey, value, timestamp)
);
return entries;
},
[]

View File

@ -0,0 +1,26 @@
'use strict';
const net = require('net'),
Promise = require('bluebird'),
Sender = require('./sender');
class GraphiteSender extends Sender {
get facility() {
return 'Graphite';
}
bulk(data) {
this.log(data);
return new Promise((resolve, reject) => {
const socket = net.connect(this.port, this.host, () => {
socket.write(data);
socket.end();
resolve();
});
socket.on('error', reject);
});
}
}
module.exports = GraphiteSender;

View File

@ -0,0 +1,14 @@
/**
* picks the correct format for a graphite entry
* @param {string} [type='graphite'] ['statsd', 'graphite']
* @return {string} The string template
*/
module.exports = type => {
switch (type) {
case 'statsd':
return '%s:%s|t';
case 'graphite':
default:
return '%s %s %s';
}
};

View File

@ -0,0 +1,6 @@
/**
* Graphite options qualify for StatsD use
* @param {Object} opts graphite options
* @return {boolean}
*/
module.exports = (opts = {}) => opts.statsd === true;

View File

@ -1,17 +1,21 @@
'use strict';
const isEmpty = require('lodash.isempty');
const Sender = require('./sender');
const GraphiteSender = require('./graphite-sender');
const StatsDSender = require('./statsd-sender');
const merge = require('lodash.merge');
const log = require('intel').getLogger('sitespeedio.plugin.graphite');
const sendAnnotations = require('./send-annotation');
const DataGenerator = require('./data-generator');
const isStatsd = require('./helpers/is-statsd');
const throwIfMissing = require('../../support/util').throwIfMissing;
const defaultConfig = {
port: 2003,
namespace: 'sitespeed_io.default',
includeQueryParams: false
includeQueryParams: false,
statsd: false,
bulkSize: null
};
module.exports = {
@ -19,8 +23,10 @@ module.exports = {
throwIfMissing(options.graphite, ['host'], 'graphite');
const opts = merge({}, defaultConfig, options.graphite);
this.options = options;
const SenderConstructor = isStatsd(opts) ? StatsDSender : GraphiteSender;
this.filterRegistry = context.filterRegistry;
this.sender = new Sender(opts.host, opts.port);
this.sender = new SenderConstructor(opts.host, opts.port, opts.bulkSize);
this.dataGenerator = new DataGenerator(
opts.namespace,
opts.includeQueryParams,

View File

@ -1,27 +1,43 @@
'use strict';
const net = require('net'),
log = require('intel').getLogger('sitespeedio.plugin.graphite'),
Promise = require('bluebird');
const Promise = require('bluebird'),
log = require('intel').getLogger('sitespeedio.plugin.graphite');
class GraphiteSender {
constructor(host, port) {
class Sender {
constructor(host, port, bulkSize) {
this.host = host;
this.port = port;
this.bulkSize = bulkSize;
}
get facility() {
return 'None';
}
log(data) {
log.debug('Send data to %s %s:%s', this.facility, this.host, this.port);
log.verbose(`Sending ${data}`);
}
send(data) {
log.debug('Send data to Graphite %s:%s', this.host, this.port);
log.verbose('Sending ' + data);
return new Promise((resolve, reject) => {
let socket = net.connect(this.port, this.host, () => {
socket.write(data);
socket.end();
resolve();
});
socket.on('error', reject);
});
return this[this.bulkSize ? 'bulks' : 'bulk'](data);
}
bulk() {
throw new ReferenceError('bulk function not implemented');
}
bulks(data) {
const lines = data.split('\n');
const promises = [];
const bulkSize = this.bulkSize || lines.length;
while (lines.length) {
promises.push(this.bulk(lines.splice(-bulkSize).join('\n')));
}
return Promise.all(promises);
}
}
module.exports = GraphiteSender;
module.exports = Sender;

View File

@ -0,0 +1,30 @@
'use strict';
const dgram = require('dgram'),
Promise = require('bluebird'),
Sender = require('./sender');
class StatsDSender extends Sender {
get facility() {
return 'StatsD';
}
bulk(data) {
this.log(data);
return new Promise((resolve, reject) => {
const client = dgram.createSocket('udp4');
client.send(
data,
0,
data.length,
this.port,
this.host,
error => (client.close() && error ? reject(error) : resolve())
);
});
}
}
module.exports = StatsDSender;

View File

@ -64,9 +64,183 @@ describe('graphite', function() {
connectivity: 'cable'
});
var data = generator.dataFromMessage(message, moment());
expect(data).to.match(
/ns.summary.sub_domain_com.chrome.cable.domains.www.sitespeed.io.dns.median/
/ns.summary.sub_domain_com.chrome.cable.domains.www.sitespeed.io.dns.median [\d]{1,} [\d]*/
);
});
it('should generate data in statsD format', function() {
const message = {
type: 'domains.summary',
timestamp: '2016-01-08T12:59:06+01:00',
source: 'domains',
data: {
'www.sitespeed.io': {
dns: {
median: '0',
mean: '13',
min: '0',
p10: '0',
p90: '40',
p99: '40',
max: '40'
}
}
},
group: 'sub_domain_com'
};
let generator = new DataGenerator('ns', false, {
_: ['sub_domain_com'],
browser: 'chrome',
connectivity: 'cable',
graphite: { statsd: true }
});
var data = generator.dataFromMessage(message, moment());
expect(data).to.match(
/ns.summary.sub_domain_com.chrome.cable.domains.www.sitespeed.io.dns.median:[\d]{1,}\|t/
);
});
});
describe('index', function() {
const messageMaker = require('../lib/support/messageMaker');
const filterRegistry = require('../lib/support/filterRegistry');
const intel = require('intel');
const statsHelpers = require('../lib/support/statsHelpers');
const context = { messageMaker, filterRegistry, intel, statsHelpers };
let plugin;
let options;
beforeEach(function() {
plugin = require('../lib/plugins/graphite');
options = {
graphite: {
host: '127.0.0.1'
}
};
});
it('Should use graphite interface by default', function() {
plugin.open(context, options);
expect(plugin.sender.facility).to.match(/graphite/i);
});
it('Should use statsd interface', function() {
Object.assign(options.graphite, {
statsd: true
});
plugin.open(context, options);
expect(plugin.sender.facility).to.match(/statsd/i);
});
it('Should use graphite interface by default', function() {
plugin.open(context, options);
expect(plugin.sender.facility).to.match(/graphite/i);
});
});
describe('helpers/is-statsd', function() {
const isStatsD = require('../lib/plugins/graphite/helpers/is-statsd');
it('Should be set to statsd', function() {
expect(isStatsD({ statsd: true })).to.be.true;
});
it('Should not be set to statsd', function() {
['true', 1, null, false, undefined].forEach(
value => expect(isStatsD({ statsd: value })).to.be.false
);
});
});
describe('helpers/format-entry', function() {
const formatEntry = require('../lib/plugins/graphite/helpers/format-entry');
it('Should retrieve the format of statsd', function() {
expect(formatEntry('statsd')).to.equal('%s:%s|t');
});
it('Should retrieve the default format of graphite', function() {
['StatsD', 'stats', 'graphite', null, false, undefined].forEach(value =>
expect(formatEntry(value)).to.equal('%s %s %s')
);
});
});
describe('GraphiteSender', function() {
const GraphiteSender = require('../lib/plugins/graphite/graphite-sender');
const net = require('net');
const { connect } = net;
afterEach(function() {
net.connect = connect;
});
function mock(fn) {
net.connect = (host, port, callback) => {
setTimeout(callback, 0);
return { write: fn, end: () => null, on: () => null };
};
}
it('Should send data to graphite via net', function(done) {
mock(() => done());
const sender = new GraphiteSender('127.0.0.1', '2003');
sender.send('some.data');
});
it('Should send data to graphite in bulks', function(done) {
let sent = 0;
mock(() => {
++sent === 2 && done();
});
const sender = new GraphiteSender('127.0.0.1', '2003', 2);
sender.send('some.data.1\nmore.data.2\nmore.data.3\nmore.data.4');
});
});
describe('StatsDSender', function() {
const StatsDSender = require('../lib/plugins/graphite/statsd-sender');
const dgram = require('dgram');
const { createSocket } = dgram;
afterEach(function() {
dgram.createSocket = createSocket;
});
function mock(fn) {
dgram.createSocket = () => ({ send: fn });
}
it('Should send data to statsd via dgram', function() {
let sent = false;
mock(() => {
sent = true;
});
const sender = new StatsDSender('127.0.0.1', '8125');
sender.send('some.data');
expect(sent).to.be.true;
});
it('Should send data to statsd in bulks', function() {
let sent = 0;
mock(() => {
sent++;
});
const sender = new StatsDSender('127.0.0.1', '8125', 2);
sender.send('some.data.1\nmore.data.2\nmore.data.3\nmore.data.4');
expect(sent).to.equal(2);
});
});
});