ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-ZabbixSender/ZabbixSender.pm
(Generate patch)

Comparing AnyEvent-ZabbixSender/ZabbixSender.pm (file contents):
Revision 1.1 by root, Thu Jan 18 10:17:15 2018 UTC vs.
Revision 1.5 by root, Sat Sep 14 19:42:52 2019 UTC

29 29
30use AnyEvent (); 30use AnyEvent ();
31use AnyEvent::Socket (); 31use AnyEvent::Socket ();
32use AnyEvent::Handle (); 32use AnyEvent::Handle ();
33 33
34our $VERSION = '0.1'; 34our $VERSION = '1.1';
35 35
36=item $zbx = new AnyEvent::ZabbixSender [key => value...] 36=item $zbx = new AnyEvent::ZabbixSender [key => value...]
37 37
38Creates a (virtual) connection to a zabbix server. Since each submission 38Creates a (virtual) connection to a zabbix server. Since each submission
39requires a new TCP connection, creating the connection object does not 39requires a new TCP connection, creating the connection object does not
78 78
79=item retry_max => $seconds (default: C<300>) 79=item retry_max => $seconds (default: C<300>)
80 80
81The minimum and maximum retry times when the server cannot be reached. 81The minimum and maximum retry times when the server cannot be reached.
82 82
83=item on_error => $cb->(\@items, $msg) (default: log and continue) 83=item on_error => $cb->($zbx, \@items, $msg) (default: log and continue)
84 84
85Called on any protocol errors - these generally indicate that something 85Called on any protocol errors - these generally indicate that something
86other than a zabbix server is running on a port. The given key-value pairs 86other than a zabbix server is running on a port. The given key-value pairs
87are the lost items. 87are the lost items.
88 88
89=item on_loss => $cb->(\@items) (default: not called) 89=item on_loss => $cb->($zbx, \@items) (default: log and continue)
90 90
91Will be called when some data items are thrown away (this happens if the 91Will be called when some data items are thrown away (this happens if the
92server isn't reachable for at least C<queue_time> seconds), 92server isn't reachable for at least C<queue_time> seconds),
93 93
94=item on_response => $cb->(\@items, \%response) (default: not called) 94=item on_response => $cb->($zbx, \@items, \%response) (default: not called)
95 95
96Will be called with the (generally rather useless) response form the 96Will be called with the (generally rather useless) response form the
97zabbix server. 97zabbix server.
98 98
99=back 99=back
112 server => "localhost:10051", 112 server => "localhost:10051",
113 delay => 0, 113 delay => 0,
114 retry_min => 30, 114 retry_min => 30,
115 retry_max => 300, 115 retry_max => 300,
116 queue_time => 3600, 116 queue_time => 3600,
117 on_loss => $NOP,
118 on_response => $NOP, 117 on_response => $NOP,
119 on_error => sub { 118 on_error => sub {
120 AE::log 4 => $_[0]; # error 119 AE::log 4 => "$_[0]{zhost}:$_[0]{zport}: $_[2]"; # error
121 }, 120 },
121 on_loss => sub {
122 my $nitems = @{ $_[1] };
123 AE::log 5 => "$_[0]{zhost}:$_[0]{zport}: $nitems items lost"; # warn
124 },
125
122 @_, 126 @_,
127
128 on_clear => $NOP,
123 }, $class; 129 }, $class;
124 130
125 ($self->{zhost}, $self->{zport}) = AnyEvent::Socket::parse_hostport $self->{server}, 10051; 131 ($self->{zhost}, $self->{zport}) = AnyEvent::Socket::parse_hostport $self->{server}, 10051;
126 132
127 $self->{host} //= do { 133 $self->{host} //= do {
143} 149}
144 150
145sub _wait { 151sub _wait {
146 my ($self) = @_; 152 my ($self) = @_;
147 153
148 return unless @{ $self->{queue} }; 154 while (@{ $self->{queue} } || $self->{sending}) {
149
150 my $cv = AE::cv; 155 my $cv = AE::cv;
151 156
152 my $to = AE::timer $self->{linger_time}, 0, $cv; 157 my $to = AE::timer $self->{linger_time}, 0, $cv;
153 local $self->{on_clear} = $cv; 158 local $self->{on_clear} = $cv;
154 159
155 $cv->recv; 160 $cv->recv;
161 }
156} 162}
157 163
158=item $zbx->submit ($k, $v[, $clock[, $host]]) 164=item $zbx->submit ($k, $v[, $clock[, $host]])
159 165
160Submits a new key-value pair to the zabbix server. If C<$clock> is missing 166Submits a new key-value pair to the zabbix server. If C<$clock> is missing
212 $fh 218 $fh
213 or return $self->_retry; 219 or return $self->_retry;
214 220
215 delete $self->{retry}; 221 delete $self->{retry};
216 222
223 delete $self->{send_immediate};
217 my $data = delete $self->{queue}; 224 my $data = delete $self->{queue};
218 my $items = [map @{ $_->[1] }, @$data]; 225 my $items = [map @{ $_->[1] }, @$data];
219 226
220 my $fail = sub { 227 my $fail = sub {
221 $self->{on_error}($items, $_[0]); 228 $self->{on_error}($self, $items, $_[0]);
222 $self->_retry; 229 $self->_retry;
223 }; 230 };
224 231
225 $self->{hdl} = new AnyEvent::Handle 232 $self->{hdl} = new AnyEvent::Handle
226 fh => $fh, 233 fh => $fh,
235 or return $fail->("protocol mismatch"); 242 or return $fail->("protocol mismatch");
236 $version == 1 243 $version == 1
237 or return $fail->("protocol version mismatch"); 244 or return $fail->("protocol version mismatch");
238 245
239 if (13 + $length <= length $_[0]{rbuf}) { 246 if (13 + $length <= length $_[0]{rbuf}) {
247 delete $self->{hdl};
248
240 my $res = eval { $json->decode (substr $_[0]{rbuf}, 13) } 249 my $res = eval { $json->decode (substr $_[0]{rbuf}, 13) }
241 or return $fail->("protocol error"); 250 or return $fail->("protocol error");
242 use Data::Dump; ddx $res;#d#
243 251
244 $self->{on_response}($items, $res); 252 $self->{on_response}($self, $items, $res);
253
254 delete $self->{sending};
255
245 $self->_send2 if delete $self->{send_immediate}; 256 $self->_send2 if delete $self->{send_immediate} && $self->{queue};
246 257
247 $self->{on_clear}() unless @{ $self->{queue} }; 258 $self->{on_clear}();
248 } 259 }
249 } 260 }
250 }, 261 },
251 ; 262 ;
252 263
253 my $json = $json->encode ({ 264 my $json = $json->encode ({
254 request => "sender data", 265 request => "sender data",
255 clock => AE::now, 266 clock => int AE::now,
256 data => [ 267 data => [
257 map { 268 map {
258 my $slot = $_; 269 my $slot = $_;
259 270
260 map { 271 map {
261 key => $_->[0], 272 key => $_->[0],
262 value => $_->[1], 273 value => $_->[1],
263 clock => $_->[2] // $slot->[0], 274 clock => int ($_->[2] // $slot->[0]),
264 host => $_->[3] // $self->{host}, 275 host => $_->[3] // $self->{host},
265 }, @{ $slot->[1] } 276 }, @{ $slot->[1] }
266 } @$data 277 } @$data
267 ], 278 ],
268 }); 279 });
269 280
270 warn $json;
271
272 $self->{hdl}->push_write (pack "a4 C Q</a", "ZBXD", 1, $json); 281 $self->{hdl}->push_write (pack "a4 C Q</a", "ZBXD", 1, $json);
273 }; 282 };
274} 283}
275 284
276sub _retry { 285sub _retry {
280 289
281 delete $self->{hdl}; 290 delete $self->{hdl};
282 291
283 my $expire = AE::now - $self->{queue_time}; 292 my $expire = AE::now - $self->{queue_time};
284 while (@{ $self->{queue} } && $self->{queue}[0][0] < $expire) { 293 while (@{ $self->{queue} } && $self->{queue}[0][0] < $expire) {
285 $self->{on_loss}([shift @{ $self->{queue} }]); 294 $self->{on_loss}($self, [shift @{ $self->{queue} }]);
286 } 295 }
287 296
288 unless (@{ $self->{queue} }) { 297 unless (@{ $self->{queue} }) {
298 delete $self->{sending};
289 $self->{on_clear}(); 299 $self->{on_clear}();
290 return; 300 return;
291 } 301 }
292 302
293 my $retry = $self->{retry_min} * 2 ** $self->{retry}++; 303 my $retry = $self->{retry_min} * 2 ** $self->{retry}++;
294 $retry = $self->{retry_max} if $retry > $self->{retry_max}; 304 $retry = $self->{retry_max} if $retry > $self->{retry_max};
295 warn "retry $retry\n";#d#
296 $self->{retry_w} = AE::timer $retry, 0, sub { 305 $self->{retry_w} = AE::timer $retry, 0, sub {
297 delete $self->{retry_w}; 306 delete $self->{retry_w};
298 $self->_send2; 307 $self->_send2;
299 }; 308 };
300} 309}

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines