ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.5
Committed: Sat Aug 1 07:44:02 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.4: +16 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP - multi-processing/message-passing framework
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.2 NODE # returns this node identifier
10     $NODE # contains this node identifier
11    
12     snd $port, type => data...;
13    
14     rcv $port, smartmatch => $cb->($port, @msg);
15    
16     # examples:
17     rcv $port2, ping => sub { snd $_[0], "pong"; 0 };
18     rcv $port1, pong => sub { warn "pong received\n" };
19     snd $port2, ping => $port1;
20    
21     # more, smarter, matches (_any_ is exported by this module)
22     rcv $port, [child_died => $pid] => sub { ...
23     rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
24    
25 root 1.1 =head1 DESCRIPTION
26    
27 root 1.2 This module (-family) implements a simple message passing framework.
28    
29     Despite its simplicity, you can securely message other processes running
30     on the same or other hosts.
31    
32     =head1 CONCEPTS
33    
34     =over 4
35    
36     =item port
37    
38     A port is something you can send messages to with the C<snd> function, and
39     you can register C<rcv> handlers with. All C<rcv> handlers will receive
40     messages they match, messages will not be queued.
41    
42 root 1.3 =item port id - C<noderef#portname>
43 root 1.2
44 root 1.3 A port id is always the noderef, a hash-mark (C<#>) as separator, followed
45     by a port name (a printable string of unspecified format).
46 root 1.2
47     =item node
48    
49     A node is a single process containing at least one port - the node
50     port. You can send messages to node ports to let them create new ports,
51     among other things.
52    
53     Initially, nodes are either private (single-process only) or hidden
54 root 1.3 (connected to a master node only). Only when they epxlicitly "become
55     public" can you send them messages from unrelated other nodes.
56 root 1.2
57 root 1.5 =item noderef - C<host:port,host:port...>, C<id@noderef>, C<id>
58 root 1.2
59 root 1.3 A noderef is a string that either uniquely identifies a given node (for
60 root 1.2 private and hidden nodes), or contains a recipe on how to reach a given
61     node (for public nodes).
62    
63     =back
64    
65 root 1.3 =head1 VARIABLES/FUNCTIONS
66 root 1.2
67     =over 4
68    
69 root 1.1 =cut
70    
71     package AnyEvent::MP;
72    
73 root 1.2 use AnyEvent::MP::Util ();
74     use AnyEvent::MP::Node;
75     use AnyEvent::MP::Transport;
76    
77     use utf8;
78 root 1.1 use common::sense;
79    
80 root 1.2 use Carp ();
81    
82 root 1.1 use AE ();
83    
84 root 1.2 use base "Exporter";
85    
86 root 1.1 our $VERSION = '0.0';
87 root 1.2 our @EXPORT = qw(NODE $NODE $PORT snd rcv _any_);
88    
89     our $DEFAULT_SECRET;
90     our $DEFAULT_PORT = "4040";
91 root 1.1
92 root 1.2 our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
93     our $CONNECT_TIMEOUT = 30; # includes handshake
94 root 1.1
95 root 1.2 sub default_secret {
96     unless (defined $DEFAULT_SECRET) {
97     if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
98     sysread $fh, $DEFAULT_SECRET, -s $fh;
99     } else {
100     $DEFAULT_SECRET = AnyEvent::MP::Util::nonce 32;
101 root 1.1 }
102 root 1.2 }
103    
104     $DEFAULT_SECRET
105     }
106    
107 root 1.3 =item NODE / $NODE
108    
109     The C<NODE ()> function and the C<$NODE> variable contain the noderef of
110     the local node. The value is initialised by a call to C<become_public> or
111     C<become_slave>, after which all local port identifiers become invalid.
112    
113     =cut
114    
115 root 1.2 our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie
116     our $PUBLIC = 0;
117     our $NODE;
118     our $PORT;
119    
120     our %NODE; # node id to transport mapping, or "undef", for local node
121     our %PORT; # local ports
122     our %LISTENER; # local transports
123    
124     sub NODE() { $NODE }
125    
126     {
127     use POSIX ();
128     my $nodename = (POSIX::uname)[1];
129     $NODE = "$$\@$nodename";
130     }
131 root 1.1
132 root 1.2 sub _ANY_() { 1 }
133     sub _any_() { \&_ANY_ }
134    
135     sub add_node {
136     my ($noderef) = @_;
137    
138     return $NODE{$noderef}
139     if exists $NODE{$noderef};
140    
141     for (split /,/, $noderef) {
142     return $NODE{$noderef} = $NODE{$_}
143     if exists $NODE{$_};
144 root 1.1 }
145    
146 root 1.2 # for indirect sends, use a different class
147     my $node = new AnyEvent::MP::Node::Direct $noderef;
148    
149     $NODE{$_} = $node
150     for $noderef, split /,/, $noderef;
151    
152     $node
153     }
154    
155 root 1.3 =item snd $portid, type => @data
156    
157     =item snd $portid, @msg
158    
159     Send the given message to the given port ID, which can identify either a
160     local or a remote port.
161    
162     While the message can be about anything, it is highly recommended to use
163     a constant string as first element.
164    
165     The message data effectively becomes read-only after a call to this
166     function: modifying any argument is not allowed and can cause many
167     problems.
168    
169     The type of data you can transfer depends on the transport protocol: when
170     JSON is used, then only strings, numbers and arrays and hashes consisting
171     of those are allowed (no objects). When Storable is used, then anything
172     that Storable can serialise and deserialise is allowed, and for the local
173     node, anything can be passed.
174    
175     =cut
176    
177 root 1.4 sub snd(@) {
178 root 1.2 my ($noderef, $port) = split /#/, shift, 2;
179    
180     add_node $noderef
181     unless exists $NODE{$noderef};
182    
183 root 1.3 $NODE{$noderef}->send (["$port", [@_]]);
184     }
185    
186     =item rcv $portid, type => $callback->(@msg)
187    
188     =item rcv $portid, $smartmatch => $callback->(@msg)
189    
190     =item rcv $portid, [$smartmatch...] => $callback->(@msg)
191    
192     Register a callback on the port identified by C<$portid>, which I<must> be
193     a local port.
194    
195     The callback has to return a true value when its work is done, after
196     which is will be removed, or a false value in which case it will stay
197     registered.
198    
199     If the match is an array reference, then it will be matched against the
200     first elements of the message, otherwise only the first element is being
201     matched.
202    
203     Any element in the match that is specified as C<_any_> (a function
204     exported by this module) matches any single element of the message.
205    
206     While not required, it is highly recommended that the first matching
207     element is a string identifying the message. The one-string-only match is
208     also the most efficient match (by far).
209    
210     =cut
211    
212     sub rcv($@) {
213     my ($port, $match, $cb) = @_;
214    
215     my $port = $PORT{$port}
216     or do {
217     my ($noderef, $lport) = split /#/, $port;
218     "AnyEvent::MP::Node::Self" eq ref $NODE{$noderef}
219     or Carp::croak "$port: can only rcv on local ports";
220    
221     $PORT{$lport}
222     or Carp::croak "$port: port does not exist";
223    
224     $PORT{$port} = $PORT{$lport} # also return
225     };
226    
227     if (!ref $match) {
228     push @{ $port->{rc0}{$match} }, [$cb];
229     } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
230     my ($type, @match) = @$match;
231     @match
232     ? push @{ $port->{rcv}{$match->[0]} }, [$cb, \@match]
233     : push @{ $port->{rc0}{$match->[0]} }, [$cb];
234     } else {
235     push @{ $port->{any} }, [$cb, $match];
236     }
237 root 1.2 }
238    
239     sub _inject {
240     my ($port, $msg) = @{+shift};
241    
242     $port = $PORT{$port}
243     or return;
244    
245 root 1.3 @_ = @$msg;
246    
247     for (@{ $port->{rc0}{$msg->[0]} }) {
248     $_ && &{$_->[0]}
249     && undef $_;
250     }
251    
252     for (@{ $port->{rcv}{$msg->[0]} }) {
253     $_ && [@_[1..$#{$_->[1]}]] ~~ $_->[1]
254     && &{$_->[0]}
255     && undef $_;
256     }
257    
258     for (@{ $port->{any} }) {
259     $_ && [@_[0..$#{$_->[1]}]] ~~ $_->[1]
260     && &{$_->[0]}
261     && undef $_;
262     }
263 root 1.1 }
264    
265 root 1.2 sub normalise_noderef($) {
266     my ($noderef) = @_;
267    
268     my $cv = AE::cv;
269     my @res;
270    
271     $cv->begin (sub {
272     my %seen;
273     my @refs;
274     for (sort { $a->[0] <=> $b->[0] } @res) {
275     push @refs, $_->[1] unless $seen{$_->[1]}++
276     }
277     shift->send (join ",", @refs);
278     });
279    
280     $noderef = $DEFAULT_PORT unless length $noderef;
281 root 1.1
282 root 1.2 my $idx;
283     for my $t (split /,/, $noderef) {
284     my $pri = ++$idx;
285    
286     #TODO: this should be outside normalise_noderef and in become_public
287     if ($t =~ /^\d*$/) {
288     my $nodename = (POSIX::uname)[1];
289    
290     $cv->begin;
291     AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
292     for (@_) {
293     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
294     push @res, [
295     $pri += 1e-5,
296     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
297     ];
298     }
299     $cv->end;
300     };
301    
302     # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
303     #
304     # for (@ipv4) {
305     # push @res, [
306     # $pri,
307     # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
308     # ];
309     # }
310 root 1.1 } else {
311 root 1.2 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
312     or Carp::croak "$t: unparsable transport descriptor";
313    
314     $cv->begin;
315     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
316     for (@_) {
317     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
318     push @res, [
319     $pri += 1e-5,
320     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
321     ];
322     }
323     $cv->end;
324     }
325 root 1.1 }
326     }
327    
328 root 1.2 $cv->end;
329    
330     $cv
331     }
332    
333     sub become_public {
334     return if $PUBLIC;
335    
336     my $noderef = join ",", ref $_[0] ? @{+shift} : shift;
337     my @args = @_;
338    
339     $NODE = (normalise_noderef $noderef)->recv;
340    
341     for my $t (split /,/, $NODE) {
342 root 1.3 $NODE{$t} = $NODE{""};
343 root 1.2
344     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
345    
346     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
347     @args,
348     on_error => sub {
349     die "on_error<@_>\n";#d#
350     },
351     on_connect => sub {
352     my ($tp) = @_;
353    
354     $NODE{$tp->{remote_id}} = $_[0];
355     },
356     sub {
357     my ($tp) = @_;
358    
359     $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp;
360     },
361     ;
362     }
363    
364     $PUBLIC = 1;
365 root 1.1 }
366    
367 root 1.4 =back
368    
369     =head1 NODE MESSAGES
370    
371 root 1.5 Nodes understand the following messages sent to them. Many of them take
372     arguments called C<@reply>, which will simply be used to compose a reply
373     message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and
374     the remaining arguments are simply the message data.
375 root 1.4
376     =over 4
377    
378     =cut
379    
380 root 1.3 #############################################################################
381     # self node code
382    
383     sub _new_port($) {
384     my ($name) = @_;
385    
386     my ($noderef, $portname) = split /#/, $name;
387    
388     $PORT{$name} =
389     $PORT{$portname} = {
390     names => [$name, $portname],
391     };
392     }
393    
394     $NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
395     _new_port "";
396    
397 root 1.4 =item relay => $port, @msg
398    
399     Simply forwards the message to the given port.
400    
401     =cut
402    
403 root 1.3 rcv "", relay => \&snd;
404    
405 root 1.4 =item eval => $string[ @reply]
406    
407     Evaluates the given string. If C<@reply> is given, then a message of the
408 root 1.5 form C<@reply, $@, @evalres> is sent.
409    
410     Example: crash another node.
411    
412     snd $othernode, eval => "exit";
413 root 1.4
414     =cut
415    
416     rcv "", eval => sub {
417     my (undef, $string, @reply) = @_;
418     my @res = eval $string;
419     snd @reply, "$@", @res if @reply;
420     };
421    
422     =item time => @reply
423    
424     Replies the the current node time to C<@reply>.
425    
426 root 1.5 Example: tell the current node to send the current time to C<$myport> in a
427     C<timereply> message.
428    
429     snd $NODE, time => $myport, timereply => 1, 2;
430     # => snd $myport, timereply => 1, 2, <time>
431    
432 root 1.4 =cut
433    
434     rcv "", time => sub { shift; snd @_, AE::time };
435    
436 root 1.2 =back
437    
438 root 1.1 =head1 SEE ALSO
439    
440     L<AnyEvent>.
441    
442     =head1 AUTHOR
443    
444     Marc Lehmann <schmorp@schmorp.de>
445     http://home.schmorp.de/
446    
447     =cut
448    
449     1
450