ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.2
Committed: Fri Jul 31 20:55:46 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.1: +246 -18 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP - multi-processing/message-passing framework
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.2 NODE # returns this node identifier
10     $NODE # contains this node identifier
11    
12     snd $port, type => data...;
13    
14     rcv $port, smartmatch => $cb->($port, @msg);
15    
16     # examples:
17     rcv $port2, ping => sub { snd $_[0], "pong"; 0 };
18     rcv $port1, pong => sub { warn "pong received\n" };
19     snd $port2, ping => $port1;
20    
21     # more, smarter, matches (_any_ is exported by this module)
22     rcv $port, [child_died => $pid] => sub { ...
23     rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
24    
25 root 1.1 =head1 DESCRIPTION
26    
27 root 1.2 This module (-family) implements a simple message passing framework.
28    
29     Despite its simplicity, you can securely message other processes running
30     on the same or other hosts.
31    
32     =head1 CONCEPTS
33    
34     =over 4
35    
36     =item port
37    
38     A port is something you can send messages to with the C<snd> function, and
39     you can register C<rcv> handlers with. All C<rcv> handlers will receive
40     messages they match, messages will not be queued.
41    
42     =item port id - C<pid@host#portname>
43    
44     A port id is always the node id, a hash-mark (C<#>) as separator, followed
45     by a port name.
46    
47     A port name can be a well known port (basically an identifier/bareword),
48     or a generated name, consisting of node id, a dot (C<.>), and an
49     identifier.
50    
51     =item node
52    
53     A node is a single process containing at least one port - the node
54     port. You can send messages to node ports to let them create new ports,
55     among other things.
56    
57     Initially, nodes are either private (single-process only) or hidden
58     (connected to a father node only). Only when they epxlicitly "go public"
59     can you send them messages form unrelated other nodes.
60    
61     Public nodes automatically connect to all other public nodes in a network
62     when they connect, creating a full mesh.
63    
64     =item node id - C<host:port>, C<id@host>, C<id>
65    
66     A node ID is a string that either uniquely identifies a given node (For
67     private and hidden nodes), or contains a recipe on how to reach a given
68     node (for public nodes).
69    
70     =back
71    
72     =head1 FUNCTIONS
73    
74     =over 4
75    
76 root 1.1 =cut
77    
78     package AnyEvent::MP;
79    
80 root 1.2 use AnyEvent::MP::Util ();
81     use AnyEvent::MP::Node;
82     use AnyEvent::MP::Transport;
83    
84     use utf8;
85 root 1.1 use common::sense;
86    
87 root 1.2 use Carp ();
88    
89 root 1.1 use AE ();
90    
91 root 1.2 use base "Exporter";
92    
93 root 1.1 our $VERSION = '0.0';
94 root 1.2 our @EXPORT = qw(NODE $NODE $PORT snd rcv _any_);
95    
96     our $DEFAULT_SECRET;
97     our $DEFAULT_PORT = "4040";
98 root 1.1
99 root 1.2 our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
100     our $CONNECT_TIMEOUT = 30; # includes handshake
101 root 1.1
102 root 1.2 sub default_secret {
103     unless (defined $DEFAULT_SECRET) {
104     if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
105     sysread $fh, $DEFAULT_SECRET, -s $fh;
106     } else {
107     $DEFAULT_SECRET = AnyEvent::MP::Util::nonce 32;
108 root 1.1 }
109 root 1.2 }
110    
111     $DEFAULT_SECRET
112     }
113    
114     our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie
115     our $PUBLIC = 0;
116     our $NODE;
117     our $PORT;
118    
119     our %NODE; # node id to transport mapping, or "undef", for local node
120     our %PORT; # local ports
121     our %LISTENER; # local transports
122    
123     sub NODE() { $NODE }
124    
125     {
126     use POSIX ();
127     my $nodename = (POSIX::uname)[1];
128     $NODE = "$$\@$nodename";
129     }
130 root 1.1
131 root 1.2 sub _ANY_() { 1 }
132     sub _any_() { \&_ANY_ }
133    
134     sub add_node {
135     my ($noderef) = @_;
136    
137     return $NODE{$noderef}
138     if exists $NODE{$noderef};
139    
140     for (split /,/, $noderef) {
141     return $NODE{$noderef} = $NODE{$_}
142     if exists $NODE{$_};
143 root 1.1 }
144    
145 root 1.2 # for indirect sends, use a different class
146     my $node = new AnyEvent::MP::Node::Direct $noderef;
147    
148     $NODE{$_} = $node
149     for $noderef, split /,/, $noderef;
150    
151     $node
152     }
153    
154     sub snd($@) {
155     my ($noderef, $port) = split /#/, shift, 2;
156    
157     add_node $noderef
158     unless exists $NODE{$noderef};
159    
160     $NODE{$noderef}->send ([$port, [@_]]);
161     }
162    
163     sub _inject {
164     my ($port, $msg) = @{+shift};
165    
166     $port = $PORT{$port}
167     or return;
168    
169     use Data::Dumper;
170     warn Dumper $msg;
171 root 1.1 }
172    
173 root 1.2 sub normalise_noderef($) {
174     my ($noderef) = @_;
175    
176     my $cv = AE::cv;
177     my @res;
178    
179     $cv->begin (sub {
180     my %seen;
181     my @refs;
182     for (sort { $a->[0] <=> $b->[0] } @res) {
183     push @refs, $_->[1] unless $seen{$_->[1]}++
184     }
185     shift->send (join ",", @refs);
186     });
187    
188     $noderef = $DEFAULT_PORT unless length $noderef;
189 root 1.1
190 root 1.2 my $idx;
191     for my $t (split /,/, $noderef) {
192     my $pri = ++$idx;
193    
194     #TODO: this should be outside normalise_noderef and in become_public
195     if ($t =~ /^\d*$/) {
196     my $nodename = (POSIX::uname)[1];
197    
198     $cv->begin;
199     AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
200     for (@_) {
201     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
202     push @res, [
203     $pri += 1e-5,
204     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
205     ];
206     }
207     $cv->end;
208     };
209    
210     # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
211     #
212     # for (@ipv4) {
213     # push @res, [
214     # $pri,
215     # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
216     # ];
217     # }
218 root 1.1 } else {
219 root 1.2 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
220     or Carp::croak "$t: unparsable transport descriptor";
221    
222     $cv->begin;
223     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
224     for (@_) {
225     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
226     push @res, [
227     $pri += 1e-5,
228     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
229     ];
230     }
231     $cv->end;
232     }
233 root 1.1 }
234     }
235    
236 root 1.2 $cv->end;
237    
238     $cv
239     }
240    
241     sub become_public {
242     return if $PUBLIC;
243    
244     my $noderef = join ",", ref $_[0] ? @{+shift} : shift;
245     my @args = @_;
246    
247     $NODE = (normalise_noderef $noderef)->recv;
248    
249     my $self = new AnyEvent::MP::Node::Self noderef => $NODE;
250    
251     $NODE{""} = $self; # empty string == local node
252    
253     for my $t (split /,/, $NODE) {
254     $NODE{$t} = $self;
255    
256     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
257    
258     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
259     @args,
260     on_error => sub {
261     die "on_error<@_>\n";#d#
262     },
263     on_connect => sub {
264     my ($tp) = @_;
265    
266     $NODE{$tp->{remote_id}} = $_[0];
267     },
268     sub {
269     my ($tp) = @_;
270    
271     $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp;
272     },
273     ;
274     }
275    
276     $PUBLIC = 1;
277 root 1.1 }
278    
279 root 1.2 =back
280    
281 root 1.1 =head1 SEE ALSO
282    
283     L<AnyEvent>.
284    
285     =head1 AUTHOR
286    
287     Marc Lehmann <schmorp@schmorp.de>
288     http://home.schmorp.de/
289    
290     =cut
291    
292     1
293