ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
Revision: 1.18
Committed: Mon Aug 10 01:37:19 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.17: +7 -14 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Base - basis for AnyEvent::MP and Coro::MP
4    
5     =head1 SYNOPSIS
6    
7     # use AnyEvent::MP or Coro::MP instead
8    
9 root 1.3 =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.4 =head1 GLOBALS
16    
17     =over 4
18    
19 root 1.1 =cut
20    
21     package AnyEvent::MP::Base;
22    
23     use common::sense;
24     use Carp ();
25 root 1.6 use MIME::Base64 ();
26 root 1.1
27     use AE ();
28    
29 root 1.6 use AnyEvent::MP::Node;
30     use AnyEvent::MP::Transport;
31    
32 root 1.1 use base "Exporter";
33    
34 root 1.17 our $VERSION = '0.4';
35 root 1.5 our @EXPORT = qw(
36 root 1.15 %NODE %PORT %PORT_DATA %REG $UNIQ $RUNIQ $ID add_node load_func
37 root 1.8
38     NODE $NODE node_of snd kil _any_
39 root 1.12 resolve_node initialise_node
40 root 1.5 );
41 root 1.1
42     our $DEFAULT_SECRET;
43 root 1.10 our $DEFAULT_PORT = "4040";
44 root 1.1
45 root 1.18 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
46     our $CONNECT_TIMEOUT = 5; # includes handshake
47     our $CONNECT_TIMEOUT_MAX = 90; # never retry less often
48 root 1.1
49 root 1.4 =item $AnyEvent::MP::Base::WARN
50    
51     This value is called with an error or warning message, when e.g. a connection
52     could not be created, authorisation failed and so on.
53    
54     The default simply logs the message to STDERR.
55    
56     =cut
57    
58 root 1.3 our $WARN = sub {
59 root 1.8 my $msg = $_[0];
60     $msg =~ s/\n$//;
61     warn "$msg\n";
62 root 1.3 };
63    
64 root 1.1 sub nonce($) {
65     my $nonce;
66    
67     if (open my $fh, "</dev/urandom") {
68     sysread $fh, $nonce, $_[0];
69     } else {
70     # shit...
71     our $nonce_init;
72     unless ($nonce_init++) {
73     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
74     }
75    
76     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
77     }
78    
79     $nonce
80     }
81    
82 root 1.14 sub asciibits($) {
83     my $data = $_[0];
84    
85     if (eval "use Math::GMP 2.05; 1") {
86     $data = Math::GMP::get_str_gmp (
87     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
88     62
89     );
90     } else {
91     $data = MIME::Base64::encode_base64 $data, "";
92     $data =~ s/=//;
93     $data =~ s/\//s/g;
94     $data =~ s/\+/p/g;
95     }
96    
97     $data
98     }
99    
100 root 1.1 sub default_secret {
101     unless (defined $DEFAULT_SECRET) {
102     if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
103     sysread $fh, $DEFAULT_SECRET, -s $fh;
104     } else {
105 root 1.2 $DEFAULT_SECRET = nonce 32;
106 root 1.1 }
107     }
108    
109     $DEFAULT_SECRET
110     }
111    
112 root 1.6 sub gen_uniq {
113 root 1.14 asciibits pack "wNa*", $$, time, nonce 2
114 root 1.6 }
115    
116 root 1.1 our $PUBLIC = 0;
117 root 1.13 our $SLAVE = 0;
118 root 1.1
119 root 1.15 our $NODE = asciibits nonce 16;
120     our $RUNIQ = $NODE; # remote uniq value
121     our $UNIQ = gen_uniq; # per-process/node unique cookie
122     our $ID = "a";
123    
124 root 1.1 our %NODE; # node id to transport mapping, or "undef", for local node
125 root 1.8 our (%PORT, %PORT_DATA); # local ports
126 root 1.5
127     our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
128     our %LMON; # monitored _local_ ports
129    
130 root 1.8 our %LISTENER;
131 root 1.1
132 root 1.5 our $SRCNODE; # holds the sending node during _inject
133    
134 root 1.8 sub NODE() {
135     $NODE
136     }
137    
138     sub node_of($) {
139     my ($noderef, undef) = split /#/, $_[0], 2;
140    
141     $noderef
142     }
143 root 1.1
144     sub _ANY_() { 1 }
145     sub _any_() { \&_ANY_ }
146    
147 root 1.16 sub TRACE() { 0 }
148 root 1.13
149 root 1.1 sub _inject {
150 root 1.14 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
151 root 1.5 &{ $PORT{+shift} or return };
152 root 1.1 }
153    
154     sub add_node {
155     my ($noderef) = @_;
156    
157     return $NODE{$noderef}
158     if exists $NODE{$noderef};
159    
160     for (split /,/, $noderef) {
161     return $NODE{$noderef} = $NODE{$_}
162     if exists $NODE{$_};
163     }
164    
165 root 1.10 # new node, check validity
166 root 1.13 my $node;
167 root 1.10
168 root 1.13 if ($noderef =~ /^slave\/.+$/) {
169     $node = new AnyEvent::MP::Node::Slave $noderef;
170    
171     } else {
172     for (split /,/, $noderef) {
173     my ($host, $port) = AnyEvent::Socket::parse_hostport $_
174     or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
175    
176     $port > 0
177     or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
178 root 1.10
179 root 1.13 AnyEvent::Socket::parse_address $host
180     or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
181     }
182 root 1.10
183 root 1.13 # TODO: for indirect sends, use a different class
184     $node = new AnyEvent::MP::Node::Direct $noderef;
185 root 1.10 }
186    
187 root 1.1 $NODE{$_} = $node
188     for $noderef, split /,/, $noderef;
189    
190     $node
191     }
192    
193     sub snd(@) {
194 root 1.13 my ($noderef, $portid) = split /#/, shift, 2;
195    
196 root 1.14 warn "SND $noderef <- $portid @_\n" if TRACE;#d#
197 root 1.1
198 root 1.5 ($NODE{$noderef} || add_node $noderef)
199 root 1.13 ->send (["$portid", @_]);
200 root 1.5 }
201    
202 root 1.8 sub kil(@) {
203 root 1.13 my ($noderef, $portid) = split /#/, shift, 2;
204 root 1.5
205 root 1.13 length $portid
206     or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
207 root 1.10
208 root 1.7 ($NODE{$noderef} || add_node $noderef)
209 root 1.13 ->kill ("$portid", @_);
210 root 1.1 }
211    
212 root 1.10 sub resolve_node($) {
213     my ($noderef) = @_;
214    
215     my $cv = AE::cv;
216     my @res;
217    
218     $cv->begin (sub {
219     my %seen;
220     my @refs;
221     for (sort { $a->[0] <=> $b->[0] } @res) {
222     push @refs, $_->[1] unless $seen{$_->[1]}++
223     }
224     shift->send (join ",", @refs);
225     });
226    
227     $noderef = $DEFAULT_PORT unless length $noderef;
228    
229     my $idx;
230     for my $t (split /,/, $noderef) {
231     my $pri = ++$idx;
232    
233     if ($t =~ /^\d*$/) {
234     require POSIX;
235     my $nodename = (POSIX::uname ())[1];
236    
237     $cv->begin;
238     AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
239     for (@_) {
240     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
241     push @res, [
242     $pri += 1e-5,
243     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
244     ];
245     }
246     $cv->end;
247     };
248    
249     # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
250     #
251     # for (@ipv4) {
252     # push @res, [
253     # $pri,
254     # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
255     # ];
256     # }
257     } else {
258     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
259     or Carp::croak "$t: unparsable transport descriptor";
260    
261     $cv->begin;
262     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
263     for (@_) {
264     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
265     push @res, [
266     $pri += 1e-5,
267     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
268     ];
269     }
270     $cv->end;
271     }
272     }
273     }
274    
275     $cv->end;
276    
277     $cv
278     }
279    
280 root 1.16 sub initialise_node(@) {
281 root 1.12 my ($noderef, @others) = @_;
282 root 1.1
283 root 1.12 if ($noderef =~ /^slave\/(.*)$/) {
284 root 1.13 $SLAVE = AE::cv;
285 root 1.12 my $name = $1;
286 root 1.14 $name = $NODE unless length $name;
287 root 1.12 $noderef = AE::cv;
288     $noderef->send ("slave/$name");
289 root 1.13
290     @others
291     or Carp::croak "seed nodes must be specified for slave nodes";
292    
293 root 1.12 } else {
294 root 1.13 $PUBLIC = 1;
295 root 1.12 $noderef = resolve_node $noderef;
296     }
297    
298     @others = map $_->recv, map +(resolve_node $_), @others;
299 root 1.1
300 root 1.12 $NODE = $noderef->recv;
301 root 1.1
302     for my $t (split /,/, $NODE) {
303     $NODE{$t} = $NODE{""};
304    
305     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
306    
307     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
308     sub {
309     my ($tp) = @_;
310    
311 root 1.5 # TODO: urgs
312     my $node = add_node $tp->{remote_node};
313     $node->{trial}{accept} = $tp;
314 root 1.1 },
315     ;
316     }
317    
318 root 1.13 (add_node $_)->connect for @others;
319    
320     if ($SLAVE) {
321 root 1.18 my $timeout = AE::timer $CONNECT_TIMEOUT, 0, sub { $SLAVE->() };
322     $SLAVE->recv
323     or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
324 root 1.13 $SLAVE = 1;
325     }
326 root 1.1 }
327    
328     #############################################################################
329     # self node code
330    
331 root 1.15 sub load_func($) {
332     my $func = $_[0];
333    
334     unless (defined &$func) {
335     my $pkg = $func;
336     do {
337     $pkg =~ s/::[^:]+$//
338 root 1.18 or return sub { die "unable to resolve '$func'" };
339 root 1.15 eval "require $pkg";
340     } until defined &$func;
341     }
342    
343     \&$func
344     }
345    
346 root 1.5 our %node_req = (
347 root 1.13 # internal services
348    
349 root 1.5 # monitoring
350     mon0 => sub { # disable monitoring
351     my $portid = shift;
352     my $node = $SRCNODE;
353     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
354     },
355     mon1 => sub { # enable monitoring
356     my $portid = shift;
357     my $node = $SRCNODE;
358     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
359 root 1.8 $node->send (["", kil => $portid, @_]);
360 root 1.5 });
361     },
362 root 1.7 kil => sub {
363 root 1.8 my $cbs = delete $SRCNODE->{lmon}{+shift}
364 root 1.5 or return;
365    
366 root 1.8 $_->(@_) for @$cbs;
367 root 1.5 },
368 root 1.13 # node changed its name (for slave nodes)
369     iam => sub {
370     $SRCNODE->{noderef} = $_[0];
371     $NODE{$_[0]} = $SRCNODE;
372     },
373    
374     # public services
375 root 1.5
376     # relay message to another node / generic echo
377     relay => sub {
378     &snd;
379     },
380 root 1.15 relay_multiple => sub {
381     snd @$_ for @_
382     },
383 root 1.5
384     # random garbage
385     eval => sub {
386     my @res = eval shift;
387     snd @_, "$@", @res if @_;
388     },
389     time => sub {
390     snd @_, AE::time;
391     },
392     devnull => sub {
393     #
394     },
395     );
396    
397 root 1.9 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
398 root 1.15 $PORT{""} = sub {
399     my $tag = shift;
400     eval { &{ $node_req{$tag} ||= load_func $tag } };
401     $WARN->("error processing node message: $@") if $@;
402     };
403 root 1.1
404 root 1.4 =back
405    
406 root 1.1 =head1 SEE ALSO
407    
408     L<AnyEvent::MP>.
409    
410     =head1 AUTHOR
411    
412     Marc Lehmann <schmorp@schmorp.de>
413     http://home.schmorp.de/
414    
415     =cut
416    
417     1
418