ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.1 by root, Thu Jul 30 08:38:50 2009 UTC vs.
Revision 1.2 by root, Fri Jul 31 20:55:46 2009 UTC

4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::MP; 7 use AnyEvent::MP;
8 8
9 NODE # returns this node identifier
10 $NODE # contains this node identifier
11
12 snd $port, type => data...;
13
14 rcv $port, smartmatch => $cb->($port, @msg);
15
16 # examples:
17 rcv $port2, ping => sub { snd $_[0], "pong"; 0 };
18 rcv $port1, pong => sub { warn "pong received\n" };
19 snd $port2, ping => $port1;
20
21 # more, smarter, matches (_any_ is exported by this module)
22 rcv $port, [child_died => $pid] => sub { ...
23 rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
24
9=head1 DESCRIPTION 25=head1 DESCRIPTION
10 26
27This module (-family) implements a simple message passing framework.
28
29Despite its simplicity, you can securely message other processes running
30on the same or other hosts.
31
32=head1 CONCEPTS
33
34=over 4
35
36=item port
37
38A port is something you can send messages to with the C<snd> function, and
39you can register C<rcv> handlers with. All C<rcv> handlers will receive
40messages they match, messages will not be queued.
41
42=item port id - C<pid@host#portname>
43
44A port id is always the node id, a hash-mark (C<#>) as separator, followed
45by a port name.
46
47A port name can be a well known port (basically an identifier/bareword),
48or a generated name, consisting of node id, a dot (C<.>), and an
49identifier.
50
51=item node
52
53A node is a single process containing at least one port - the node
54port. You can send messages to node ports to let them create new ports,
55among other things.
56
57Initially, nodes are either private (single-process only) or hidden
58(connected to a father node only). Only when they epxlicitly "go public"
59can you send them messages form unrelated other nodes.
60
61Public nodes automatically connect to all other public nodes in a network
62when they connect, creating a full mesh.
63
64=item node id - C<host:port>, C<id@host>, C<id>
65
66A node ID is a string that either uniquely identifies a given node (For
67private and hidden nodes), or contains a recipe on how to reach a given
68node (for public nodes).
69
70=back
71
72=head1 FUNCTIONS
73
74=over 4
75
11=cut 76=cut
12 77
13package AnyEvent::MP; 78package AnyEvent::MP;
14 79
80use AnyEvent::MP::Util ();
81use AnyEvent::MP::Node;
82use AnyEvent::MP::Transport;
83
84use utf8;
15use common::sense; 85use common::sense;
16 86
87use Carp ();
88
17use AE (); 89use AE ();
18 90
91use base "Exporter";
92
19our $VERSION = '0.0'; 93our $VERSION = '0.0';
20 94our @EXPORT = qw(NODE $NODE $PORT snd rcv _any_);
21sub nonce($) {
22 my $nonce;
23
24 if (open my $fh, "</dev/urandom") {
25 sysread $fh, $nonce, $_[0];
26 } else {
27 # shit...
28 our $nonce_init;
29 unless ($nonce_init++) {
30 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
31 }
32
33 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
34 }
35
36 $nonce
37}
38 95
39our $DEFAULT_SECRET; 96our $DEFAULT_SECRET;
97our $DEFAULT_PORT = "4040";
98
99our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
100our $CONNECT_TIMEOUT = 30; # includes handshake
40 101
41sub default_secret { 102sub default_secret {
42 unless (defined $DEFAULT_SECRET) { 103 unless (defined $DEFAULT_SECRET) {
43 if (open my $fh, "<$ENV{HOME}/.aemp-secret") { 104 if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
44 sysread $fh, $DEFAULT_SECRET, -s $fh; 105 sysread $fh, $DEFAULT_SECRET, -s $fh;
45 } else { 106 } else {
46 $DEFAULT_SECRET = nonce 32; 107 $DEFAULT_SECRET = AnyEvent::MP::Util::nonce 32;
47 } 108 }
48 } 109 }
49 110
50 $DEFAULT_SECRET 111 $DEFAULT_SECRET
51} 112}
113
114our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie
115our $PUBLIC = 0;
116our $NODE;
117our $PORT;
118
119our %NODE; # node id to transport mapping, or "undef", for local node
120our %PORT; # local ports
121our %LISTENER; # local transports
122
123sub NODE() { $NODE }
124
125{
126 use POSIX ();
127 my $nodename = (POSIX::uname)[1];
128 $NODE = "$$\@$nodename";
129}
130
131sub _ANY_() { 1 }
132sub _any_() { \&_ANY_ }
133
134sub add_node {
135 my ($noderef) = @_;
136
137 return $NODE{$noderef}
138 if exists $NODE{$noderef};
139
140 for (split /,/, $noderef) {
141 return $NODE{$noderef} = $NODE{$_}
142 if exists $NODE{$_};
143 }
144
145 # for indirect sends, use a different class
146 my $node = new AnyEvent::MP::Node::Direct $noderef;
147
148 $NODE{$_} = $node
149 for $noderef, split /,/, $noderef;
150
151 $node
152}
153
154sub snd($@) {
155 my ($noderef, $port) = split /#/, shift, 2;
156
157 add_node $noderef
158 unless exists $NODE{$noderef};
159
160 $NODE{$noderef}->send ([$port, [@_]]);
161}
162
163sub _inject {
164 my ($port, $msg) = @{+shift};
165
166 $port = $PORT{$port}
167 or return;
168
169 use Data::Dumper;
170 warn Dumper $msg;
171}
172
173sub normalise_noderef($) {
174 my ($noderef) = @_;
175
176 my $cv = AE::cv;
177 my @res;
178
179 $cv->begin (sub {
180 my %seen;
181 my @refs;
182 for (sort { $a->[0] <=> $b->[0] } @res) {
183 push @refs, $_->[1] unless $seen{$_->[1]}++
184 }
185 shift->send (join ",", @refs);
186 });
187
188 $noderef = $DEFAULT_PORT unless length $noderef;
189
190 my $idx;
191 for my $t (split /,/, $noderef) {
192 my $pri = ++$idx;
193
194 #TODO: this should be outside normalise_noderef and in become_public
195 if ($t =~ /^\d*$/) {
196 my $nodename = (POSIX::uname)[1];
197
198 $cv->begin;
199 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
200 for (@_) {
201 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
202 push @res, [
203 $pri += 1e-5,
204 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
205 ];
206 }
207 $cv->end;
208 };
209
210# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
211#
212# for (@ipv4) {
213# push @res, [
214# $pri,
215# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
216# ];
217# }
218 } else {
219 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
220 or Carp::croak "$t: unparsable transport descriptor";
221
222 $cv->begin;
223 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
224 for (@_) {
225 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
226 push @res, [
227 $pri += 1e-5,
228 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
229 ];
230 }
231 $cv->end;
232 }
233 }
234 }
235
236 $cv->end;
237
238 $cv
239}
240
241sub become_public {
242 return if $PUBLIC;
243
244 my $noderef = join ",", ref $_[0] ? @{+shift} : shift;
245 my @args = @_;
246
247 $NODE = (normalise_noderef $noderef)->recv;
248
249 my $self = new AnyEvent::MP::Node::Self noderef => $NODE;
250
251 $NODE{""} = $self; # empty string == local node
252
253 for my $t (split /,/, $NODE) {
254 $NODE{$t} = $self;
255
256 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
257
258 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
259 @args,
260 on_error => sub {
261 die "on_error<@_>\n";#d#
262 },
263 on_connect => sub {
264 my ($tp) = @_;
265
266 $NODE{$tp->{remote_id}} = $_[0];
267 },
268 sub {
269 my ($tp) = @_;
270
271 $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp;
272 },
273 ;
274 }
275
276 $PUBLIC = 1;
277}
278
279=back
52 280
53=head1 SEE ALSO 281=head1 SEE ALSO
54 282
55L<AnyEvent>. 283L<AnyEvent>.
56 284

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines