ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
Revision: 1.13
Committed: Wed Aug 5 22:40:51 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.12: +50 -19 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Base - basis for AnyEvent::MP and Coro::MP
4
5 =head1 SYNOPSIS
6
7 # use AnyEvent::MP or Coro::MP instead
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 =head1 GLOBALS
16
17 =over 4
18
19 =cut
20
21 package AnyEvent::MP::Base;
22
23 use common::sense;
24 use Carp ();
25 use MIME::Base64 ();
26
27 use AE ();
28
29 use AnyEvent::MP::Node;
30 use AnyEvent::MP::Transport;
31
32 use base "Exporter";
33
34 our $VERSION = '0.01';
35 our @EXPORT = qw(
36 %NODE %PORT %PORT_DATA %REG $UNIQ $ID add_node
37
38 NODE $NODE node_of snd kil _any_
39 resolve_node initialise_node
40 );
41
42 our $DEFAULT_SECRET;
43 our $DEFAULT_PORT = "4040";
44
45 our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
46 our $CONNECT_TIMEOUT = 30; # includes handshake
47
48 =item $AnyEvent::MP::Base::WARN
49
50 This value is called with an error or warning message, when e.g. a connection
51 could not be created, authorisation failed and so on.
52
53 The default simply logs the message to STDERR.
54
55 =cut
56
57 our $WARN = sub {
58 my $msg = $_[0];
59 $msg =~ s/\n$//;
60 warn "$msg\n";
61 };
62
63 sub nonce($) {
64 my $nonce;
65
66 if (open my $fh, "</dev/urandom") {
67 sysread $fh, $nonce, $_[0];
68 } else {
69 # shit...
70 our $nonce_init;
71 unless ($nonce_init++) {
72 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
73 }
74
75 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
76 }
77
78 $nonce
79 }
80
81 sub default_secret {
82 unless (defined $DEFAULT_SECRET) {
83 if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
84 sysread $fh, $DEFAULT_SECRET, -s $fh;
85 } else {
86 $DEFAULT_SECRET = nonce 32;
87 }
88 }
89
90 $DEFAULT_SECRET
91 }
92
93 sub gen_uniq {
94 my $uniq = pack "wN", $$, time;
95 $uniq = MIME::Base64::encode_base64 $uniq, "";
96 $uniq =~ s/=+$//;
97 $uniq
98 }
99
100 our $NODE = unpack "H*", nonce 16;
101 our $UNIQ = gen_uniq; # per-process/node unique cookie
102 our $ID = "a";
103 our $PUBLIC = 0;
104 our $SLAVE = 0;
105
106 our %NODE; # node id to transport mapping, or "undef", for local node
107 our (%PORT, %PORT_DATA); # local ports
108
109 our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
110 our %LMON; # monitored _local_ ports
111
112 our %REG; # registered port names
113
114 our %LISTENER;
115
116 our $SRCNODE; # holds the sending node during _inject
117
118 sub NODE() {
119 $NODE
120 }
121
122 sub node_of($) {
123 my ($noderef, undef) = split /#/, $_[0], 2;
124
125 $noderef
126 }
127
128 sub _ANY_() { 1 }
129 sub _any_() { \&_ANY_ }
130
131 sub TRACE() { 1 }
132
133 sub _inject {
134 warn "$SRCNODE->{noderef} -> @_\n" if TRACE;#d#
135 &{ $PORT{+shift} or return };
136 }
137
138 sub add_node {
139 my ($noderef) = @_;
140
141 return $NODE{$noderef}
142 if exists $NODE{$noderef};
143
144 for (split /,/, $noderef) {
145 return $NODE{$noderef} = $NODE{$_}
146 if exists $NODE{$_};
147 }
148
149 # new node, check validity
150 my $node;
151
152 if ($noderef =~ /^slave\/.+$/) {
153 $node = new AnyEvent::MP::Node::Slave $noderef;
154
155 } else {
156 for (split /,/, $noderef) {
157 my ($host, $port) = AnyEvent::Socket::parse_hostport $_
158 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
159
160 $port > 0
161 or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
162
163 AnyEvent::Socket::parse_address $host
164 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
165 }
166
167 # TODO: for indirect sends, use a different class
168 $node = new AnyEvent::MP::Node::Direct $noderef;
169 }
170
171 $NODE{$_} = $node
172 for $noderef, split /,/, $noderef;
173
174 $node
175 }
176
177 sub snd(@) {
178 my ($noderef, $portid) = split /#/, shift, 2;
179
180 warn "$noderef <- $portid @_\n" if TRACE;#d#
181
182 ($NODE{$noderef} || add_node $noderef)
183 ->send (["$portid", @_]);
184 }
185
186 sub kil(@) {
187 my ($noderef, $portid) = split /#/, shift, 2;
188
189 length $portid
190 or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
191
192 ($NODE{$noderef} || add_node $noderef)
193 ->kill ("$portid", @_);
194 }
195
196 sub resolve_node($) {
197 my ($noderef) = @_;
198
199 my $cv = AE::cv;
200 my @res;
201
202 $cv->begin (sub {
203 my %seen;
204 my @refs;
205 for (sort { $a->[0] <=> $b->[0] } @res) {
206 push @refs, $_->[1] unless $seen{$_->[1]}++
207 }
208 shift->send (join ",", @refs);
209 });
210
211 $noderef = $DEFAULT_PORT unless length $noderef;
212
213 my $idx;
214 for my $t (split /,/, $noderef) {
215 my $pri = ++$idx;
216
217 if ($t =~ /^\d*$/) {
218 require POSIX;
219 my $nodename = (POSIX::uname ())[1];
220
221 $cv->begin;
222 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
223 for (@_) {
224 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
225 push @res, [
226 $pri += 1e-5,
227 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
228 ];
229 }
230 $cv->end;
231 };
232
233 # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
234 #
235 # for (@ipv4) {
236 # push @res, [
237 # $pri,
238 # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
239 # ];
240 # }
241 } else {
242 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
243 or Carp::croak "$t: unparsable transport descriptor";
244
245 $cv->begin;
246 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
247 for (@_) {
248 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
249 push @res, [
250 $pri += 1e-5,
251 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
252 ];
253 }
254 $cv->end;
255 }
256 }
257 }
258
259 $cv->end;
260
261 $cv
262 }
263
264 sub initialise_node($;@) {
265 my ($noderef, @others) = @_;
266
267 if ($noderef =~ /^slave\/(.*)$/) {
268 $SLAVE = AE::cv;
269 my $name = $1;
270 $name = $UNIQ unless length $name;
271 $noderef = AE::cv;
272 $noderef->send ("slave/$name");
273
274 @others
275 or Carp::croak "seed nodes must be specified for slave nodes";
276
277 } else {
278 $PUBLIC = 1;
279 $noderef = resolve_node $noderef;
280 }
281
282 @others = map $_->recv, map +(resolve_node $_), @others;
283
284 $NODE = $noderef->recv;
285
286 for my $t (split /,/, $NODE) {
287 $NODE{$t} = $NODE{""};
288
289 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
290
291 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
292 sub {
293 my ($tp) = @_;
294
295 # TODO: urgs
296 my $node = add_node $tp->{remote_node};
297 $node->{trial}{accept} = $tp;
298 },
299 ;
300 }
301
302 (add_node $_)->connect for @others;
303
304 if ($SLAVE) {
305 $SLAVE->recv;
306 $SLAVE = 1;
307 }
308 }
309
310 #############################################################################
311 # self node code
312
313 our %node_req = (
314 # internal services
315
316 # monitoring
317 mon0 => sub { # disable monitoring
318 my $portid = shift;
319 my $node = $SRCNODE;
320 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
321 },
322 mon1 => sub { # enable monitoring
323 my $portid = shift;
324 my $node = $SRCNODE;
325 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
326 $node->send (["", kil => $portid, @_]);
327 });
328 },
329 kil => sub {
330 my $cbs = delete $SRCNODE->{lmon}{+shift}
331 or return;
332
333 $_->(@_) for @$cbs;
334 },
335 # node changed its name (for slave nodes)
336 iam => sub {
337 $SRCNODE->{noderef} = $_[0];
338 $NODE{$_[0]} = $SRCNODE;
339 },
340
341 # public services
342
343 # well-known-port lookup
344 lookup => sub {
345 my $name = shift;
346 my $port = $REG{$name};
347 #TODO: check vailidity
348 snd @_, $port;
349 },
350
351 # relay message to another node / generic echo
352 relay => sub {
353 &snd;
354 },
355
356 # random garbage
357 eval => sub {
358 my @res = eval shift;
359 snd @_, "$@", @res if @_;
360 },
361 time => sub {
362 snd @_, AE::time;
363 },
364 devnull => sub {
365 #
366 },
367 );
368
369 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
370 $PORT{""} = sub { &{ $node_req{+shift} or return } };
371
372 =back
373
374 =head1 SEE ALSO
375
376 L<AnyEvent::MP>.
377
378 =head1 AUTHOR
379
380 Marc Lehmann <schmorp@schmorp.de>
381 http://home.schmorp.de/
382
383 =cut
384
385 1
386