… | |
… | |
78 | |
78 | |
79 | =item retry_max => $seconds (default: C<300>) |
79 | =item retry_max => $seconds (default: C<300>) |
80 | |
80 | |
81 | The minimum and maximum retry times when the server cannot be reached. |
81 | The minimum and maximum retry times when the server cannot be reached. |
82 | |
82 | |
83 | =item on_error => $cb->(\@items, $msg) (default: log and continue) |
83 | =item on_error => $cb->($zbx, \@items, $msg) (default: log and continue) |
84 | |
84 | |
85 | Called on any protocol errors - these generally indicate that something |
85 | Called on any protocol errors - these generally indicate that something |
86 | other than a zabbix server is running on a port. The given key-value pairs |
86 | other than a zabbix server is running on a port. The given key-value pairs |
87 | are the lost items. |
87 | are the lost items. |
88 | |
88 | |
89 | =item on_loss => $cb->(\@items) (default: not called) |
89 | =item on_loss => $cb->$zbx, (\@items) (default: log and continue) |
90 | |
90 | |
91 | Will be called when some data items are thrown away (this happens if the |
91 | Will be called when some data items are thrown away (this happens if the |
92 | server isn't reachable for at least C<queue_time> seconds), |
92 | server isn't reachable for at least C<queue_time> seconds), |
93 | |
93 | |
94 | =item on_response => $cb->(\@items, \%response) (default: not called) |
94 | =item on_response => $cb->$zbx, (\@items, \%response) (default: not called) |
95 | |
95 | |
96 | Will be called with the (generally rather useless) response form the |
96 | Will be called with the (generally rather useless) response form the |
97 | zabbix server. |
97 | zabbix server. |
98 | |
98 | |
99 | =back |
99 | =back |
… | |
… | |
112 | server => "localhost:10051", |
112 | server => "localhost:10051", |
113 | delay => 0, |
113 | delay => 0, |
114 | retry_min => 30, |
114 | retry_min => 30, |
115 | retry_max => 300, |
115 | retry_max => 300, |
116 | queue_time => 3600, |
116 | queue_time => 3600, |
117 | on_loss => $NOP, |
|
|
118 | on_response => $NOP, |
117 | on_response => $NOP, |
119 | on_error => sub { |
118 | on_error => sub { |
120 | AE::log 4 => $_[0]; # error |
119 | AE::log 4 => "$_[0]{zhost}:$_[0]{zport}: $_[1]"; # error |
121 | }, |
120 | }, |
|
|
121 | on_loss => sub { |
|
|
122 | my $nitems = @{ $_[1] }; |
|
|
123 | AE::log 5 => "$_[0]{zhost}:$_[0]{zport}: $nitems items lost"; # warn |
|
|
124 | }, |
|
|
125 | |
122 | @_, |
126 | @_, |
|
|
127 | |
|
|
128 | on_clear => $NOP, |
123 | }, $class; |
129 | }, $class; |
124 | |
130 | |
125 | ($self->{zhost}, $self->{zport}) = AnyEvent::Socket::parse_hostport $self->{server}, 10051; |
131 | ($self->{zhost}, $self->{zport}) = AnyEvent::Socket::parse_hostport $self->{server}, 10051; |
126 | |
132 | |
127 | $self->{host} //= do { |
133 | $self->{host} //= do { |
… | |
… | |
143 | } |
149 | } |
144 | |
150 | |
145 | sub _wait { |
151 | sub _wait { |
146 | my ($self) = @_; |
152 | my ($self) = @_; |
147 | |
153 | |
148 | return unless @{ $self->{queue} }; |
154 | while (@{ $self->{queue} } || $self->{sending}) { |
149 | |
|
|
150 | my $cv = AE::cv; |
155 | my $cv = AE::cv; |
151 | |
156 | |
152 | my $to = AE::timer $self->{linger_time}, 0, $cv; |
157 | my $to = AE::timer $self->{linger_time}, 0, $cv; |
153 | local $self->{on_clear} = $cv; |
158 | local $self->{on_clear} = $cv; |
154 | |
159 | |
155 | $cv->recv; |
160 | $cv->recv; |
|
|
161 | } |
156 | } |
162 | } |
157 | |
163 | |
158 | =item $zbx->submit ($k, $v[, $clock[, $host]]) |
164 | =item $zbx->submit ($k, $v[, $clock[, $host]]) |
159 | |
165 | |
160 | Submits a new key-value pair to the zabbix server. If C<$clock> is missing |
166 | Submits a new key-value pair to the zabbix server. If C<$clock> is missing |
… | |
… | |
216 | |
222 | |
217 | my $data = delete $self->{queue}; |
223 | my $data = delete $self->{queue}; |
218 | my $items = [map @{ $_->[1] }, @$data]; |
224 | my $items = [map @{ $_->[1] }, @$data]; |
219 | |
225 | |
220 | my $fail = sub { |
226 | my $fail = sub { |
221 | $self->{on_error}($items, $_[0]); |
227 | $self->{on_error}($self, $items, $_[0]); |
222 | $self->_retry; |
228 | $self->_retry; |
223 | }; |
229 | }; |
224 | |
230 | |
225 | $self->{hdl} = new AnyEvent::Handle |
231 | $self->{hdl} = new AnyEvent::Handle |
226 | fh => $fh, |
232 | fh => $fh, |
… | |
… | |
235 | or return $fail->("protocol mismatch"); |
241 | or return $fail->("protocol mismatch"); |
236 | $version == 1 |
242 | $version == 1 |
237 | or return $fail->("protocol version mismatch"); |
243 | or return $fail->("protocol version mismatch"); |
238 | |
244 | |
239 | if (13 + $length <= length $_[0]{rbuf}) { |
245 | if (13 + $length <= length $_[0]{rbuf}) { |
|
|
246 | delete $self->{hdl}; |
|
|
247 | |
240 | my $res = eval { $json->decode (substr $_[0]{rbuf}, 13) } |
248 | my $res = eval { $json->decode (substr $_[0]{rbuf}, 13) } |
241 | or return $fail->("protocol error"); |
249 | or return $fail->("protocol error"); |
242 | use Data::Dump; ddx $res;#d# |
|
|
243 | |
250 | |
244 | $self->{on_response}($items, $res); |
251 | $self->{on_response}($self, $items, $res); |
|
|
252 | |
|
|
253 | delete $self->{sending}; |
|
|
254 | |
245 | $self->_send2 if delete $self->{send_immediate}; |
255 | $self->_send2 if delete $self->{send_immediate} && $self->{queue}; |
246 | |
256 | |
247 | $self->{on_clear}() unless @{ $self->{queue} }; |
257 | $self->{on_clear}(); |
248 | } |
258 | } |
249 | } |
259 | } |
250 | }, |
260 | }, |
251 | ; |
261 | ; |
252 | |
262 | |
… | |
… | |
265 | }, @{ $slot->[1] } |
275 | }, @{ $slot->[1] } |
266 | } @$data |
276 | } @$data |
267 | ], |
277 | ], |
268 | }); |
278 | }); |
269 | |
279 | |
270 | warn $json; |
|
|
271 | |
|
|
272 | $self->{hdl}->push_write (pack "a4 C Q</a", "ZBXD", 1, $json); |
280 | $self->{hdl}->push_write (pack "a4 C Q</a", "ZBXD", 1, $json); |
273 | }; |
281 | }; |
274 | } |
282 | } |
275 | |
283 | |
276 | sub _retry { |
284 | sub _retry { |
… | |
… | |
280 | |
288 | |
281 | delete $self->{hdl}; |
289 | delete $self->{hdl}; |
282 | |
290 | |
283 | my $expire = AE::now - $self->{queue_time}; |
291 | my $expire = AE::now - $self->{queue_time}; |
284 | while (@{ $self->{queue} } && $self->{queue}[0][0] < $expire) { |
292 | while (@{ $self->{queue} } && $self->{queue}[0][0] < $expire) { |
285 | $self->{on_loss}([shift @{ $self->{queue} }]); |
293 | $self->{on_loss}($self, [shift @{ $self->{queue} }]); |
286 | } |
294 | } |
287 | |
295 | |
288 | unless (@{ $self->{queue} }) { |
296 | unless (@{ $self->{queue} }) { |
|
|
297 | delete $self->{sending}; |
289 | $self->{on_clear}(); |
298 | $self->{on_clear}(); |
290 | return; |
299 | return; |
291 | } |
300 | } |
292 | |
301 | |
293 | my $retry = $self->{retry_min} * 2 ** $self->{retry}++; |
302 | my $retry = $self->{retry_min} * 2 ** $self->{retry}++; |
294 | $retry = $self->{retry_max} if $retry > $self->{retry_max}; |
303 | $retry = $self->{retry_max} if $retry > $self->{retry_max}; |
295 | warn "retry $retry\n";#d# |
|
|
296 | $self->{retry_w} = AE::timer $retry, 0, sub { |
304 | $self->{retry_w} = AE::timer $retry, 0, sub { |
297 | delete $self->{retry_w}; |
305 | delete $self->{retry_w}; |
298 | $self->_send2; |
306 | $self->_send2; |
299 | }; |
307 | }; |
300 | } |
308 | } |