ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
Revision: 1.16
Committed: Sat Aug 8 00:22:16 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.15: +2 -2 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Base - basis for AnyEvent::MP and Coro::MP
4
5 =head1 SYNOPSIS
6
7 # use AnyEvent::MP or Coro::MP instead
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 =head1 GLOBALS
16
17 =over 4
18
19 =cut
20
21 package AnyEvent::MP::Base;
22
23 use common::sense;
24 use Carp ();
25 use MIME::Base64 ();
26
27 use AE ();
28
29 use AnyEvent::MP::Node;
30 use AnyEvent::MP::Transport;
31
32 use base "Exporter";
33
34 our $VERSION = '0.01';
35 our @EXPORT = qw(
36 %NODE %PORT %PORT_DATA %REG $UNIQ $RUNIQ $ID add_node load_func
37
38 NODE $NODE node_of snd kil _any_
39 resolve_node initialise_node
40 );
41
42 our $DEFAULT_SECRET;
43 our $DEFAULT_PORT = "4040";
44
45 our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
46 our $CONNECT_TIMEOUT = 30; # includes handshake
47
48 =item $AnyEvent::MP::Base::WARN
49
50 This value is called with an error or warning message, when e.g. a connection
51 could not be created, authorisation failed and so on.
52
53 The default simply logs the message to STDERR.
54
55 =cut
56
57 our $WARN = sub {
58 my $msg = $_[0];
59 $msg =~ s/\n$//;
60 warn "$msg\n";
61 };
62
63 sub nonce($) {
64 my $nonce;
65
66 if (open my $fh, "</dev/urandom") {
67 sysread $fh, $nonce, $_[0];
68 } else {
69 # shit...
70 our $nonce_init;
71 unless ($nonce_init++) {
72 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
73 }
74
75 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
76 }
77
78 $nonce
79 }
80
81 sub asciibits($) {
82 my $data = $_[0];
83
84 if (eval "use Math::GMP 2.05; 1") {
85 $data = Math::GMP::get_str_gmp (
86 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
87 62
88 );
89 } else {
90 $data = MIME::Base64::encode_base64 $data, "";
91 $data =~ s/=//;
92 $data =~ s/\//s/g;
93 $data =~ s/\+/p/g;
94 }
95
96 $data
97 }
98
99 sub default_secret {
100 unless (defined $DEFAULT_SECRET) {
101 if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
102 sysread $fh, $DEFAULT_SECRET, -s $fh;
103 } else {
104 $DEFAULT_SECRET = nonce 32;
105 }
106 }
107
108 $DEFAULT_SECRET
109 }
110
111 sub gen_uniq {
112 asciibits pack "wNa*", $$, time, nonce 2
113 }
114
115 our $PUBLIC = 0;
116 our $SLAVE = 0;
117
118 our $NODE = asciibits nonce 16;
119 our $RUNIQ = $NODE; # remote uniq value
120 our $UNIQ = gen_uniq; # per-process/node unique cookie
121 our $ID = "a";
122
123 our %NODE; # node id to transport mapping, or "undef", for local node
124 our (%PORT, %PORT_DATA); # local ports
125
126 our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
127 our %LMON; # monitored _local_ ports
128
129 our %REG; # registered port names
130
131 our %LISTENER;
132
133 our $SRCNODE; # holds the sending node during _inject
134
135 sub NODE() {
136 $NODE
137 }
138
139 sub node_of($) {
140 my ($noderef, undef) = split /#/, $_[0], 2;
141
142 $noderef
143 }
144
145 sub _ANY_() { 1 }
146 sub _any_() { \&_ANY_ }
147
148 sub TRACE() { 0 }
149
150 sub _inject {
151 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
152 &{ $PORT{+shift} or return };
153 }
154
155 sub add_node {
156 my ($noderef) = @_;
157
158 return $NODE{$noderef}
159 if exists $NODE{$noderef};
160
161 for (split /,/, $noderef) {
162 return $NODE{$noderef} = $NODE{$_}
163 if exists $NODE{$_};
164 }
165
166 # new node, check validity
167 my $node;
168
169 if ($noderef =~ /^slave\/.+$/) {
170 $node = new AnyEvent::MP::Node::Slave $noderef;
171
172 } else {
173 for (split /,/, $noderef) {
174 my ($host, $port) = AnyEvent::Socket::parse_hostport $_
175 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
176
177 $port > 0
178 or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
179
180 AnyEvent::Socket::parse_address $host
181 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
182 }
183
184 # TODO: for indirect sends, use a different class
185 $node = new AnyEvent::MP::Node::Direct $noderef;
186 }
187
188 $NODE{$_} = $node
189 for $noderef, split /,/, $noderef;
190
191 $node
192 }
193
194 sub snd(@) {
195 my ($noderef, $portid) = split /#/, shift, 2;
196
197 warn "SND $noderef <- $portid @_\n" if TRACE;#d#
198
199 ($NODE{$noderef} || add_node $noderef)
200 ->send (["$portid", @_]);
201 }
202
203 sub kil(@) {
204 my ($noderef, $portid) = split /#/, shift, 2;
205
206 length $portid
207 or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
208
209 ($NODE{$noderef} || add_node $noderef)
210 ->kill ("$portid", @_);
211 }
212
213 sub resolve_node($) {
214 my ($noderef) = @_;
215
216 my $cv = AE::cv;
217 my @res;
218
219 $cv->begin (sub {
220 my %seen;
221 my @refs;
222 for (sort { $a->[0] <=> $b->[0] } @res) {
223 push @refs, $_->[1] unless $seen{$_->[1]}++
224 }
225 shift->send (join ",", @refs);
226 });
227
228 $noderef = $DEFAULT_PORT unless length $noderef;
229
230 my $idx;
231 for my $t (split /,/, $noderef) {
232 my $pri = ++$idx;
233
234 if ($t =~ /^\d*$/) {
235 require POSIX;
236 my $nodename = (POSIX::uname ())[1];
237
238 $cv->begin;
239 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
240 for (@_) {
241 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
242 push @res, [
243 $pri += 1e-5,
244 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
245 ];
246 }
247 $cv->end;
248 };
249
250 # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
251 #
252 # for (@ipv4) {
253 # push @res, [
254 # $pri,
255 # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
256 # ];
257 # }
258 } else {
259 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
260 or Carp::croak "$t: unparsable transport descriptor";
261
262 $cv->begin;
263 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
264 for (@_) {
265 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
266 push @res, [
267 $pri += 1e-5,
268 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
269 ];
270 }
271 $cv->end;
272 }
273 }
274 }
275
276 $cv->end;
277
278 $cv
279 }
280
281 sub initialise_node(@) {
282 my ($noderef, @others) = @_;
283
284 if ($noderef =~ /^slave\/(.*)$/) {
285 $SLAVE = AE::cv;
286 my $name = $1;
287 $name = $NODE unless length $name;
288 $noderef = AE::cv;
289 $noderef->send ("slave/$name");
290
291 @others
292 or Carp::croak "seed nodes must be specified for slave nodes";
293
294 } else {
295 $PUBLIC = 1;
296 $noderef = resolve_node $noderef;
297 }
298
299 @others = map $_->recv, map +(resolve_node $_), @others;
300
301 $NODE = $noderef->recv;
302
303 for my $t (split /,/, $NODE) {
304 $NODE{$t} = $NODE{""};
305
306 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
307
308 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
309 sub {
310 my ($tp) = @_;
311
312 # TODO: urgs
313 my $node = add_node $tp->{remote_node};
314 $node->{trial}{accept} = $tp;
315 },
316 ;
317 }
318
319 (add_node $_)->connect for @others;
320
321 if ($SLAVE) {
322 $SLAVE->recv;
323 $SLAVE = 1;
324 }
325 }
326
327 #############################################################################
328 # self node code
329
330 sub load_func($) {
331 my $func = $_[0];
332
333 unless (defined &$func) {
334 my $pkg = $func;
335 do {
336 $pkg =~ s/::[^:]+$//
337 or return sub { die "unable to resolve $func" };
338 eval "require $pkg";
339 } until defined &$func;
340 }
341
342 \&$func
343 }
344
345 our %node_req = (
346 # internal services
347
348 # monitoring
349 mon0 => sub { # disable monitoring
350 my $portid = shift;
351 my $node = $SRCNODE;
352 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
353 },
354 mon1 => sub { # enable monitoring
355 my $portid = shift;
356 my $node = $SRCNODE;
357 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
358 $node->send (["", kil => $portid, @_]);
359 });
360 },
361 kil => sub {
362 my $cbs = delete $SRCNODE->{lmon}{+shift}
363 or return;
364
365 $_->(@_) for @$cbs;
366 },
367 # node changed its name (for slave nodes)
368 iam => sub {
369 $SRCNODE->{noderef} = $_[0];
370 $NODE{$_[0]} = $SRCNODE;
371 },
372
373 # public services
374
375 # well-known-port lookup
376 lookup => sub {
377 my $name = shift;
378 my $port = $REG{$name};
379 #TODO: check validity
380 snd @_, $port;
381 },
382
383 # relay message to another node / generic echo
384 relay => sub {
385 &snd;
386 },
387 relay_multiple => sub {
388 snd @$_ for @_
389 },
390
391 # random garbage
392 eval => sub {
393 my @res = eval shift;
394 snd @_, "$@", @res if @_;
395 },
396 time => sub {
397 snd @_, AE::time;
398 },
399 devnull => sub {
400 #
401 },
402 );
403
404 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
405 $PORT{""} = sub {
406 my $tag = shift;
407 eval { &{ $node_req{$tag} ||= load_func $tag } };
408 $WARN->("error processing node message: $@") if $@;
409 };
410
411 =back
412
413 =head1 SEE ALSO
414
415 L<AnyEvent::MP>.
416
417 =head1 AUTHOR
418
419 Marc Lehmann <schmorp@schmorp.de>
420 http://home.schmorp.de/
421
422 =cut
423
424 1
425