ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-ZabbixSender/ZabbixSender.pm
(Generate patch)

Comparing AnyEvent-ZabbixSender/ZabbixSender.pm (file contents):
Revision 1.1 by root, Thu Jan 18 10:17:15 2018 UTC vs.
Revision 1.2 by root, Thu Jan 18 10:48:10 2018 UTC

78 78
79=item retry_max => $seconds (default: C<300>) 79=item retry_max => $seconds (default: C<300>)
80 80
81The minimum and maximum retry times when the server cannot be reached. 81The minimum and maximum retry times when the server cannot be reached.
82 82
83=item on_error => $cb->(\@items, $msg) (default: log and continue) 83=item on_error => $cb->($zbx, \@items, $msg) (default: log and continue)
84 84
85Called on any protocol errors - these generally indicate that something 85Called on any protocol errors - these generally indicate that something
86other than a zabbix server is running on a port. The given key-value pairs 86other than a zabbix server is running on a port. The given key-value pairs
87are the lost items. 87are the lost items.
88 88
89=item on_loss => $cb->(\@items) (default: not called) 89=item on_loss => $cb->$zbx, (\@items) (default: log and continue)
90 90
91Will be called when some data items are thrown away (this happens if the 91Will be called when some data items are thrown away (this happens if the
92server isn't reachable for at least C<queue_time> seconds), 92server isn't reachable for at least C<queue_time> seconds),
93 93
94=item on_response => $cb->(\@items, \%response) (default: not called) 94=item on_response => $cb->$zbx, (\@items, \%response) (default: not called)
95 95
96Will be called with the (generally rather useless) response form the 96Will be called with the (generally rather useless) response form the
97zabbix server. 97zabbix server.
98 98
99=back 99=back
112 server => "localhost:10051", 112 server => "localhost:10051",
113 delay => 0, 113 delay => 0,
114 retry_min => 30, 114 retry_min => 30,
115 retry_max => 300, 115 retry_max => 300,
116 queue_time => 3600, 116 queue_time => 3600,
117 on_loss => $NOP,
118 on_response => $NOP, 117 on_response => $NOP,
119 on_error => sub { 118 on_error => sub {
120 AE::log 4 => $_[0]; # error 119 AE::log 4 => "$_[0]{zhost}:$_[0]{zport}: $_[1]"; # error
121 }, 120 },
121 on_loss => sub {
122 my $nitems = @{ $_[1] };
123 AE::log 5 => "$_[0]{zhost}:$_[0]{zport}: $nitems items lost"; # warn
124 },
125
122 @_, 126 @_,
127
128 on_clear => $NOP,
123 }, $class; 129 }, $class;
124 130
125 ($self->{zhost}, $self->{zport}) = AnyEvent::Socket::parse_hostport $self->{server}, 10051; 131 ($self->{zhost}, $self->{zport}) = AnyEvent::Socket::parse_hostport $self->{server}, 10051;
126 132
127 $self->{host} //= do { 133 $self->{host} //= do {
143} 149}
144 150
145sub _wait { 151sub _wait {
146 my ($self) = @_; 152 my ($self) = @_;
147 153
148 return unless @{ $self->{queue} }; 154 while (@{ $self->{queue} } || $self->{sending}) {
149
150 my $cv = AE::cv; 155 my $cv = AE::cv;
151 156
152 my $to = AE::timer $self->{linger_time}, 0, $cv; 157 my $to = AE::timer $self->{linger_time}, 0, $cv;
153 local $self->{on_clear} = $cv; 158 local $self->{on_clear} = $cv;
154 159
155 $cv->recv; 160 $cv->recv;
161 }
156} 162}
157 163
158=item $zbx->submit ($k, $v[, $clock[, $host]]) 164=item $zbx->submit ($k, $v[, $clock[, $host]])
159 165
160Submits a new key-value pair to the zabbix server. If C<$clock> is missing 166Submits a new key-value pair to the zabbix server. If C<$clock> is missing
216 222
217 my $data = delete $self->{queue}; 223 my $data = delete $self->{queue};
218 my $items = [map @{ $_->[1] }, @$data]; 224 my $items = [map @{ $_->[1] }, @$data];
219 225
220 my $fail = sub { 226 my $fail = sub {
221 $self->{on_error}($items, $_[0]); 227 $self->{on_error}($self, $items, $_[0]);
222 $self->_retry; 228 $self->_retry;
223 }; 229 };
224 230
225 $self->{hdl} = new AnyEvent::Handle 231 $self->{hdl} = new AnyEvent::Handle
226 fh => $fh, 232 fh => $fh,
235 or return $fail->("protocol mismatch"); 241 or return $fail->("protocol mismatch");
236 $version == 1 242 $version == 1
237 or return $fail->("protocol version mismatch"); 243 or return $fail->("protocol version mismatch");
238 244
239 if (13 + $length <= length $_[0]{rbuf}) { 245 if (13 + $length <= length $_[0]{rbuf}) {
246 delete $self->{hdl};
247
240 my $res = eval { $json->decode (substr $_[0]{rbuf}, 13) } 248 my $res = eval { $json->decode (substr $_[0]{rbuf}, 13) }
241 or return $fail->("protocol error"); 249 or return $fail->("protocol error");
242 use Data::Dump; ddx $res;#d#
243 250
244 $self->{on_response}($items, $res); 251 $self->{on_response}($self, $items, $res);
252
253 delete $self->{sending};
254
245 $self->_send2 if delete $self->{send_immediate}; 255 $self->_send2 if delete $self->{send_immediate} && $self->{queue};
246 256
247 $self->{on_clear}() unless @{ $self->{queue} }; 257 $self->{on_clear}();
248 } 258 }
249 } 259 }
250 }, 260 },
251 ; 261 ;
252 262
265 }, @{ $slot->[1] } 275 }, @{ $slot->[1] }
266 } @$data 276 } @$data
267 ], 277 ],
268 }); 278 });
269 279
270 warn $json;
271
272 $self->{hdl}->push_write (pack "a4 C Q</a", "ZBXD", 1, $json); 280 $self->{hdl}->push_write (pack "a4 C Q</a", "ZBXD", 1, $json);
273 }; 281 };
274} 282}
275 283
276sub _retry { 284sub _retry {
280 288
281 delete $self->{hdl}; 289 delete $self->{hdl};
282 290
283 my $expire = AE::now - $self->{queue_time}; 291 my $expire = AE::now - $self->{queue_time};
284 while (@{ $self->{queue} } && $self->{queue}[0][0] < $expire) { 292 while (@{ $self->{queue} } && $self->{queue}[0][0] < $expire) {
285 $self->{on_loss}([shift @{ $self->{queue} }]); 293 $self->{on_loss}($self, [shift @{ $self->{queue} }]);
286 } 294 }
287 295
288 unless (@{ $self->{queue} }) { 296 unless (@{ $self->{queue} }) {
297 delete $self->{sending};
289 $self->{on_clear}(); 298 $self->{on_clear}();
290 return; 299 return;
291 } 300 }
292 301
293 my $retry = $self->{retry_min} * 2 ** $self->{retry}++; 302 my $retry = $self->{retry_min} * 2 ** $self->{retry}++;
294 $retry = $self->{retry_max} if $retry > $self->{retry_max}; 303 $retry = $self->{retry_max} if $retry > $self->{retry_max};
295 warn "retry $retry\n";#d#
296 $self->{retry_w} = AE::timer $retry, 0, sub { 304 $self->{retry_w} = AE::timer $retry, 0, sub {
297 delete $self->{retry_w}; 305 delete $self->{retry_w};
298 $self->_send2; 306 $self->_send2;
299 }; 307 };
300} 308}

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines