ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.30 by root, Tue Aug 4 23:35:51 2009 UTC vs.
Revision 1.34 by root, Wed Aug 5 23:50:46 2009 UTC

103use base "Exporter"; 103use base "Exporter";
104 104
105our $VERSION = '0.1'; 105our $VERSION = '0.1';
106our @EXPORT = qw( 106our @EXPORT = qw(
107 NODE $NODE *SELF node_of _any_ 107 NODE $NODE *SELF node_of _any_
108 resolve_node 108 resolve_node initialise_node
109 become_slave become_public
110 snd rcv mon kil reg psub 109 snd rcv mon kil reg psub
111 port 110 port
112); 111);
113 112
114our $SELF; 113our $SELF;
124The C<NODE> function returns, and the C<$NODE> variable contains 123The C<NODE> function returns, and the C<$NODE> variable contains
125the noderef of the local node. The value is initialised by a call 124the noderef of the local node. The value is initialised by a call
126to C<become_public> or C<become_slave>, after which all local port 125to C<become_public> or C<become_slave>, after which all local port
127identifiers become invalid. 126identifiers become invalid.
128 127
129=item $noderef = node_of $portid 128=item $noderef = node_of $port
130 129
131Extracts and returns the noderef from a portid or a noderef. 130Extracts and returns the noderef from a portid or a noderef.
131
132=item initialise_node $noderef, $seednode, $seednode...
133
134=item initialise_node "slave/", $master, $master...
135
136Before a node can talk to other nodes on the network it has to initialise
137itself - the minimum a node needs to know is it's own name, and optionally
138it should know the noderefs of some other nodes in the network.
139
140This function initialises a node - it must be called exactly once (or
141never) before calling other AnyEvent::MP functions.
142
143All arguments are noderefs, which can be either resolved or unresolved.
144
145There are two types of networked nodes, public nodes and slave nodes:
146
147=over 4
148
149=item public nodes
150
151For public nodes, C<$noderef> must either be a (possibly unresolved)
152noderef, in which case it will be resolved, or C<undef> (or missing), in
153which case the noderef will be guessed.
154
155Afterwards, the node will bind itself on all endpoints and try to connect
156to all additional C<$seednodes> that are specified. Seednodes are optional
157and can be used to quickly bootstrap the node into an existing network.
158
159=item slave nodes
160
161When the C<$noderef> is the special string C<slave/>, then the node will
162become a slave node. Slave nodes cannot be contacted from outside and will
163route most of their traffic to the master node that they attach to.
164
165At least one additional noderef is required: The node will try to connect
166to all of them and will become a slave attached to the first node it can
167successfully connect to.
168
169=back
170
171This function will block until all nodes have been resolved and, for slave
172nodes, until it has successfully established a connection to a master
173server.
174
175Example: become a public node listening on the default node.
176
177 initialise_node;
178
179Example: become a public node, and try to contact some well-known master
180servers to become part of the network.
181
182 initialise_node undef, "master1", "master2";
183
184Example: become a public node listening on port C<4041>.
185
186 initialise_node 4041;
187
188Example: become a public node, only visible on localhost port 4044.
189
190 initialise_node "locahost:4044";
191
192Example: become a slave node to any of the specified master servers.
193
194 initialise_node "slave/", "master1", "192.168.13.17", "mp.example.net";
132 195
133=item $cv = resolve_node $noderef 196=item $cv = resolve_node $noderef
134 197
135Takes an unresolved node reference that may contain hostnames and 198Takes an unresolved node reference that may contain hostnames and
136abbreviated IDs, resolves all of them and returns a resolved node 199abbreviated IDs, resolves all of them and returns a resolved node
168 231
169Due to some quirks in how perl exports variables, it is impossible to 232Due to some quirks in how perl exports variables, it is impossible to
170just export C<$SELF>, all the symbols called C<SELF> are exported by this 233just export C<$SELF>, all the symbols called C<SELF> are exported by this
171module, but only C<$SELF> is currently used. 234module, but only C<$SELF> is currently used.
172 235
173=item snd $portid, type => @data 236=item snd $port, type => @data
174 237
175=item snd $portid, @msg 238=item snd $port, @msg
176 239
177Send the given message to the given port ID, which can identify either 240Send the given message to the given port ID, which can identify either
178a local or a remote port, and can be either a string or soemthignt hat 241a local or a remote port, and can be either a string or soemthignt hat
179stringifies a sa port ID (such as a port object :). 242stringifies a sa port ID (such as a port object :).
180 243
190JSON is used, then only strings, numbers and arrays and hashes consisting 253JSON is used, then only strings, numbers and arrays and hashes consisting
191of those are allowed (no objects). When Storable is used, then anything 254of those are allowed (no objects). When Storable is used, then anything
192that Storable can serialise and deserialise is allowed, and for the local 255that Storable can serialise and deserialise is allowed, and for the local
193node, anything can be passed. 256node, anything can be passed.
194 257
195=item kil $portid[, @reason] 258=item $local_port = port
196 259
197Kill the specified port with the given C<@reason>. 260Create a new local port object that can be used either as a pattern
261matching port ("full port") or a single-callback port ("miniport"),
262depending on how C<rcv> callbacks are bound to the object.
198 263
199If no C<@reason> is specified, then the port is killed "normally" (linked 264=item $port = port { my @msg = @_; $finished }
200ports will not be kileld, or even notified).
201 265
202Otherwise, linked ports get killed with the same reason (second form of 266Creates a "miniport", that is, a very lightweight port without any pattern
203C<mon>, see below). 267matching behind it, and returns its ID. Semantically the same as creating
268a port and calling C<rcv $port, $callback> on it.
204 269
205Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks 270The block will be called for every message received on the port. When the
206will be reported as reason C<< die => $@ >>. 271callback returns a true value its job is considered "done" and the port
272will be destroyed. Otherwise it will stay alive.
207 273
208Transport/communication errors are reported as C<< transport_error => 274The message will be passed as-is, no extra argument (i.e. no port id) will
209$message >>. 275be passed to the callback.
210 276
277If you need the local port id in the callback, this works nicely:
278
279 my $port; $port = port {
280 snd $otherport, reply => $port;
281 };
282
283=cut
284
285sub rcv($@);
286
287sub port(;&) {
288 my $id = "$UNIQ." . $ID++;
289 my $port = "$NODE#$id";
290
291 if (@_) {
292 rcv $port, shift;
293 } else {
294 $PORT{$id} = sub { }; # nop
295 }
296
297 $port
298}
299
300=item reg $port, $name
301
302Registers the given port under the name C<$name>. If the name already
303exists it is replaced.
304
305A port can only be registered under one well known name.
306
307A port automatically becomes unregistered when it is killed.
308
309=cut
310
311sub reg(@) {
312 my ($port, $name) = @_;
313
314 $REG{$name} = $port;
315}
316
317=item rcv $port, $callback->(@msg)
318
319Replaces the callback on the specified miniport (after converting it to
320one if required).
321
322=item rcv $port, tagstring => $callback->(@msg), ...
323
324=item rcv $port, $smartmatch => $callback->(@msg), ...
325
326=item rcv $port, [$smartmatch...] => $callback->(@msg), ...
327
328Register callbacks to be called on matching messages on the given full
329port (after converting it to one if required).
330
331The callback has to return a true value when its work is done, after
332which is will be removed, or a false value in which case it will stay
333registered.
334
335The global C<$SELF> (exported by this module) contains C<$port> while
336executing the callback.
337
338Runtime errors wdurign callback execution will result in the port being
339C<kil>ed.
340
341If the match is an array reference, then it will be matched against the
342first elements of the message, otherwise only the first element is being
343matched.
344
345Any element in the match that is specified as C<_any_> (a function
346exported by this module) matches any single element of the message.
347
348While not required, it is highly recommended that the first matching
349element is a string identifying the message. The one-string-only match is
350also the most efficient match (by far).
351
352=cut
353
354sub rcv($@) {
355 my $port = shift;
356 my ($noderef, $portid) = split /#/, $port, 2;
357
358 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
359 or Carp::croak "$port: rcv can only be called on local ports, caught";
360
361 if (@_ == 1) {
362 my $cb = shift;
363 delete $PORT_DATA{$portid};
364 $PORT{$portid} = sub {
365 local $SELF = $port;
366 eval {
367 &$cb
368 and kil $port;
369 };
370 _self_die if $@;
371 };
372 } else {
373 my $self = $PORT_DATA{$portid} ||= do {
374 my $self = bless {
375 id => $port,
376 }, "AnyEvent::MP::Port";
377
378 $PORT{$portid} = sub {
379 local $SELF = $port;
380
381 eval {
382 for (@{ $self->{rc0}{$_[0]} }) {
383 $_ && &{$_->[0]}
384 && undef $_;
385 }
386
387 for (@{ $self->{rcv}{$_[0]} }) {
388 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
389 && &{$_->[0]}
390 && undef $_;
391 }
392
393 for (@{ $self->{any} }) {
394 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
395 && &{$_->[0]}
396 && undef $_;
397 }
398 };
399 _self_die if $@;
400 };
401
402 $self
403 };
404
405 "AnyEvent::MP::Port" eq ref $self
406 or Carp::croak "$port: rcv can only be called on message matching ports, caught";
407
408 while (@_) {
409 my ($match, $cb) = splice @_, 0, 2;
410
411 if (!ref $match) {
412 push @{ $self->{rc0}{$match} }, [$cb];
413 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
414 my ($type, @match) = @$match;
415 @match
416 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
417 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
418 } else {
419 push @{ $self->{any} }, [$cb, $match];
420 }
421 }
422 }
423
424 $port
425}
426
427=item $closure = psub { BLOCK }
428
429Remembers C<$SELF> and creates a closure out of the BLOCK. When the
430closure is executed, sets up the environment in the same way as in C<rcv>
431callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
432
433This is useful when you register callbacks from C<rcv> callbacks:
434
435 rcv delayed_reply => sub {
436 my ($delay, @reply) = @_;
437 my $timer = AE::timer $delay, 0, psub {
438 snd @reply, $SELF;
439 };
440 };
441
442=cut
443
444sub psub(&) {
445 my $cb = shift;
446
447 my $port = $SELF
448 or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
449
450 sub {
451 local $SELF = $port;
452
453 if (wantarray) {
454 my @res = eval { &$cb };
455 _self_die if $@;
456 @res
457 } else {
458 my $res = eval { &$cb };
459 _self_die if $@;
460 $res
461 }
462 }
463}
464
211=item $guard = mon $portid, $cb->(@reason) 465=item $guard = mon $port, $cb->(@reason)
212 466
213=item $guard = mon $portid, $otherport 467=item $guard = mon $port, $otherport
214 468
215=item $guard = mon $portid, $otherport, @msg 469=item $guard = mon $port, $otherport, @msg
216 470
217Monitor the given port and do something when the port is killed. 471Monitor the given port and do something when the port is killed.
218 472
219In the first form, the callback is simply called with any number 473In the first form, the callback is simply called with any number
220of C<@reason> elements (no @reason means that the port was deleted 474of C<@reason> elements (no @reason means that the port was deleted
298 mon $port2, $port1; 552 mon $port2, $port1;
299 553
300It means that if either one is killed abnormally, the other one gets 554It means that if either one is killed abnormally, the other one gets
301killed as well. 555killed as well.
302 556
303=item $local_port = port 557=item kil $port[, @reason]
304 558
305Create a new local port object that supports message matching. 559Kill the specified port with the given C<@reason>.
306 560
307=item $portid = port { my @msg = @_; $finished } 561If no C<@reason> is specified, then the port is killed "normally" (linked
562ports will not be kileld, or even notified).
308 563
309Creates a "mini port", that is, a very lightweight port without any 564Otherwise, linked ports get killed with the same reason (second form of
310pattern matching behind it, and returns its ID. 565C<mon>, see below).
311 566
312The block will be called for every message received on the port. When the 567Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
313callback returns a true value its job is considered "done" and the port 568will be reported as reason C<< die => $@ >>.
314will be destroyed. Otherwise it will stay alive.
315 569
316The message will be passed as-is, no extra argument (i.e. no port id) will 570Transport/communication errors are reported as C<< transport_error =>
317be passed to the callback. 571$message >>.
318
319If you need the local port id in the callback, this works nicely:
320
321 my $port; $port = miniport {
322 snd $otherport, reply => $port;
323 };
324
325=cut
326
327sub port(;&) {
328 my $id = "$UNIQ." . $ID++;
329 my $port = "$NODE#$id";
330
331 if (@_) {
332 my $cb = shift;
333 $PORT{$id} = sub {
334 local $SELF = $port;
335 eval {
336 &$cb
337 and kil $id;
338 };
339 _self_die if $@;
340 };
341 } else {
342 my $self = bless {
343 id => "$NODE#$id",
344 }, "AnyEvent::MP::Port";
345
346 $PORT_DATA{$id} = $self;
347 $PORT{$id} = sub {
348 local $SELF = $port;
349
350 eval {
351 for (@{ $self->{rc0}{$_[0]} }) {
352 $_ && &{$_->[0]}
353 && undef $_;
354 }
355
356 for (@{ $self->{rcv}{$_[0]} }) {
357 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
358 && &{$_->[0]}
359 && undef $_;
360 }
361
362 for (@{ $self->{any} }) {
363 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
364 && &{$_->[0]}
365 && undef $_;
366 }
367 };
368 _self_die if $@;
369 };
370 }
371
372 $port
373}
374
375=item reg $portid, $name
376
377Registers the given port under the name C<$name>. If the name already
378exists it is replaced.
379
380A port can only be registered under one well known name.
381
382A port automatically becomes unregistered when it is killed.
383
384=cut
385
386sub reg(@) {
387 my ($portid, $name) = @_;
388
389 $REG{$name} = $portid;
390}
391
392=item rcv $portid, tagstring => $callback->(@msg), ...
393
394=item rcv $portid, $smartmatch => $callback->(@msg), ...
395
396=item rcv $portid, [$smartmatch...] => $callback->(@msg), ...
397
398Register callbacks to be called on matching messages on the given port.
399
400The callback has to return a true value when its work is done, after
401which is will be removed, or a false value in which case it will stay
402registered.
403
404The global C<$SELF> (exported by this module) contains C<$portid> while
405executing the callback.
406
407Runtime errors wdurign callback execution will result in the port being
408C<kil>ed.
409
410If the match is an array reference, then it will be matched against the
411first elements of the message, otherwise only the first element is being
412matched.
413
414Any element in the match that is specified as C<_any_> (a function
415exported by this module) matches any single element of the message.
416
417While not required, it is highly recommended that the first matching
418element is a string identifying the message. The one-string-only match is
419also the most efficient match (by far).
420
421=cut
422
423sub rcv($@) {
424 my ($noderef, $port) = split /#/, shift, 2;
425
426 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
427 or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught";
428
429 my $self = $PORT_DATA{$port}
430 or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
431
432 "AnyEvent::MP::Port" eq ref $self
433 or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
434
435 while (@_) {
436 my ($match, $cb) = splice @_, 0, 2;
437
438 if (!ref $match) {
439 push @{ $self->{rc0}{$match} }, [$cb];
440 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
441 my ($type, @match) = @$match;
442 @match
443 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
444 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
445 } else {
446 push @{ $self->{any} }, [$cb, $match];
447 }
448 }
449}
450
451=item $closure = psub { BLOCK }
452
453Remembers C<$SELF> and creates a closure out of the BLOCK. When the
454closure is executed, sets up the environment in the same way as in C<rcv>
455callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
456
457This is useful when you register callbacks from C<rcv> callbacks:
458
459 rcv delayed_reply => sub {
460 my ($delay, @reply) = @_;
461 my $timer = AE::timer $delay, 0, psub {
462 snd @reply, $SELF;
463 };
464 };
465
466=cut
467
468sub psub(&) {
469 my $cb = shift;
470
471 my $port = $SELF
472 or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
473
474 sub {
475 local $SELF = $port;
476
477 if (wantarray) {
478 my @res = eval { &$cb };
479 _self_die if $@;
480 @res
481 } else {
482 my $res = eval { &$cb };
483 _self_die if $@;
484 $res
485 }
486 }
487}
488
489=back
490
491=head1 FUNCTIONS FOR NODES
492
493=over 4
494
495=item become_public $noderef
496
497Tells the node to become a public node, i.e. reachable from other nodes.
498
499The first argument is the (unresolved) node reference of the local node
500(if missing then the empty string is used).
501
502It is quite common to not specify anything, in which case the local node
503tries to listen on the default port, or to only specify a port number, in
504which case AnyEvent::MP tries to guess the local addresses.
505
506=cut
507 572
508=back 573=back
509 574
510=head1 NODE MESSAGES 575=head1 NODE MESSAGES
511 576

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines