ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-ZabbixSender/ZabbixSender.pm
Revision: 1.2
Committed: Thu Jan 18 10:48:10 2018 UTC (6 years, 4 months ago) by root
Branch: MAIN
Changes since 1.1: +28 -20 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::ZabbixSender - simple and efficient zabbix data submission
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::ZabbixSender;
8    
9     =head1 DESCRIPTION
10    
11     This module is an L<AnyEvent> user, you need to make sure that you use and
12     run a supported event loop.
13    
14     I't implements the zabbix version 2.0-3.4 protocol for item data
15     submission.
16    
17     =head2 METHODS
18    
19     =over 4
20    
21     =cut
22    
23     package AnyEvent::ZabbixSender;
24    
25     use common::sense;
26    
27     use Errno ();
28     use Scalar::Util ();
29    
30     use AnyEvent ();
31     use AnyEvent::Socket ();
32     use AnyEvent::Handle ();
33    
34     our $VERSION = '0.1';
35    
36     =item $zbx = new AnyEvent::ZabbixSender [key => value...]
37    
38     Creates a (virtual) connection to a zabbix server. Since each submission
39     requires a new TCP connection, creating the connection object does not
40     actually contact the server.
41    
42     The connection object will linger in the destructor until all data has
43     been submitted or thrown away.
44    
45     You can specify various configuration parameters. The term C<@items>
46     refers to an array with C<[key, value, clock]> array-refs.
47    
48     =over 4
49    
50     =item server => "$hostname:$port" (default: C<localhost:10051>)
51    
52     The zabbix server to connect to.
53    
54     =item host => $name (default: local nodename)
55    
56     The submission host, the "technical" name from tghe zabbix configuration.
57    
58     =item delay => $seconds (default: C<0>)
59    
60     If non-zero, then the module will gather data submissions for up to this
61     number of seconds before actually submitting them as a single batch.
62    
63     Submissions can get batched even if C<0>, as events submitted while the
64     connection is being established or retried will be batched together in any
65     case.
66    
67     =item queue_time => $seconds (default: C<3600>)
68    
69     The amount of time a data item will be queued until it is thrown away when
70     the server cannot be reached.
71    
72     =item linger_time => $seconds (default: same as C<queue_time>)
73    
74     The amount of time the module will linger in its destructor until all
75     items have been submitted.
76    
77     =item retry_min => $seconds (default: C<30>)
78    
79     =item retry_max => $seconds (default: C<300>)
80    
81     The minimum and maximum retry times when the server cannot be reached.
82    
83 root 1.2 =item on_error => $cb->($zbx, \@items, $msg) (default: log and continue)
84 root 1.1
85     Called on any protocol errors - these generally indicate that something
86     other than a zabbix server is running on a port. The given key-value pairs
87     are the lost items.
88    
89 root 1.2 =item on_loss => $cb->$zbx, (\@items) (default: log and continue)
90 root 1.1
91     Will be called when some data items are thrown away (this happens if the
92     server isn't reachable for at least C<queue_time> seconds),
93    
94 root 1.2 =item on_response => $cb->$zbx, (\@items, \%response) (default: not called)
95 root 1.1
96     Will be called with the (generally rather useless) response form the
97     zabbix server.
98    
99     =back
100    
101     =cut
102    
103     our $NOP = sub { };
104    
105     my $json = eval { require JSON::XS; JSON::XS->new } || do { require JSON::PP; JSON::PP->new };
106    
107     $json->utf8;
108    
109     sub new {
110     my $class = shift;
111     my $self = bless {
112     server => "localhost:10051",
113     delay => 0,
114     retry_min => 30,
115     retry_max => 300,
116     queue_time => 3600,
117     on_response => $NOP,
118     on_error => sub {
119 root 1.2 AE::log 4 => "$_[0]{zhost}:$_[0]{zport}: $_[1]"; # error
120 root 1.1 },
121 root 1.2 on_loss => sub {
122     my $nitems = @{ $_[1] };
123     AE::log 5 => "$_[0]{zhost}:$_[0]{zport}: $nitems items lost"; # warn
124     },
125    
126 root 1.1 @_,
127 root 1.2
128     on_clear => $NOP,
129 root 1.1 }, $class;
130    
131     ($self->{zhost}, $self->{zport}) = AnyEvent::Socket::parse_hostport $self->{server}, 10051;
132    
133     $self->{host} //= do {
134     require POSIX;
135     (POSIX::uname())[1]
136     };
137    
138     $self->{linger_time} //= $self->{queue_time};
139    
140     $self
141     }
142    
143     sub DESTROY {
144     my ($self) = @_;
145    
146     $self->_wait;
147    
148     %$self = ();
149     }
150    
151     sub _wait {
152     my ($self) = @_;
153    
154 root 1.2 while (@{ $self->{queue} } || $self->{sending}) {
155     my $cv = AE::cv;
156 root 1.1
157 root 1.2 my $to = AE::timer $self->{linger_time}, 0, $cv;
158     local $self->{on_clear} = $cv;
159 root 1.1
160 root 1.2 $cv->recv;
161     }
162 root 1.1 }
163    
164     =item $zbx->submit ($k, $v[, $clock[, $host]])
165    
166     Submits a new key-value pair to the zabbix server. If C<$clock> is missing
167     or C<undef>, then C<AE::now> is used for the event timestamp. If C<$host>
168     is missing, then the default set during object creation is used.
169    
170     =item $zbx->submit_multiple ([ [$k, $v, $clock, $host]... ])
171    
172     Like C<submit>, but submits many key-value pairs at once.
173    
174     =cut
175    
176     sub submit_multiple {
177     my ($self, $kvcs) = @_;
178    
179     push @{ $self->{queue} }, [AE::now, $kvcs];
180    
181     $self->_send
182     unless $self->{sending};
183     }
184    
185     sub submit {
186     my ($self, $k, $v, $clock, $host) = @_;
187    
188     push @{ $self->{queue} }, [AE::now, [[$k, $v, $clock, $host]]];
189    
190     $self->_send;
191     }
192    
193     # start sending
194     sub _send {
195     my ($self) = @_;
196    
197     if ($self->{delay}) {
198     Scalar::Util::weaken $self;
199     $self->{delay_w} ||= AE::timer $self->{delay}, 0, sub {
200     delete $self->{delay_w};
201     $self->{send_immediate} = 1;
202     $self->_send2 unless $self->{sending}++;
203     };
204     } else {
205     $self->{send_immediate} = 1;
206     $self->_send2 unless $self->{sending}++;
207     }
208     }
209    
210     # actually do send
211     sub _send2 {
212     my ($self) = @_;
213    
214     Scalar::Util::weaken $self;
215     $self->{connect_w} = AnyEvent::Socket::tcp_connect $self->{zhost}, $self->{zport}, sub {
216     my ($fh) = @_;
217    
218     $fh
219     or return $self->_retry;
220    
221     delete $self->{retry};
222    
223     my $data = delete $self->{queue};
224     my $items = [map @{ $_->[1] }, @$data];
225    
226     my $fail = sub {
227 root 1.2 $self->{on_error}($self, $items, $_[0]);
228 root 1.1 $self->_retry;
229     };
230    
231     $self->{hdl} = new AnyEvent::Handle
232     fh => $fh,
233     on_error => sub {
234     $fail->($_[2]);
235     },
236     on_read => sub {
237     if (13 <= length $_[0]{rbuf}) {
238     my ($zbxd, $version, $length) = unpack "a4 C Q<", $_[0]{rbuf};
239    
240     $zbxd eq "ZBXD"
241     or return $fail->("protocol mismatch");
242     $version == 1
243     or return $fail->("protocol version mismatch");
244    
245     if (13 + $length <= length $_[0]{rbuf}) {
246 root 1.2 delete $self->{hdl};
247    
248 root 1.1 my $res = eval { $json->decode (substr $_[0]{rbuf}, 13) }
249     or return $fail->("protocol error");
250    
251 root 1.2 $self->{on_response}($self, $items, $res);
252    
253     delete $self->{sending};
254 root 1.1
255 root 1.2 $self->_send2 if delete $self->{send_immediate} && $self->{queue};
256    
257     $self->{on_clear}();
258 root 1.1 }
259     }
260     },
261     ;
262    
263     my $json = $json->encode ({
264     request => "sender data",
265     clock => AE::now,
266     data => [
267     map {
268     my $slot = $_;
269    
270     map {
271     key => $_->[0],
272     value => $_->[1],
273     clock => $_->[2] // $slot->[0],
274     host => $_->[3] // $self->{host},
275     }, @{ $slot->[1] }
276     } @$data
277     ],
278     });
279    
280     $self->{hdl}->push_write (pack "a4 C Q</a", "ZBXD", 1, $json);
281     };
282     }
283    
284     sub _retry {
285     my ($self) = @_;
286    
287     Scalar::Util::weaken $self;
288    
289     delete $self->{hdl};
290    
291     my $expire = AE::now - $self->{queue_time};
292     while (@{ $self->{queue} } && $self->{queue}[0][0] < $expire) {
293 root 1.2 $self->{on_loss}($self, [shift @{ $self->{queue} }]);
294 root 1.1 }
295    
296     unless (@{ $self->{queue} }) {
297 root 1.2 delete $self->{sending};
298 root 1.1 $self->{on_clear}();
299     return;
300     }
301    
302     my $retry = $self->{retry_min} * 2 ** $self->{retry}++;
303     $retry = $self->{retry_max} if $retry > $self->{retry_max};
304     $self->{retry_w} = AE::timer $retry, 0, sub {
305     delete $self->{retry_w};
306     $self->_send2;
307     };
308     }
309    
310     =back
311    
312     =head1 SEE ALSO
313    
314     L<AnyEvent>.
315    
316     =head1 AUTHOR
317    
318     Marc Lehmann <schmorp@schmorp.de>
319     http://home.schmorp.de/
320    
321     =cut
322    
323     1
324