ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
Revision: 1.14
Committed: Wed Aug 5 23:24:58 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.13: +23 -8 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Base - basis for AnyEvent::MP and Coro::MP
4    
5     =head1 SYNOPSIS
6    
7     # use AnyEvent::MP or Coro::MP instead
8    
9 root 1.3 =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.4 =head1 GLOBALS
16    
17     =over 4
18    
19 root 1.1 =cut
20    
21     package AnyEvent::MP::Base;
22    
23     use common::sense;
24     use Carp ();
25 root 1.6 use MIME::Base64 ();
26 root 1.1
27     use AE ();
28    
29 root 1.6 use AnyEvent::MP::Node;
30     use AnyEvent::MP::Transport;
31    
32 root 1.1 use base "Exporter";
33    
34     our $VERSION = '0.01';
35 root 1.5 our @EXPORT = qw(
36 root 1.8 %NODE %PORT %PORT_DATA %REG $UNIQ $ID add_node
37    
38     NODE $NODE node_of snd kil _any_
39 root 1.12 resolve_node initialise_node
40 root 1.5 );
41 root 1.1
42     our $DEFAULT_SECRET;
43 root 1.10 our $DEFAULT_PORT = "4040";
44 root 1.1
45     our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
46     our $CONNECT_TIMEOUT = 30; # includes handshake
47    
48 root 1.4 =item $AnyEvent::MP::Base::WARN
49    
50     This value is called with an error or warning message, when e.g. a connection
51     could not be created, authorisation failed and so on.
52    
53     The default simply logs the message to STDERR.
54    
55     =cut
56    
57 root 1.3 our $WARN = sub {
58 root 1.8 my $msg = $_[0];
59     $msg =~ s/\n$//;
60     warn "$msg\n";
61 root 1.3 };
62    
63 root 1.1 sub nonce($) {
64     my $nonce;
65    
66     if (open my $fh, "</dev/urandom") {
67     sysread $fh, $nonce, $_[0];
68     } else {
69     # shit...
70     our $nonce_init;
71     unless ($nonce_init++) {
72     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
73     }
74    
75     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
76     }
77    
78     $nonce
79     }
80    
81 root 1.14 sub asciibits($) {
82     my $data = $_[0];
83    
84     if (eval "use Math::GMP 2.05; 1") {
85     $data = Math::GMP::get_str_gmp (
86     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
87     62
88     );
89     } else {
90     $data = MIME::Base64::encode_base64 $data, "";
91     $data =~ s/=//;
92     $data =~ s/\//s/g;
93     $data =~ s/\+/p/g;
94     }
95    
96     $data
97     }
98    
99 root 1.1 sub default_secret {
100     unless (defined $DEFAULT_SECRET) {
101     if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
102     sysread $fh, $DEFAULT_SECRET, -s $fh;
103     } else {
104 root 1.2 $DEFAULT_SECRET = nonce 32;
105 root 1.1 }
106     }
107    
108     $DEFAULT_SECRET
109     }
110    
111 root 1.6 sub gen_uniq {
112 root 1.14 asciibits pack "wNa*", $$, time, nonce 2
113 root 1.6 }
114    
115 root 1.14 our $NODE = asciibits nonce 12;
116 root 1.6 our $UNIQ = gen_uniq; # per-process/node unique cookie
117 root 1.1 our $ID = "a";
118     our $PUBLIC = 0;
119 root 1.13 our $SLAVE = 0;
120 root 1.1
121     our %NODE; # node id to transport mapping, or "undef", for local node
122 root 1.8 our (%PORT, %PORT_DATA); # local ports
123 root 1.5
124     our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
125     our %LMON; # monitored _local_ ports
126    
127 root 1.8 our %REG; # registered port names
128    
129     our %LISTENER;
130 root 1.1
131 root 1.5 our $SRCNODE; # holds the sending node during _inject
132    
133 root 1.8 sub NODE() {
134     $NODE
135     }
136    
137     sub node_of($) {
138     my ($noderef, undef) = split /#/, $_[0], 2;
139    
140     $noderef
141     }
142 root 1.1
143     sub _ANY_() { 1 }
144     sub _any_() { \&_ANY_ }
145    
146 root 1.13 sub TRACE() { 1 }
147    
148 root 1.1 sub _inject {
149 root 1.14 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
150 root 1.5 &{ $PORT{+shift} or return };
151 root 1.1 }
152    
153     sub add_node {
154     my ($noderef) = @_;
155    
156     return $NODE{$noderef}
157     if exists $NODE{$noderef};
158    
159     for (split /,/, $noderef) {
160     return $NODE{$noderef} = $NODE{$_}
161     if exists $NODE{$_};
162     }
163    
164 root 1.10 # new node, check validity
165 root 1.13 my $node;
166 root 1.10
167 root 1.13 if ($noderef =~ /^slave\/.+$/) {
168     $node = new AnyEvent::MP::Node::Slave $noderef;
169    
170     } else {
171     for (split /,/, $noderef) {
172     my ($host, $port) = AnyEvent::Socket::parse_hostport $_
173     or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
174    
175     $port > 0
176     or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
177 root 1.10
178 root 1.13 AnyEvent::Socket::parse_address $host
179     or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
180     }
181 root 1.10
182 root 1.13 # TODO: for indirect sends, use a different class
183     $node = new AnyEvent::MP::Node::Direct $noderef;
184 root 1.10 }
185    
186 root 1.1 $NODE{$_} = $node
187     for $noderef, split /,/, $noderef;
188    
189     $node
190     }
191    
192     sub snd(@) {
193 root 1.13 my ($noderef, $portid) = split /#/, shift, 2;
194    
195 root 1.14 warn "SND $noderef <- $portid @_\n" if TRACE;#d#
196 root 1.1
197 root 1.5 ($NODE{$noderef} || add_node $noderef)
198 root 1.13 ->send (["$portid", @_]);
199 root 1.5 }
200    
201 root 1.8 sub kil(@) {
202 root 1.13 my ($noderef, $portid) = split /#/, shift, 2;
203 root 1.5
204 root 1.13 length $portid
205     or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
206 root 1.10
207 root 1.7 ($NODE{$noderef} || add_node $noderef)
208 root 1.13 ->kill ("$portid", @_);
209 root 1.1 }
210    
211 root 1.10 sub resolve_node($) {
212     my ($noderef) = @_;
213    
214     my $cv = AE::cv;
215     my @res;
216    
217     $cv->begin (sub {
218     my %seen;
219     my @refs;
220     for (sort { $a->[0] <=> $b->[0] } @res) {
221     push @refs, $_->[1] unless $seen{$_->[1]}++
222     }
223     shift->send (join ",", @refs);
224     });
225    
226     $noderef = $DEFAULT_PORT unless length $noderef;
227    
228     my $idx;
229     for my $t (split /,/, $noderef) {
230     my $pri = ++$idx;
231    
232     if ($t =~ /^\d*$/) {
233     require POSIX;
234     my $nodename = (POSIX::uname ())[1];
235    
236     $cv->begin;
237     AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
238     for (@_) {
239     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
240     push @res, [
241     $pri += 1e-5,
242     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
243     ];
244     }
245     $cv->end;
246     };
247    
248     # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
249     #
250     # for (@ipv4) {
251     # push @res, [
252     # $pri,
253     # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
254     # ];
255     # }
256     } else {
257     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
258     or Carp::croak "$t: unparsable transport descriptor";
259    
260     $cv->begin;
261     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
262     for (@_) {
263     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
264     push @res, [
265     $pri += 1e-5,
266     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
267     ];
268     }
269     $cv->end;
270     }
271     }
272     }
273    
274     $cv->end;
275    
276     $cv
277     }
278    
279 root 1.12 sub initialise_node($;@) {
280     my ($noderef, @others) = @_;
281 root 1.1
282 root 1.12 if ($noderef =~ /^slave\/(.*)$/) {
283 root 1.13 $SLAVE = AE::cv;
284 root 1.12 my $name = $1;
285 root 1.14 $name = $NODE unless length $name;
286 root 1.12 $noderef = AE::cv;
287     $noderef->send ("slave/$name");
288 root 1.13
289     @others
290     or Carp::croak "seed nodes must be specified for slave nodes";
291    
292 root 1.12 } else {
293 root 1.13 $PUBLIC = 1;
294 root 1.12 $noderef = resolve_node $noderef;
295     }
296    
297     @others = map $_->recv, map +(resolve_node $_), @others;
298 root 1.1
299 root 1.12 $NODE = $noderef->recv;
300 root 1.1
301     for my $t (split /,/, $NODE) {
302     $NODE{$t} = $NODE{""};
303    
304     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
305    
306     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
307     sub {
308     my ($tp) = @_;
309    
310 root 1.5 # TODO: urgs
311     my $node = add_node $tp->{remote_node};
312     $node->{trial}{accept} = $tp;
313 root 1.1 },
314     ;
315     }
316    
317 root 1.13 (add_node $_)->connect for @others;
318    
319     if ($SLAVE) {
320     $SLAVE->recv;
321     $SLAVE = 1;
322     }
323 root 1.1 }
324    
325     #############################################################################
326     # self node code
327    
328 root 1.5 our %node_req = (
329 root 1.13 # internal services
330    
331 root 1.5 # monitoring
332     mon0 => sub { # disable monitoring
333     my $portid = shift;
334     my $node = $SRCNODE;
335     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
336     },
337     mon1 => sub { # enable monitoring
338     my $portid = shift;
339     my $node = $SRCNODE;
340     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
341 root 1.8 $node->send (["", kil => $portid, @_]);
342 root 1.5 });
343     },
344 root 1.7 kil => sub {
345 root 1.8 my $cbs = delete $SRCNODE->{lmon}{+shift}
346 root 1.5 or return;
347    
348 root 1.8 $_->(@_) for @$cbs;
349 root 1.5 },
350 root 1.13 # node changed its name (for slave nodes)
351     iam => sub {
352     $SRCNODE->{noderef} = $_[0];
353     $NODE{$_[0]} = $SRCNODE;
354     },
355    
356     # public services
357 root 1.5
358     # well-known-port lookup
359 root 1.8 lookup => sub {
360     my $name = shift;
361     my $port = $REG{$name};
362     #TODO: check vailidity
363     snd @_, $port;
364 root 1.5 },
365    
366     # relay message to another node / generic echo
367     relay => sub {
368     &snd;
369     },
370    
371     # random garbage
372     eval => sub {
373     my @res = eval shift;
374     snd @_, "$@", @res if @_;
375     },
376     time => sub {
377     snd @_, AE::time;
378     },
379     devnull => sub {
380     #
381     },
382     );
383    
384 root 1.9 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
385 root 1.5 $PORT{""} = sub { &{ $node_req{+shift} or return } };
386 root 1.1
387 root 1.4 =back
388    
389 root 1.1 =head1 SEE ALSO
390    
391     L<AnyEvent::MP>.
392    
393     =head1 AUTHOR
394    
395     Marc Lehmann <schmorp@schmorp.de>
396     http://home.schmorp.de/
397    
398     =cut
399    
400     1
401