ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.47 by root, Thu Aug 13 01:57:10 2009 UTC vs.
Revision 1.52 by root, Fri Aug 14 15:13:20 2009 UTC

9 $NODE # contains this node's noderef 9 $NODE # contains this node's noderef
10 NODE # returns this node's noderef 10 NODE # returns this node's noderef
11 NODE $port # returns the noderef of the port 11 NODE $port # returns the noderef of the port
12 12
13 $SELF # receiving/own port id in rcv callbacks 13 $SELF # receiving/own port id in rcv callbacks
14
15 # initialise the node so it can send/receive messages
16 initialise_node; # -OR-
17 initialise_node "localhost:4040"; # -OR-
18 initialise_node "slave/", "localhost:4040"
14 19
15 # ports are message endpoints 20 # ports are message endpoints
16 21
17 # sending messages 22 # sending messages
18 snd $port, type => data...; 23 snd $port, type => data...;
19 snd $port, @msg; 24 snd $port, @msg;
20 snd @msg_with_first_element_being_a_port; 25 snd @msg_with_first_element_being_a_port;
21 26
22 # miniports 27 # creating/using ports, the simple way
23 my $miniport = port { my @msg = @_; 0 }; 28 my $somple_port = port { my @msg = @_; 0 };
24 29
25 # full ports 30 # creating/using ports, tagged message matching
26 my $port = port; 31 my $port = port;
27 rcv $port, smartmatch => $cb->(@msg);
28 rcv $port, ping => sub { snd $_[0], "pong"; 0 }; 32 rcv $port, ping => sub { snd $_[0], "pong"; 0 };
29 rcv $port, pong => sub { warn "pong received\n"; 0 }; 33 rcv $port, pong => sub { warn "pong received\n"; 0 };
30 34
31 # remote ports 35 # create a port on another node
32 my $port = spawn $node, $initfunc, @initdata; 36 my $port = spawn $node, $initfunc, @initdata;
33
34 # more, smarter, matches (_any_ is exported by this module)
35 rcv $port, [child_died => $pid] => sub { ...
36 rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
37 37
38 # monitoring 38 # monitoring
39 mon $port, $cb->(@msg) # callback is invoked on death 39 mon $port, $cb->(@msg) # callback is invoked on death
40 mon $port, $otherport # kill otherport on abnormal death 40 mon $port, $otherport # kill otherport on abnormal death
41 mon $port, $otherport, @msg # send message on death 41 mon $port, $otherport, @msg # send message on death
143 kil $SELF, die => $msg; 143 kil $SELF, die => $msg;
144} 144}
145 145
146=item $thisnode = NODE / $NODE 146=item $thisnode = NODE / $NODE
147 147
148The C<NODE> function returns, and the C<$NODE> variable contains 148The C<NODE> function returns, and the C<$NODE> variable contains the
149the noderef of the local node. The value is initialised by a call 149noderef of the local node. The value is initialised by a call to
150to C<become_public> or C<become_slave>, after which all local port 150C<initialise_node>.
151identifiers become invalid.
152 151
153=item $noderef = node_of $port 152=item $noderef = node_of $port
154 153
155Extracts and returns the noderef from a portid or a noderef. 154Extracts and returns the noderef from a port ID or a noderef.
156 155
157=item initialise_node $noderef, $seednode, $seednode... 156=item initialise_node $noderef, $seednode, $seednode...
158 157
159=item initialise_node "slave/", $master, $master... 158=item initialise_node "slave/", $master, $master...
160 159
163it should know the noderefs of some other nodes in the network. 162it should know the noderefs of some other nodes in the network.
164 163
165This function initialises a node - it must be called exactly once (or 164This function initialises a node - it must be called exactly once (or
166never) before calling other AnyEvent::MP functions. 165never) before calling other AnyEvent::MP functions.
167 166
168All arguments are noderefs, which can be either resolved or unresolved. 167All arguments (optionally except for the first) are noderefs, which can be
168either resolved or unresolved.
169
170The first argument will be looked up in the configuration database first
171(if it is C<undef> then the current nodename will be used instead) to find
172the relevant configuration profile (see L<aemp>). If none is found then
173the default configuration is used. The configuration supplies additional
174seed/master nodes and can override the actual noderef.
169 175
170There are two types of networked nodes, public nodes and slave nodes: 176There are two types of networked nodes, public nodes and slave nodes:
171 177
172=over 4 178=over 4
173 179
174=item public nodes 180=item public nodes
175 181
176For public nodes, C<$noderef> must either be a (possibly unresolved) 182For public nodes, C<$noderef> (supplied either directly to
177noderef, in which case it will be resolved, or C<undef> (or missing), in 183C<initialise_node> or indirectly via a profile or the nodename) must be a
178which case the noderef will be guessed. 184noderef (possibly unresolved, in which case it will be resolved).
179 185
180Afterwards, the node will bind itself on all endpoints and try to connect 186After resolving, the node will bind itself on all endpoints and try to
181to all additional C<$seednodes> that are specified. Seednodes are optional 187connect to all additional C<$seednodes> that are specified. Seednodes are
182and can be used to quickly bootstrap the node into an existing network. 188optional and can be used to quickly bootstrap the node into an existing
189network.
183 190
184=item slave nodes 191=item slave nodes
185 192
186When the C<$noderef> is the special string C<slave/>, then the node will 193When the C<$noderef> (either as given or overriden by the config file)
194is the special string C<slave/>, then the node will become a slave
187become a slave node. Slave nodes cannot be contacted from outside and will 195node. Slave nodes cannot be contacted from outside and will route most of
188route most of their traffic to the master node that they attach to. 196their traffic to the master node that they attach to.
189 197
190At least one additional noderef is required: The node will try to connect 198At least one additional noderef is required (either by specifying it
191to all of them and will become a slave attached to the first node it can 199directly or because it is part of the configuration profile): The node
192successfully connect to. 200will try to connect to all of them and will become a slave attached to the
201first node it can successfully connect to.
193 202
194=back 203=back
195 204
196This function will block until all nodes have been resolved and, for slave 205This function will block until all nodes have been resolved and, for slave
197nodes, until it has successfully established a connection to a master 206nodes, until it has successfully established a connection to a master
198server. 207server.
199 208
200Example: become a public node listening on the default node. 209Example: become a public node listening on the guessed noderef, or the one
210specified via C<aemp> for the current node. This should be the most common
211form of invocation for "daemon"-type nodes.
201 212
202 initialise_node; 213 initialise_node;
214
215Example: become a slave node to any of the the seednodes specified via
216C<aemp>. This form is often used for commandline clients.
217
218 initialise_node "slave/";
219
220Example: become a slave node to any of the specified master servers. This
221form is also often used for commandline clients.
222
223 initialise_node "slave/", "master1", "192.168.13.17", "mp.example.net";
203 224
204Example: become a public node, and try to contact some well-known master 225Example: become a public node, and try to contact some well-known master
205servers to become part of the network. 226servers to become part of the network.
206 227
207 initialise_node undef, "master1", "master2"; 228 initialise_node undef, "master1", "master2";
210 231
211 initialise_node 4041; 232 initialise_node 4041;
212 233
213Example: become a public node, only visible on localhost port 4044. 234Example: become a public node, only visible on localhost port 4044.
214 235
215 initialise_node "locahost:4044"; 236 initialise_node "localhost:4044";
216
217Example: become a slave node to any of the specified master servers.
218
219 initialise_node "slave/", "master1", "192.168.13.17", "mp.example.net";
220 237
221=item $cv = resolve_node $noderef 238=item $cv = resolve_node $noderef
222 239
223Takes an unresolved node reference that may contain hostnames and 240Takes an unresolved node reference that may contain hostnames and
224abbreviated IDs, resolves all of them and returns a resolved node 241abbreviated IDs, resolves all of them and returns a resolved node
261=item snd $port, type => @data 278=item snd $port, type => @data
262 279
263=item snd $port, @msg 280=item snd $port, @msg
264 281
265Send the given message to the given port ID, which can identify either 282Send the given message to the given port ID, which can identify either
266a local or a remote port, and can be either a string or soemthignt hat 283a local or a remote port, and must be a port ID.
267stringifies a sa port ID (such as a port object :).
268 284
269While the message can be about anything, it is highly recommended to use a 285While the message can be about anything, it is highly recommended to use a
270string as first element (a portid, or some word that indicates a request 286string as first element (a port ID, or some word that indicates a request
271type etc.). 287type etc.).
272 288
273The message data effectively becomes read-only after a call to this 289The message data effectively becomes read-only after a call to this
274function: modifying any argument is not allowed and can cause many 290function: modifying any argument is not allowed and can cause many
275problems. 291problems.
280that Storable can serialise and deserialise is allowed, and for the local 296that Storable can serialise and deserialise is allowed, and for the local
281node, anything can be passed. 297node, anything can be passed.
282 298
283=item $local_port = port 299=item $local_port = port
284 300
285Create a new local port object that can be used either as a pattern 301Create a new local port object and returns its port ID. Initially it has
286matching port ("full port") or a single-callback port ("miniport"), 302no callbacks set and will throw an error when it receives messages.
287depending on how C<rcv> callbacks are bound to the object.
288 303
289=item $port = port { my @msg = @_; $finished } 304=item $local_port = port { my @msg = @_ }
290 305
291Creates a "miniport", that is, a very lightweight port without any pattern 306Creates a new local port, and returns its ID. Semantically the same as
292matching behind it, and returns its ID. Semantically the same as creating
293a port and calling C<rcv $port, $callback> on it. 307creating a port and calling C<rcv $port, $callback> on it.
294 308
295The block will be called for every message received on the port. When the 309The block will be called for every message received on the port, with the
296callback returns a true value its job is considered "done" and the port 310global variable C<$SELF> set to the port ID. Runtime errors will cause the
297will be destroyed. Otherwise it will stay alive. 311port to be C<kil>ed. The message will be passed as-is, no extra argument
312(i.e. no port ID) will be passed to the callback.
298 313
299The message will be passed as-is, no extra argument (i.e. no port id) will 314If you want to stop/destroy the port, simply C<kil> it:
300be passed to the callback.
301 315
302If you need the local port id in the callback, this works nicely: 316 my $port = port {
303 317 my @msg = @_;
304 my $port; $port = port { 318 ...
305 snd $otherport, reply => $port; 319 kil $SELF;
306 }; 320 };
307 321
308=cut 322=cut
309 323
310sub rcv($@); 324sub rcv($@);
325
326sub _kilme {
327 die "received message on port without callback";
328}
311 329
312sub port(;&) { 330sub port(;&) {
313 my $id = "$UNIQ." . $ID++; 331 my $id = "$UNIQ." . $ID++;
314 my $port = "$NODE#$id"; 332 my $port = "$NODE#$id";
315 333
316 if (@_) { 334 rcv $port, shift || \&_kilme;
317 rcv $port, shift;
318 } else {
319 $PORT{$id} = sub { }; # nop
320 }
321 335
322 $port 336 $port
323} 337}
324 338
325=item reg $port, $name
326
327=item reg $name
328
329Registers the given port (or C<$SELF><<< if missing) under the name
330C<$name>. If the name already exists it is replaced.
331
332A port can only be registered under one well known name.
333
334A port automatically becomes unregistered when it is killed.
335
336=cut
337
338sub reg(@) {
339 my $port = @_ > 1 ? shift : $SELF || Carp::croak 'reg: called with one argument only, but $SELF not set,';
340
341 $REG{$_[0]} = $port;
342}
343
344=item rcv $port, $callback->(@msg) 339=item rcv $local_port, $callback->(@msg)
345 340
346Replaces the callback on the specified miniport (after converting it to 341Replaces the default callback on the specified port. There is no way to
347one if required). 342remove the default callback: use C<sub { }> to disable it, or better
348 343C<kil> the port when it is no longer needed.
349=item rcv $port, tagstring => $callback->(@msg), ...
350
351=item rcv $port, $smartmatch => $callback->(@msg), ...
352
353=item rcv $port, [$smartmatch...] => $callback->(@msg), ...
354
355Register callbacks to be called on matching messages on the given full
356port (after converting it to one if required) and return the port.
357
358The callback has to return a true value when its work is done, after
359which is will be removed, or a false value in which case it will stay
360registered.
361 344
362The global C<$SELF> (exported by this module) contains C<$port> while 345The global C<$SELF> (exported by this module) contains C<$port> while
363executing the callback. 346executing the callback. Runtime errors during callback execution will
347result in the port being C<kil>ed.
364 348
365Runtime errors during callback execution will result in the port being 349The default callback received all messages not matched by a more specific
366C<kil>ed. 350C<tag> match.
367 351
368If the match is an array reference, then it will be matched against the 352=item rcv $local_port, tag => $callback->(@msg_without_tag), ...
369first elements of the message, otherwise only the first element is being
370matched.
371 353
372Any element in the match that is specified as C<_any_> (a function 354Register callbacks to be called on messages starting with the given tag on
373exported by this module) matches any single element of the message. 355the given port (and return the port), or unregister it (when C<$callback>
356is C<$undef>).
374 357
375While not required, it is highly recommended that the first matching 358The original message will be passed to the callback, after the first
376element is a string identifying the message. The one-string-only match is 359element (the tag) has been removed. The callback will use the same
377also the most efficient match (by far). 360environment as the default callback (see above).
378 361
379Example: create a port and bind receivers on it in one go. 362Example: create a port and bind receivers on it in one go.
380 363
381 my $port = rcv port, 364 my $port = rcv port,
382 msg1 => sub { ...; 0 }, 365 msg1 => sub { ... },
383 msg2 => sub { ...; 0 }, 366 msg2 => sub { ... },
384 ; 367 ;
385 368
386Example: create a port, bind receivers and send it in a message elsewhere 369Example: create a port, bind receivers and send it in a message elsewhere
387in one go: 370in one go:
388 371
389 snd $otherport, reply => 372 snd $otherport, reply =>
390 rcv port, 373 rcv port,
391 msg1 => sub { ...; 0 }, 374 msg1 => sub { ... },
392 ... 375 ...
393 ; 376 ;
394 377
395=cut 378=cut
396 379
399 my ($noderef, $portid) = split /#/, $port, 2; 382 my ($noderef, $portid) = split /#/, $port, 2;
400 383
401 ($NODE{$noderef} || add_node $noderef) == $NODE{""} 384 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
402 or Carp::croak "$port: rcv can only be called on local ports, caught"; 385 or Carp::croak "$port: rcv can only be called on local ports, caught";
403 386
404 if (@_ == 1) { 387 while (@_) {
388 if (ref $_[0]) {
389 if (my $self = $PORT_DATA{$portid}) {
390 "AnyEvent::MP::Port" eq ref $self
391 or Carp::croak "$port: rcv can only be called on message matching ports, caught";
392
393 $self->[2] = shift;
394 } else {
405 my $cb = shift; 395 my $cb = shift;
406 delete $PORT_DATA{$portid};
407 $PORT{$portid} = sub { 396 $PORT{$portid} = sub {
408 local $SELF = $port; 397 local $SELF = $port;
409 eval { 398 eval { &$cb }; _self_die if $@;
410 &$cb 399 };
411 and kil $port;
412 }; 400 }
413 _self_die if $@; 401 } elsif (defined $_[0]) {
414 };
415 } else {
416 my $self = $PORT_DATA{$portid} ||= do { 402 my $self = $PORT_DATA{$portid} ||= do {
417 my $self = bless { 403 my $self = bless [$PORT{$port} || sub { }, { }, $port], "AnyEvent::MP::Port";
418 id => $port,
419 }, "AnyEvent::MP::Port";
420 404
421 $PORT{$portid} = sub { 405 $PORT{$portid} = sub {
422 local $SELF = $port; 406 local $SELF = $port;
423 407
424 eval {
425 for (@{ $self->{rc0}{$_[0]} }) { 408 if (my $cb = $self->[1]{$_[0]}) {
426 $_ && &{$_->[0]} 409 shift;
427 && undef $_; 410 eval { &$cb }; _self_die if $@;
428 } 411 } else {
429
430 for (@{ $self->{rcv}{$_[0]} }) {
431 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
432 && &{$_->[0]} 412 &{ $self->[0] };
433 && undef $_;
434 }
435
436 for (@{ $self->{any} }) {
437 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
438 && &{$_->[0]}
439 && undef $_;
440 } 413 }
441 }; 414 };
442 _self_die if $@; 415
416 $self
443 }; 417 };
444 418
445 $self
446 };
447
448 "AnyEvent::MP::Port" eq ref $self 419 "AnyEvent::MP::Port" eq ref $self
449 or Carp::croak "$port: rcv can only be called on message matching ports, caught"; 420 or Carp::croak "$port: rcv can only be called on message matching ports, caught";
450 421
451 while (@_) {
452 my ($match, $cb) = splice @_, 0, 2; 422 my ($tag, $cb) = splice @_, 0, 2;
453 423
454 if (!ref $match) { 424 if (defined $cb) {
455 push @{ $self->{rc0}{$match} }, [$cb]; 425 $self->[1]{$tag} = $cb;
456 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
457 my ($type, @match) = @$match;
458 @match
459 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
460 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
461 } else { 426 } else {
462 push @{ $self->{any} }, [$cb, $match]; 427 delete $self->[1]{$tag};
463 } 428 }
464 } 429 }
465 } 430 }
466 431
467 $port 432 $port
771convenience functionality. 736convenience functionality.
772 737
773This means that AEMP requires a less tightly controlled environment at the 738This means that AEMP requires a less tightly controlled environment at the
774cost of longer node references and a slightly higher management overhead. 739cost of longer node references and a slightly higher management overhead.
775 740
741=item Erlang has a "remote ports are like local ports" philosophy, AEMP
742uses "local ports are like remote ports".
743
744The failure modes for local ports are quite different (runtime errors
745only) then for remote ports - when a local port dies, you I<know> it dies,
746when a connection to another node dies, you know nothing about the other
747port.
748
749Erlang pretends remote ports are as reliable as local ports, even when
750they are not.
751
752AEMP encourages a "treat remote ports differently" philosophy, with local
753ports being the special case/exception, where transport errors cannot
754occur.
755
776=item * Erlang uses processes and a mailbox, AEMP does not queue. 756=item * Erlang uses processes and a mailbox, AEMP does not queue.
777 757
778Erlang uses processes that selctively receive messages, and therefore 758Erlang uses processes that selectively receive messages, and therefore
779needs a queue. AEMP is event based, queuing messages would serve no useful 759needs a queue. AEMP is event based, queuing messages would serve no
780purpose. 760useful purpose. For the same reason the pattern-matching abilities of
761AnyEvent::MP are more limited, as there is little need to be able to
762filter messages without dequeing them.
781 763
782(But see L<Coro::MP> for a more Erlang-like process model on top of AEMP). 764(But see L<Coro::MP> for a more Erlang-like process model on top of AEMP).
783 765
784=item * Erlang sends are synchronous, AEMP sends are asynchronous. 766=item * Erlang sends are synchronous, AEMP sends are asynchronous.
785 767
786Sending messages in Erlang is synchronous and blocks the process. AEMP 768Sending messages in Erlang is synchronous and blocks the process (and
787sends are immediate, connection establishment is handled in the 769so does not need a queue that can overflow). AEMP sends are immediate,
788background. 770connection establishment is handled in the background.
789 771
790=item * Erlang can silently lose messages, AEMP cannot. 772=item * Erlang suffers from silent message loss, AEMP does not.
791 773
792Erlang makes few guarantees on messages delivery - messages can get lost 774Erlang makes few guarantees on messages delivery - messages can get lost
793without any of the processes realising it (i.e. you send messages a, b, 775without any of the processes realising it (i.e. you send messages a, b,
794and c, and the other side only receives messages a and c). 776and c, and the other side only receives messages a and c).
795 777
807eventually be killed - it cannot happen that a node detects a port as dead 789eventually be killed - it cannot happen that a node detects a port as dead
808and then later sends messages to it, finding it is still alive. 790and then later sends messages to it, finding it is still alive.
809 791
810=item * Erlang can send messages to the wrong port, AEMP does not. 792=item * Erlang can send messages to the wrong port, AEMP does not.
811 793
812In Erlang it is quite possible that a node that restarts reuses a process 794In Erlang it is quite likely that a node that restarts reuses a process ID
813ID known to other nodes for a completely different process, causing 795known to other nodes for a completely different process, causing messages
814messages destined for that process to end up in an unrelated process. 796destined for that process to end up in an unrelated process.
815 797
816AEMP never reuses port IDs, so old messages or old port IDs floating 798AEMP never reuses port IDs, so old messages or old port IDs floating
817around in the network will not be sent to an unrelated port. 799around in the network will not be sent to an unrelated port.
818 800
819=item * Erlang uses unprotected connections, AEMP uses secure 801=item * Erlang uses unprotected connections, AEMP uses secure

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines