ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.23
Committed: Tue Aug 4 18:46:16 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.22: +5 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP - multi-processing/message-passing framework
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.22 $NODE # contains this node's noderef
10     NODE # returns this node's noderef
11     NODE $port # returns the noderef of the port
12 root 1.2
13     snd $port, type => data...;
14    
15 root 1.22 $SELF # receiving/own port id in rcv callbacks
16    
17 root 1.2 rcv $port, smartmatch => $cb->($port, @msg);
18    
19     # examples:
20     rcv $port2, ping => sub { snd $_[0], "pong"; 0 };
21     rcv $port1, pong => sub { warn "pong received\n" };
22     snd $port2, ping => $port1;
23    
24     # more, smarter, matches (_any_ is exported by this module)
25     rcv $port, [child_died => $pid] => sub { ...
26     rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
27    
28 root 1.1 =head1 DESCRIPTION
29    
30 root 1.2 This module (-family) implements a simple message passing framework.
31    
32     Despite its simplicity, you can securely message other processes running
33     on the same or other hosts.
34    
35 root 1.23 For an introduction to this module family, see the L<AnyEvent::MP::Intro>
36     manual page.
37    
38     At the moment, this module family is severly broken and underdocumented,
39 root 1.21 so do not use. This was uploaded mainly to reserve the CPAN namespace -
40 root 1.23 stay tuned! The basic API should be finished, however.
41 root 1.6
42 root 1.2 =head1 CONCEPTS
43    
44     =over 4
45    
46     =item port
47    
48     A port is something you can send messages to with the C<snd> function, and
49     you can register C<rcv> handlers with. All C<rcv> handlers will receive
50     messages they match, messages will not be queued.
51    
52 root 1.3 =item port id - C<noderef#portname>
53 root 1.2
54 root 1.3 A port id is always the noderef, a hash-mark (C<#>) as separator, followed
55     by a port name (a printable string of unspecified format).
56 root 1.2
57     =item node
58    
59     A node is a single process containing at least one port - the node
60     port. You can send messages to node ports to let them create new ports,
61     among other things.
62    
63     Initially, nodes are either private (single-process only) or hidden
64 root 1.3 (connected to a master node only). Only when they epxlicitly "become
65     public" can you send them messages from unrelated other nodes.
66 root 1.2
67 root 1.5 =item noderef - C<host:port,host:port...>, C<id@noderef>, C<id>
68 root 1.2
69 root 1.3 A noderef is a string that either uniquely identifies a given node (for
70 root 1.2 private and hidden nodes), or contains a recipe on how to reach a given
71     node (for public nodes).
72    
73     =back
74    
75 root 1.3 =head1 VARIABLES/FUNCTIONS
76 root 1.2
77     =over 4
78    
79 root 1.1 =cut
80    
81     package AnyEvent::MP;
82    
83 root 1.8 use AnyEvent::MP::Base;
84 root 1.2
85 root 1.1 use common::sense;
86    
87 root 1.2 use Carp ();
88    
89 root 1.1 use AE ();
90    
91 root 1.2 use base "Exporter";
92    
93 root 1.9 our $VERSION = '0.02';
94 root 1.8 our @EXPORT = qw(
95 root 1.22 NODE $NODE *SELF node_of _any_
96 root 1.8 become_slave become_public
97 root 1.22 snd rcv mon kil reg psub
98     port
99 root 1.8 );
100 root 1.2
101 root 1.22 our $SELF;
102    
103     sub _self_die() {
104     my $msg = $@;
105     $msg =~ s/\n+$// unless ref $msg;
106     kil $SELF, die => $msg;
107     }
108    
109     =item $thisnode = NODE / $NODE
110    
111     The C<NODE> function returns, and the C<$NODE> variable contains
112     the noderef of the local node. The value is initialised by a call
113     to C<become_public> or C<become_slave>, after which all local port
114     identifiers become invalid.
115    
116     =item $noderef = node_of $portid
117    
118     Extracts and returns the noderef from a portid or a noderef.
119    
120     =item $SELF
121    
122     Contains the current port id while executing C<rcv> callbacks or C<psub>
123     blocks.
124 root 1.3
125 root 1.22 =item SELF, %SELF, @SELF...
126    
127     Due to some quirks in how perl exports variables, it is impossible to
128     just export C<$SELF>, all the symbols called C<SELF> are exported by this
129     module, but only C<$SELF> is currently used.
130 root 1.3
131     =item snd $portid, type => @data
132    
133     =item snd $portid, @msg
134    
135 root 1.8 Send the given message to the given port ID, which can identify either
136     a local or a remote port, and can be either a string or soemthignt hat
137     stringifies a sa port ID (such as a port object :).
138    
139     While the message can be about anything, it is highly recommended to use a
140     string as first element (a portid, or some word that indicates a request
141     type etc.).
142 root 1.3
143     The message data effectively becomes read-only after a call to this
144     function: modifying any argument is not allowed and can cause many
145     problems.
146    
147     The type of data you can transfer depends on the transport protocol: when
148     JSON is used, then only strings, numbers and arrays and hashes consisting
149     of those are allowed (no objects). When Storable is used, then anything
150     that Storable can serialise and deserialise is allowed, and for the local
151     node, anything can be passed.
152    
153 root 1.22 =item kil $portid[, @reason]
154    
155     Kill the specified port with the given C<@reason>.
156    
157     If no C<@reason> is specified, then the port is killed "normally" (linked
158     ports will not be kileld, or even notified).
159    
160     Otherwise, linked ports get killed with the same reason (second form of
161     C<mon>, see below).
162    
163     Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
164     will be reported as reason C<< die => $@ >>.
165    
166     Transport/communication errors are reported as C<< transport_error =>
167     $message >>.
168    
169     =item $guard = mon $portid, $cb->(@reason)
170 root 1.18
171 root 1.21 =item $guard = mon $portid, $otherport
172    
173     =item $guard = mon $portid, $otherport, @msg
174    
175 root 1.22 Monitor the given port and do something when the port is killed.
176    
177     In the first form, the callback is simply called with any number
178     of C<@reason> elements (no @reason means that the port was deleted
179     "normally"). Note also that I<< the callback B<must> never die >>, so use
180     C<eval> if unsure.
181    
182     In the second form, the other port will be C<kil>'ed with C<@reason>, iff
183     a @reason was specified, i.e. on "normal" kils nothing happens, while
184     under all other conditions, the other port is killed with the same reason.
185 root 1.20
186 root 1.22 In the last form, a message of the form C<@msg, @reason> will be C<snd>.
187    
188     Example: call a given callback when C<$port> is killed.
189    
190     mon $port, sub { warn "port died because of <@_>\n" };
191    
192     Example: kill ourselves when C<$port> is killed abnormally.
193    
194     mon $port, $self;
195    
196     Example: send us a restart message another C<$port> is killed.
197    
198     mon $port, $self => "restart";
199 root 1.18
200     =cut
201    
202     sub mon {
203 root 1.21 my ($noderef, $port, $cb) = ((split /#/, shift, 2), shift);
204 root 1.18
205 root 1.22 my $node = $NODE{$noderef} || add_node $noderef;
206 root 1.18
207 root 1.21 #TODO: ports must not be references
208     if (!ref $cb or "AnyEvent::MP::Port" eq ref $cb) {
209     if (@_) {
210     # send a kill info message
211     my (@msg) = ($cb, @_);
212     $cb = sub { snd @msg, @_ };
213     } else {
214     # simply kill other port
215     my $port = $cb;
216 root 1.22 $cb = sub { kil $port, @_ if @_ };
217 root 1.21 }
218     }
219 root 1.18
220     $node->monitor ($port, $cb);
221    
222     defined wantarray
223     and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
224     }
225    
226 root 1.21 =item $guard = mon_guard $port, $ref, $ref...
227    
228     Monitors the given C<$port> and keeps the passed references. When the port
229     is killed, the references will be freed.
230    
231     Optionally returns a guard that will stop the monitoring.
232    
233     This function is useful when you create e.g. timers or other watchers and
234     want to free them when the port gets killed:
235    
236     $port->rcv (start => sub {
237     my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub {
238     undef $timer if 0.9 < rand;
239     });
240     });
241    
242     =cut
243    
244     sub mon_guard {
245     my ($port, @refs) = @_;
246    
247     mon $port, sub { 0 && @refs }
248     }
249    
250 root 1.22 =item $local_port = port
251 root 1.2
252 root 1.22 Create a new local port object that supports message matching.
253 root 1.3
254 root 1.22 =item $portid = port { my @msg = @_; $finished }
255 root 1.10
256 root 1.15 Creates a "mini port", that is, a very lightweight port without any
257     pattern matching behind it, and returns its ID.
258    
259     The block will be called for every message received on the port. When the
260     callback returns a true value its job is considered "done" and the port
261     will be destroyed. Otherwise it will stay alive.
262    
263 root 1.17 The message will be passed as-is, no extra argument (i.e. no port id) will
264 root 1.15 be passed to the callback.
265    
266     If you need the local port id in the callback, this works nicely:
267    
268     my $port; $port = miniport {
269     snd $otherport, reply => $port;
270     };
271 root 1.10
272     =cut
273    
274 root 1.22 sub port(;&) {
275     my $id = "$UNIQ." . $ID++;
276     my $port = "$NODE#$id";
277    
278     if (@_) {
279     my $cb = shift;
280     $PORT{$id} = sub {
281     local $SELF = $port;
282     eval {
283     &$cb
284     and kil $id;
285     };
286     _self_die if $@;
287     };
288     } else {
289     my $self = bless {
290     id => "$NODE#$id",
291     }, "AnyEvent::MP::Port";
292    
293     $PORT_DATA{$id} = $self;
294     $PORT{$id} = sub {
295     local $SELF = $port;
296    
297     eval {
298     for (@{ $self->{rc0}{$_[0]} }) {
299     $_ && &{$_->[0]}
300     && undef $_;
301     }
302    
303     for (@{ $self->{rcv}{$_[0]} }) {
304     $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
305     && &{$_->[0]}
306     && undef $_;
307     }
308    
309     for (@{ $self->{any} }) {
310     $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
311     && &{$_->[0]}
312     && undef $_;
313     }
314     };
315     _self_die if $@;
316     };
317     }
318 root 1.10
319 root 1.22 $port
320 root 1.10 }
321    
322 root 1.22 =item reg $portid, $name
323 root 1.8
324 root 1.22 Registers the given port under the name C<$name>. If the name already
325     exists it is replaced.
326 root 1.8
327 root 1.22 A port can only be registered under one well known name.
328 root 1.8
329 root 1.22 A port automatically becomes unregistered when it is killed.
330 root 1.8
331     =cut
332    
333 root 1.22 sub reg(@) {
334     my ($portid, $name) = @_;
335 root 1.8
336 root 1.22 $REG{$name} = $portid;
337     }
338 root 1.18
339 root 1.22 =item rcv $portid, tagstring => $callback->(@msg), ...
340 root 1.3
341 root 1.22 =item rcv $portid, $smartmatch => $callback->(@msg), ...
342 root 1.3
343 root 1.22 =item rcv $portid, [$smartmatch...] => $callback->(@msg), ...
344 root 1.3
345 root 1.22 Register callbacks to be called on matching messages on the given port.
346 root 1.3
347     The callback has to return a true value when its work is done, after
348     which is will be removed, or a false value in which case it will stay
349     registered.
350    
351 root 1.22 The global C<$SELF> (exported by this module) contains C<$portid> while
352     executing the callback.
353    
354     Runtime errors wdurign callback execution will result in the port being
355     C<kil>ed.
356    
357 root 1.3 If the match is an array reference, then it will be matched against the
358     first elements of the message, otherwise only the first element is being
359     matched.
360    
361     Any element in the match that is specified as C<_any_> (a function
362     exported by this module) matches any single element of the message.
363    
364     While not required, it is highly recommended that the first matching
365     element is a string identifying the message. The one-string-only match is
366     also the most efficient match (by far).
367    
368     =cut
369    
370     sub rcv($@) {
371 root 1.22 my ($noderef, $port) = split /#/, shift, 2;
372 root 1.3
373 root 1.22 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
374     or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught";
375    
376     my $self = $PORT_DATA{$port}
377     or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
378    
379     "AnyEvent::MP::Port" eq ref $self
380     or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
381    
382     while (@_) {
383     my ($match, $cb) = splice @_, 0, 2;
384    
385     if (!ref $match) {
386     push @{ $self->{rc0}{$match} }, [$cb];
387     } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
388     my ($type, @match) = @$match;
389     @match
390     ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
391     : push @{ $self->{rc0}{$match->[0]} }, [$cb];
392     } else {
393     push @{ $self->{any} }, [$cb, $match];
394     }
395 root 1.3 }
396 root 1.2 }
397    
398 root 1.22 =item $closure = psub { BLOCK }
399 root 1.2
400 root 1.22 Remembers C<$SELF> and creates a closure out of the BLOCK. When the
401     closure is executed, sets up the environment in the same way as in C<rcv>
402     callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
403    
404     This is useful when you register callbacks from C<rcv> callbacks:
405    
406     rcv delayed_reply => sub {
407     my ($delay, @reply) = @_;
408     my $timer = AE::timer $delay, 0, psub {
409     snd @reply, $SELF;
410     };
411     };
412 root 1.3
413 root 1.8 =cut
414 root 1.3
415 root 1.22 sub psub(&) {
416     my $cb = shift;
417 root 1.3
418 root 1.22 my $port = $SELF
419     or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
420 root 1.1
421 root 1.22 sub {
422     local $SELF = $port;
423 root 1.2
424 root 1.22 if (wantarray) {
425     my @res = eval { &$cb };
426     _self_die if $@;
427     @res
428     } else {
429     my $res = eval { &$cb };
430     _self_die if $@;
431     $res
432     }
433     }
434 root 1.2 }
435    
436 root 1.8 =back
437    
438     =head1 FUNCTIONS FOR NODES
439    
440     =over 4
441 root 1.2
442 root 1.8 =item become_public endpoint...
443    
444     Tells the node to become a public node, i.e. reachable from other nodes.
445    
446     If no arguments are given, or the first argument is C<undef>, then
447     AnyEvent::MP tries to bind on port C<4040> on all IP addresses that the
448     local nodename resolves to.
449    
450     Otherwise the first argument must be an array-reference with transport
451     endpoints ("ip:port", "hostname:port") or port numbers (in which case the
452     local nodename is used as hostname). The endpoints are all resolved and
453     will become the node reference.
454 root 1.2
455 root 1.8 =cut
456 root 1.1
457 root 1.4 =back
458    
459     =head1 NODE MESSAGES
460    
461 root 1.5 Nodes understand the following messages sent to them. Many of them take
462     arguments called C<@reply>, which will simply be used to compose a reply
463     message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and
464     the remaining arguments are simply the message data.
465 root 1.4
466     =over 4
467    
468     =cut
469    
470 root 1.22 =item lookup => $name, @reply
471 root 1.3
472 root 1.8 Replies with the port ID of the specified well-known port, or C<undef>.
473 root 1.3
474 root 1.7 =item devnull => ...
475    
476     Generic data sink/CPU heat conversion.
477    
478 root 1.4 =item relay => $port, @msg
479    
480     Simply forwards the message to the given port.
481    
482     =item eval => $string[ @reply]
483    
484     Evaluates the given string. If C<@reply> is given, then a message of the
485 root 1.5 form C<@reply, $@, @evalres> is sent.
486    
487     Example: crash another node.
488    
489     snd $othernode, eval => "exit";
490 root 1.4
491     =item time => @reply
492    
493     Replies the the current node time to C<@reply>.
494    
495 root 1.5 Example: tell the current node to send the current time to C<$myport> in a
496     C<timereply> message.
497    
498     snd $NODE, time => $myport, timereply => 1, 2;
499     # => snd $myport, timereply => 1, 2, <time>
500    
501 root 1.2 =back
502    
503 root 1.1 =head1 SEE ALSO
504    
505     L<AnyEvent>.
506    
507     =head1 AUTHOR
508    
509     Marc Lehmann <schmorp@schmorp.de>
510     http://home.schmorp.de/
511    
512     =cut
513    
514     1
515