ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
Revision: 1.13
Committed: Wed Aug 5 22:40:51 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.12: +50 -19 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Base - basis for AnyEvent::MP and Coro::MP
4    
5     =head1 SYNOPSIS
6    
7     # use AnyEvent::MP or Coro::MP instead
8    
9 root 1.3 =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.4 =head1 GLOBALS
16    
17     =over 4
18    
19 root 1.1 =cut
20    
21     package AnyEvent::MP::Base;
22    
23     use common::sense;
24     use Carp ();
25 root 1.6 use MIME::Base64 ();
26 root 1.1
27     use AE ();
28    
29 root 1.6 use AnyEvent::MP::Node;
30     use AnyEvent::MP::Transport;
31    
32 root 1.1 use base "Exporter";
33    
34     our $VERSION = '0.01';
35 root 1.5 our @EXPORT = qw(
36 root 1.8 %NODE %PORT %PORT_DATA %REG $UNIQ $ID add_node
37    
38     NODE $NODE node_of snd kil _any_
39 root 1.12 resolve_node initialise_node
40 root 1.5 );
41 root 1.1
42     our $DEFAULT_SECRET;
43 root 1.10 our $DEFAULT_PORT = "4040";
44 root 1.1
45     our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
46     our $CONNECT_TIMEOUT = 30; # includes handshake
47    
48 root 1.4 =item $AnyEvent::MP::Base::WARN
49    
50     This value is called with an error or warning message, when e.g. a connection
51     could not be created, authorisation failed and so on.
52    
53     The default simply logs the message to STDERR.
54    
55     =cut
56    
57 root 1.3 our $WARN = sub {
58 root 1.8 my $msg = $_[0];
59     $msg =~ s/\n$//;
60     warn "$msg\n";
61 root 1.3 };
62    
63 root 1.1 sub nonce($) {
64     my $nonce;
65    
66     if (open my $fh, "</dev/urandom") {
67     sysread $fh, $nonce, $_[0];
68     } else {
69     # shit...
70     our $nonce_init;
71     unless ($nonce_init++) {
72     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
73     }
74    
75     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
76     }
77    
78     $nonce
79     }
80    
81     sub default_secret {
82     unless (defined $DEFAULT_SECRET) {
83     if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
84     sysread $fh, $DEFAULT_SECRET, -s $fh;
85     } else {
86 root 1.2 $DEFAULT_SECRET = nonce 32;
87 root 1.1 }
88     }
89    
90     $DEFAULT_SECRET
91     }
92    
93 root 1.6 sub gen_uniq {
94     my $uniq = pack "wN", $$, time;
95     $uniq = MIME::Base64::encode_base64 $uniq, "";
96     $uniq =~ s/=+$//;
97     $uniq
98     }
99    
100 root 1.13 our $NODE = unpack "H*", nonce 16;
101 root 1.6 our $UNIQ = gen_uniq; # per-process/node unique cookie
102 root 1.1 our $ID = "a";
103     our $PUBLIC = 0;
104 root 1.13 our $SLAVE = 0;
105 root 1.1
106     our %NODE; # node id to transport mapping, or "undef", for local node
107 root 1.8 our (%PORT, %PORT_DATA); # local ports
108 root 1.5
109     our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
110     our %LMON; # monitored _local_ ports
111    
112 root 1.8 our %REG; # registered port names
113    
114     our %LISTENER;
115 root 1.1
116 root 1.5 our $SRCNODE; # holds the sending node during _inject
117    
118 root 1.8 sub NODE() {
119     $NODE
120     }
121    
122     sub node_of($) {
123     my ($noderef, undef) = split /#/, $_[0], 2;
124    
125     $noderef
126     }
127 root 1.1
128     sub _ANY_() { 1 }
129     sub _any_() { \&_ANY_ }
130    
131 root 1.13 sub TRACE() { 1 }
132    
133 root 1.1 sub _inject {
134 root 1.13 warn "$SRCNODE->{noderef} -> @_\n" if TRACE;#d#
135 root 1.5 &{ $PORT{+shift} or return };
136 root 1.1 }
137    
138     sub add_node {
139     my ($noderef) = @_;
140    
141     return $NODE{$noderef}
142     if exists $NODE{$noderef};
143    
144     for (split /,/, $noderef) {
145     return $NODE{$noderef} = $NODE{$_}
146     if exists $NODE{$_};
147     }
148    
149 root 1.10 # new node, check validity
150 root 1.13 my $node;
151 root 1.10
152 root 1.13 if ($noderef =~ /^slave\/.+$/) {
153     $node = new AnyEvent::MP::Node::Slave $noderef;
154    
155     } else {
156     for (split /,/, $noderef) {
157     my ($host, $port) = AnyEvent::Socket::parse_hostport $_
158     or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
159    
160     $port > 0
161     or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
162 root 1.10
163 root 1.13 AnyEvent::Socket::parse_address $host
164     or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
165     }
166 root 1.10
167 root 1.13 # TODO: for indirect sends, use a different class
168     $node = new AnyEvent::MP::Node::Direct $noderef;
169 root 1.10 }
170    
171 root 1.1 $NODE{$_} = $node
172     for $noderef, split /,/, $noderef;
173    
174     $node
175     }
176    
177     sub snd(@) {
178 root 1.13 my ($noderef, $portid) = split /#/, shift, 2;
179    
180     warn "$noderef <- $portid @_\n" if TRACE;#d#
181 root 1.1
182 root 1.5 ($NODE{$noderef} || add_node $noderef)
183 root 1.13 ->send (["$portid", @_]);
184 root 1.5 }
185    
186 root 1.8 sub kil(@) {
187 root 1.13 my ($noderef, $portid) = split /#/, shift, 2;
188 root 1.5
189 root 1.13 length $portid
190     or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
191 root 1.10
192 root 1.7 ($NODE{$noderef} || add_node $noderef)
193 root 1.13 ->kill ("$portid", @_);
194 root 1.1 }
195    
196 root 1.10 sub resolve_node($) {
197     my ($noderef) = @_;
198    
199     my $cv = AE::cv;
200     my @res;
201    
202     $cv->begin (sub {
203     my %seen;
204     my @refs;
205     for (sort { $a->[0] <=> $b->[0] } @res) {
206     push @refs, $_->[1] unless $seen{$_->[1]}++
207     }
208     shift->send (join ",", @refs);
209     });
210    
211     $noderef = $DEFAULT_PORT unless length $noderef;
212    
213     my $idx;
214     for my $t (split /,/, $noderef) {
215     my $pri = ++$idx;
216    
217     if ($t =~ /^\d*$/) {
218     require POSIX;
219     my $nodename = (POSIX::uname ())[1];
220    
221     $cv->begin;
222     AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
223     for (@_) {
224     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
225     push @res, [
226     $pri += 1e-5,
227     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
228     ];
229     }
230     $cv->end;
231     };
232    
233     # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
234     #
235     # for (@ipv4) {
236     # push @res, [
237     # $pri,
238     # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
239     # ];
240     # }
241     } else {
242     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
243     or Carp::croak "$t: unparsable transport descriptor";
244    
245     $cv->begin;
246     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
247     for (@_) {
248     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
249     push @res, [
250     $pri += 1e-5,
251     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
252     ];
253     }
254     $cv->end;
255     }
256     }
257     }
258    
259     $cv->end;
260    
261     $cv
262     }
263    
264 root 1.12 sub initialise_node($;@) {
265     my ($noderef, @others) = @_;
266 root 1.1
267 root 1.12 if ($noderef =~ /^slave\/(.*)$/) {
268 root 1.13 $SLAVE = AE::cv;
269 root 1.12 my $name = $1;
270     $name = $UNIQ unless length $name;
271     $noderef = AE::cv;
272     $noderef->send ("slave/$name");
273 root 1.13
274     @others
275     or Carp::croak "seed nodes must be specified for slave nodes";
276    
277 root 1.12 } else {
278 root 1.13 $PUBLIC = 1;
279 root 1.12 $noderef = resolve_node $noderef;
280     }
281    
282     @others = map $_->recv, map +(resolve_node $_), @others;
283 root 1.1
284 root 1.12 $NODE = $noderef->recv;
285 root 1.1
286     for my $t (split /,/, $NODE) {
287     $NODE{$t} = $NODE{""};
288    
289     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
290    
291     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
292     sub {
293     my ($tp) = @_;
294    
295 root 1.5 # TODO: urgs
296     my $node = add_node $tp->{remote_node};
297     $node->{trial}{accept} = $tp;
298 root 1.1 },
299     ;
300     }
301    
302 root 1.13 (add_node $_)->connect for @others;
303    
304     if ($SLAVE) {
305     $SLAVE->recv;
306     $SLAVE = 1;
307     }
308 root 1.1 }
309    
310     #############################################################################
311     # self node code
312    
313 root 1.5 our %node_req = (
314 root 1.13 # internal services
315    
316 root 1.5 # monitoring
317     mon0 => sub { # disable monitoring
318     my $portid = shift;
319     my $node = $SRCNODE;
320     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
321     },
322     mon1 => sub { # enable monitoring
323     my $portid = shift;
324     my $node = $SRCNODE;
325     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
326 root 1.8 $node->send (["", kil => $portid, @_]);
327 root 1.5 });
328     },
329 root 1.7 kil => sub {
330 root 1.8 my $cbs = delete $SRCNODE->{lmon}{+shift}
331 root 1.5 or return;
332    
333 root 1.8 $_->(@_) for @$cbs;
334 root 1.5 },
335 root 1.13 # node changed its name (for slave nodes)
336     iam => sub {
337     $SRCNODE->{noderef} = $_[0];
338     $NODE{$_[0]} = $SRCNODE;
339     },
340    
341     # public services
342 root 1.5
343     # well-known-port lookup
344 root 1.8 lookup => sub {
345     my $name = shift;
346     my $port = $REG{$name};
347     #TODO: check vailidity
348     snd @_, $port;
349 root 1.5 },
350    
351     # relay message to another node / generic echo
352     relay => sub {
353     &snd;
354     },
355    
356     # random garbage
357     eval => sub {
358     my @res = eval shift;
359     snd @_, "$@", @res if @_;
360     },
361     time => sub {
362     snd @_, AE::time;
363     },
364     devnull => sub {
365     #
366     },
367     );
368    
369 root 1.9 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
370 root 1.5 $PORT{""} = sub { &{ $node_req{+shift} or return } };
371 root 1.1
372 root 1.4 =back
373    
374 root 1.1 =head1 SEE ALSO
375    
376     L<AnyEvent::MP>.
377    
378     =head1 AUTHOR
379    
380     Marc Lehmann <schmorp@schmorp.de>
381     http://home.schmorp.de/
382    
383     =cut
384    
385     1
386