ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-ZabbixSender/ZabbixSender.pm
Revision: 1.4
Committed: Thu Jan 18 16:37:44 2018 UTC (6 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-1_0
Changes since 1.3: +1 -1 lines
Log Message:
1.0

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::ZabbixSender - simple and efficient zabbix data submission
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::ZabbixSender;
8
9 =head1 DESCRIPTION
10
11 This module is an L<AnyEvent> user, you need to make sure that you use and
12 run a supported event loop.
13
14 I't implements the zabbix version 2.0-3.4 protocol for item data
15 submission.
16
17 =head2 METHODS
18
19 =over 4
20
21 =cut
22
23 package AnyEvent::ZabbixSender;
24
25 use common::sense;
26
27 use Errno ();
28 use Scalar::Util ();
29
30 use AnyEvent ();
31 use AnyEvent::Socket ();
32 use AnyEvent::Handle ();
33
34 our $VERSION = '1.0';
35
36 =item $zbx = new AnyEvent::ZabbixSender [key => value...]
37
38 Creates a (virtual) connection to a zabbix server. Since each submission
39 requires a new TCP connection, creating the connection object does not
40 actually contact the server.
41
42 The connection object will linger in the destructor until all data has
43 been submitted or thrown away.
44
45 You can specify various configuration parameters. The term C<@items>
46 refers to an array with C<[key, value, clock]> array-refs.
47
48 =over 4
49
50 =item server => "$hostname:$port" (default: C<localhost:10051>)
51
52 The zabbix server to connect to.
53
54 =item host => $name (default: local nodename)
55
56 The submission host, the "technical" name from tghe zabbix configuration.
57
58 =item delay => $seconds (default: C<0>)
59
60 If non-zero, then the module will gather data submissions for up to this
61 number of seconds before actually submitting them as a single batch.
62
63 Submissions can get batched even if C<0>, as events submitted while the
64 connection is being established or retried will be batched together in any
65 case.
66
67 =item queue_time => $seconds (default: C<3600>)
68
69 The amount of time a data item will be queued until it is thrown away when
70 the server cannot be reached.
71
72 =item linger_time => $seconds (default: same as C<queue_time>)
73
74 The amount of time the module will linger in its destructor until all
75 items have been submitted.
76
77 =item retry_min => $seconds (default: C<30>)
78
79 =item retry_max => $seconds (default: C<300>)
80
81 The minimum and maximum retry times when the server cannot be reached.
82
83 =item on_error => $cb->($zbx, \@items, $msg) (default: log and continue)
84
85 Called on any protocol errors - these generally indicate that something
86 other than a zabbix server is running on a port. The given key-value pairs
87 are the lost items.
88
89 =item on_loss => $cb->($zbx, \@items) (default: log and continue)
90
91 Will be called when some data items are thrown away (this happens if the
92 server isn't reachable for at least C<queue_time> seconds),
93
94 =item on_response => $cb->($zbx, \@items, \%response) (default: not called)
95
96 Will be called with the (generally rather useless) response form the
97 zabbix server.
98
99 =back
100
101 =cut
102
103 our $NOP = sub { };
104
105 my $json = eval { require JSON::XS; JSON::XS->new } || do { require JSON::PP; JSON::PP->new };
106
107 $json->utf8;
108
109 sub new {
110 my $class = shift;
111 my $self = bless {
112 server => "localhost:10051",
113 delay => 0,
114 retry_min => 30,
115 retry_max => 300,
116 queue_time => 3600,
117 on_response => $NOP,
118 on_error => sub {
119 AE::log 4 => "$_[0]{zhost}:$_[0]{zport}: $_[1]"; # error
120 },
121 on_loss => sub {
122 my $nitems = @{ $_[1] };
123 AE::log 5 => "$_[0]{zhost}:$_[0]{zport}: $nitems items lost"; # warn
124 },
125
126 @_,
127
128 on_clear => $NOP,
129 }, $class;
130
131 ($self->{zhost}, $self->{zport}) = AnyEvent::Socket::parse_hostport $self->{server}, 10051;
132
133 $self->{host} //= do {
134 require POSIX;
135 (POSIX::uname())[1]
136 };
137
138 $self->{linger_time} //= $self->{queue_time};
139
140 $self
141 }
142
143 sub DESTROY {
144 my ($self) = @_;
145
146 $self->_wait;
147
148 %$self = ();
149 }
150
151 sub _wait {
152 my ($self) = @_;
153
154 while (@{ $self->{queue} } || $self->{sending}) {
155 my $cv = AE::cv;
156
157 my $to = AE::timer $self->{linger_time}, 0, $cv;
158 local $self->{on_clear} = $cv;
159
160 $cv->recv;
161 }
162 }
163
164 =item $zbx->submit ($k, $v[, $clock[, $host]])
165
166 Submits a new key-value pair to the zabbix server. If C<$clock> is missing
167 or C<undef>, then C<AE::now> is used for the event timestamp. If C<$host>
168 is missing, then the default set during object creation is used.
169
170 =item $zbx->submit_multiple ([ [$k, $v, $clock, $host]... ])
171
172 Like C<submit>, but submits many key-value pairs at once.
173
174 =cut
175
176 sub submit_multiple {
177 my ($self, $kvcs) = @_;
178
179 push @{ $self->{queue} }, [AE::now, $kvcs];
180
181 $self->_send
182 unless $self->{sending};
183 }
184
185 sub submit {
186 my ($self, $k, $v, $clock, $host) = @_;
187
188 push @{ $self->{queue} }, [AE::now, [[$k, $v, $clock, $host]]];
189
190 $self->_send;
191 }
192
193 # start sending
194 sub _send {
195 my ($self) = @_;
196
197 if ($self->{delay}) {
198 Scalar::Util::weaken $self;
199 $self->{delay_w} ||= AE::timer $self->{delay}, 0, sub {
200 delete $self->{delay_w};
201 $self->{send_immediate} = 1;
202 $self->_send2 unless $self->{sending}++;
203 };
204 } else {
205 $self->{send_immediate} = 1;
206 $self->_send2 unless $self->{sending}++;
207 }
208 }
209
210 # actually do send
211 sub _send2 {
212 my ($self) = @_;
213
214 Scalar::Util::weaken $self;
215 $self->{connect_w} = AnyEvent::Socket::tcp_connect $self->{zhost}, $self->{zport}, sub {
216 my ($fh) = @_;
217
218 $fh
219 or return $self->_retry;
220
221 delete $self->{retry};
222
223 delete $self->{send_immediate};
224 my $data = delete $self->{queue};
225 my $items = [map @{ $_->[1] }, @$data];
226
227 my $fail = sub {
228 $self->{on_error}($self, $items, $_[0]);
229 $self->_retry;
230 };
231
232 $self->{hdl} = new AnyEvent::Handle
233 fh => $fh,
234 on_error => sub {
235 $fail->($_[2]);
236 },
237 on_read => sub {
238 if (13 <= length $_[0]{rbuf}) {
239 my ($zbxd, $version, $length) = unpack "a4 C Q<", $_[0]{rbuf};
240
241 $zbxd eq "ZBXD"
242 or return $fail->("protocol mismatch");
243 $version == 1
244 or return $fail->("protocol version mismatch");
245
246 if (13 + $length <= length $_[0]{rbuf}) {
247 delete $self->{hdl};
248
249 my $res = eval { $json->decode (substr $_[0]{rbuf}, 13) }
250 or return $fail->("protocol error");
251
252 $self->{on_response}($self, $items, $res);
253
254 delete $self->{sending};
255
256 $self->_send2 if delete $self->{send_immediate} && $self->{queue};
257
258 $self->{on_clear}();
259 }
260 }
261 },
262 ;
263
264 my $json = $json->encode ({
265 request => "sender data",
266 clock => int AE::now,
267 data => [
268 map {
269 my $slot = $_;
270
271 map {
272 key => $_->[0],
273 value => $_->[1],
274 clock => int ($_->[2] // $slot->[0]),
275 host => $_->[3] // $self->{host},
276 }, @{ $slot->[1] }
277 } @$data
278 ],
279 });
280
281 $self->{hdl}->push_write (pack "a4 C Q</a", "ZBXD", 1, $json);
282 };
283 }
284
285 sub _retry {
286 my ($self) = @_;
287
288 Scalar::Util::weaken $self;
289
290 delete $self->{hdl};
291
292 my $expire = AE::now - $self->{queue_time};
293 while (@{ $self->{queue} } && $self->{queue}[0][0] < $expire) {
294 $self->{on_loss}($self, [shift @{ $self->{queue} }]);
295 }
296
297 unless (@{ $self->{queue} }) {
298 delete $self->{sending};
299 $self->{on_clear}();
300 return;
301 }
302
303 my $retry = $self->{retry_min} * 2 ** $self->{retry}++;
304 $retry = $self->{retry_max} if $retry > $self->{retry_max};
305 $self->{retry_w} = AE::timer $retry, 0, sub {
306 delete $self->{retry_w};
307 $self->_send2;
308 };
309 }
310
311 =back
312
313 =head1 SEE ALSO
314
315 L<AnyEvent>.
316
317 =head1 AUTHOR
318
319 Marc Lehmann <schmorp@schmorp.de>
320 http://home.schmorp.de/
321
322 =cut
323
324 1
325