ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.2
Committed: Fri Jul 31 20:55:46 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.1: +246 -18 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP - multi-processing/message-passing framework
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP;
8
9 NODE # returns this node identifier
10 $NODE # contains this node identifier
11
12 snd $port, type => data...;
13
14 rcv $port, smartmatch => $cb->($port, @msg);
15
16 # examples:
17 rcv $port2, ping => sub { snd $_[0], "pong"; 0 };
18 rcv $port1, pong => sub { warn "pong received\n" };
19 snd $port2, ping => $port1;
20
21 # more, smarter, matches (_any_ is exported by this module)
22 rcv $port, [child_died => $pid] => sub { ...
23 rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
24
25 =head1 DESCRIPTION
26
27 This module (-family) implements a simple message passing framework.
28
29 Despite its simplicity, you can securely message other processes running
30 on the same or other hosts.
31
32 =head1 CONCEPTS
33
34 =over 4
35
36 =item port
37
38 A port is something you can send messages to with the C<snd> function, and
39 you can register C<rcv> handlers with. All C<rcv> handlers will receive
40 messages they match, messages will not be queued.
41
42 =item port id - C<pid@host#portname>
43
44 A port id is always the node id, a hash-mark (C<#>) as separator, followed
45 by a port name.
46
47 A port name can be a well known port (basically an identifier/bareword),
48 or a generated name, consisting of node id, a dot (C<.>), and an
49 identifier.
50
51 =item node
52
53 A node is a single process containing at least one port - the node
54 port. You can send messages to node ports to let them create new ports,
55 among other things.
56
57 Initially, nodes are either private (single-process only) or hidden
58 (connected to a father node only). Only when they epxlicitly "go public"
59 can you send them messages form unrelated other nodes.
60
61 Public nodes automatically connect to all other public nodes in a network
62 when they connect, creating a full mesh.
63
64 =item node id - C<host:port>, C<id@host>, C<id>
65
66 A node ID is a string that either uniquely identifies a given node (For
67 private and hidden nodes), or contains a recipe on how to reach a given
68 node (for public nodes).
69
70 =back
71
72 =head1 FUNCTIONS
73
74 =over 4
75
76 =cut
77
78 package AnyEvent::MP;
79
80 use AnyEvent::MP::Util ();
81 use AnyEvent::MP::Node;
82 use AnyEvent::MP::Transport;
83
84 use utf8;
85 use common::sense;
86
87 use Carp ();
88
89 use AE ();
90
91 use base "Exporter";
92
93 our $VERSION = '0.0';
94 our @EXPORT = qw(NODE $NODE $PORT snd rcv _any_);
95
96 our $DEFAULT_SECRET;
97 our $DEFAULT_PORT = "4040";
98
99 our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
100 our $CONNECT_TIMEOUT = 30; # includes handshake
101
102 sub default_secret {
103 unless (defined $DEFAULT_SECRET) {
104 if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
105 sysread $fh, $DEFAULT_SECRET, -s $fh;
106 } else {
107 $DEFAULT_SECRET = AnyEvent::MP::Util::nonce 32;
108 }
109 }
110
111 $DEFAULT_SECRET
112 }
113
114 our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie
115 our $PUBLIC = 0;
116 our $NODE;
117 our $PORT;
118
119 our %NODE; # node id to transport mapping, or "undef", for local node
120 our %PORT; # local ports
121 our %LISTENER; # local transports
122
123 sub NODE() { $NODE }
124
125 {
126 use POSIX ();
127 my $nodename = (POSIX::uname)[1];
128 $NODE = "$$\@$nodename";
129 }
130
131 sub _ANY_() { 1 }
132 sub _any_() { \&_ANY_ }
133
134 sub add_node {
135 my ($noderef) = @_;
136
137 return $NODE{$noderef}
138 if exists $NODE{$noderef};
139
140 for (split /,/, $noderef) {
141 return $NODE{$noderef} = $NODE{$_}
142 if exists $NODE{$_};
143 }
144
145 # for indirect sends, use a different class
146 my $node = new AnyEvent::MP::Node::Direct $noderef;
147
148 $NODE{$_} = $node
149 for $noderef, split /,/, $noderef;
150
151 $node
152 }
153
154 sub snd($@) {
155 my ($noderef, $port) = split /#/, shift, 2;
156
157 add_node $noderef
158 unless exists $NODE{$noderef};
159
160 $NODE{$noderef}->send ([$port, [@_]]);
161 }
162
163 sub _inject {
164 my ($port, $msg) = @{+shift};
165
166 $port = $PORT{$port}
167 or return;
168
169 use Data::Dumper;
170 warn Dumper $msg;
171 }
172
173 sub normalise_noderef($) {
174 my ($noderef) = @_;
175
176 my $cv = AE::cv;
177 my @res;
178
179 $cv->begin (sub {
180 my %seen;
181 my @refs;
182 for (sort { $a->[0] <=> $b->[0] } @res) {
183 push @refs, $_->[1] unless $seen{$_->[1]}++
184 }
185 shift->send (join ",", @refs);
186 });
187
188 $noderef = $DEFAULT_PORT unless length $noderef;
189
190 my $idx;
191 for my $t (split /,/, $noderef) {
192 my $pri = ++$idx;
193
194 #TODO: this should be outside normalise_noderef and in become_public
195 if ($t =~ /^\d*$/) {
196 my $nodename = (POSIX::uname)[1];
197
198 $cv->begin;
199 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
200 for (@_) {
201 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
202 push @res, [
203 $pri += 1e-5,
204 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
205 ];
206 }
207 $cv->end;
208 };
209
210 # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
211 #
212 # for (@ipv4) {
213 # push @res, [
214 # $pri,
215 # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
216 # ];
217 # }
218 } else {
219 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
220 or Carp::croak "$t: unparsable transport descriptor";
221
222 $cv->begin;
223 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
224 for (@_) {
225 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
226 push @res, [
227 $pri += 1e-5,
228 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
229 ];
230 }
231 $cv->end;
232 }
233 }
234 }
235
236 $cv->end;
237
238 $cv
239 }
240
241 sub become_public {
242 return if $PUBLIC;
243
244 my $noderef = join ",", ref $_[0] ? @{+shift} : shift;
245 my @args = @_;
246
247 $NODE = (normalise_noderef $noderef)->recv;
248
249 my $self = new AnyEvent::MP::Node::Self noderef => $NODE;
250
251 $NODE{""} = $self; # empty string == local node
252
253 for my $t (split /,/, $NODE) {
254 $NODE{$t} = $self;
255
256 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
257
258 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
259 @args,
260 on_error => sub {
261 die "on_error<@_>\n";#d#
262 },
263 on_connect => sub {
264 my ($tp) = @_;
265
266 $NODE{$tp->{remote_id}} = $_[0];
267 },
268 sub {
269 my ($tp) = @_;
270
271 $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp;
272 },
273 ;
274 }
275
276 $PUBLIC = 1;
277 }
278
279 =back
280
281 =head1 SEE ALSO
282
283 L<AnyEvent>.
284
285 =head1 AUTHOR
286
287 Marc Lehmann <schmorp@schmorp.de>
288 http://home.schmorp.de/
289
290 =cut
291
292 1
293