ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
Revision: 1.16
Committed: Sat Aug 8 00:22:16 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.15: +2 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Base - basis for AnyEvent::MP and Coro::MP
4    
5     =head1 SYNOPSIS
6    
7     # use AnyEvent::MP or Coro::MP instead
8    
9 root 1.3 =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.4 =head1 GLOBALS
16    
17     =over 4
18    
19 root 1.1 =cut
20    
21     package AnyEvent::MP::Base;
22    
23     use common::sense;
24     use Carp ();
25 root 1.6 use MIME::Base64 ();
26 root 1.1
27     use AE ();
28    
29 root 1.6 use AnyEvent::MP::Node;
30     use AnyEvent::MP::Transport;
31    
32 root 1.1 use base "Exporter";
33    
34     our $VERSION = '0.01';
35 root 1.5 our @EXPORT = qw(
36 root 1.15 %NODE %PORT %PORT_DATA %REG $UNIQ $RUNIQ $ID add_node load_func
37 root 1.8
38     NODE $NODE node_of snd kil _any_
39 root 1.12 resolve_node initialise_node
40 root 1.5 );
41 root 1.1
42     our $DEFAULT_SECRET;
43 root 1.10 our $DEFAULT_PORT = "4040";
44 root 1.1
45     our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
46     our $CONNECT_TIMEOUT = 30; # includes handshake
47    
48 root 1.4 =item $AnyEvent::MP::Base::WARN
49    
50     This value is called with an error or warning message, when e.g. a connection
51     could not be created, authorisation failed and so on.
52    
53     The default simply logs the message to STDERR.
54    
55     =cut
56    
57 root 1.3 our $WARN = sub {
58 root 1.8 my $msg = $_[0];
59     $msg =~ s/\n$//;
60     warn "$msg\n";
61 root 1.3 };
62    
63 root 1.1 sub nonce($) {
64     my $nonce;
65    
66     if (open my $fh, "</dev/urandom") {
67     sysread $fh, $nonce, $_[0];
68     } else {
69     # shit...
70     our $nonce_init;
71     unless ($nonce_init++) {
72     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
73     }
74    
75     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
76     }
77    
78     $nonce
79     }
80    
81 root 1.14 sub asciibits($) {
82     my $data = $_[0];
83    
84     if (eval "use Math::GMP 2.05; 1") {
85     $data = Math::GMP::get_str_gmp (
86     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
87     62
88     );
89     } else {
90     $data = MIME::Base64::encode_base64 $data, "";
91     $data =~ s/=//;
92     $data =~ s/\//s/g;
93     $data =~ s/\+/p/g;
94     }
95    
96     $data
97     }
98    
99 root 1.1 sub default_secret {
100     unless (defined $DEFAULT_SECRET) {
101     if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
102     sysread $fh, $DEFAULT_SECRET, -s $fh;
103     } else {
104 root 1.2 $DEFAULT_SECRET = nonce 32;
105 root 1.1 }
106     }
107    
108     $DEFAULT_SECRET
109     }
110    
111 root 1.6 sub gen_uniq {
112 root 1.14 asciibits pack "wNa*", $$, time, nonce 2
113 root 1.6 }
114    
115 root 1.1 our $PUBLIC = 0;
116 root 1.13 our $SLAVE = 0;
117 root 1.1
118 root 1.15 our $NODE = asciibits nonce 16;
119     our $RUNIQ = $NODE; # remote uniq value
120     our $UNIQ = gen_uniq; # per-process/node unique cookie
121     our $ID = "a";
122    
123 root 1.1 our %NODE; # node id to transport mapping, or "undef", for local node
124 root 1.8 our (%PORT, %PORT_DATA); # local ports
125 root 1.5
126     our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
127     our %LMON; # monitored _local_ ports
128    
129 root 1.8 our %REG; # registered port names
130    
131     our %LISTENER;
132 root 1.1
133 root 1.5 our $SRCNODE; # holds the sending node during _inject
134    
135 root 1.8 sub NODE() {
136     $NODE
137     }
138    
139     sub node_of($) {
140     my ($noderef, undef) = split /#/, $_[0], 2;
141    
142     $noderef
143     }
144 root 1.1
145     sub _ANY_() { 1 }
146     sub _any_() { \&_ANY_ }
147    
148 root 1.16 sub TRACE() { 0 }
149 root 1.13
150 root 1.1 sub _inject {
151 root 1.14 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
152 root 1.5 &{ $PORT{+shift} or return };
153 root 1.1 }
154    
155     sub add_node {
156     my ($noderef) = @_;
157    
158     return $NODE{$noderef}
159     if exists $NODE{$noderef};
160    
161     for (split /,/, $noderef) {
162     return $NODE{$noderef} = $NODE{$_}
163     if exists $NODE{$_};
164     }
165    
166 root 1.10 # new node, check validity
167 root 1.13 my $node;
168 root 1.10
169 root 1.13 if ($noderef =~ /^slave\/.+$/) {
170     $node = new AnyEvent::MP::Node::Slave $noderef;
171    
172     } else {
173     for (split /,/, $noderef) {
174     my ($host, $port) = AnyEvent::Socket::parse_hostport $_
175     or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
176    
177     $port > 0
178     or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
179 root 1.10
180 root 1.13 AnyEvent::Socket::parse_address $host
181     or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
182     }
183 root 1.10
184 root 1.13 # TODO: for indirect sends, use a different class
185     $node = new AnyEvent::MP::Node::Direct $noderef;
186 root 1.10 }
187    
188 root 1.1 $NODE{$_} = $node
189     for $noderef, split /,/, $noderef;
190    
191     $node
192     }
193    
194     sub snd(@) {
195 root 1.13 my ($noderef, $portid) = split /#/, shift, 2;
196    
197 root 1.14 warn "SND $noderef <- $portid @_\n" if TRACE;#d#
198 root 1.1
199 root 1.5 ($NODE{$noderef} || add_node $noderef)
200 root 1.13 ->send (["$portid", @_]);
201 root 1.5 }
202    
203 root 1.8 sub kil(@) {
204 root 1.13 my ($noderef, $portid) = split /#/, shift, 2;
205 root 1.5
206 root 1.13 length $portid
207     or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
208 root 1.10
209 root 1.7 ($NODE{$noderef} || add_node $noderef)
210 root 1.13 ->kill ("$portid", @_);
211 root 1.1 }
212    
213 root 1.10 sub resolve_node($) {
214     my ($noderef) = @_;
215    
216     my $cv = AE::cv;
217     my @res;
218    
219     $cv->begin (sub {
220     my %seen;
221     my @refs;
222     for (sort { $a->[0] <=> $b->[0] } @res) {
223     push @refs, $_->[1] unless $seen{$_->[1]}++
224     }
225     shift->send (join ",", @refs);
226     });
227    
228     $noderef = $DEFAULT_PORT unless length $noderef;
229    
230     my $idx;
231     for my $t (split /,/, $noderef) {
232     my $pri = ++$idx;
233    
234     if ($t =~ /^\d*$/) {
235     require POSIX;
236     my $nodename = (POSIX::uname ())[1];
237    
238     $cv->begin;
239     AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
240     for (@_) {
241     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
242     push @res, [
243     $pri += 1e-5,
244     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
245     ];
246     }
247     $cv->end;
248     };
249    
250     # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
251     #
252     # for (@ipv4) {
253     # push @res, [
254     # $pri,
255     # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
256     # ];
257     # }
258     } else {
259     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
260     or Carp::croak "$t: unparsable transport descriptor";
261    
262     $cv->begin;
263     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
264     for (@_) {
265     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
266     push @res, [
267     $pri += 1e-5,
268     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
269     ];
270     }
271     $cv->end;
272     }
273     }
274     }
275    
276     $cv->end;
277    
278     $cv
279     }
280    
281 root 1.16 sub initialise_node(@) {
282 root 1.12 my ($noderef, @others) = @_;
283 root 1.1
284 root 1.12 if ($noderef =~ /^slave\/(.*)$/) {
285 root 1.13 $SLAVE = AE::cv;
286 root 1.12 my $name = $1;
287 root 1.14 $name = $NODE unless length $name;
288 root 1.12 $noderef = AE::cv;
289     $noderef->send ("slave/$name");
290 root 1.13
291     @others
292     or Carp::croak "seed nodes must be specified for slave nodes";
293    
294 root 1.12 } else {
295 root 1.13 $PUBLIC = 1;
296 root 1.12 $noderef = resolve_node $noderef;
297     }
298    
299     @others = map $_->recv, map +(resolve_node $_), @others;
300 root 1.1
301 root 1.12 $NODE = $noderef->recv;
302 root 1.1
303     for my $t (split /,/, $NODE) {
304     $NODE{$t} = $NODE{""};
305    
306     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
307    
308     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
309     sub {
310     my ($tp) = @_;
311    
312 root 1.5 # TODO: urgs
313     my $node = add_node $tp->{remote_node};
314     $node->{trial}{accept} = $tp;
315 root 1.1 },
316     ;
317     }
318    
319 root 1.13 (add_node $_)->connect for @others;
320    
321     if ($SLAVE) {
322     $SLAVE->recv;
323     $SLAVE = 1;
324     }
325 root 1.1 }
326    
327     #############################################################################
328     # self node code
329    
330 root 1.15 sub load_func($) {
331     my $func = $_[0];
332    
333     unless (defined &$func) {
334     my $pkg = $func;
335     do {
336     $pkg =~ s/::[^:]+$//
337     or return sub { die "unable to resolve $func" };
338     eval "require $pkg";
339     } until defined &$func;
340     }
341    
342     \&$func
343     }
344    
345 root 1.5 our %node_req = (
346 root 1.13 # internal services
347    
348 root 1.5 # monitoring
349     mon0 => sub { # disable monitoring
350     my $portid = shift;
351     my $node = $SRCNODE;
352     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
353     },
354     mon1 => sub { # enable monitoring
355     my $portid = shift;
356     my $node = $SRCNODE;
357     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
358 root 1.8 $node->send (["", kil => $portid, @_]);
359 root 1.5 });
360     },
361 root 1.7 kil => sub {
362 root 1.8 my $cbs = delete $SRCNODE->{lmon}{+shift}
363 root 1.5 or return;
364    
365 root 1.8 $_->(@_) for @$cbs;
366 root 1.5 },
367 root 1.13 # node changed its name (for slave nodes)
368     iam => sub {
369     $SRCNODE->{noderef} = $_[0];
370     $NODE{$_[0]} = $SRCNODE;
371     },
372    
373     # public services
374 root 1.5
375     # well-known-port lookup
376 root 1.8 lookup => sub {
377     my $name = shift;
378     my $port = $REG{$name};
379 root 1.15 #TODO: check validity
380 root 1.8 snd @_, $port;
381 root 1.5 },
382    
383     # relay message to another node / generic echo
384     relay => sub {
385     &snd;
386     },
387 root 1.15 relay_multiple => sub {
388     snd @$_ for @_
389     },
390 root 1.5
391     # random garbage
392     eval => sub {
393     my @res = eval shift;
394     snd @_, "$@", @res if @_;
395     },
396     time => sub {
397     snd @_, AE::time;
398     },
399     devnull => sub {
400     #
401     },
402     );
403    
404 root 1.9 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
405 root 1.15 $PORT{""} = sub {
406     my $tag = shift;
407     eval { &{ $node_req{$tag} ||= load_func $tag } };
408     $WARN->("error processing node message: $@") if $@;
409     };
410 root 1.1
411 root 1.4 =back
412    
413 root 1.1 =head1 SEE ALSO
414    
415     L<AnyEvent::MP>.
416    
417     =head1 AUTHOR
418    
419     Marc Lehmann <schmorp@schmorp.de>
420     http://home.schmorp.de/
421    
422     =cut
423    
424     1
425