ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.27 by root, Tue Aug 4 22:13:45 2009 UTC vs.
Revision 1.32 by root, Wed Aug 5 19:58:46 2009 UTC

43 43
44=over 4 44=over 4
45 45
46=item port 46=item port
47 47
48A port is something you can send messages to with the C<snd> function, and 48A port is something you can send messages to (with the C<snd> function).
49you can register C<rcv> handlers with. All C<rcv> handlers will receive 49
50messages they match, messages will not be queued. 50Some ports allow you to register C<rcv> handlers that can match specific
51messages. All C<rcv> handlers will receive messages they match, messages
52will not be queued.
51 53
52=item port id - C<noderef#portname> 54=item port id - C<noderef#portname>
53 55
54A port id is always the noderef, a hash-mark (C<#>) as separator, followed 56A port id is normaly the concatenation of a noderef, a hash-mark (C<#>) as
55by a port name (a printable string of unspecified format). 57separator, and a port name (a printable string of unspecified format). An
58exception is the the node port, whose ID is identical to its node
59reference.
56 60
57=item node 61=item node
58 62
59A node is a single process containing at least one port - the node 63A node is a single process containing at least one port - the node
60port. You can send messages to node ports to let them create new ports, 64port. You can send messages to node ports to find existing ports or to
61among other things. 65create new ports, among other things.
62 66
63Initially, nodes are either private (single-process only) or hidden 67Nodes are either private (single-process only), slaves (connected to a
64(connected to a master node only). Only when they epxlicitly "become 68master node only) or public nodes (connectable from unrelated nodes).
65public" can you send them messages from unrelated other nodes.
66 69
67=item noderef - C<host:port,host:port...>, C<id@noderef>, C<id> 70=item noderef - C<host:port,host:port...>, C<id@noderef>, C<id>
68 71
69A noderef is a string that either uniquely identifies a given node (for 72A node reference is a string that either simply identifies the node (for
70private and hidden nodes), or contains a recipe on how to reach a given 73private and slave nodes), or contains a recipe on how to reach a given
71node (for public nodes). 74node (for public nodes).
75
76This recipe is simply a comma-separated list of C<address:port> pairs (for
77TCP/IP, other protocols might look different).
78
79Node references come in two flavours: resolved (containing only numerical
80addresses) or unresolved (where hostnames are used instead of addresses).
81
82Before using an unresolved node reference in a message you first have to
83resolve it.
72 84
73=back 85=back
74 86
75=head1 VARIABLES/FUNCTIONS 87=head1 VARIABLES/FUNCTIONS
76 88
91use base "Exporter"; 103use base "Exporter";
92 104
93our $VERSION = '0.1'; 105our $VERSION = '0.1';
94our @EXPORT = qw( 106our @EXPORT = qw(
95 NODE $NODE *SELF node_of _any_ 107 NODE $NODE *SELF node_of _any_
96 become_slave become_public 108 resolve_node initialise_node
97 snd rcv mon kil reg psub 109 snd rcv mon kil reg psub
98 port 110 port
99); 111);
100 112
101our $SELF; 113our $SELF;
115 127
116=item $noderef = node_of $portid 128=item $noderef = node_of $portid
117 129
118Extracts and returns the noderef from a portid or a noderef. 130Extracts and returns the noderef from a portid or a noderef.
119 131
132=item $cv = resolve_node $noderef
133
134Takes an unresolved node reference that may contain hostnames and
135abbreviated IDs, resolves all of them and returns a resolved node
136reference.
137
138In addition to C<address:port> pairs allowed in resolved noderefs, the
139following forms are supported:
140
141=over 4
142
143=item the empty string
144
145An empty-string component gets resolved as if the default port (4040) was
146specified.
147
148=item naked port numbers (e.g. C<1234>)
149
150These are resolved by prepending the local nodename and a colon, to be
151further resolved.
152
153=item hostnames (e.g. C<localhost:1234>, C<localhost>)
154
155These are resolved by using AnyEvent::DNS to resolve them, optionally
156looking up SRV records for the C<aemp=4040> port, if no port was
157specified.
158
159=back
160
120=item $SELF 161=item $SELF
121 162
122Contains the current port id while executing C<rcv> callbacks or C<psub> 163Contains the current port id while executing C<rcv> callbacks or C<psub>
123blocks. 164blocks.
124 165
148JSON is used, then only strings, numbers and arrays and hashes consisting 189JSON is used, then only strings, numbers and arrays and hashes consisting
149of those are allowed (no objects). When Storable is used, then anything 190of those are allowed (no objects). When Storable is used, then anything
150that Storable can serialise and deserialise is allowed, and for the local 191that Storable can serialise and deserialise is allowed, and for the local
151node, anything can be passed. 192node, anything can be passed.
152 193
153=item kil $portid[, @reason]
154
155Kill the specified port with the given C<@reason>.
156
157If no C<@reason> is specified, then the port is killed "normally" (linked
158ports will not be kileld, or even notified).
159
160Otherwise, linked ports get killed with the same reason (second form of
161C<mon>, see below).
162
163Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
164will be reported as reason C<< die => $@ >>.
165
166Transport/communication errors are reported as C<< transport_error =>
167$message >>.
168
169=item $guard = mon $portid, $cb->(@reason)
170
171=item $guard = mon $portid, $otherport
172
173=item $guard = mon $portid, $otherport, @msg
174
175Monitor the given port and do something when the port is killed.
176
177In the first form, the callback is simply called with any number
178of C<@reason> elements (no @reason means that the port was deleted
179"normally"). Note also that I<< the callback B<must> never die >>, so use
180C<eval> if unsure.
181
182In the second form, the other port will be C<kil>'ed with C<@reason>, iff
183a @reason was specified, i.e. on "normal" kils nothing happens, while
184under all other conditions, the other port is killed with the same reason.
185
186In the last form, a message of the form C<@msg, @reason> will be C<snd>.
187
188Example: call a given callback when C<$port> is killed.
189
190 mon $port, sub { warn "port died because of <@_>\n" };
191
192Example: kill ourselves when C<$port> is killed abnormally.
193
194 mon $port, $self;
195
196Example: send us a restart message another C<$port> is killed.
197
198 mon $port, $self => "restart";
199
200=cut
201
202sub mon {
203 my ($noderef, $port, $cb) = ((split /#/, shift, 2), shift);
204
205 my $node = $NODE{$noderef} || add_node $noderef;
206
207 #TODO: ports must not be references
208 if (!ref $cb or "AnyEvent::MP::Port" eq ref $cb) {
209 if (@_) {
210 # send a kill info message
211 my (@msg) = ($cb, @_);
212 $cb = sub { snd @msg, @_ };
213 } else {
214 # simply kill other port
215 my $port = $cb;
216 $cb = sub { kil $port, @_ if @_ };
217 }
218 }
219
220 $node->monitor ($port, $cb);
221
222 defined wantarray
223 and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
224}
225
226=item $guard = mon_guard $port, $ref, $ref...
227
228Monitors the given C<$port> and keeps the passed references. When the port
229is killed, the references will be freed.
230
231Optionally returns a guard that will stop the monitoring.
232
233This function is useful when you create e.g. timers or other watchers and
234want to free them when the port gets killed:
235
236 $port->rcv (start => sub {
237 my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub {
238 undef $timer if 0.9 < rand;
239 });
240 });
241
242=cut
243
244sub mon_guard {
245 my ($port, @refs) = @_;
246
247 mon $port, sub { 0 && @refs }
248}
249
250=item lnk $port1, $port2
251
252Link two ports. This is simply a shorthand for:
253
254 mon $port1, $port2;
255 mon $port2, $port1;
256
257It means that if either one is killed abnormally, the other one gets
258killed as well.
259
260=item $local_port = port 194=item $local_port = port
261 195
262Create a new local port object that supports message matching. 196Create a new local port object that can be used either as a pattern
197matching port ("full port") or a single-callback port ("miniport"),
198depending on how C<rcv> callbacks are bound to the object.
263 199
264=item $portid = port { my @msg = @_; $finished } 200=item $portid = port { my @msg = @_; $finished }
265 201
266Creates a "mini port", that is, a very lightweight port without any 202Creates a "mini port", that is, a very lightweight port without any
267pattern matching behind it, and returns its ID. 203pattern matching behind it, and returns its ID.
273The message will be passed as-is, no extra argument (i.e. no port id) will 209The message will be passed as-is, no extra argument (i.e. no port id) will
274be passed to the callback. 210be passed to the callback.
275 211
276If you need the local port id in the callback, this works nicely: 212If you need the local port id in the callback, this works nicely:
277 213
278 my $port; $port = miniport { 214 my $port; $port = port {
279 snd $otherport, reply => $port; 215 snd $otherport, reply => $port;
280 }; 216 };
281 217
282=cut 218=cut
283 219
344 my ($portid, $name) = @_; 280 my ($portid, $name) = @_;
345 281
346 $REG{$name} = $portid; 282 $REG{$name} = $portid;
347} 283}
348 284
285=item rcv $portid, $callback->(@msg)
286
287Replaces the callback on the specified miniport (or newly created port
288object, see C<port>). Full ports are configured with the following calls:
289
349=item rcv $portid, tagstring => $callback->(@msg), ... 290=item rcv $portid, tagstring => $callback->(@msg), ...
350 291
351=item rcv $portid, $smartmatch => $callback->(@msg), ... 292=item rcv $portid, $smartmatch => $callback->(@msg), ...
352 293
353=item rcv $portid, [$smartmatch...] => $callback->(@msg), ... 294=item rcv $portid, [$smartmatch...] => $callback->(@msg), ...
354 295
355Register callbacks to be called on matching messages on the given port. 296Register callbacks to be called on matching messages on the given full
297port (or newly created port).
356 298
357The callback has to return a true value when its work is done, after 299The callback has to return a true value when its work is done, after
358which is will be removed, or a false value in which case it will stay 300which is will be removed, or a false value in which case it will stay
359registered. 301registered.
360 302
376also the most efficient match (by far). 318also the most efficient match (by far).
377 319
378=cut 320=cut
379 321
380sub rcv($@) { 322sub rcv($@) {
323 my $portid = shift;
381 my ($noderef, $port) = split /#/, shift, 2; 324 my ($noderef, $port) = split /#/, $port, 2;
382 325
383 ($NODE{$noderef} || add_node $noderef) == $NODE{""} 326 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
384 or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught"; 327 or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught";
385 328
386 my $self = $PORT_DATA{$port} 329 my $self = $PORT_DATA{$port}
401 : push @{ $self->{rc0}{$match->[0]} }, [$cb]; 344 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
402 } else { 345 } else {
403 push @{ $self->{any} }, [$cb, $match]; 346 push @{ $self->{any} }, [$cb, $match];
404 } 347 }
405 } 348 }
349
350 $portid
406} 351}
407 352
408=item $closure = psub { BLOCK } 353=item $closure = psub { BLOCK }
409 354
410Remembers C<$SELF> and creates a closure out of the BLOCK. When the 355Remembers C<$SELF> and creates a closure out of the BLOCK. When the
441 $res 386 $res
442 } 387 }
443 } 388 }
444} 389}
445 390
391=item $guard = mon $portid, $cb->(@reason)
392
393=item $guard = mon $portid, $otherport
394
395=item $guard = mon $portid, $otherport, @msg
396
397Monitor the given port and do something when the port is killed.
398
399In the first form, the callback is simply called with any number
400of C<@reason> elements (no @reason means that the port was deleted
401"normally"). Note also that I<< the callback B<must> never die >>, so use
402C<eval> if unsure.
403
404In the second form, the other port will be C<kil>'ed with C<@reason>, iff
405a @reason was specified, i.e. on "normal" kils nothing happens, while
406under all other conditions, the other port is killed with the same reason.
407
408In the last form, a message of the form C<@msg, @reason> will be C<snd>.
409
410Example: call a given callback when C<$port> is killed.
411
412 mon $port, sub { warn "port died because of <@_>\n" };
413
414Example: kill ourselves when C<$port> is killed abnormally.
415
416 mon $port, $self;
417
418Example: send us a restart message another C<$port> is killed.
419
420 mon $port, $self => "restart";
421
422=cut
423
424sub mon {
425 my ($noderef, $port) = split /#/, shift, 2;
426
427 my $node = $NODE{$noderef} || add_node $noderef;
428
429 my $cb = shift;
430
431 unless (ref $cb) {
432 if (@_) {
433 # send a kill info message
434 my (@msg) = ($cb, @_);
435 $cb = sub { snd @msg, @_ };
436 } else {
437 # simply kill other port
438 my $port = $cb;
439 $cb = sub { kil $port, @_ if @_ };
440 }
441 }
442
443 $node->monitor ($port, $cb);
444
445 defined wantarray
446 and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
447}
448
449=item $guard = mon_guard $port, $ref, $ref...
450
451Monitors the given C<$port> and keeps the passed references. When the port
452is killed, the references will be freed.
453
454Optionally returns a guard that will stop the monitoring.
455
456This function is useful when you create e.g. timers or other watchers and
457want to free them when the port gets killed:
458
459 $port->rcv (start => sub {
460 my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub {
461 undef $timer if 0.9 < rand;
462 });
463 });
464
465=cut
466
467sub mon_guard {
468 my ($port, @refs) = @_;
469
470 mon $port, sub { 0 && @refs }
471}
472
473=item lnk $port1, $port2
474
475Link two ports. This is simply a shorthand for:
476
477 mon $port1, $port2;
478 mon $port2, $port1;
479
480It means that if either one is killed abnormally, the other one gets
481killed as well.
482
483=item kil $portid[, @reason]
484
485Kill the specified port with the given C<@reason>.
486
487If no C<@reason> is specified, then the port is killed "normally" (linked
488ports will not be kileld, or even notified).
489
490Otherwise, linked ports get killed with the same reason (second form of
491C<mon>, see below).
492
493Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
494will be reported as reason C<< die => $@ >>.
495
496Transport/communication errors are reported as C<< transport_error =>
497$message >>.
498
446=back 499=back
447 500
448=head1 FUNCTIONS FOR NODES 501=head1 FUNCTIONS FOR NODES
449 502
450=over 4 503=over 4
451 504
452=item become_public endpoint... 505=item become_public $noderef
453 506
454Tells the node to become a public node, i.e. reachable from other nodes. 507Tells the node to become a public node, i.e. reachable from other nodes.
455 508
456If no arguments are given, or the first argument is C<undef>, then 509The first argument is the (unresolved) node reference of the local node
457AnyEvent::MP tries to bind on port C<4040> on all IP addresses that the 510(if missing then the empty string is used).
458local nodename resolves to.
459 511
460Otherwise the first argument must be an array-reference with transport 512It is quite common to not specify anything, in which case the local node
461endpoints ("ip:port", "hostname:port") or port numbers (in which case the 513tries to listen on the default port, or to only specify a port number, in
462local nodename is used as hostname). The endpoints are all resolved and 514which case AnyEvent::MP tries to guess the local addresses.
463will become the node reference.
464 515
465=cut 516=cut
466 517
467=back 518=back
468 519
471Nodes understand the following messages sent to them. Many of them take 522Nodes understand the following messages sent to them. Many of them take
472arguments called C<@reply>, which will simply be used to compose a reply 523arguments called C<@reply>, which will simply be used to compose a reply
473message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and 524message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and
474the remaining arguments are simply the message data. 525the remaining arguments are simply the message data.
475 526
527While other messages exist, they are not public and subject to change.
528
476=over 4 529=over 4
477 530
478=cut 531=cut
479 532
480=item lookup => $name, @reply 533=item lookup => $name, @reply
582authentication and can use TLS. 635authentication and can use TLS.
583 636
584AEMP can use a proven protocol - SSL/TLS - to protect connections and 637AEMP can use a proven protocol - SSL/TLS - to protect connections and
585securely authenticate nodes. 638securely authenticate nodes.
586 639
640=item * The AEMP protocol is optimised for both text-based and binary
641communications.
642
643The AEMP protocol, unlike the erlang protocol, supports both
644language-independent text-only protocols (good for debugging) and binary,
645language-specific serialisers (e.g. Storable).
646
647It has also been carefully designed to be implementable in other languages
648with a minimum of work while gracefully degrading fucntionality to make the
649protocol simple.
650
587=back 651=back
588 652
589=head1 SEE ALSO 653=head1 SEE ALSO
590 654
591L<AnyEvent>. 655L<AnyEvent>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines