Skip to content

Instantly share code, notes, and snippets.

@n7st
Last active November 23, 2016 15:08
Show Gist options
  • Save n7st/f5556c9731880f9a6bf5990028d68996 to your computer and use it in GitHub Desktop.
Save n7st/f5556c9731880f9a6bf5990028d68996 to your computer and use it in GitHub Desktop.
InfluxDB AnyEvent Server and Example Client
#!/usr/bin/env perl
use strict;
use warnings;
use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Impl::EV;
use AnyEvent::Socket qw/tcp_connect/;
use InfluxDB::LineProtocol qw/data2line/;
use Socket qw/SOL_SOCKET SO_REUSEPORT/;
################################################################################
sub main {
my $cv = AnyEvent->condvar;
my $timer = AnyEvent->timer(after => 0.001, interval => 120.000, cb => sub {
my $line = build_line();
printf STDOUT ("Sending: %s\n", $line);
transaction($line);
});
$cv->recv;
return 0;
}
sub transaction {
my $line = shift;
my $host = '127.0.0.1';
my $port = 8888;
tcp_connect($host, $port, sub {
my $sock = shift || die "Failed to connect: $!";
my $handle;
$handle = AnyEvent::Handle->new(
fh => $sock,
on_eof => sub {
$handle->destroy();
},
on_read => sub {
my $handle = shift;
print $handle->rbuf();
$handle->destroy();
},
);
$handle->push_write("$line\r\n\r\n");
$handle->push_shutdown();
}, sub {
my $sock = shift;
setsockopt($sock, SOL_SOCKET, SO_REUSEPORT, 1) or die $!;
return undef;
});
}
sub build_line {
my $sensors = `sensors | grep 'Physical id 0:'`;
my ($temp) = $sensors =~ /\+(\d+)/;
$temp = sprintf("%.2f", $temp);
return data2line('temperature', {
machine => 'galerius',
type => 'desktop',
internal => $temp,
external => $temp, # temporary
});
}
################################################################################
exit main();
#!/usr/bin/env perl
use strict;
use warnings;
use AnyEvent;
use AnyEvent::InfluxDB;
use AnyEvent::Socket;
use AnyEvent::Handle;
use InfluxDB::LineProtocol qw/data2line/;
use EV;
my $cv = AE::cv;
my $db = AnyEvent::InfluxDB->new(
server => 'http://192.168.0.40:8086',
username => 'grafana',
password => 'grafana',
on_request => sub {
my $method = shift;
my $url = shift;
my $post_data = shift;
printf STDOUT ("Method: %s | URL: %s\n", $method, $url);
printf STDOUT ("Data: %s", $post_data) if $post_data;
},
);
tcp_server undef, 8888, sub {
my $fh = shift;
my $host = shift;
my $port = shift;
my $handle = AnyEvent::Handle->new(
fh => $fh,
on_error => sub {
my $handle = shift;
my $fatal = shift;
my $msg = shift;
AE::log error => $msg;
$handle->destroy;
$cv->send;
},
);
$handle->push_read(line => sub {
my $test = shift;
my $line = shift;
$db->write(
database => 'snmp',
data => $line,
on_success => sub { print STDOUT "Wrote $line\n"; },
on_error => sub { print STDERR "Error writing $line: @_\n"; },
);
$handle->on_drain(sub {
$handle->fh->close;
undef $handle;
});
});
};
$cv->recv;
__END__
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment