ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-ZabbixSender/ZabbixSender.pm
Revision: 1.1
Committed: Thu Jan 18 10:17:15 2018 UTC (6 years, 4 months ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::ZabbixSender - simple and efficient zabbix data submission
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::ZabbixSender;
8
9 =head1 DESCRIPTION
10
11 This module is an L<AnyEvent> user, you need to make sure that you use and
12 run a supported event loop.
13
14 I't implements the zabbix version 2.0-3.4 protocol for item data
15 submission.
16
17 =head2 METHODS
18
19 =over 4
20
21 =cut
22
23 package AnyEvent::ZabbixSender;
24
25 use common::sense;
26
27 use Errno ();
28 use Scalar::Util ();
29
30 use AnyEvent ();
31 use AnyEvent::Socket ();
32 use AnyEvent::Handle ();
33
34 our $VERSION = '0.1';
35
36 =item $zbx = new AnyEvent::ZabbixSender [key => value...]
37
38 Creates a (virtual) connection to a zabbix server. Since each submission
39 requires a new TCP connection, creating the connection object does not
40 actually contact the server.
41
42 The connection object will linger in the destructor until all data has
43 been submitted or thrown away.
44
45 You can specify various configuration parameters. The term C<@items>
46 refers to an array with C<[key, value, clock]> array-refs.
47
48 =over 4
49
50 =item server => "$hostname:$port" (default: C<localhost:10051>)
51
52 The zabbix server to connect to.
53
54 =item host => $name (default: local nodename)
55
56 The submission host, the "technical" name from tghe zabbix configuration.
57
58 =item delay => $seconds (default: C<0>)
59
60 If non-zero, then the module will gather data submissions for up to this
61 number of seconds before actually submitting them as a single batch.
62
63 Submissions can get batched even if C<0>, as events submitted while the
64 connection is being established or retried will be batched together in any
65 case.
66
67 =item queue_time => $seconds (default: C<3600>)
68
69 The amount of time a data item will be queued until it is thrown away when
70 the server cannot be reached.
71
72 =item linger_time => $seconds (default: same as C<queue_time>)
73
74 The amount of time the module will linger in its destructor until all
75 items have been submitted.
76
77 =item retry_min => $seconds (default: C<30>)
78
79 =item retry_max => $seconds (default: C<300>)
80
81 The minimum and maximum retry times when the server cannot be reached.
82
83 =item on_error => $cb->(\@items, $msg) (default: log and continue)
84
85 Called on any protocol errors - these generally indicate that something
86 other than a zabbix server is running on a port. The given key-value pairs
87 are the lost items.
88
89 =item on_loss => $cb->(\@items) (default: not called)
90
91 Will be called when some data items are thrown away (this happens if the
92 server isn't reachable for at least C<queue_time> seconds),
93
94 =item on_response => $cb->(\@items, \%response) (default: not called)
95
96 Will be called with the (generally rather useless) response form the
97 zabbix server.
98
99 =back
100
101 =cut
102
103 our $NOP = sub { };
104
105 my $json = eval { require JSON::XS; JSON::XS->new } || do { require JSON::PP; JSON::PP->new };
106
107 $json->utf8;
108
109 sub new {
110 my $class = shift;
111 my $self = bless {
112 server => "localhost:10051",
113 delay => 0,
114 retry_min => 30,
115 retry_max => 300,
116 queue_time => 3600,
117 on_loss => $NOP,
118 on_response => $NOP,
119 on_error => sub {
120 AE::log 4 => $_[0]; # error
121 },
122 @_,
123 }, $class;
124
125 ($self->{zhost}, $self->{zport}) = AnyEvent::Socket::parse_hostport $self->{server}, 10051;
126
127 $self->{host} //= do {
128 require POSIX;
129 (POSIX::uname())[1]
130 };
131
132 $self->{linger_time} //= $self->{queue_time};
133
134 $self
135 }
136
137 sub DESTROY {
138 my ($self) = @_;
139
140 $self->_wait;
141
142 %$self = ();
143 }
144
145 sub _wait {
146 my ($self) = @_;
147
148 return unless @{ $self->{queue} };
149
150 my $cv = AE::cv;
151
152 my $to = AE::timer $self->{linger_time}, 0, $cv;
153 local $self->{on_clear} = $cv;
154
155 $cv->recv;
156 }
157
158 =item $zbx->submit ($k, $v[, $clock[, $host]])
159
160 Submits a new key-value pair to the zabbix server. If C<$clock> is missing
161 or C<undef>, then C<AE::now> is used for the event timestamp. If C<$host>
162 is missing, then the default set during object creation is used.
163
164 =item $zbx->submit_multiple ([ [$k, $v, $clock, $host]... ])
165
166 Like C<submit>, but submits many key-value pairs at once.
167
168 =cut
169
170 sub submit_multiple {
171 my ($self, $kvcs) = @_;
172
173 push @{ $self->{queue} }, [AE::now, $kvcs];
174
175 $self->_send
176 unless $self->{sending};
177 }
178
179 sub submit {
180 my ($self, $k, $v, $clock, $host) = @_;
181
182 push @{ $self->{queue} }, [AE::now, [[$k, $v, $clock, $host]]];
183
184 $self->_send;
185 }
186
187 # start sending
188 sub _send {
189 my ($self) = @_;
190
191 if ($self->{delay}) {
192 Scalar::Util::weaken $self;
193 $self->{delay_w} ||= AE::timer $self->{delay}, 0, sub {
194 delete $self->{delay_w};
195 $self->{send_immediate} = 1;
196 $self->_send2 unless $self->{sending}++;
197 };
198 } else {
199 $self->{send_immediate} = 1;
200 $self->_send2 unless $self->{sending}++;
201 }
202 }
203
204 # actually do send
205 sub _send2 {
206 my ($self) = @_;
207
208 Scalar::Util::weaken $self;
209 $self->{connect_w} = AnyEvent::Socket::tcp_connect $self->{zhost}, $self->{zport}, sub {
210 my ($fh) = @_;
211
212 $fh
213 or return $self->_retry;
214
215 delete $self->{retry};
216
217 my $data = delete $self->{queue};
218 my $items = [map @{ $_->[1] }, @$data];
219
220 my $fail = sub {
221 $self->{on_error}($items, $_[0]);
222 $self->_retry;
223 };
224
225 $self->{hdl} = new AnyEvent::Handle
226 fh => $fh,
227 on_error => sub {
228 $fail->($_[2]);
229 },
230 on_read => sub {
231 if (13 <= length $_[0]{rbuf}) {
232 my ($zbxd, $version, $length) = unpack "a4 C Q<", $_[0]{rbuf};
233
234 $zbxd eq "ZBXD"
235 or return $fail->("protocol mismatch");
236 $version == 1
237 or return $fail->("protocol version mismatch");
238
239 if (13 + $length <= length $_[0]{rbuf}) {
240 my $res = eval { $json->decode (substr $_[0]{rbuf}, 13) }
241 or return $fail->("protocol error");
242 use Data::Dump; ddx $res;#d#
243
244 $self->{on_response}($items, $res);
245 $self->_send2 if delete $self->{send_immediate};
246
247 $self->{on_clear}() unless @{ $self->{queue} };
248 }
249 }
250 },
251 ;
252
253 my $json = $json->encode ({
254 request => "sender data",
255 clock => AE::now,
256 data => [
257 map {
258 my $slot = $_;
259
260 map {
261 key => $_->[0],
262 value => $_->[1],
263 clock => $_->[2] // $slot->[0],
264 host => $_->[3] // $self->{host},
265 }, @{ $slot->[1] }
266 } @$data
267 ],
268 });
269
270 warn $json;
271
272 $self->{hdl}->push_write (pack "a4 C Q</a", "ZBXD", 1, $json);
273 };
274 }
275
276 sub _retry {
277 my ($self) = @_;
278
279 Scalar::Util::weaken $self;
280
281 delete $self->{hdl};
282
283 my $expire = AE::now - $self->{queue_time};
284 while (@{ $self->{queue} } && $self->{queue}[0][0] < $expire) {
285 $self->{on_loss}([shift @{ $self->{queue} }]);
286 }
287
288 unless (@{ $self->{queue} }) {
289 $self->{on_clear}();
290 return;
291 }
292
293 my $retry = $self->{retry_min} * 2 ** $self->{retry}++;
294 $retry = $self->{retry_max} if $retry > $self->{retry_max};
295 warn "retry $retry\n";#d#
296 $self->{retry_w} = AE::timer $retry, 0, sub {
297 delete $self->{retry_w};
298 $self->_send2;
299 };
300 }
301
302 =back
303
304 =head1 SEE ALSO
305
306 L<AnyEvent>.
307
308 =head1 AUTHOR
309
310 Marc Lehmann <schmorp@schmorp.de>
311 http://home.schmorp.de/
312
313 =cut
314
315 1
316