ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.3
Committed: Sat Aug 1 07:11:45 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.2: +130 -23 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP - multi-processing/message-passing framework
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP;
8
9 NODE # returns this node identifier
10 $NODE # contains this node identifier
11
12 snd $port, type => data...;
13
14 rcv $port, smartmatch => $cb->($port, @msg);
15
16 # examples:
17 rcv $port2, ping => sub { snd $_[0], "pong"; 0 };
18 rcv $port1, pong => sub { warn "pong received\n" };
19 snd $port2, ping => $port1;
20
21 # more, smarter, matches (_any_ is exported by this module)
22 rcv $port, [child_died => $pid] => sub { ...
23 rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
24
25 =head1 DESCRIPTION
26
27 This module (-family) implements a simple message passing framework.
28
29 Despite its simplicity, you can securely message other processes running
30 on the same or other hosts.
31
32 =head1 CONCEPTS
33
34 =over 4
35
36 =item port
37
38 A port is something you can send messages to with the C<snd> function, and
39 you can register C<rcv> handlers with. All C<rcv> handlers will receive
40 messages they match, messages will not be queued.
41
42 =item port id - C<noderef#portname>
43
44 A port id is always the noderef, a hash-mark (C<#>) as separator, followed
45 by a port name (a printable string of unspecified format).
46
47 =item node
48
49 A node is a single process containing at least one port - the node
50 port. You can send messages to node ports to let them create new ports,
51 among other things.
52
53 Initially, nodes are either private (single-process only) or hidden
54 (connected to a master node only). Only when they epxlicitly "become
55 public" can you send them messages from unrelated other nodes.
56
57 =item noderef - C<host:port,host:port...>, C<id@noderef, C<id>
58
59 A noderef is a string that either uniquely identifies a given node (for
60 private and hidden nodes), or contains a recipe on how to reach a given
61 node (for public nodes).
62
63 =back
64
65 =head1 VARIABLES/FUNCTIONS
66
67 =over 4
68
69 =cut
70
71 package AnyEvent::MP;
72
73 use AnyEvent::MP::Util ();
74 use AnyEvent::MP::Node;
75 use AnyEvent::MP::Transport;
76
77 use utf8;
78 use common::sense;
79
80 use Carp ();
81
82 use AE ();
83
84 use base "Exporter";
85
86 our $VERSION = '0.0';
87 our @EXPORT = qw(NODE $NODE $PORT snd rcv _any_);
88
89 our $DEFAULT_SECRET;
90 our $DEFAULT_PORT = "4040";
91
92 our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
93 our $CONNECT_TIMEOUT = 30; # includes handshake
94
95 sub default_secret {
96 unless (defined $DEFAULT_SECRET) {
97 if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
98 sysread $fh, $DEFAULT_SECRET, -s $fh;
99 } else {
100 $DEFAULT_SECRET = AnyEvent::MP::Util::nonce 32;
101 }
102 }
103
104 $DEFAULT_SECRET
105 }
106
107 =item NODE / $NODE
108
109 The C<NODE ()> function and the C<$NODE> variable contain the noderef of
110 the local node. The value is initialised by a call to C<become_public> or
111 C<become_slave>, after which all local port identifiers become invalid.
112
113 =cut
114
115 our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie
116 our $PUBLIC = 0;
117 our $NODE;
118 our $PORT;
119
120 our %NODE; # node id to transport mapping, or "undef", for local node
121 our %PORT; # local ports
122 our %LISTENER; # local transports
123
124 sub NODE() { $NODE }
125
126 {
127 use POSIX ();
128 my $nodename = (POSIX::uname)[1];
129 $NODE = "$$\@$nodename";
130 }
131
132 sub _ANY_() { 1 }
133 sub _any_() { \&_ANY_ }
134
135 sub add_node {
136 my ($noderef) = @_;
137
138 return $NODE{$noderef}
139 if exists $NODE{$noderef};
140
141 for (split /,/, $noderef) {
142 return $NODE{$noderef} = $NODE{$_}
143 if exists $NODE{$_};
144 }
145
146 # for indirect sends, use a different class
147 my $node = new AnyEvent::MP::Node::Direct $noderef;
148
149 $NODE{$_} = $node
150 for $noderef, split /,/, $noderef;
151
152 $node
153 }
154
155 =item snd $portid, type => @data
156
157 =item snd $portid, @msg
158
159 Send the given message to the given port ID, which can identify either a
160 local or a remote port.
161
162 While the message can be about anything, it is highly recommended to use
163 a constant string as first element.
164
165 The message data effectively becomes read-only after a call to this
166 function: modifying any argument is not allowed and can cause many
167 problems.
168
169 The type of data you can transfer depends on the transport protocol: when
170 JSON is used, then only strings, numbers and arrays and hashes consisting
171 of those are allowed (no objects). When Storable is used, then anything
172 that Storable can serialise and deserialise is allowed, and for the local
173 node, anything can be passed.
174
175 =cut
176
177 sub snd($@) {
178 my ($noderef, $port) = split /#/, shift, 2;
179
180 add_node $noderef
181 unless exists $NODE{$noderef};
182
183 $NODE{$noderef}->send (["$port", [@_]]);
184 }
185
186 =item rcv $portid, type => $callback->(@msg)
187
188 =item rcv $portid, $smartmatch => $callback->(@msg)
189
190 =item rcv $portid, [$smartmatch...] => $callback->(@msg)
191
192 Register a callback on the port identified by C<$portid>, which I<must> be
193 a local port.
194
195 The callback has to return a true value when its work is done, after
196 which is will be removed, or a false value in which case it will stay
197 registered.
198
199 If the match is an array reference, then it will be matched against the
200 first elements of the message, otherwise only the first element is being
201 matched.
202
203 Any element in the match that is specified as C<_any_> (a function
204 exported by this module) matches any single element of the message.
205
206 While not required, it is highly recommended that the first matching
207 element is a string identifying the message. The one-string-only match is
208 also the most efficient match (by far).
209
210 =cut
211
212 sub rcv($@) {
213 my ($port, $match, $cb) = @_;
214
215 my $port = $PORT{$port}
216 or do {
217 my ($noderef, $lport) = split /#/, $port;
218 "AnyEvent::MP::Node::Self" eq ref $NODE{$noderef}
219 or Carp::croak "$port: can only rcv on local ports";
220
221 $PORT{$lport}
222 or Carp::croak "$port: port does not exist";
223
224 $PORT{$port} = $PORT{$lport} # also return
225 };
226
227 if (!ref $match) {
228 push @{ $port->{rc0}{$match} }, [$cb];
229 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
230 my ($type, @match) = @$match;
231 @match
232 ? push @{ $port->{rcv}{$match->[0]} }, [$cb, \@match]
233 : push @{ $port->{rc0}{$match->[0]} }, [$cb];
234 } else {
235 push @{ $port->{any} }, [$cb, $match];
236 }
237 }
238
239 sub _inject {
240 my ($port, $msg) = @{+shift};
241
242 $port = $PORT{$port}
243 or return;
244
245 @_ = @$msg;
246
247 for (@{ $port->{rc0}{$msg->[0]} }) {
248 $_ && &{$_->[0]}
249 && undef $_;
250 }
251
252 for (@{ $port->{rcv}{$msg->[0]} }) {
253 $_ && [@_[1..$#{$_->[1]}]] ~~ $_->[1]
254 && &{$_->[0]}
255 && undef $_;
256 }
257
258 for (@{ $port->{any} }) {
259 $_ && [@_[0..$#{$_->[1]}]] ~~ $_->[1]
260 && &{$_->[0]}
261 && undef $_;
262 }
263 }
264
265 sub normalise_noderef($) {
266 my ($noderef) = @_;
267
268 my $cv = AE::cv;
269 my @res;
270
271 $cv->begin (sub {
272 my %seen;
273 my @refs;
274 for (sort { $a->[0] <=> $b->[0] } @res) {
275 push @refs, $_->[1] unless $seen{$_->[1]}++
276 }
277 shift->send (join ",", @refs);
278 });
279
280 $noderef = $DEFAULT_PORT unless length $noderef;
281
282 my $idx;
283 for my $t (split /,/, $noderef) {
284 my $pri = ++$idx;
285
286 #TODO: this should be outside normalise_noderef and in become_public
287 if ($t =~ /^\d*$/) {
288 my $nodename = (POSIX::uname)[1];
289
290 $cv->begin;
291 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
292 for (@_) {
293 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
294 push @res, [
295 $pri += 1e-5,
296 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
297 ];
298 }
299 $cv->end;
300 };
301
302 # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
303 #
304 # for (@ipv4) {
305 # push @res, [
306 # $pri,
307 # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
308 # ];
309 # }
310 } else {
311 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
312 or Carp::croak "$t: unparsable transport descriptor";
313
314 $cv->begin;
315 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
316 for (@_) {
317 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
318 push @res, [
319 $pri += 1e-5,
320 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
321 ];
322 }
323 $cv->end;
324 }
325 }
326 }
327
328 $cv->end;
329
330 $cv
331 }
332
333 sub become_public {
334 return if $PUBLIC;
335
336 my $noderef = join ",", ref $_[0] ? @{+shift} : shift;
337 my @args = @_;
338
339 $NODE = (normalise_noderef $noderef)->recv;
340
341 for my $t (split /,/, $NODE) {
342 $NODE{$t} = $NODE{""};
343
344 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
345
346 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
347 @args,
348 on_error => sub {
349 die "on_error<@_>\n";#d#
350 },
351 on_connect => sub {
352 my ($tp) = @_;
353
354 $NODE{$tp->{remote_id}} = $_[0];
355 },
356 sub {
357 my ($tp) = @_;
358
359 $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp;
360 },
361 ;
362 }
363
364 $PUBLIC = 1;
365 }
366
367 #############################################################################
368 # self node code
369
370 sub _new_port($) {
371 my ($name) = @_;
372
373 my ($noderef, $portname) = split /#/, $name;
374
375 $PORT{$name} =
376 $PORT{$portname} = {
377 names => [$name, $portname],
378 };
379 }
380
381 $NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
382 _new_port "";
383
384 rcv "", relay => \&snd;
385
386 =back
387
388 =head1 SEE ALSO
389
390 L<AnyEvent>.
391
392 =head1 AUTHOR
393
394 Marc Lehmann <schmorp@schmorp.de>
395 http://home.schmorp.de/
396
397 =cut
398
399 1
400