ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-ZabbixSender/ZabbixSender.pm
Revision: 1.1
Committed: Thu Jan 18 10:17:15 2018 UTC (6 years, 4 months ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::ZabbixSender - simple and efficient zabbix data submission
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::ZabbixSender;
8    
9     =head1 DESCRIPTION
10    
11     This module is an L<AnyEvent> user, you need to make sure that you use and
12     run a supported event loop.
13    
14     I't implements the zabbix version 2.0-3.4 protocol for item data
15     submission.
16    
17     =head2 METHODS
18    
19     =over 4
20    
21     =cut
22    
23     package AnyEvent::ZabbixSender;
24    
25     use common::sense;
26    
27     use Errno ();
28     use Scalar::Util ();
29    
30     use AnyEvent ();
31     use AnyEvent::Socket ();
32     use AnyEvent::Handle ();
33    
34     our $VERSION = '0.1';
35    
36     =item $zbx = new AnyEvent::ZabbixSender [key => value...]
37    
38     Creates a (virtual) connection to a zabbix server. Since each submission
39     requires a new TCP connection, creating the connection object does not
40     actually contact the server.
41    
42     The connection object will linger in the destructor until all data has
43     been submitted or thrown away.
44    
45     You can specify various configuration parameters. The term C<@items>
46     refers to an array with C<[key, value, clock]> array-refs.
47    
48     =over 4
49    
50     =item server => "$hostname:$port" (default: C<localhost:10051>)
51    
52     The zabbix server to connect to.
53    
54     =item host => $name (default: local nodename)
55    
56     The submission host, the "technical" name from tghe zabbix configuration.
57    
58     =item delay => $seconds (default: C<0>)
59    
60     If non-zero, then the module will gather data submissions for up to this
61     number of seconds before actually submitting them as a single batch.
62    
63     Submissions can get batched even if C<0>, as events submitted while the
64     connection is being established or retried will be batched together in any
65     case.
66    
67     =item queue_time => $seconds (default: C<3600>)
68    
69     The amount of time a data item will be queued until it is thrown away when
70     the server cannot be reached.
71    
72     =item linger_time => $seconds (default: same as C<queue_time>)
73    
74     The amount of time the module will linger in its destructor until all
75     items have been submitted.
76    
77     =item retry_min => $seconds (default: C<30>)
78    
79     =item retry_max => $seconds (default: C<300>)
80    
81     The minimum and maximum retry times when the server cannot be reached.
82    
83     =item on_error => $cb->(\@items, $msg) (default: log and continue)
84    
85     Called on any protocol errors - these generally indicate that something
86     other than a zabbix server is running on a port. The given key-value pairs
87     are the lost items.
88    
89     =item on_loss => $cb->(\@items) (default: not called)
90    
91     Will be called when some data items are thrown away (this happens if the
92     server isn't reachable for at least C<queue_time> seconds),
93    
94     =item on_response => $cb->(\@items, \%response) (default: not called)
95    
96     Will be called with the (generally rather useless) response form the
97     zabbix server.
98    
99     =back
100    
101     =cut
102    
103     our $NOP = sub { };
104    
105     my $json = eval { require JSON::XS; JSON::XS->new } || do { require JSON::PP; JSON::PP->new };
106    
107     $json->utf8;
108    
109     sub new {
110     my $class = shift;
111     my $self = bless {
112     server => "localhost:10051",
113     delay => 0,
114     retry_min => 30,
115     retry_max => 300,
116     queue_time => 3600,
117     on_loss => $NOP,
118     on_response => $NOP,
119     on_error => sub {
120     AE::log 4 => $_[0]; # error
121     },
122     @_,
123     }, $class;
124    
125     ($self->{zhost}, $self->{zport}) = AnyEvent::Socket::parse_hostport $self->{server}, 10051;
126    
127     $self->{host} //= do {
128     require POSIX;
129     (POSIX::uname())[1]
130     };
131    
132     $self->{linger_time} //= $self->{queue_time};
133    
134     $self
135     }
136    
137     sub DESTROY {
138     my ($self) = @_;
139    
140     $self->_wait;
141    
142     %$self = ();
143     }
144    
145     sub _wait {
146     my ($self) = @_;
147    
148     return unless @{ $self->{queue} };
149    
150     my $cv = AE::cv;
151    
152     my $to = AE::timer $self->{linger_time}, 0, $cv;
153     local $self->{on_clear} = $cv;
154    
155     $cv->recv;
156     }
157    
158     =item $zbx->submit ($k, $v[, $clock[, $host]])
159    
160     Submits a new key-value pair to the zabbix server. If C<$clock> is missing
161     or C<undef>, then C<AE::now> is used for the event timestamp. If C<$host>
162     is missing, then the default set during object creation is used.
163    
164     =item $zbx->submit_multiple ([ [$k, $v, $clock, $host]... ])
165    
166     Like C<submit>, but submits many key-value pairs at once.
167    
168     =cut
169    
170     sub submit_multiple {
171     my ($self, $kvcs) = @_;
172    
173     push @{ $self->{queue} }, [AE::now, $kvcs];
174    
175     $self->_send
176     unless $self->{sending};
177     }
178    
179     sub submit {
180     my ($self, $k, $v, $clock, $host) = @_;
181    
182     push @{ $self->{queue} }, [AE::now, [[$k, $v, $clock, $host]]];
183    
184     $self->_send;
185     }
186    
187     # start sending
188     sub _send {
189     my ($self) = @_;
190    
191     if ($self->{delay}) {
192     Scalar::Util::weaken $self;
193     $self->{delay_w} ||= AE::timer $self->{delay}, 0, sub {
194     delete $self->{delay_w};
195     $self->{send_immediate} = 1;
196     $self->_send2 unless $self->{sending}++;
197     };
198     } else {
199     $self->{send_immediate} = 1;
200     $self->_send2 unless $self->{sending}++;
201     }
202     }
203    
204     # actually do send
205     sub _send2 {
206     my ($self) = @_;
207    
208     Scalar::Util::weaken $self;
209     $self->{connect_w} = AnyEvent::Socket::tcp_connect $self->{zhost}, $self->{zport}, sub {
210     my ($fh) = @_;
211    
212     $fh
213     or return $self->_retry;
214    
215     delete $self->{retry};
216    
217     my $data = delete $self->{queue};
218     my $items = [map @{ $_->[1] }, @$data];
219    
220     my $fail = sub {
221     $self->{on_error}($items, $_[0]);
222     $self->_retry;
223     };
224    
225     $self->{hdl} = new AnyEvent::Handle
226     fh => $fh,
227     on_error => sub {
228     $fail->($_[2]);
229     },
230     on_read => sub {
231     if (13 <= length $_[0]{rbuf}) {
232     my ($zbxd, $version, $length) = unpack "a4 C Q<", $_[0]{rbuf};
233    
234     $zbxd eq "ZBXD"
235     or return $fail->("protocol mismatch");
236     $version == 1
237     or return $fail->("protocol version mismatch");
238    
239     if (13 + $length <= length $_[0]{rbuf}) {
240     my $res = eval { $json->decode (substr $_[0]{rbuf}, 13) }
241     or return $fail->("protocol error");
242     use Data::Dump; ddx $res;#d#
243    
244     $self->{on_response}($items, $res);
245     $self->_send2 if delete $self->{send_immediate};
246    
247     $self->{on_clear}() unless @{ $self->{queue} };
248     }
249     }
250     },
251     ;
252    
253     my $json = $json->encode ({
254     request => "sender data",
255     clock => AE::now,
256     data => [
257     map {
258     my $slot = $_;
259    
260     map {
261     key => $_->[0],
262     value => $_->[1],
263     clock => $_->[2] // $slot->[0],
264     host => $_->[3] // $self->{host},
265     }, @{ $slot->[1] }
266     } @$data
267     ],
268     });
269    
270     warn $json;
271    
272     $self->{hdl}->push_write (pack "a4 C Q</a", "ZBXD", 1, $json);
273     };
274     }
275    
276     sub _retry {
277     my ($self) = @_;
278    
279     Scalar::Util::weaken $self;
280    
281     delete $self->{hdl};
282    
283     my $expire = AE::now - $self->{queue_time};
284     while (@{ $self->{queue} } && $self->{queue}[0][0] < $expire) {
285     $self->{on_loss}([shift @{ $self->{queue} }]);
286     }
287    
288     unless (@{ $self->{queue} }) {
289     $self->{on_clear}();
290     return;
291     }
292    
293     my $retry = $self->{retry_min} * 2 ** $self->{retry}++;
294     $retry = $self->{retry_max} if $retry > $self->{retry_max};
295     warn "retry $retry\n";#d#
296     $self->{retry_w} = AE::timer $retry, 0, sub {
297     delete $self->{retry_w};
298     $self->_send2;
299     };
300     }
301    
302     =back
303    
304     =head1 SEE ALSO
305    
306     L<AnyEvent>.
307    
308     =head1 AUTHOR
309    
310     Marc Lehmann <schmorp@schmorp.de>
311     http://home.schmorp.de/
312    
313     =cut
314    
315     1
316