ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.22
Committed: Tue Aug 4 18:33:30 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.21: +198 -116 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP - multi-processing/message-passing framework
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.22 $NODE # contains this node's noderef
10     NODE # returns this node's noderef
11     NODE $port # returns the noderef of the port
12 root 1.2
13     snd $port, type => data...;
14    
15 root 1.22 $SELF # receiving/own port id in rcv callbacks
16    
17 root 1.2 rcv $port, smartmatch => $cb->($port, @msg);
18    
19     # examples:
20     rcv $port2, ping => sub { snd $_[0], "pong"; 0 };
21     rcv $port1, pong => sub { warn "pong received\n" };
22     snd $port2, ping => $port1;
23    
24     # more, smarter, matches (_any_ is exported by this module)
25     rcv $port, [child_died => $pid] => sub { ...
26     rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
27    
28 root 1.1 =head1 DESCRIPTION
29    
30 root 1.2 This module (-family) implements a simple message passing framework.
31    
32     Despite its simplicity, you can securely message other processes running
33     on the same or other hosts.
34    
35 root 1.6 At the moment, this module family is severly brokena nd underdocumented,
36 root 1.21 so do not use. This was uploaded mainly to reserve the CPAN namespace -
37 root 1.6 stay tuned!
38    
39 root 1.2 =head1 CONCEPTS
40    
41     =over 4
42    
43     =item port
44    
45     A port is something you can send messages to with the C<snd> function, and
46     you can register C<rcv> handlers with. All C<rcv> handlers will receive
47     messages they match, messages will not be queued.
48    
49 root 1.3 =item port id - C<noderef#portname>
50 root 1.2
51 root 1.3 A port id is always the noderef, a hash-mark (C<#>) as separator, followed
52     by a port name (a printable string of unspecified format).
53 root 1.2
54     =item node
55    
56     A node is a single process containing at least one port - the node
57     port. You can send messages to node ports to let them create new ports,
58     among other things.
59    
60     Initially, nodes are either private (single-process only) or hidden
61 root 1.3 (connected to a master node only). Only when they epxlicitly "become
62     public" can you send them messages from unrelated other nodes.
63 root 1.2
64 root 1.5 =item noderef - C<host:port,host:port...>, C<id@noderef>, C<id>
65 root 1.2
66 root 1.3 A noderef is a string that either uniquely identifies a given node (for
67 root 1.2 private and hidden nodes), or contains a recipe on how to reach a given
68     node (for public nodes).
69    
70     =back
71    
72 root 1.3 =head1 VARIABLES/FUNCTIONS
73 root 1.2
74     =over 4
75    
76 root 1.1 =cut
77    
78     package AnyEvent::MP;
79    
80 root 1.8 use AnyEvent::MP::Base;
81 root 1.2
82 root 1.1 use common::sense;
83    
84 root 1.2 use Carp ();
85    
86 root 1.1 use AE ();
87    
88 root 1.2 use base "Exporter";
89    
90 root 1.9 our $VERSION = '0.02';
91 root 1.8 our @EXPORT = qw(
92 root 1.22 NODE $NODE *SELF node_of _any_
93 root 1.8 become_slave become_public
94 root 1.22 snd rcv mon kil reg psub
95     port
96 root 1.8 );
97 root 1.2
98 root 1.22 our $SELF;
99    
100     sub _self_die() {
101     my $msg = $@;
102     $msg =~ s/\n+$// unless ref $msg;
103     kil $SELF, die => $msg;
104     }
105    
106     =item $thisnode = NODE / $NODE
107    
108     The C<NODE> function returns, and the C<$NODE> variable contains
109     the noderef of the local node. The value is initialised by a call
110     to C<become_public> or C<become_slave>, after which all local port
111     identifiers become invalid.
112    
113     =item $noderef = node_of $portid
114    
115     Extracts and returns the noderef from a portid or a noderef.
116    
117     =item $SELF
118    
119     Contains the current port id while executing C<rcv> callbacks or C<psub>
120     blocks.
121 root 1.3
122 root 1.22 =item SELF, %SELF, @SELF...
123    
124     Due to some quirks in how perl exports variables, it is impossible to
125     just export C<$SELF>, all the symbols called C<SELF> are exported by this
126     module, but only C<$SELF> is currently used.
127 root 1.3
128     =item snd $portid, type => @data
129    
130     =item snd $portid, @msg
131    
132 root 1.8 Send the given message to the given port ID, which can identify either
133     a local or a remote port, and can be either a string or soemthignt hat
134     stringifies a sa port ID (such as a port object :).
135    
136     While the message can be about anything, it is highly recommended to use a
137     string as first element (a portid, or some word that indicates a request
138     type etc.).
139 root 1.3
140     The message data effectively becomes read-only after a call to this
141     function: modifying any argument is not allowed and can cause many
142     problems.
143    
144     The type of data you can transfer depends on the transport protocol: when
145     JSON is used, then only strings, numbers and arrays and hashes consisting
146     of those are allowed (no objects). When Storable is used, then anything
147     that Storable can serialise and deserialise is allowed, and for the local
148     node, anything can be passed.
149    
150 root 1.22 =item kil $portid[, @reason]
151    
152     Kill the specified port with the given C<@reason>.
153    
154     If no C<@reason> is specified, then the port is killed "normally" (linked
155     ports will not be kileld, or even notified).
156    
157     Otherwise, linked ports get killed with the same reason (second form of
158     C<mon>, see below).
159    
160     Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
161     will be reported as reason C<< die => $@ >>.
162    
163     Transport/communication errors are reported as C<< transport_error =>
164     $message >>.
165    
166     =item $guard = mon $portid, $cb->(@reason)
167 root 1.18
168 root 1.21 =item $guard = mon $portid, $otherport
169    
170     =item $guard = mon $portid, $otherport, @msg
171    
172 root 1.22 Monitor the given port and do something when the port is killed.
173    
174     In the first form, the callback is simply called with any number
175     of C<@reason> elements (no @reason means that the port was deleted
176     "normally"). Note also that I<< the callback B<must> never die >>, so use
177     C<eval> if unsure.
178    
179     In the second form, the other port will be C<kil>'ed with C<@reason>, iff
180     a @reason was specified, i.e. on "normal" kils nothing happens, while
181     under all other conditions, the other port is killed with the same reason.
182 root 1.20
183 root 1.22 In the last form, a message of the form C<@msg, @reason> will be C<snd>.
184    
185     Example: call a given callback when C<$port> is killed.
186    
187     mon $port, sub { warn "port died because of <@_>\n" };
188    
189     Example: kill ourselves when C<$port> is killed abnormally.
190    
191     mon $port, $self;
192    
193     Example: send us a restart message another C<$port> is killed.
194    
195     mon $port, $self => "restart";
196 root 1.18
197     =cut
198    
199     sub mon {
200 root 1.21 my ($noderef, $port, $cb) = ((split /#/, shift, 2), shift);
201 root 1.18
202 root 1.22 my $node = $NODE{$noderef} || add_node $noderef;
203 root 1.18
204 root 1.21 #TODO: ports must not be references
205     if (!ref $cb or "AnyEvent::MP::Port" eq ref $cb) {
206     if (@_) {
207     # send a kill info message
208     my (@msg) = ($cb, @_);
209     $cb = sub { snd @msg, @_ };
210     } else {
211     # simply kill other port
212     my $port = $cb;
213 root 1.22 $cb = sub { kil $port, @_ if @_ };
214 root 1.21 }
215     }
216 root 1.18
217     $node->monitor ($port, $cb);
218    
219     defined wantarray
220     and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
221     }
222    
223 root 1.21 =item $guard = mon_guard $port, $ref, $ref...
224    
225     Monitors the given C<$port> and keeps the passed references. When the port
226     is killed, the references will be freed.
227    
228     Optionally returns a guard that will stop the monitoring.
229    
230     This function is useful when you create e.g. timers or other watchers and
231     want to free them when the port gets killed:
232    
233     $port->rcv (start => sub {
234     my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub {
235     undef $timer if 0.9 < rand;
236     });
237     });
238    
239     =cut
240    
241     sub mon_guard {
242     my ($port, @refs) = @_;
243    
244     mon $port, sub { 0 && @refs }
245     }
246    
247 root 1.22 =item $local_port = port
248 root 1.2
249 root 1.22 Create a new local port object that supports message matching.
250 root 1.3
251 root 1.22 =item $portid = port { my @msg = @_; $finished }
252 root 1.10
253 root 1.15 Creates a "mini port", that is, a very lightweight port without any
254     pattern matching behind it, and returns its ID.
255    
256     The block will be called for every message received on the port. When the
257     callback returns a true value its job is considered "done" and the port
258     will be destroyed. Otherwise it will stay alive.
259    
260 root 1.17 The message will be passed as-is, no extra argument (i.e. no port id) will
261 root 1.15 be passed to the callback.
262    
263     If you need the local port id in the callback, this works nicely:
264    
265     my $port; $port = miniport {
266     snd $otherport, reply => $port;
267     };
268 root 1.10
269     =cut
270    
271 root 1.22 sub port(;&) {
272     my $id = "$UNIQ." . $ID++;
273     my $port = "$NODE#$id";
274    
275     if (@_) {
276     my $cb = shift;
277     $PORT{$id} = sub {
278     local $SELF = $port;
279     eval {
280     &$cb
281     and kil $id;
282     };
283     _self_die if $@;
284     };
285     } else {
286     my $self = bless {
287     id => "$NODE#$id",
288     }, "AnyEvent::MP::Port";
289    
290     $PORT_DATA{$id} = $self;
291     $PORT{$id} = sub {
292     local $SELF = $port;
293    
294     eval {
295     for (@{ $self->{rc0}{$_[0]} }) {
296     $_ && &{$_->[0]}
297     && undef $_;
298     }
299    
300     for (@{ $self->{rcv}{$_[0]} }) {
301     $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
302     && &{$_->[0]}
303     && undef $_;
304     }
305    
306     for (@{ $self->{any} }) {
307     $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
308     && &{$_->[0]}
309     && undef $_;
310     }
311     };
312     _self_die if $@;
313     };
314     }
315 root 1.10
316 root 1.22 $port
317 root 1.10 }
318    
319 root 1.22 =item reg $portid, $name
320 root 1.8
321 root 1.22 Registers the given port under the name C<$name>. If the name already
322     exists it is replaced.
323 root 1.8
324 root 1.22 A port can only be registered under one well known name.
325 root 1.8
326 root 1.22 A port automatically becomes unregistered when it is killed.
327 root 1.8
328     =cut
329    
330 root 1.22 sub reg(@) {
331     my ($portid, $name) = @_;
332 root 1.8
333 root 1.22 $REG{$name} = $portid;
334     }
335 root 1.18
336 root 1.22 =item rcv $portid, tagstring => $callback->(@msg), ...
337 root 1.3
338 root 1.22 =item rcv $portid, $smartmatch => $callback->(@msg), ...
339 root 1.3
340 root 1.22 =item rcv $portid, [$smartmatch...] => $callback->(@msg), ...
341 root 1.3
342 root 1.22 Register callbacks to be called on matching messages on the given port.
343 root 1.3
344     The callback has to return a true value when its work is done, after
345     which is will be removed, or a false value in which case it will stay
346     registered.
347    
348 root 1.22 The global C<$SELF> (exported by this module) contains C<$portid> while
349     executing the callback.
350    
351     Runtime errors wdurign callback execution will result in the port being
352     C<kil>ed.
353    
354 root 1.3 If the match is an array reference, then it will be matched against the
355     first elements of the message, otherwise only the first element is being
356     matched.
357    
358     Any element in the match that is specified as C<_any_> (a function
359     exported by this module) matches any single element of the message.
360    
361     While not required, it is highly recommended that the first matching
362     element is a string identifying the message. The one-string-only match is
363     also the most efficient match (by far).
364    
365     =cut
366    
367     sub rcv($@) {
368 root 1.22 my ($noderef, $port) = split /#/, shift, 2;
369 root 1.3
370 root 1.22 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
371     or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught";
372    
373     my $self = $PORT_DATA{$port}
374     or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
375    
376     "AnyEvent::MP::Port" eq ref $self
377     or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
378    
379     while (@_) {
380     my ($match, $cb) = splice @_, 0, 2;
381    
382     if (!ref $match) {
383     push @{ $self->{rc0}{$match} }, [$cb];
384     } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
385     my ($type, @match) = @$match;
386     @match
387     ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
388     : push @{ $self->{rc0}{$match->[0]} }, [$cb];
389     } else {
390     push @{ $self->{any} }, [$cb, $match];
391     }
392 root 1.3 }
393 root 1.2 }
394    
395 root 1.22 =item $closure = psub { BLOCK }
396 root 1.2
397 root 1.22 Remembers C<$SELF> and creates a closure out of the BLOCK. When the
398     closure is executed, sets up the environment in the same way as in C<rcv>
399     callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
400    
401     This is useful when you register callbacks from C<rcv> callbacks:
402    
403     rcv delayed_reply => sub {
404     my ($delay, @reply) = @_;
405     my $timer = AE::timer $delay, 0, psub {
406     snd @reply, $SELF;
407     };
408     };
409 root 1.3
410 root 1.8 =cut
411 root 1.3
412 root 1.22 sub psub(&) {
413     my $cb = shift;
414 root 1.3
415 root 1.22 my $port = $SELF
416     or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
417 root 1.1
418 root 1.22 sub {
419     local $SELF = $port;
420 root 1.2
421 root 1.22 if (wantarray) {
422     my @res = eval { &$cb };
423     _self_die if $@;
424     @res
425     } else {
426     my $res = eval { &$cb };
427     _self_die if $@;
428     $res
429     }
430     }
431 root 1.2 }
432    
433 root 1.8 =back
434    
435     =head1 FUNCTIONS FOR NODES
436    
437     =over 4
438 root 1.2
439 root 1.8 =item become_public endpoint...
440    
441     Tells the node to become a public node, i.e. reachable from other nodes.
442    
443     If no arguments are given, or the first argument is C<undef>, then
444     AnyEvent::MP tries to bind on port C<4040> on all IP addresses that the
445     local nodename resolves to.
446    
447     Otherwise the first argument must be an array-reference with transport
448     endpoints ("ip:port", "hostname:port") or port numbers (in which case the
449     local nodename is used as hostname). The endpoints are all resolved and
450     will become the node reference.
451 root 1.2
452 root 1.8 =cut
453 root 1.1
454 root 1.4 =back
455    
456     =head1 NODE MESSAGES
457    
458 root 1.5 Nodes understand the following messages sent to them. Many of them take
459     arguments called C<@reply>, which will simply be used to compose a reply
460     message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and
461     the remaining arguments are simply the message data.
462 root 1.4
463     =over 4
464    
465     =cut
466    
467 root 1.22 =item lookup => $name, @reply
468 root 1.3
469 root 1.8 Replies with the port ID of the specified well-known port, or C<undef>.
470 root 1.3
471 root 1.7 =item devnull => ...
472    
473     Generic data sink/CPU heat conversion.
474    
475 root 1.4 =item relay => $port, @msg
476    
477     Simply forwards the message to the given port.
478    
479     =item eval => $string[ @reply]
480    
481     Evaluates the given string. If C<@reply> is given, then a message of the
482 root 1.5 form C<@reply, $@, @evalres> is sent.
483    
484     Example: crash another node.
485    
486     snd $othernode, eval => "exit";
487 root 1.4
488     =item time => @reply
489    
490     Replies the the current node time to C<@reply>.
491    
492 root 1.5 Example: tell the current node to send the current time to C<$myport> in a
493     C<timereply> message.
494    
495     snd $NODE, time => $myport, timereply => 1, 2;
496     # => snd $myport, timereply => 1, 2, <time>
497    
498 root 1.2 =back
499    
500 root 1.1 =head1 SEE ALSO
501    
502     L<AnyEvent>.
503    
504     =head1 AUTHOR
505    
506     Marc Lehmann <schmorp@schmorp.de>
507     http://home.schmorp.de/
508    
509     =cut
510    
511     1
512