ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.6
Committed: Sat Aug 1 10:02:33 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.5: +6 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP - multi-processing/message-passing framework
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.2 NODE # returns this node identifier
10     $NODE # contains this node identifier
11    
12     snd $port, type => data...;
13    
14     rcv $port, smartmatch => $cb->($port, @msg);
15    
16     # examples:
17     rcv $port2, ping => sub { snd $_[0], "pong"; 0 };
18     rcv $port1, pong => sub { warn "pong received\n" };
19     snd $port2, ping => $port1;
20    
21     # more, smarter, matches (_any_ is exported by this module)
22     rcv $port, [child_died => $pid] => sub { ...
23     rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
24    
25 root 1.1 =head1 DESCRIPTION
26    
27 root 1.2 This module (-family) implements a simple message passing framework.
28    
29     Despite its simplicity, you can securely message other processes running
30     on the same or other hosts.
31    
32 root 1.6 At the moment, this module family is severly brokena nd underdocumented,
33     so do not use. This was uploaded mainly to resreve the CPAN namespace -
34     stay tuned!
35    
36 root 1.2 =head1 CONCEPTS
37    
38     =over 4
39    
40     =item port
41    
42     A port is something you can send messages to with the C<snd> function, and
43     you can register C<rcv> handlers with. All C<rcv> handlers will receive
44     messages they match, messages will not be queued.
45    
46 root 1.3 =item port id - C<noderef#portname>
47 root 1.2
48 root 1.3 A port id is always the noderef, a hash-mark (C<#>) as separator, followed
49     by a port name (a printable string of unspecified format).
50 root 1.2
51     =item node
52    
53     A node is a single process containing at least one port - the node
54     port. You can send messages to node ports to let them create new ports,
55     among other things.
56    
57     Initially, nodes are either private (single-process only) or hidden
58 root 1.3 (connected to a master node only). Only when they epxlicitly "become
59     public" can you send them messages from unrelated other nodes.
60 root 1.2
61 root 1.5 =item noderef - C<host:port,host:port...>, C<id@noderef>, C<id>
62 root 1.2
63 root 1.3 A noderef is a string that either uniquely identifies a given node (for
64 root 1.2 private and hidden nodes), or contains a recipe on how to reach a given
65     node (for public nodes).
66    
67     =back
68    
69 root 1.3 =head1 VARIABLES/FUNCTIONS
70 root 1.2
71     =over 4
72    
73 root 1.1 =cut
74    
75     package AnyEvent::MP;
76    
77 root 1.2 use AnyEvent::MP::Util ();
78     use AnyEvent::MP::Node;
79     use AnyEvent::MP::Transport;
80    
81     use utf8;
82 root 1.1 use common::sense;
83    
84 root 1.2 use Carp ();
85    
86 root 1.1 use AE ();
87    
88 root 1.2 use base "Exporter";
89    
90 root 1.6 our $VERSION = '0.01';
91 root 1.2 our @EXPORT = qw(NODE $NODE $PORT snd rcv _any_);
92    
93     our $DEFAULT_SECRET;
94     our $DEFAULT_PORT = "4040";
95 root 1.1
96 root 1.2 our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
97     our $CONNECT_TIMEOUT = 30; # includes handshake
98 root 1.1
99 root 1.2 sub default_secret {
100     unless (defined $DEFAULT_SECRET) {
101     if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
102     sysread $fh, $DEFAULT_SECRET, -s $fh;
103     } else {
104     $DEFAULT_SECRET = AnyEvent::MP::Util::nonce 32;
105 root 1.1 }
106 root 1.2 }
107    
108     $DEFAULT_SECRET
109     }
110    
111 root 1.3 =item NODE / $NODE
112    
113     The C<NODE ()> function and the C<$NODE> variable contain the noderef of
114     the local node. The value is initialised by a call to C<become_public> or
115     C<become_slave>, after which all local port identifiers become invalid.
116    
117     =cut
118    
119 root 1.2 our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie
120 root 1.6 our $ID = "a0";
121 root 1.2 our $PUBLIC = 0;
122     our $NODE;
123     our $PORT;
124    
125     our %NODE; # node id to transport mapping, or "undef", for local node
126     our %PORT; # local ports
127     our %LISTENER; # local transports
128    
129     sub NODE() { $NODE }
130    
131     {
132     use POSIX ();
133     my $nodename = (POSIX::uname)[1];
134     $NODE = "$$\@$nodename";
135     }
136 root 1.1
137 root 1.2 sub _ANY_() { 1 }
138     sub _any_() { \&_ANY_ }
139    
140     sub add_node {
141     my ($noderef) = @_;
142    
143     return $NODE{$noderef}
144     if exists $NODE{$noderef};
145    
146     for (split /,/, $noderef) {
147     return $NODE{$noderef} = $NODE{$_}
148     if exists $NODE{$_};
149 root 1.1 }
150    
151 root 1.2 # for indirect sends, use a different class
152     my $node = new AnyEvent::MP::Node::Direct $noderef;
153    
154     $NODE{$_} = $node
155     for $noderef, split /,/, $noderef;
156    
157     $node
158     }
159    
160 root 1.3 =item snd $portid, type => @data
161    
162     =item snd $portid, @msg
163    
164     Send the given message to the given port ID, which can identify either a
165     local or a remote port.
166    
167     While the message can be about anything, it is highly recommended to use
168     a constant string as first element.
169    
170     The message data effectively becomes read-only after a call to this
171     function: modifying any argument is not allowed and can cause many
172     problems.
173    
174     The type of data you can transfer depends on the transport protocol: when
175     JSON is used, then only strings, numbers and arrays and hashes consisting
176     of those are allowed (no objects). When Storable is used, then anything
177     that Storable can serialise and deserialise is allowed, and for the local
178     node, anything can be passed.
179    
180     =cut
181    
182 root 1.4 sub snd(@) {
183 root 1.2 my ($noderef, $port) = split /#/, shift, 2;
184    
185     add_node $noderef
186     unless exists $NODE{$noderef};
187    
188 root 1.3 $NODE{$noderef}->send (["$port", [@_]]);
189     }
190    
191     =item rcv $portid, type => $callback->(@msg)
192    
193     =item rcv $portid, $smartmatch => $callback->(@msg)
194    
195     =item rcv $portid, [$smartmatch...] => $callback->(@msg)
196    
197     Register a callback on the port identified by C<$portid>, which I<must> be
198     a local port.
199    
200     The callback has to return a true value when its work is done, after
201     which is will be removed, or a false value in which case it will stay
202     registered.
203    
204     If the match is an array reference, then it will be matched against the
205     first elements of the message, otherwise only the first element is being
206     matched.
207    
208     Any element in the match that is specified as C<_any_> (a function
209     exported by this module) matches any single element of the message.
210    
211     While not required, it is highly recommended that the first matching
212     element is a string identifying the message. The one-string-only match is
213     also the most efficient match (by far).
214    
215     =cut
216    
217     sub rcv($@) {
218     my ($port, $match, $cb) = @_;
219    
220     my $port = $PORT{$port}
221     or do {
222     my ($noderef, $lport) = split /#/, $port;
223     "AnyEvent::MP::Node::Self" eq ref $NODE{$noderef}
224     or Carp::croak "$port: can only rcv on local ports";
225    
226     $PORT{$lport}
227     or Carp::croak "$port: port does not exist";
228    
229     $PORT{$port} = $PORT{$lport} # also return
230     };
231    
232     if (!ref $match) {
233     push @{ $port->{rc0}{$match} }, [$cb];
234     } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
235     my ($type, @match) = @$match;
236     @match
237     ? push @{ $port->{rcv}{$match->[0]} }, [$cb, \@match]
238     : push @{ $port->{rc0}{$match->[0]} }, [$cb];
239     } else {
240     push @{ $port->{any} }, [$cb, $match];
241     }
242 root 1.2 }
243    
244     sub _inject {
245     my ($port, $msg) = @{+shift};
246    
247     $port = $PORT{$port}
248     or return;
249    
250 root 1.3 @_ = @$msg;
251    
252     for (@{ $port->{rc0}{$msg->[0]} }) {
253     $_ && &{$_->[0]}
254     && undef $_;
255     }
256    
257     for (@{ $port->{rcv}{$msg->[0]} }) {
258     $_ && [@_[1..$#{$_->[1]}]] ~~ $_->[1]
259     && &{$_->[0]}
260     && undef $_;
261     }
262    
263     for (@{ $port->{any} }) {
264     $_ && [@_[0..$#{$_->[1]}]] ~~ $_->[1]
265     && &{$_->[0]}
266     && undef $_;
267     }
268 root 1.1 }
269    
270 root 1.2 sub normalise_noderef($) {
271     my ($noderef) = @_;
272    
273     my $cv = AE::cv;
274     my @res;
275    
276     $cv->begin (sub {
277     my %seen;
278     my @refs;
279     for (sort { $a->[0] <=> $b->[0] } @res) {
280     push @refs, $_->[1] unless $seen{$_->[1]}++
281     }
282     shift->send (join ",", @refs);
283     });
284    
285     $noderef = $DEFAULT_PORT unless length $noderef;
286 root 1.1
287 root 1.2 my $idx;
288     for my $t (split /,/, $noderef) {
289     my $pri = ++$idx;
290    
291     #TODO: this should be outside normalise_noderef and in become_public
292     if ($t =~ /^\d*$/) {
293     my $nodename = (POSIX::uname)[1];
294    
295     $cv->begin;
296     AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
297     for (@_) {
298     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
299     push @res, [
300     $pri += 1e-5,
301     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
302     ];
303     }
304     $cv->end;
305     };
306    
307     # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
308     #
309     # for (@ipv4) {
310     # push @res, [
311     # $pri,
312     # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
313     # ];
314     # }
315 root 1.1 } else {
316 root 1.2 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
317     or Carp::croak "$t: unparsable transport descriptor";
318    
319     $cv->begin;
320     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
321     for (@_) {
322     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
323     push @res, [
324     $pri += 1e-5,
325     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
326     ];
327     }
328     $cv->end;
329     }
330 root 1.1 }
331     }
332    
333 root 1.2 $cv->end;
334    
335     $cv
336     }
337    
338     sub become_public {
339     return if $PUBLIC;
340    
341     my $noderef = join ",", ref $_[0] ? @{+shift} : shift;
342     my @args = @_;
343    
344     $NODE = (normalise_noderef $noderef)->recv;
345    
346     for my $t (split /,/, $NODE) {
347 root 1.3 $NODE{$t} = $NODE{""};
348 root 1.2
349     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
350    
351     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
352     @args,
353     on_error => sub {
354     die "on_error<@_>\n";#d#
355     },
356     on_connect => sub {
357     my ($tp) = @_;
358    
359     $NODE{$tp->{remote_id}} = $_[0];
360     },
361     sub {
362     my ($tp) = @_;
363    
364     $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp;
365     },
366     ;
367     }
368    
369     $PUBLIC = 1;
370 root 1.1 }
371    
372 root 1.4 =back
373    
374     =head1 NODE MESSAGES
375    
376 root 1.5 Nodes understand the following messages sent to them. Many of them take
377     arguments called C<@reply>, which will simply be used to compose a reply
378     message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and
379     the remaining arguments are simply the message data.
380 root 1.4
381     =over 4
382    
383     =cut
384    
385 root 1.3 #############################################################################
386     # self node code
387    
388     sub _new_port($) {
389     my ($name) = @_;
390    
391     my ($noderef, $portname) = split /#/, $name;
392    
393     $PORT{$name} =
394     $PORT{$portname} = {
395     names => [$name, $portname],
396     };
397     }
398    
399     $NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
400     _new_port "";
401    
402 root 1.4 =item relay => $port, @msg
403    
404     Simply forwards the message to the given port.
405    
406     =cut
407    
408 root 1.3 rcv "", relay => \&snd;
409    
410 root 1.4 =item eval => $string[ @reply]
411    
412     Evaluates the given string. If C<@reply> is given, then a message of the
413 root 1.5 form C<@reply, $@, @evalres> is sent.
414    
415     Example: crash another node.
416    
417     snd $othernode, eval => "exit";
418 root 1.4
419     =cut
420    
421     rcv "", eval => sub {
422     my (undef, $string, @reply) = @_;
423     my @res = eval $string;
424     snd @reply, "$@", @res if @reply;
425     };
426    
427     =item time => @reply
428    
429     Replies the the current node time to C<@reply>.
430    
431 root 1.5 Example: tell the current node to send the current time to C<$myport> in a
432     C<timereply> message.
433    
434     snd $NODE, time => $myport, timereply => 1, 2;
435     # => snd $myport, timereply => 1, 2, <time>
436    
437 root 1.4 =cut
438    
439     rcv "", time => sub { shift; snd @_, AE::time };
440    
441 root 1.2 =back
442    
443 root 1.1 =head1 SEE ALSO
444    
445     L<AnyEvent>.
446    
447     =head1 AUTHOR
448    
449     Marc Lehmann <schmorp@schmorp.de>
450     http://home.schmorp.de/
451    
452     =cut
453    
454     1
455