ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-ZabbixSender/ZabbixSender.pm
Revision: 1.2
Committed: Thu Jan 18 10:48:10 2018 UTC (6 years, 10 months ago) by root
Branch: MAIN
Changes since 1.1: +28 -20 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::ZabbixSender - simple and efficient zabbix data submission
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::ZabbixSender;
8
9 =head1 DESCRIPTION
10
11 This module is an L<AnyEvent> user, you need to make sure that you use and
12 run a supported event loop.
13
14 I't implements the zabbix version 2.0-3.4 protocol for item data
15 submission.
16
17 =head2 METHODS
18
19 =over 4
20
21 =cut
22
23 package AnyEvent::ZabbixSender;
24
25 use common::sense;
26
27 use Errno ();
28 use Scalar::Util ();
29
30 use AnyEvent ();
31 use AnyEvent::Socket ();
32 use AnyEvent::Handle ();
33
34 our $VERSION = '0.1';
35
36 =item $zbx = new AnyEvent::ZabbixSender [key => value...]
37
38 Creates a (virtual) connection to a zabbix server. Since each submission
39 requires a new TCP connection, creating the connection object does not
40 actually contact the server.
41
42 The connection object will linger in the destructor until all data has
43 been submitted or thrown away.
44
45 You can specify various configuration parameters. The term C<@items>
46 refers to an array with C<[key, value, clock]> array-refs.
47
48 =over 4
49
50 =item server => "$hostname:$port" (default: C<localhost:10051>)
51
52 The zabbix server to connect to.
53
54 =item host => $name (default: local nodename)
55
56 The submission host, the "technical" name from tghe zabbix configuration.
57
58 =item delay => $seconds (default: C<0>)
59
60 If non-zero, then the module will gather data submissions for up to this
61 number of seconds before actually submitting them as a single batch.
62
63 Submissions can get batched even if C<0>, as events submitted while the
64 connection is being established or retried will be batched together in any
65 case.
66
67 =item queue_time => $seconds (default: C<3600>)
68
69 The amount of time a data item will be queued until it is thrown away when
70 the server cannot be reached.
71
72 =item linger_time => $seconds (default: same as C<queue_time>)
73
74 The amount of time the module will linger in its destructor until all
75 items have been submitted.
76
77 =item retry_min => $seconds (default: C<30>)
78
79 =item retry_max => $seconds (default: C<300>)
80
81 The minimum and maximum retry times when the server cannot be reached.
82
83 =item on_error => $cb->($zbx, \@items, $msg) (default: log and continue)
84
85 Called on any protocol errors - these generally indicate that something
86 other than a zabbix server is running on a port. The given key-value pairs
87 are the lost items.
88
89 =item on_loss => $cb->$zbx, (\@items) (default: log and continue)
90
91 Will be called when some data items are thrown away (this happens if the
92 server isn't reachable for at least C<queue_time> seconds),
93
94 =item on_response => $cb->$zbx, (\@items, \%response) (default: not called)
95
96 Will be called with the (generally rather useless) response form the
97 zabbix server.
98
99 =back
100
101 =cut
102
103 our $NOP = sub { };
104
105 my $json = eval { require JSON::XS; JSON::XS->new } || do { require JSON::PP; JSON::PP->new };
106
107 $json->utf8;
108
109 sub new {
110 my $class = shift;
111 my $self = bless {
112 server => "localhost:10051",
113 delay => 0,
114 retry_min => 30,
115 retry_max => 300,
116 queue_time => 3600,
117 on_response => $NOP,
118 on_error => sub {
119 AE::log 4 => "$_[0]{zhost}:$_[0]{zport}: $_[1]"; # error
120 },
121 on_loss => sub {
122 my $nitems = @{ $_[1] };
123 AE::log 5 => "$_[0]{zhost}:$_[0]{zport}: $nitems items lost"; # warn
124 },
125
126 @_,
127
128 on_clear => $NOP,
129 }, $class;
130
131 ($self->{zhost}, $self->{zport}) = AnyEvent::Socket::parse_hostport $self->{server}, 10051;
132
133 $self->{host} //= do {
134 require POSIX;
135 (POSIX::uname())[1]
136 };
137
138 $self->{linger_time} //= $self->{queue_time};
139
140 $self
141 }
142
143 sub DESTROY {
144 my ($self) = @_;
145
146 $self->_wait;
147
148 %$self = ();
149 }
150
151 sub _wait {
152 my ($self) = @_;
153
154 while (@{ $self->{queue} } || $self->{sending}) {
155 my $cv = AE::cv;
156
157 my $to = AE::timer $self->{linger_time}, 0, $cv;
158 local $self->{on_clear} = $cv;
159
160 $cv->recv;
161 }
162 }
163
164 =item $zbx->submit ($k, $v[, $clock[, $host]])
165
166 Submits a new key-value pair to the zabbix server. If C<$clock> is missing
167 or C<undef>, then C<AE::now> is used for the event timestamp. If C<$host>
168 is missing, then the default set during object creation is used.
169
170 =item $zbx->submit_multiple ([ [$k, $v, $clock, $host]... ])
171
172 Like C<submit>, but submits many key-value pairs at once.
173
174 =cut
175
176 sub submit_multiple {
177 my ($self, $kvcs) = @_;
178
179 push @{ $self->{queue} }, [AE::now, $kvcs];
180
181 $self->_send
182 unless $self->{sending};
183 }
184
185 sub submit {
186 my ($self, $k, $v, $clock, $host) = @_;
187
188 push @{ $self->{queue} }, [AE::now, [[$k, $v, $clock, $host]]];
189
190 $self->_send;
191 }
192
193 # start sending
194 sub _send {
195 my ($self) = @_;
196
197 if ($self->{delay}) {
198 Scalar::Util::weaken $self;
199 $self->{delay_w} ||= AE::timer $self->{delay}, 0, sub {
200 delete $self->{delay_w};
201 $self->{send_immediate} = 1;
202 $self->_send2 unless $self->{sending}++;
203 };
204 } else {
205 $self->{send_immediate} = 1;
206 $self->_send2 unless $self->{sending}++;
207 }
208 }
209
210 # actually do send
211 sub _send2 {
212 my ($self) = @_;
213
214 Scalar::Util::weaken $self;
215 $self->{connect_w} = AnyEvent::Socket::tcp_connect $self->{zhost}, $self->{zport}, sub {
216 my ($fh) = @_;
217
218 $fh
219 or return $self->_retry;
220
221 delete $self->{retry};
222
223 my $data = delete $self->{queue};
224 my $items = [map @{ $_->[1] }, @$data];
225
226 my $fail = sub {
227 $self->{on_error}($self, $items, $_[0]);
228 $self->_retry;
229 };
230
231 $self->{hdl} = new AnyEvent::Handle
232 fh => $fh,
233 on_error => sub {
234 $fail->($_[2]);
235 },
236 on_read => sub {
237 if (13 <= length $_[0]{rbuf}) {
238 my ($zbxd, $version, $length) = unpack "a4 C Q<", $_[0]{rbuf};
239
240 $zbxd eq "ZBXD"
241 or return $fail->("protocol mismatch");
242 $version == 1
243 or return $fail->("protocol version mismatch");
244
245 if (13 + $length <= length $_[0]{rbuf}) {
246 delete $self->{hdl};
247
248 my $res = eval { $json->decode (substr $_[0]{rbuf}, 13) }
249 or return $fail->("protocol error");
250
251 $self->{on_response}($self, $items, $res);
252
253 delete $self->{sending};
254
255 $self->_send2 if delete $self->{send_immediate} && $self->{queue};
256
257 $self->{on_clear}();
258 }
259 }
260 },
261 ;
262
263 my $json = $json->encode ({
264 request => "sender data",
265 clock => AE::now,
266 data => [
267 map {
268 my $slot = $_;
269
270 map {
271 key => $_->[0],
272 value => $_->[1],
273 clock => $_->[2] // $slot->[0],
274 host => $_->[3] // $self->{host},
275 }, @{ $slot->[1] }
276 } @$data
277 ],
278 });
279
280 $self->{hdl}->push_write (pack "a4 C Q</a", "ZBXD", 1, $json);
281 };
282 }
283
284 sub _retry {
285 my ($self) = @_;
286
287 Scalar::Util::weaken $self;
288
289 delete $self->{hdl};
290
291 my $expire = AE::now - $self->{queue_time};
292 while (@{ $self->{queue} } && $self->{queue}[0][0] < $expire) {
293 $self->{on_loss}($self, [shift @{ $self->{queue} }]);
294 }
295
296 unless (@{ $self->{queue} }) {
297 delete $self->{sending};
298 $self->{on_clear}();
299 return;
300 }
301
302 my $retry = $self->{retry_min} * 2 ** $self->{retry}++;
303 $retry = $self->{retry_max} if $retry > $self->{retry_max};
304 $self->{retry_w} = AE::timer $retry, 0, sub {
305 delete $self->{retry_w};
306 $self->_send2;
307 };
308 }
309
310 =back
311
312 =head1 SEE ALSO
313
314 L<AnyEvent>.
315
316 =head1 AUTHOR
317
318 Marc Lehmann <schmorp@schmorp.de>
319 http://home.schmorp.de/
320
321 =cut
322
323 1
324