ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.30 by root, Tue Aug 4 23:35:51 2009 UTC vs.
Revision 1.33 by root, Wed Aug 5 22:40:51 2009 UTC

103use base "Exporter"; 103use base "Exporter";
104 104
105our $VERSION = '0.1'; 105our $VERSION = '0.1';
106our @EXPORT = qw( 106our @EXPORT = qw(
107 NODE $NODE *SELF node_of _any_ 107 NODE $NODE *SELF node_of _any_
108 resolve_node 108 resolve_node initialise_node
109 become_slave become_public
110 snd rcv mon kil reg psub 109 snd rcv mon kil reg psub
111 port 110 port
112); 111);
113 112
114our $SELF; 113our $SELF;
124The C<NODE> function returns, and the C<$NODE> variable contains 123The C<NODE> function returns, and the C<$NODE> variable contains
125the noderef of the local node. The value is initialised by a call 124the noderef of the local node. The value is initialised by a call
126to C<become_public> or C<become_slave>, after which all local port 125to C<become_public> or C<become_slave>, after which all local port
127identifiers become invalid. 126identifiers become invalid.
128 127
129=item $noderef = node_of $portid 128=item $noderef = node_of $port
130 129
131Extracts and returns the noderef from a portid or a noderef. 130Extracts and returns the noderef from a portid or a noderef.
132 131
133=item $cv = resolve_node $noderef 132=item $cv = resolve_node $noderef
134 133
168 167
169Due to some quirks in how perl exports variables, it is impossible to 168Due to some quirks in how perl exports variables, it is impossible to
170just export C<$SELF>, all the symbols called C<SELF> are exported by this 169just export C<$SELF>, all the symbols called C<SELF> are exported by this
171module, but only C<$SELF> is currently used. 170module, but only C<$SELF> is currently used.
172 171
173=item snd $portid, type => @data 172=item snd $port, type => @data
174 173
175=item snd $portid, @msg 174=item snd $port, @msg
176 175
177Send the given message to the given port ID, which can identify either 176Send the given message to the given port ID, which can identify either
178a local or a remote port, and can be either a string or soemthignt hat 177a local or a remote port, and can be either a string or soemthignt hat
179stringifies a sa port ID (such as a port object :). 178stringifies a sa port ID (such as a port object :).
180 179
190JSON is used, then only strings, numbers and arrays and hashes consisting 189JSON is used, then only strings, numbers and arrays and hashes consisting
191of those are allowed (no objects). When Storable is used, then anything 190of those are allowed (no objects). When Storable is used, then anything
192that Storable can serialise and deserialise is allowed, and for the local 191that Storable can serialise and deserialise is allowed, and for the local
193node, anything can be passed. 192node, anything can be passed.
194 193
195=item kil $portid[, @reason] 194=item $local_port = port
196 195
197Kill the specified port with the given C<@reason>. 196Create a new local port object that can be used either as a pattern
197matching port ("full port") or a single-callback port ("miniport"),
198depending on how C<rcv> callbacks are bound to the object.
198 199
199If no C<@reason> is specified, then the port is killed "normally" (linked 200=item $port = port { my @msg = @_; $finished }
200ports will not be kileld, or even notified).
201 201
202Otherwise, linked ports get killed with the same reason (second form of 202Creates a "miniport", that is, a very lightweight port without any pattern
203C<mon>, see below). 203matching behind it, and returns its ID. Semantically the same as creating
204a port and calling C<rcv $port, $callback> on it.
204 205
205Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks 206The block will be called for every message received on the port. When the
206will be reported as reason C<< die => $@ >>. 207callback returns a true value its job is considered "done" and the port
208will be destroyed. Otherwise it will stay alive.
207 209
208Transport/communication errors are reported as C<< transport_error => 210The message will be passed as-is, no extra argument (i.e. no port id) will
209$message >>. 211be passed to the callback.
210 212
213If you need the local port id in the callback, this works nicely:
214
215 my $port; $port = port {
216 snd $otherport, reply => $port;
217 };
218
219=cut
220
221sub rcv($@);
222
223sub port(;&) {
224 my $id = "$UNIQ." . $ID++;
225 my $port = "$NODE#$id";
226
227 if (@_) {
228 rcv $port, shift;
229 } else {
230 $PORT{$id} = sub { }; # nop
231 }
232
233 $port
234}
235
236=item reg $port, $name
237
238Registers the given port under the name C<$name>. If the name already
239exists it is replaced.
240
241A port can only be registered under one well known name.
242
243A port automatically becomes unregistered when it is killed.
244
245=cut
246
247sub reg(@) {
248 my ($port, $name) = @_;
249
250 $REG{$name} = $port;
251}
252
253=item rcv $port, $callback->(@msg)
254
255Replaces the callback on the specified miniport (after converting it to
256one if required).
257
258=item rcv $port, tagstring => $callback->(@msg), ...
259
260=item rcv $port, $smartmatch => $callback->(@msg), ...
261
262=item rcv $port, [$smartmatch...] => $callback->(@msg), ...
263
264Register callbacks to be called on matching messages on the given full
265port (after converting it to one if required).
266
267The callback has to return a true value when its work is done, after
268which is will be removed, or a false value in which case it will stay
269registered.
270
271The global C<$SELF> (exported by this module) contains C<$port> while
272executing the callback.
273
274Runtime errors wdurign callback execution will result in the port being
275C<kil>ed.
276
277If the match is an array reference, then it will be matched against the
278first elements of the message, otherwise only the first element is being
279matched.
280
281Any element in the match that is specified as C<_any_> (a function
282exported by this module) matches any single element of the message.
283
284While not required, it is highly recommended that the first matching
285element is a string identifying the message. The one-string-only match is
286also the most efficient match (by far).
287
288=cut
289
290sub rcv($@) {
291 my $port = shift;
292 my ($noderef, $portid) = split /#/, $port, 2;
293
294 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
295 or Carp::croak "$port: rcv can only be called on local ports, caught";
296
297 if (@_ == 1) {
298 my $cb = shift;
299 delete $PORT_DATA{$portid};
300 $PORT{$portid} = sub {
301 local $SELF = $port;
302 eval {
303 &$cb
304 and kil $port;
305 };
306 _self_die if $@;
307 };
308 } else {
309 my $self = $PORT_DATA{$portid} ||= do {
310 my $self = bless {
311 id => $port,
312 }, "AnyEvent::MP::Port";
313
314 $PORT{$portid} = sub {
315 local $SELF = $port;
316
317 eval {
318 for (@{ $self->{rc0}{$_[0]} }) {
319 $_ && &{$_->[0]}
320 && undef $_;
321 }
322
323 for (@{ $self->{rcv}{$_[0]} }) {
324 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
325 && &{$_->[0]}
326 && undef $_;
327 }
328
329 for (@{ $self->{any} }) {
330 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
331 && &{$_->[0]}
332 && undef $_;
333 }
334 };
335 _self_die if $@;
336 };
337
338 $self
339 };
340
341 "AnyEvent::MP::Port" eq ref $self
342 or Carp::croak "$port: rcv can only be called on message matching ports, caught";
343
344 while (@_) {
345 my ($match, $cb) = splice @_, 0, 2;
346
347 if (!ref $match) {
348 push @{ $self->{rc0}{$match} }, [$cb];
349 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
350 my ($type, @match) = @$match;
351 @match
352 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
353 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
354 } else {
355 push @{ $self->{any} }, [$cb, $match];
356 }
357 }
358 }
359
360 $port
361}
362
363=item $closure = psub { BLOCK }
364
365Remembers C<$SELF> and creates a closure out of the BLOCK. When the
366closure is executed, sets up the environment in the same way as in C<rcv>
367callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
368
369This is useful when you register callbacks from C<rcv> callbacks:
370
371 rcv delayed_reply => sub {
372 my ($delay, @reply) = @_;
373 my $timer = AE::timer $delay, 0, psub {
374 snd @reply, $SELF;
375 };
376 };
377
378=cut
379
380sub psub(&) {
381 my $cb = shift;
382
383 my $port = $SELF
384 or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
385
386 sub {
387 local $SELF = $port;
388
389 if (wantarray) {
390 my @res = eval { &$cb };
391 _self_die if $@;
392 @res
393 } else {
394 my $res = eval { &$cb };
395 _self_die if $@;
396 $res
397 }
398 }
399}
400
211=item $guard = mon $portid, $cb->(@reason) 401=item $guard = mon $port, $cb->(@reason)
212 402
213=item $guard = mon $portid, $otherport 403=item $guard = mon $port, $otherport
214 404
215=item $guard = mon $portid, $otherport, @msg 405=item $guard = mon $port, $otherport, @msg
216 406
217Monitor the given port and do something when the port is killed. 407Monitor the given port and do something when the port is killed.
218 408
219In the first form, the callback is simply called with any number 409In the first form, the callback is simply called with any number
220of C<@reason> elements (no @reason means that the port was deleted 410of C<@reason> elements (no @reason means that the port was deleted
298 mon $port2, $port1; 488 mon $port2, $port1;
299 489
300It means that if either one is killed abnormally, the other one gets 490It means that if either one is killed abnormally, the other one gets
301killed as well. 491killed as well.
302 492
303=item $local_port = port 493=item kil $port[, @reason]
304 494
305Create a new local port object that supports message matching. 495Kill the specified port with the given C<@reason>.
306 496
307=item $portid = port { my @msg = @_; $finished } 497If no C<@reason> is specified, then the port is killed "normally" (linked
498ports will not be kileld, or even notified).
308 499
309Creates a "mini port", that is, a very lightweight port without any 500Otherwise, linked ports get killed with the same reason (second form of
310pattern matching behind it, and returns its ID. 501C<mon>, see below).
311 502
312The block will be called for every message received on the port. When the 503Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
313callback returns a true value its job is considered "done" and the port 504will be reported as reason C<< die => $@ >>.
314will be destroyed. Otherwise it will stay alive.
315 505
316The message will be passed as-is, no extra argument (i.e. no port id) will 506Transport/communication errors are reported as C<< transport_error =>
317be passed to the callback. 507$message >>.
318
319If you need the local port id in the callback, this works nicely:
320
321 my $port; $port = miniport {
322 snd $otherport, reply => $port;
323 };
324
325=cut
326
327sub port(;&) {
328 my $id = "$UNIQ." . $ID++;
329 my $port = "$NODE#$id";
330
331 if (@_) {
332 my $cb = shift;
333 $PORT{$id} = sub {
334 local $SELF = $port;
335 eval {
336 &$cb
337 and kil $id;
338 };
339 _self_die if $@;
340 };
341 } else {
342 my $self = bless {
343 id => "$NODE#$id",
344 }, "AnyEvent::MP::Port";
345
346 $PORT_DATA{$id} = $self;
347 $PORT{$id} = sub {
348 local $SELF = $port;
349
350 eval {
351 for (@{ $self->{rc0}{$_[0]} }) {
352 $_ && &{$_->[0]}
353 && undef $_;
354 }
355
356 for (@{ $self->{rcv}{$_[0]} }) {
357 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
358 && &{$_->[0]}
359 && undef $_;
360 }
361
362 for (@{ $self->{any} }) {
363 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
364 && &{$_->[0]}
365 && undef $_;
366 }
367 };
368 _self_die if $@;
369 };
370 }
371
372 $port
373}
374
375=item reg $portid, $name
376
377Registers the given port under the name C<$name>. If the name already
378exists it is replaced.
379
380A port can only be registered under one well known name.
381
382A port automatically becomes unregistered when it is killed.
383
384=cut
385
386sub reg(@) {
387 my ($portid, $name) = @_;
388
389 $REG{$name} = $portid;
390}
391
392=item rcv $portid, tagstring => $callback->(@msg), ...
393
394=item rcv $portid, $smartmatch => $callback->(@msg), ...
395
396=item rcv $portid, [$smartmatch...] => $callback->(@msg), ...
397
398Register callbacks to be called on matching messages on the given port.
399
400The callback has to return a true value when its work is done, after
401which is will be removed, or a false value in which case it will stay
402registered.
403
404The global C<$SELF> (exported by this module) contains C<$portid> while
405executing the callback.
406
407Runtime errors wdurign callback execution will result in the port being
408C<kil>ed.
409
410If the match is an array reference, then it will be matched against the
411first elements of the message, otherwise only the first element is being
412matched.
413
414Any element in the match that is specified as C<_any_> (a function
415exported by this module) matches any single element of the message.
416
417While not required, it is highly recommended that the first matching
418element is a string identifying the message. The one-string-only match is
419also the most efficient match (by far).
420
421=cut
422
423sub rcv($@) {
424 my ($noderef, $port) = split /#/, shift, 2;
425
426 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
427 or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught";
428
429 my $self = $PORT_DATA{$port}
430 or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
431
432 "AnyEvent::MP::Port" eq ref $self
433 or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
434
435 while (@_) {
436 my ($match, $cb) = splice @_, 0, 2;
437
438 if (!ref $match) {
439 push @{ $self->{rc0}{$match} }, [$cb];
440 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
441 my ($type, @match) = @$match;
442 @match
443 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
444 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
445 } else {
446 push @{ $self->{any} }, [$cb, $match];
447 }
448 }
449}
450
451=item $closure = psub { BLOCK }
452
453Remembers C<$SELF> and creates a closure out of the BLOCK. When the
454closure is executed, sets up the environment in the same way as in C<rcv>
455callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
456
457This is useful when you register callbacks from C<rcv> callbacks:
458
459 rcv delayed_reply => sub {
460 my ($delay, @reply) = @_;
461 my $timer = AE::timer $delay, 0, psub {
462 snd @reply, $SELF;
463 };
464 };
465
466=cut
467
468sub psub(&) {
469 my $cb = shift;
470
471 my $port = $SELF
472 or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
473
474 sub {
475 local $SELF = $port;
476
477 if (wantarray) {
478 my @res = eval { &$cb };
479 _self_die if $@;
480 @res
481 } else {
482 my $res = eval { &$cb };
483 _self_die if $@;
484 $res
485 }
486 }
487}
488 508
489=back 509=back
490 510
491=head1 FUNCTIONS FOR NODES 511=head1 FUNCTIONS FOR NODES
492 512
493=over 4 513=over 4
494 514
495=item become_public $noderef 515=item initialise_node $noderef, $seednode, $seednode...
496 516
497Tells the node to become a public node, i.e. reachable from other nodes. 517=item initialise_node "slave/", $master, $master...
498 518
499The first argument is the (unresolved) node reference of the local node 519Initialises a node - must be called exactly once before calling other
500(if missing then the empty string is used). 520AnyEvent::MP functions when talking to other nodes is required.
501 521
502It is quite common to not specify anything, in which case the local node 522All arguments are noderefs, which can be either resolved or unresolved.
503tries to listen on the default port, or to only specify a port number, in 523
504which case AnyEvent::MP tries to guess the local addresses. 524There are two types of networked nodes, public nodes and slave nodes:
525
526=over 4
527
528=item public nodes
529
530For public nodes, C<$noderef> must either be a (possibly unresolved)
531noderef, in which case it will be resolved, or C<undef> (or missing), in
532which case the noderef will be guessed.
533
534Afterwards, the node will bind itself on all endpoints and try to connect
535to all additional C<$seednodes> that are specified. Seednodes are optional
536and can be used to quickly bootstrap the node into an existing network.
537
538=item slave nodes
539
540When the C<$noderef> is the special string C<slave/>, then the node will
541become a slave node. Slave nodes cannot be contacted from outside and will
542route most of their traffic to the master node that they attach to.
543
544At least one additional noderef is required: The node will try to connect
545to all of them and will become a slave attached to the first node it can
546successfully connect to.
547
548=back
549
550This function will block until all nodes have been resolved and, for slave
551nodes, until it has successfully established a connection to a master
552server.
553
554Example: become a public node listening on the default node.
555
556 initialise_node;
557
558Example: become a public node, and try to contact some well-known master
559servers to become part of the network.
560
561 initialise_node undef, "master1", "master2";
562
563Example: become a public node listening on port C<4041>.
564
565 initialise_node 4041;
566
567Example: become a public node, only visible on localhost port 4044.
568
569 initialise_node "locahost:4044";
570
571Example: become a slave node to any of the specified master servers.
572
573 initialise_node "slave/", "master1", "192.168.13.17", "mp.example.net";
505 574
506=cut 575=cut
507 576
508=back 577=back
509 578

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines