ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.30 by root, Tue Aug 4 23:35:51 2009 UTC vs.
Revision 1.32 by root, Wed Aug 5 19:58:46 2009 UTC

103use base "Exporter"; 103use base "Exporter";
104 104
105our $VERSION = '0.1'; 105our $VERSION = '0.1';
106our @EXPORT = qw( 106our @EXPORT = qw(
107 NODE $NODE *SELF node_of _any_ 107 NODE $NODE *SELF node_of _any_
108 resolve_node 108 resolve_node initialise_node
109 become_slave become_public
110 snd rcv mon kil reg psub 109 snd rcv mon kil reg psub
111 port 110 port
112); 111);
113 112
114our $SELF; 113our $SELF;
190JSON is used, then only strings, numbers and arrays and hashes consisting 189JSON is used, then only strings, numbers and arrays and hashes consisting
191of those are allowed (no objects). When Storable is used, then anything 190of those are allowed (no objects). When Storable is used, then anything
192that Storable can serialise and deserialise is allowed, and for the local 191that Storable can serialise and deserialise is allowed, and for the local
193node, anything can be passed. 192node, anything can be passed.
194 193
195=item kil $portid[, @reason]
196
197Kill the specified port with the given C<@reason>.
198
199If no C<@reason> is specified, then the port is killed "normally" (linked
200ports will not be kileld, or even notified).
201
202Otherwise, linked ports get killed with the same reason (second form of
203C<mon>, see below).
204
205Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
206will be reported as reason C<< die => $@ >>.
207
208Transport/communication errors are reported as C<< transport_error =>
209$message >>.
210
211=item $guard = mon $portid, $cb->(@reason)
212
213=item $guard = mon $portid, $otherport
214
215=item $guard = mon $portid, $otherport, @msg
216
217Monitor the given port and do something when the port is killed.
218
219In the first form, the callback is simply called with any number
220of C<@reason> elements (no @reason means that the port was deleted
221"normally"). Note also that I<< the callback B<must> never die >>, so use
222C<eval> if unsure.
223
224In the second form, the other port will be C<kil>'ed with C<@reason>, iff
225a @reason was specified, i.e. on "normal" kils nothing happens, while
226under all other conditions, the other port is killed with the same reason.
227
228In the last form, a message of the form C<@msg, @reason> will be C<snd>.
229
230Example: call a given callback when C<$port> is killed.
231
232 mon $port, sub { warn "port died because of <@_>\n" };
233
234Example: kill ourselves when C<$port> is killed abnormally.
235
236 mon $port, $self;
237
238Example: send us a restart message another C<$port> is killed.
239
240 mon $port, $self => "restart";
241
242=cut
243
244sub mon {
245 my ($noderef, $port) = split /#/, shift, 2;
246
247 my $node = $NODE{$noderef} || add_node $noderef;
248
249 my $cb = shift;
250
251 unless (ref $cb) {
252 if (@_) {
253 # send a kill info message
254 my (@msg) = ($cb, @_);
255 $cb = sub { snd @msg, @_ };
256 } else {
257 # simply kill other port
258 my $port = $cb;
259 $cb = sub { kil $port, @_ if @_ };
260 }
261 }
262
263 $node->monitor ($port, $cb);
264
265 defined wantarray
266 and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
267}
268
269=item $guard = mon_guard $port, $ref, $ref...
270
271Monitors the given C<$port> and keeps the passed references. When the port
272is killed, the references will be freed.
273
274Optionally returns a guard that will stop the monitoring.
275
276This function is useful when you create e.g. timers or other watchers and
277want to free them when the port gets killed:
278
279 $port->rcv (start => sub {
280 my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub {
281 undef $timer if 0.9 < rand;
282 });
283 });
284
285=cut
286
287sub mon_guard {
288 my ($port, @refs) = @_;
289
290 mon $port, sub { 0 && @refs }
291}
292
293=item lnk $port1, $port2
294
295Link two ports. This is simply a shorthand for:
296
297 mon $port1, $port2;
298 mon $port2, $port1;
299
300It means that if either one is killed abnormally, the other one gets
301killed as well.
302
303=item $local_port = port 194=item $local_port = port
304 195
305Create a new local port object that supports message matching. 196Create a new local port object that can be used either as a pattern
197matching port ("full port") or a single-callback port ("miniport"),
198depending on how C<rcv> callbacks are bound to the object.
306 199
307=item $portid = port { my @msg = @_; $finished } 200=item $portid = port { my @msg = @_; $finished }
308 201
309Creates a "mini port", that is, a very lightweight port without any 202Creates a "mini port", that is, a very lightweight port without any
310pattern matching behind it, and returns its ID. 203pattern matching behind it, and returns its ID.
316The message will be passed as-is, no extra argument (i.e. no port id) will 209The message will be passed as-is, no extra argument (i.e. no port id) will
317be passed to the callback. 210be passed to the callback.
318 211
319If you need the local port id in the callback, this works nicely: 212If you need the local port id in the callback, this works nicely:
320 213
321 my $port; $port = miniport { 214 my $port; $port = port {
322 snd $otherport, reply => $port; 215 snd $otherport, reply => $port;
323 }; 216 };
324 217
325=cut 218=cut
326 219
387 my ($portid, $name) = @_; 280 my ($portid, $name) = @_;
388 281
389 $REG{$name} = $portid; 282 $REG{$name} = $portid;
390} 283}
391 284
285=item rcv $portid, $callback->(@msg)
286
287Replaces the callback on the specified miniport (or newly created port
288object, see C<port>). Full ports are configured with the following calls:
289
392=item rcv $portid, tagstring => $callback->(@msg), ... 290=item rcv $portid, tagstring => $callback->(@msg), ...
393 291
394=item rcv $portid, $smartmatch => $callback->(@msg), ... 292=item rcv $portid, $smartmatch => $callback->(@msg), ...
395 293
396=item rcv $portid, [$smartmatch...] => $callback->(@msg), ... 294=item rcv $portid, [$smartmatch...] => $callback->(@msg), ...
397 295
398Register callbacks to be called on matching messages on the given port. 296Register callbacks to be called on matching messages on the given full
297port (or newly created port).
399 298
400The callback has to return a true value when its work is done, after 299The callback has to return a true value when its work is done, after
401which is will be removed, or a false value in which case it will stay 300which is will be removed, or a false value in which case it will stay
402registered. 301registered.
403 302
419also the most efficient match (by far). 318also the most efficient match (by far).
420 319
421=cut 320=cut
422 321
423sub rcv($@) { 322sub rcv($@) {
323 my $portid = shift;
424 my ($noderef, $port) = split /#/, shift, 2; 324 my ($noderef, $port) = split /#/, $port, 2;
425 325
426 ($NODE{$noderef} || add_node $noderef) == $NODE{""} 326 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
427 or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught"; 327 or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught";
428 328
429 my $self = $PORT_DATA{$port} 329 my $self = $PORT_DATA{$port}
444 : push @{ $self->{rc0}{$match->[0]} }, [$cb]; 344 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
445 } else { 345 } else {
446 push @{ $self->{any} }, [$cb, $match]; 346 push @{ $self->{any} }, [$cb, $match];
447 } 347 }
448 } 348 }
349
350 $portid
449} 351}
450 352
451=item $closure = psub { BLOCK } 353=item $closure = psub { BLOCK }
452 354
453Remembers C<$SELF> and creates a closure out of the BLOCK. When the 355Remembers C<$SELF> and creates a closure out of the BLOCK. When the
484 $res 386 $res
485 } 387 }
486 } 388 }
487} 389}
488 390
391=item $guard = mon $portid, $cb->(@reason)
392
393=item $guard = mon $portid, $otherport
394
395=item $guard = mon $portid, $otherport, @msg
396
397Monitor the given port and do something when the port is killed.
398
399In the first form, the callback is simply called with any number
400of C<@reason> elements (no @reason means that the port was deleted
401"normally"). Note also that I<< the callback B<must> never die >>, so use
402C<eval> if unsure.
403
404In the second form, the other port will be C<kil>'ed with C<@reason>, iff
405a @reason was specified, i.e. on "normal" kils nothing happens, while
406under all other conditions, the other port is killed with the same reason.
407
408In the last form, a message of the form C<@msg, @reason> will be C<snd>.
409
410Example: call a given callback when C<$port> is killed.
411
412 mon $port, sub { warn "port died because of <@_>\n" };
413
414Example: kill ourselves when C<$port> is killed abnormally.
415
416 mon $port, $self;
417
418Example: send us a restart message another C<$port> is killed.
419
420 mon $port, $self => "restart";
421
422=cut
423
424sub mon {
425 my ($noderef, $port) = split /#/, shift, 2;
426
427 my $node = $NODE{$noderef} || add_node $noderef;
428
429 my $cb = shift;
430
431 unless (ref $cb) {
432 if (@_) {
433 # send a kill info message
434 my (@msg) = ($cb, @_);
435 $cb = sub { snd @msg, @_ };
436 } else {
437 # simply kill other port
438 my $port = $cb;
439 $cb = sub { kil $port, @_ if @_ };
440 }
441 }
442
443 $node->monitor ($port, $cb);
444
445 defined wantarray
446 and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
447}
448
449=item $guard = mon_guard $port, $ref, $ref...
450
451Monitors the given C<$port> and keeps the passed references. When the port
452is killed, the references will be freed.
453
454Optionally returns a guard that will stop the monitoring.
455
456This function is useful when you create e.g. timers or other watchers and
457want to free them when the port gets killed:
458
459 $port->rcv (start => sub {
460 my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub {
461 undef $timer if 0.9 < rand;
462 });
463 });
464
465=cut
466
467sub mon_guard {
468 my ($port, @refs) = @_;
469
470 mon $port, sub { 0 && @refs }
471}
472
473=item lnk $port1, $port2
474
475Link two ports. This is simply a shorthand for:
476
477 mon $port1, $port2;
478 mon $port2, $port1;
479
480It means that if either one is killed abnormally, the other one gets
481killed as well.
482
483=item kil $portid[, @reason]
484
485Kill the specified port with the given C<@reason>.
486
487If no C<@reason> is specified, then the port is killed "normally" (linked
488ports will not be kileld, or even notified).
489
490Otherwise, linked ports get killed with the same reason (second form of
491C<mon>, see below).
492
493Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
494will be reported as reason C<< die => $@ >>.
495
496Transport/communication errors are reported as C<< transport_error =>
497$message >>.
498
489=back 499=back
490 500
491=head1 FUNCTIONS FOR NODES 501=head1 FUNCTIONS FOR NODES
492 502
493=over 4 503=over 4

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines