ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.31 by root, Wed Aug 5 19:55:58 2009 UTC vs.
Revision 1.33 by root, Wed Aug 5 22:40:51 2009 UTC

123The C<NODE> function returns, and the C<$NODE> variable contains 123The C<NODE> function returns, and the C<$NODE> variable contains
124the noderef of the local node. The value is initialised by a call 124the noderef of the local node. The value is initialised by a call
125to C<become_public> or C<become_slave>, after which all local port 125to C<become_public> or C<become_slave>, after which all local port
126identifiers become invalid. 126identifiers become invalid.
127 127
128=item $noderef = node_of $portid 128=item $noderef = node_of $port
129 129
130Extracts and returns the noderef from a portid or a noderef. 130Extracts and returns the noderef from a portid or a noderef.
131 131
132=item $cv = resolve_node $noderef 132=item $cv = resolve_node $noderef
133 133
167 167
168Due to some quirks in how perl exports variables, it is impossible to 168Due to some quirks in how perl exports variables, it is impossible to
169just export C<$SELF>, all the symbols called C<SELF> are exported by this 169just export C<$SELF>, all the symbols called C<SELF> are exported by this
170module, but only C<$SELF> is currently used. 170module, but only C<$SELF> is currently used.
171 171
172=item snd $portid, type => @data 172=item snd $port, type => @data
173 173
174=item snd $portid, @msg 174=item snd $port, @msg
175 175
176Send the given message to the given port ID, which can identify either 176Send the given message to the given port ID, which can identify either
177a local or a remote port, and can be either a string or soemthignt hat 177a local or a remote port, and can be either a string or soemthignt hat
178stringifies a sa port ID (such as a port object :). 178stringifies a sa port ID (such as a port object :).
179 179
189JSON is used, then only strings, numbers and arrays and hashes consisting 189JSON is used, then only strings, numbers and arrays and hashes consisting
190of those are allowed (no objects). When Storable is used, then anything 190of those are allowed (no objects). When Storable is used, then anything
191that Storable can serialise and deserialise is allowed, and for the local 191that Storable can serialise and deserialise is allowed, and for the local
192node, anything can be passed. 192node, anything can be passed.
193 193
194=item kil $portid[, @reason] 194=item $local_port = port
195 195
196Kill the specified port with the given C<@reason>. 196Create a new local port object that can be used either as a pattern
197matching port ("full port") or a single-callback port ("miniport"),
198depending on how C<rcv> callbacks are bound to the object.
197 199
198If no C<@reason> is specified, then the port is killed "normally" (linked 200=item $port = port { my @msg = @_; $finished }
199ports will not be kileld, or even notified).
200 201
201Otherwise, linked ports get killed with the same reason (second form of 202Creates a "miniport", that is, a very lightweight port without any pattern
202C<mon>, see below). 203matching behind it, and returns its ID. Semantically the same as creating
204a port and calling C<rcv $port, $callback> on it.
203 205
204Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks 206The block will be called for every message received on the port. When the
205will be reported as reason C<< die => $@ >>. 207callback returns a true value its job is considered "done" and the port
208will be destroyed. Otherwise it will stay alive.
206 209
207Transport/communication errors are reported as C<< transport_error => 210The message will be passed as-is, no extra argument (i.e. no port id) will
208$message >>. 211be passed to the callback.
209 212
213If you need the local port id in the callback, this works nicely:
214
215 my $port; $port = port {
216 snd $otherport, reply => $port;
217 };
218
219=cut
220
221sub rcv($@);
222
223sub port(;&) {
224 my $id = "$UNIQ." . $ID++;
225 my $port = "$NODE#$id";
226
227 if (@_) {
228 rcv $port, shift;
229 } else {
230 $PORT{$id} = sub { }; # nop
231 }
232
233 $port
234}
235
236=item reg $port, $name
237
238Registers the given port under the name C<$name>. If the name already
239exists it is replaced.
240
241A port can only be registered under one well known name.
242
243A port automatically becomes unregistered when it is killed.
244
245=cut
246
247sub reg(@) {
248 my ($port, $name) = @_;
249
250 $REG{$name} = $port;
251}
252
253=item rcv $port, $callback->(@msg)
254
255Replaces the callback on the specified miniport (after converting it to
256one if required).
257
258=item rcv $port, tagstring => $callback->(@msg), ...
259
260=item rcv $port, $smartmatch => $callback->(@msg), ...
261
262=item rcv $port, [$smartmatch...] => $callback->(@msg), ...
263
264Register callbacks to be called on matching messages on the given full
265port (after converting it to one if required).
266
267The callback has to return a true value when its work is done, after
268which is will be removed, or a false value in which case it will stay
269registered.
270
271The global C<$SELF> (exported by this module) contains C<$port> while
272executing the callback.
273
274Runtime errors wdurign callback execution will result in the port being
275C<kil>ed.
276
277If the match is an array reference, then it will be matched against the
278first elements of the message, otherwise only the first element is being
279matched.
280
281Any element in the match that is specified as C<_any_> (a function
282exported by this module) matches any single element of the message.
283
284While not required, it is highly recommended that the first matching
285element is a string identifying the message. The one-string-only match is
286also the most efficient match (by far).
287
288=cut
289
290sub rcv($@) {
291 my $port = shift;
292 my ($noderef, $portid) = split /#/, $port, 2;
293
294 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
295 or Carp::croak "$port: rcv can only be called on local ports, caught";
296
297 if (@_ == 1) {
298 my $cb = shift;
299 delete $PORT_DATA{$portid};
300 $PORT{$portid} = sub {
301 local $SELF = $port;
302 eval {
303 &$cb
304 and kil $port;
305 };
306 _self_die if $@;
307 };
308 } else {
309 my $self = $PORT_DATA{$portid} ||= do {
310 my $self = bless {
311 id => $port,
312 }, "AnyEvent::MP::Port";
313
314 $PORT{$portid} = sub {
315 local $SELF = $port;
316
317 eval {
318 for (@{ $self->{rc0}{$_[0]} }) {
319 $_ && &{$_->[0]}
320 && undef $_;
321 }
322
323 for (@{ $self->{rcv}{$_[0]} }) {
324 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
325 && &{$_->[0]}
326 && undef $_;
327 }
328
329 for (@{ $self->{any} }) {
330 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
331 && &{$_->[0]}
332 && undef $_;
333 }
334 };
335 _self_die if $@;
336 };
337
338 $self
339 };
340
341 "AnyEvent::MP::Port" eq ref $self
342 or Carp::croak "$port: rcv can only be called on message matching ports, caught";
343
344 while (@_) {
345 my ($match, $cb) = splice @_, 0, 2;
346
347 if (!ref $match) {
348 push @{ $self->{rc0}{$match} }, [$cb];
349 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
350 my ($type, @match) = @$match;
351 @match
352 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
353 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
354 } else {
355 push @{ $self->{any} }, [$cb, $match];
356 }
357 }
358 }
359
360 $port
361}
362
363=item $closure = psub { BLOCK }
364
365Remembers C<$SELF> and creates a closure out of the BLOCK. When the
366closure is executed, sets up the environment in the same way as in C<rcv>
367callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
368
369This is useful when you register callbacks from C<rcv> callbacks:
370
371 rcv delayed_reply => sub {
372 my ($delay, @reply) = @_;
373 my $timer = AE::timer $delay, 0, psub {
374 snd @reply, $SELF;
375 };
376 };
377
378=cut
379
380sub psub(&) {
381 my $cb = shift;
382
383 my $port = $SELF
384 or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
385
386 sub {
387 local $SELF = $port;
388
389 if (wantarray) {
390 my @res = eval { &$cb };
391 _self_die if $@;
392 @res
393 } else {
394 my $res = eval { &$cb };
395 _self_die if $@;
396 $res
397 }
398 }
399}
400
210=item $guard = mon $portid, $cb->(@reason) 401=item $guard = mon $port, $cb->(@reason)
211 402
212=item $guard = mon $portid, $otherport 403=item $guard = mon $port, $otherport
213 404
214=item $guard = mon $portid, $otherport, @msg 405=item $guard = mon $port, $otherport, @msg
215 406
216Monitor the given port and do something when the port is killed. 407Monitor the given port and do something when the port is killed.
217 408
218In the first form, the callback is simply called with any number 409In the first form, the callback is simply called with any number
219of C<@reason> elements (no @reason means that the port was deleted 410of C<@reason> elements (no @reason means that the port was deleted
297 mon $port2, $port1; 488 mon $port2, $port1;
298 489
299It means that if either one is killed abnormally, the other one gets 490It means that if either one is killed abnormally, the other one gets
300killed as well. 491killed as well.
301 492
302=item $local_port = port 493=item kil $port[, @reason]
303 494
304Create a new local port object that can be used either as a pattern 495Kill the specified port with the given C<@reason>.
305matching port ("full port") or a single-callback port ("miniport"),
306depending on how C<rcv> callbacks are bound to the object.
307 496
308=item $portid = port { my @msg = @_; $finished } 497If no C<@reason> is specified, then the port is killed "normally" (linked
498ports will not be kileld, or even notified).
309 499
310Creates a "mini port", that is, a very lightweight port without any 500Otherwise, linked ports get killed with the same reason (second form of
311pattern matching behind it, and returns its ID. 501C<mon>, see below).
312 502
313The block will be called for every message received on the port. When the 503Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
314callback returns a true value its job is considered "done" and the port 504will be reported as reason C<< die => $@ >>.
315will be destroyed. Otherwise it will stay alive.
316 505
317The message will be passed as-is, no extra argument (i.e. no port id) will 506Transport/communication errors are reported as C<< transport_error =>
318be passed to the callback. 507$message >>.
319
320If you need the local port id in the callback, this works nicely:
321
322 my $port; $port = port {
323 snd $otherport, reply => $port;
324 };
325
326=cut
327
328sub port(;&) {
329 my $id = "$UNIQ." . $ID++;
330 my $port = "$NODE#$id";
331
332 if (@_) {
333 my $cb = shift;
334 $PORT{$id} = sub {
335 local $SELF = $port;
336 eval {
337 &$cb
338 and kil $id;
339 };
340 _self_die if $@;
341 };
342 } else {
343 my $self = bless {
344 id => "$NODE#$id",
345 }, "AnyEvent::MP::Port";
346
347 $PORT_DATA{$id} = $self;
348 $PORT{$id} = sub {
349 local $SELF = $port;
350
351 eval {
352 for (@{ $self->{rc0}{$_[0]} }) {
353 $_ && &{$_->[0]}
354 && undef $_;
355 }
356
357 for (@{ $self->{rcv}{$_[0]} }) {
358 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
359 && &{$_->[0]}
360 && undef $_;
361 }
362
363 for (@{ $self->{any} }) {
364 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
365 && &{$_->[0]}
366 && undef $_;
367 }
368 };
369 _self_die if $@;
370 };
371 }
372
373 $port
374}
375
376=item reg $portid, $name
377
378Registers the given port under the name C<$name>. If the name already
379exists it is replaced.
380
381A port can only be registered under one well known name.
382
383A port automatically becomes unregistered when it is killed.
384
385=cut
386
387sub reg(@) {
388 my ($portid, $name) = @_;
389
390 $REG{$name} = $portid;
391}
392
393=item rcv $portid, $callback->(@msg)
394
395Replaces the callback on the specified miniport (or newly created port
396object, see C<port>). Full ports are configured with the following calls:
397
398=item rcv $portid, tagstring => $callback->(@msg), ...
399
400=item rcv $portid, $smartmatch => $callback->(@msg), ...
401
402=item rcv $portid, [$smartmatch...] => $callback->(@msg), ...
403
404Register callbacks to be called on matching messages on the given port.
405
406The callback has to return a true value when its work is done, after
407which is will be removed, or a false value in which case it will stay
408registered.
409
410The global C<$SELF> (exported by this module) contains C<$portid> while
411executing the callback.
412
413Runtime errors wdurign callback execution will result in the port being
414C<kil>ed.
415
416If the match is an array reference, then it will be matched against the
417first elements of the message, otherwise only the first element is being
418matched.
419
420Any element in the match that is specified as C<_any_> (a function
421exported by this module) matches any single element of the message.
422
423While not required, it is highly recommended that the first matching
424element is a string identifying the message. The one-string-only match is
425also the most efficient match (by far).
426
427=cut
428
429sub rcv($@) {
430 my $portid = shift;
431 my ($noderef, $port) = split /#/, $port, 2;
432
433 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
434 or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught";
435
436 my $self = $PORT_DATA{$port}
437 or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
438
439 "AnyEvent::MP::Port" eq ref $self
440 or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
441
442 while (@_) {
443 my ($match, $cb) = splice @_, 0, 2;
444
445 if (!ref $match) {
446 push @{ $self->{rc0}{$match} }, [$cb];
447 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
448 my ($type, @match) = @$match;
449 @match
450 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
451 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
452 } else {
453 push @{ $self->{any} }, [$cb, $match];
454 }
455 }
456
457 $portid
458}
459
460=item $closure = psub { BLOCK }
461
462Remembers C<$SELF> and creates a closure out of the BLOCK. When the
463closure is executed, sets up the environment in the same way as in C<rcv>
464callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
465
466This is useful when you register callbacks from C<rcv> callbacks:
467
468 rcv delayed_reply => sub {
469 my ($delay, @reply) = @_;
470 my $timer = AE::timer $delay, 0, psub {
471 snd @reply, $SELF;
472 };
473 };
474
475=cut
476
477sub psub(&) {
478 my $cb = shift;
479
480 my $port = $SELF
481 or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
482
483 sub {
484 local $SELF = $port;
485
486 if (wantarray) {
487 my @res = eval { &$cb };
488 _self_die if $@;
489 @res
490 } else {
491 my $res = eval { &$cb };
492 _self_die if $@;
493 $res
494 }
495 }
496}
497 508
498=back 509=back
499 510
500=head1 FUNCTIONS FOR NODES 511=head1 FUNCTIONS FOR NODES
501 512
502=over 4 513=over 4
503 514
504=item become_public $noderef 515=item initialise_node $noderef, $seednode, $seednode...
505 516
506Tells the node to become a public node, i.e. reachable from other nodes. 517=item initialise_node "slave/", $master, $master...
507 518
508The first argument is the (unresolved) node reference of the local node 519Initialises a node - must be called exactly once before calling other
509(if missing then the empty string is used). 520AnyEvent::MP functions when talking to other nodes is required.
510 521
511It is quite common to not specify anything, in which case the local node 522All arguments are noderefs, which can be either resolved or unresolved.
512tries to listen on the default port, or to only specify a port number, in 523
513which case AnyEvent::MP tries to guess the local addresses. 524There are two types of networked nodes, public nodes and slave nodes:
525
526=over 4
527
528=item public nodes
529
530For public nodes, C<$noderef> must either be a (possibly unresolved)
531noderef, in which case it will be resolved, or C<undef> (or missing), in
532which case the noderef will be guessed.
533
534Afterwards, the node will bind itself on all endpoints and try to connect
535to all additional C<$seednodes> that are specified. Seednodes are optional
536and can be used to quickly bootstrap the node into an existing network.
537
538=item slave nodes
539
540When the C<$noderef> is the special string C<slave/>, then the node will
541become a slave node. Slave nodes cannot be contacted from outside and will
542route most of their traffic to the master node that they attach to.
543
544At least one additional noderef is required: The node will try to connect
545to all of them and will become a slave attached to the first node it can
546successfully connect to.
547
548=back
549
550This function will block until all nodes have been resolved and, for slave
551nodes, until it has successfully established a connection to a master
552server.
553
554Example: become a public node listening on the default node.
555
556 initialise_node;
557
558Example: become a public node, and try to contact some well-known master
559servers to become part of the network.
560
561 initialise_node undef, "master1", "master2";
562
563Example: become a public node listening on port C<4041>.
564
565 initialise_node 4041;
566
567Example: become a public node, only visible on localhost port 4044.
568
569 initialise_node "locahost:4044";
570
571Example: become a slave node to any of the specified master servers.
572
573 initialise_node "slave/", "master1", "192.168.13.17", "mp.example.net";
514 574
515=cut 575=cut
516 576
517=back 577=back
518 578

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines