ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-ZabbixSender/ZabbixSender.pm
Revision: 1.5
Committed: Sat Sep 14 19:42:52 2019 UTC (5 years, 2 months ago) by root
Branch: MAIN
CVS Tags: rel-1_1, HEAD
Changes since 1.4: +2 -2 lines
Log Message:
1.1

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::ZabbixSender - simple and efficient zabbix data submission
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::ZabbixSender;
8    
9     =head1 DESCRIPTION
10    
11     This module is an L<AnyEvent> user, you need to make sure that you use and
12     run a supported event loop.
13    
14     I't implements the zabbix version 2.0-3.4 protocol for item data
15     submission.
16    
17     =head2 METHODS
18    
19     =over 4
20    
21     =cut
22    
23     package AnyEvent::ZabbixSender;
24    
25     use common::sense;
26    
27     use Errno ();
28     use Scalar::Util ();
29    
30     use AnyEvent ();
31     use AnyEvent::Socket ();
32     use AnyEvent::Handle ();
33    
34 root 1.5 our $VERSION = '1.1';
35 root 1.1
36     =item $zbx = new AnyEvent::ZabbixSender [key => value...]
37    
38     Creates a (virtual) connection to a zabbix server. Since each submission
39     requires a new TCP connection, creating the connection object does not
40     actually contact the server.
41    
42     The connection object will linger in the destructor until all data has
43     been submitted or thrown away.
44    
45     You can specify various configuration parameters. The term C<@items>
46     refers to an array with C<[key, value, clock]> array-refs.
47    
48     =over 4
49    
50     =item server => "$hostname:$port" (default: C<localhost:10051>)
51    
52     The zabbix server to connect to.
53    
54     =item host => $name (default: local nodename)
55    
56     The submission host, the "technical" name from tghe zabbix configuration.
57    
58     =item delay => $seconds (default: C<0>)
59    
60     If non-zero, then the module will gather data submissions for up to this
61     number of seconds before actually submitting them as a single batch.
62    
63     Submissions can get batched even if C<0>, as events submitted while the
64     connection is being established or retried will be batched together in any
65     case.
66    
67     =item queue_time => $seconds (default: C<3600>)
68    
69     The amount of time a data item will be queued until it is thrown away when
70     the server cannot be reached.
71    
72     =item linger_time => $seconds (default: same as C<queue_time>)
73    
74     The amount of time the module will linger in its destructor until all
75     items have been submitted.
76    
77     =item retry_min => $seconds (default: C<30>)
78    
79     =item retry_max => $seconds (default: C<300>)
80    
81     The minimum and maximum retry times when the server cannot be reached.
82    
83 root 1.2 =item on_error => $cb->($zbx, \@items, $msg) (default: log and continue)
84 root 1.1
85     Called on any protocol errors - these generally indicate that something
86     other than a zabbix server is running on a port. The given key-value pairs
87     are the lost items.
88    
89 root 1.3 =item on_loss => $cb->($zbx, \@items) (default: log and continue)
90 root 1.1
91     Will be called when some data items are thrown away (this happens if the
92     server isn't reachable for at least C<queue_time> seconds),
93    
94 root 1.3 =item on_response => $cb->($zbx, \@items, \%response) (default: not called)
95 root 1.1
96     Will be called with the (generally rather useless) response form the
97     zabbix server.
98    
99     =back
100    
101     =cut
102    
103     our $NOP = sub { };
104    
105     my $json = eval { require JSON::XS; JSON::XS->new } || do { require JSON::PP; JSON::PP->new };
106    
107     $json->utf8;
108    
109     sub new {
110     my $class = shift;
111     my $self = bless {
112     server => "localhost:10051",
113     delay => 0,
114     retry_min => 30,
115     retry_max => 300,
116     queue_time => 3600,
117     on_response => $NOP,
118     on_error => sub {
119 root 1.5 AE::log 4 => "$_[0]{zhost}:$_[0]{zport}: $_[2]"; # error
120 root 1.1 },
121 root 1.2 on_loss => sub {
122     my $nitems = @{ $_[1] };
123     AE::log 5 => "$_[0]{zhost}:$_[0]{zport}: $nitems items lost"; # warn
124     },
125    
126 root 1.1 @_,
127 root 1.2
128     on_clear => $NOP,
129 root 1.1 }, $class;
130    
131     ($self->{zhost}, $self->{zport}) = AnyEvent::Socket::parse_hostport $self->{server}, 10051;
132    
133     $self->{host} //= do {
134     require POSIX;
135     (POSIX::uname())[1]
136     };
137    
138     $self->{linger_time} //= $self->{queue_time};
139    
140     $self
141     }
142    
143     sub DESTROY {
144     my ($self) = @_;
145    
146     $self->_wait;
147    
148     %$self = ();
149     }
150    
151     sub _wait {
152     my ($self) = @_;
153    
154 root 1.2 while (@{ $self->{queue} } || $self->{sending}) {
155     my $cv = AE::cv;
156 root 1.1
157 root 1.2 my $to = AE::timer $self->{linger_time}, 0, $cv;
158     local $self->{on_clear} = $cv;
159 root 1.1
160 root 1.2 $cv->recv;
161     }
162 root 1.1 }
163    
164     =item $zbx->submit ($k, $v[, $clock[, $host]])
165    
166     Submits a new key-value pair to the zabbix server. If C<$clock> is missing
167     or C<undef>, then C<AE::now> is used for the event timestamp. If C<$host>
168     is missing, then the default set during object creation is used.
169    
170     =item $zbx->submit_multiple ([ [$k, $v, $clock, $host]... ])
171    
172     Like C<submit>, but submits many key-value pairs at once.
173    
174     =cut
175    
176     sub submit_multiple {
177     my ($self, $kvcs) = @_;
178    
179     push @{ $self->{queue} }, [AE::now, $kvcs];
180    
181     $self->_send
182     unless $self->{sending};
183     }
184    
185     sub submit {
186     my ($self, $k, $v, $clock, $host) = @_;
187    
188     push @{ $self->{queue} }, [AE::now, [[$k, $v, $clock, $host]]];
189    
190     $self->_send;
191     }
192    
193     # start sending
194     sub _send {
195     my ($self) = @_;
196    
197     if ($self->{delay}) {
198     Scalar::Util::weaken $self;
199     $self->{delay_w} ||= AE::timer $self->{delay}, 0, sub {
200     delete $self->{delay_w};
201     $self->{send_immediate} = 1;
202     $self->_send2 unless $self->{sending}++;
203     };
204     } else {
205     $self->{send_immediate} = 1;
206     $self->_send2 unless $self->{sending}++;
207     }
208     }
209    
210     # actually do send
211     sub _send2 {
212     my ($self) = @_;
213    
214     Scalar::Util::weaken $self;
215     $self->{connect_w} = AnyEvent::Socket::tcp_connect $self->{zhost}, $self->{zport}, sub {
216     my ($fh) = @_;
217    
218     $fh
219     or return $self->_retry;
220    
221     delete $self->{retry};
222    
223 root 1.3 delete $self->{send_immediate};
224 root 1.1 my $data = delete $self->{queue};
225     my $items = [map @{ $_->[1] }, @$data];
226    
227     my $fail = sub {
228 root 1.2 $self->{on_error}($self, $items, $_[0]);
229 root 1.1 $self->_retry;
230     };
231    
232     $self->{hdl} = new AnyEvent::Handle
233     fh => $fh,
234     on_error => sub {
235     $fail->($_[2]);
236     },
237     on_read => sub {
238     if (13 <= length $_[0]{rbuf}) {
239     my ($zbxd, $version, $length) = unpack "a4 C Q<", $_[0]{rbuf};
240    
241     $zbxd eq "ZBXD"
242     or return $fail->("protocol mismatch");
243     $version == 1
244     or return $fail->("protocol version mismatch");
245    
246     if (13 + $length <= length $_[0]{rbuf}) {
247 root 1.2 delete $self->{hdl};
248    
249 root 1.1 my $res = eval { $json->decode (substr $_[0]{rbuf}, 13) }
250     or return $fail->("protocol error");
251    
252 root 1.2 $self->{on_response}($self, $items, $res);
253    
254     delete $self->{sending};
255 root 1.1
256 root 1.2 $self->_send2 if delete $self->{send_immediate} && $self->{queue};
257    
258     $self->{on_clear}();
259 root 1.1 }
260     }
261     },
262     ;
263    
264     my $json = $json->encode ({
265     request => "sender data",
266 root 1.3 clock => int AE::now,
267 root 1.1 data => [
268     map {
269     my $slot = $_;
270    
271     map {
272     key => $_->[0],
273     value => $_->[1],
274 root 1.3 clock => int ($_->[2] // $slot->[0]),
275 root 1.1 host => $_->[3] // $self->{host},
276     }, @{ $slot->[1] }
277     } @$data
278     ],
279     });
280    
281     $self->{hdl}->push_write (pack "a4 C Q</a", "ZBXD", 1, $json);
282     };
283     }
284    
285     sub _retry {
286     my ($self) = @_;
287    
288     Scalar::Util::weaken $self;
289    
290     delete $self->{hdl};
291    
292     my $expire = AE::now - $self->{queue_time};
293     while (@{ $self->{queue} } && $self->{queue}[0][0] < $expire) {
294 root 1.2 $self->{on_loss}($self, [shift @{ $self->{queue} }]);
295 root 1.1 }
296    
297     unless (@{ $self->{queue} }) {
298 root 1.2 delete $self->{sending};
299 root 1.1 $self->{on_clear}();
300     return;
301     }
302    
303     my $retry = $self->{retry_min} * 2 ** $self->{retry}++;
304     $retry = $self->{retry_max} if $retry > $self->{retry_max};
305     $self->{retry_w} = AE::timer $retry, 0, sub {
306     delete $self->{retry_w};
307     $self->_send2;
308     };
309     }
310    
311     =back
312    
313     =head1 SEE ALSO
314    
315     L<AnyEvent>.
316    
317     =head1 AUTHOR
318    
319     Marc Lehmann <schmorp@schmorp.de>
320     http://home.schmorp.de/
321    
322     =cut
323    
324     1
325