ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.32 by root, Wed Aug 5 19:58:46 2009 UTC vs.
Revision 1.33 by root, Wed Aug 5 22:40:51 2009 UTC

123The C<NODE> function returns, and the C<$NODE> variable contains 123The C<NODE> function returns, and the C<$NODE> variable contains
124the noderef of the local node. The value is initialised by a call 124the noderef of the local node. The value is initialised by a call
125to C<become_public> or C<become_slave>, after which all local port 125to C<become_public> or C<become_slave>, after which all local port
126identifiers become invalid. 126identifiers become invalid.
127 127
128=item $noderef = node_of $portid 128=item $noderef = node_of $port
129 129
130Extracts and returns the noderef from a portid or a noderef. 130Extracts and returns the noderef from a portid or a noderef.
131 131
132=item $cv = resolve_node $noderef 132=item $cv = resolve_node $noderef
133 133
167 167
168Due to some quirks in how perl exports variables, it is impossible to 168Due to some quirks in how perl exports variables, it is impossible to
169just export C<$SELF>, all the symbols called C<SELF> are exported by this 169just export C<$SELF>, all the symbols called C<SELF> are exported by this
170module, but only C<$SELF> is currently used. 170module, but only C<$SELF> is currently used.
171 171
172=item snd $portid, type => @data 172=item snd $port, type => @data
173 173
174=item snd $portid, @msg 174=item snd $port, @msg
175 175
176Send the given message to the given port ID, which can identify either 176Send the given message to the given port ID, which can identify either
177a local or a remote port, and can be either a string or soemthignt hat 177a local or a remote port, and can be either a string or soemthignt hat
178stringifies a sa port ID (such as a port object :). 178stringifies a sa port ID (such as a port object :).
179 179
195 195
196Create a new local port object that can be used either as a pattern 196Create a new local port object that can be used either as a pattern
197matching port ("full port") or a single-callback port ("miniport"), 197matching port ("full port") or a single-callback port ("miniport"),
198depending on how C<rcv> callbacks are bound to the object. 198depending on how C<rcv> callbacks are bound to the object.
199 199
200=item $portid = port { my @msg = @_; $finished } 200=item $port = port { my @msg = @_; $finished }
201 201
202Creates a "mini port", that is, a very lightweight port without any 202Creates a "miniport", that is, a very lightweight port without any pattern
203pattern matching behind it, and returns its ID. 203matching behind it, and returns its ID. Semantically the same as creating
204a port and calling C<rcv $port, $callback> on it.
204 205
205The block will be called for every message received on the port. When the 206The block will be called for every message received on the port. When the
206callback returns a true value its job is considered "done" and the port 207callback returns a true value its job is considered "done" and the port
207will be destroyed. Otherwise it will stay alive. 208will be destroyed. Otherwise it will stay alive.
208 209
215 snd $otherport, reply => $port; 216 snd $otherport, reply => $port;
216 }; 217 };
217 218
218=cut 219=cut
219 220
221sub rcv($@);
222
220sub port(;&) { 223sub port(;&) {
221 my $id = "$UNIQ." . $ID++; 224 my $id = "$UNIQ." . $ID++;
222 my $port = "$NODE#$id"; 225 my $port = "$NODE#$id";
223 226
224 if (@_) { 227 if (@_) {
228 rcv $port, shift;
229 } else {
230 $PORT{$id} = sub { }; # nop
231 }
232
233 $port
234}
235
236=item reg $port, $name
237
238Registers the given port under the name C<$name>. If the name already
239exists it is replaced.
240
241A port can only be registered under one well known name.
242
243A port automatically becomes unregistered when it is killed.
244
245=cut
246
247sub reg(@) {
248 my ($port, $name) = @_;
249
250 $REG{$name} = $port;
251}
252
253=item rcv $port, $callback->(@msg)
254
255Replaces the callback on the specified miniport (after converting it to
256one if required).
257
258=item rcv $port, tagstring => $callback->(@msg), ...
259
260=item rcv $port, $smartmatch => $callback->(@msg), ...
261
262=item rcv $port, [$smartmatch...] => $callback->(@msg), ...
263
264Register callbacks to be called on matching messages on the given full
265port (after converting it to one if required).
266
267The callback has to return a true value when its work is done, after
268which is will be removed, or a false value in which case it will stay
269registered.
270
271The global C<$SELF> (exported by this module) contains C<$port> while
272executing the callback.
273
274Runtime errors wdurign callback execution will result in the port being
275C<kil>ed.
276
277If the match is an array reference, then it will be matched against the
278first elements of the message, otherwise only the first element is being
279matched.
280
281Any element in the match that is specified as C<_any_> (a function
282exported by this module) matches any single element of the message.
283
284While not required, it is highly recommended that the first matching
285element is a string identifying the message. The one-string-only match is
286also the most efficient match (by far).
287
288=cut
289
290sub rcv($@) {
291 my $port = shift;
292 my ($noderef, $portid) = split /#/, $port, 2;
293
294 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
295 or Carp::croak "$port: rcv can only be called on local ports, caught";
296
297 if (@_ == 1) {
225 my $cb = shift; 298 my $cb = shift;
299 delete $PORT_DATA{$portid};
226 $PORT{$id} = sub { 300 $PORT{$portid} = sub {
227 local $SELF = $port; 301 local $SELF = $port;
228 eval { 302 eval {
229 &$cb 303 &$cb
230 and kil $id; 304 and kil $port;
231 }; 305 };
232 _self_die if $@; 306 _self_die if $@;
233 }; 307 };
234 } else { 308 } else {
309 my $self = $PORT_DATA{$portid} ||= do {
235 my $self = bless { 310 my $self = bless {
236 id => "$NODE#$id", 311 id => $port,
237 }, "AnyEvent::MP::Port"; 312 }, "AnyEvent::MP::Port";
238 313
239 $PORT_DATA{$id} = $self;
240 $PORT{$id} = sub { 314 $PORT{$portid} = sub {
241 local $SELF = $port; 315 local $SELF = $port;
242 316
243 eval { 317 eval {
244 for (@{ $self->{rc0}{$_[0]} }) { 318 for (@{ $self->{rc0}{$_[0]} }) {
245 $_ && &{$_->[0]} 319 $_ && &{$_->[0]}
246 && undef $_; 320 && undef $_;
247 } 321 }
248 322
249 for (@{ $self->{rcv}{$_[0]} }) { 323 for (@{ $self->{rcv}{$_[0]} }) {
250 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1] 324 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
251 && &{$_->[0]} 325 && &{$_->[0]}
252 && undef $_; 326 && undef $_;
253 } 327 }
254 328
255 for (@{ $self->{any} }) { 329 for (@{ $self->{any} }) {
256 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1] 330 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
257 && &{$_->[0]} 331 && &{$_->[0]}
258 && undef $_; 332 && undef $_;
333 }
259 } 334 };
335 _self_die if $@;
260 }; 336 };
261 _self_die if $@; 337
338 $self
262 }; 339 };
263 }
264 340
265 $port
266}
267
268=item reg $portid, $name
269
270Registers the given port under the name C<$name>. If the name already
271exists it is replaced.
272
273A port can only be registered under one well known name.
274
275A port automatically becomes unregistered when it is killed.
276
277=cut
278
279sub reg(@) {
280 my ($portid, $name) = @_;
281
282 $REG{$name} = $portid;
283}
284
285=item rcv $portid, $callback->(@msg)
286
287Replaces the callback on the specified miniport (or newly created port
288object, see C<port>). Full ports are configured with the following calls:
289
290=item rcv $portid, tagstring => $callback->(@msg), ...
291
292=item rcv $portid, $smartmatch => $callback->(@msg), ...
293
294=item rcv $portid, [$smartmatch...] => $callback->(@msg), ...
295
296Register callbacks to be called on matching messages on the given full
297port (or newly created port).
298
299The callback has to return a true value when its work is done, after
300which is will be removed, or a false value in which case it will stay
301registered.
302
303The global C<$SELF> (exported by this module) contains C<$portid> while
304executing the callback.
305
306Runtime errors wdurign callback execution will result in the port being
307C<kil>ed.
308
309If the match is an array reference, then it will be matched against the
310first elements of the message, otherwise only the first element is being
311matched.
312
313Any element in the match that is specified as C<_any_> (a function
314exported by this module) matches any single element of the message.
315
316While not required, it is highly recommended that the first matching
317element is a string identifying the message. The one-string-only match is
318also the most efficient match (by far).
319
320=cut
321
322sub rcv($@) {
323 my $portid = shift;
324 my ($noderef, $port) = split /#/, $port, 2;
325
326 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
327 or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught";
328
329 my $self = $PORT_DATA{$port}
330 or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
331
332 "AnyEvent::MP::Port" eq ref $self 341 "AnyEvent::MP::Port" eq ref $self
333 or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught"; 342 or Carp::croak "$port: rcv can only be called on message matching ports, caught";
334 343
335 while (@_) { 344 while (@_) {
336 my ($match, $cb) = splice @_, 0, 2; 345 my ($match, $cb) = splice @_, 0, 2;
337 346
338 if (!ref $match) { 347 if (!ref $match) {
339 push @{ $self->{rc0}{$match} }, [$cb]; 348 push @{ $self->{rc0}{$match} }, [$cb];
340 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { 349 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
341 my ($type, @match) = @$match; 350 my ($type, @match) = @$match;
342 @match 351 @match
343 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match] 352 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
344 : push @{ $self->{rc0}{$match->[0]} }, [$cb]; 353 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
345 } else { 354 } else {
346 push @{ $self->{any} }, [$cb, $match]; 355 push @{ $self->{any} }, [$cb, $match];
356 }
347 } 357 }
348 } 358 }
349 359
350 $portid 360 $port
351} 361}
352 362
353=item $closure = psub { BLOCK } 363=item $closure = psub { BLOCK }
354 364
355Remembers C<$SELF> and creates a closure out of the BLOCK. When the 365Remembers C<$SELF> and creates a closure out of the BLOCK. When the
386 $res 396 $res
387 } 397 }
388 } 398 }
389} 399}
390 400
391=item $guard = mon $portid, $cb->(@reason) 401=item $guard = mon $port, $cb->(@reason)
392 402
393=item $guard = mon $portid, $otherport 403=item $guard = mon $port, $otherport
394 404
395=item $guard = mon $portid, $otherport, @msg 405=item $guard = mon $port, $otherport, @msg
396 406
397Monitor the given port and do something when the port is killed. 407Monitor the given port and do something when the port is killed.
398 408
399In the first form, the callback is simply called with any number 409In the first form, the callback is simply called with any number
400of C<@reason> elements (no @reason means that the port was deleted 410of C<@reason> elements (no @reason means that the port was deleted
478 mon $port2, $port1; 488 mon $port2, $port1;
479 489
480It means that if either one is killed abnormally, the other one gets 490It means that if either one is killed abnormally, the other one gets
481killed as well. 491killed as well.
482 492
483=item kil $portid[, @reason] 493=item kil $port[, @reason]
484 494
485Kill the specified port with the given C<@reason>. 495Kill the specified port with the given C<@reason>.
486 496
487If no C<@reason> is specified, then the port is killed "normally" (linked 497If no C<@reason> is specified, then the port is killed "normally" (linked
488ports will not be kileld, or even notified). 498ports will not be kileld, or even notified).
500 510
501=head1 FUNCTIONS FOR NODES 511=head1 FUNCTIONS FOR NODES
502 512
503=over 4 513=over 4
504 514
505=item become_public $noderef 515=item initialise_node $noderef, $seednode, $seednode...
506 516
507Tells the node to become a public node, i.e. reachable from other nodes. 517=item initialise_node "slave/", $master, $master...
508 518
509The first argument is the (unresolved) node reference of the local node 519Initialises a node - must be called exactly once before calling other
510(if missing then the empty string is used). 520AnyEvent::MP functions when talking to other nodes is required.
511 521
512It is quite common to not specify anything, in which case the local node 522All arguments are noderefs, which can be either resolved or unresolved.
513tries to listen on the default port, or to only specify a port number, in 523
514which case AnyEvent::MP tries to guess the local addresses. 524There are two types of networked nodes, public nodes and slave nodes:
525
526=over 4
527
528=item public nodes
529
530For public nodes, C<$noderef> must either be a (possibly unresolved)
531noderef, in which case it will be resolved, or C<undef> (or missing), in
532which case the noderef will be guessed.
533
534Afterwards, the node will bind itself on all endpoints and try to connect
535to all additional C<$seednodes> that are specified. Seednodes are optional
536and can be used to quickly bootstrap the node into an existing network.
537
538=item slave nodes
539
540When the C<$noderef> is the special string C<slave/>, then the node will
541become a slave node. Slave nodes cannot be contacted from outside and will
542route most of their traffic to the master node that they attach to.
543
544At least one additional noderef is required: The node will try to connect
545to all of them and will become a slave attached to the first node it can
546successfully connect to.
547
548=back
549
550This function will block until all nodes have been resolved and, for slave
551nodes, until it has successfully established a connection to a master
552server.
553
554Example: become a public node listening on the default node.
555
556 initialise_node;
557
558Example: become a public node, and try to contact some well-known master
559servers to become part of the network.
560
561 initialise_node undef, "master1", "master2";
562
563Example: become a public node listening on port C<4041>.
564
565 initialise_node 4041;
566
567Example: become a public node, only visible on localhost port 4044.
568
569 initialise_node "locahost:4044";
570
571Example: become a slave node to any of the specified master servers.
572
573 initialise_node "slave/", "master1", "192.168.13.17", "mp.example.net";
515 574
516=cut 575=cut
517 576
518=back 577=back
519 578

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines