ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
Revision: 1.11
Committed: Tue Aug 4 23:35:51 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.10: +2 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Base - basis for AnyEvent::MP and Coro::MP
4    
5     =head1 SYNOPSIS
6    
7     # use AnyEvent::MP or Coro::MP instead
8    
9 root 1.3 =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.4 =head1 GLOBALS
16    
17     =over 4
18    
19 root 1.1 =cut
20    
21     package AnyEvent::MP::Base;
22    
23     use common::sense;
24     use Carp ();
25 root 1.6 use MIME::Base64 ();
26 root 1.1
27     use AE ();
28    
29 root 1.6 use AnyEvent::MP::Node;
30     use AnyEvent::MP::Transport;
31    
32 root 1.1 use base "Exporter";
33    
34     our $VERSION = '0.01';
35 root 1.5 our @EXPORT = qw(
36 root 1.8 %NODE %PORT %PORT_DATA %REG $UNIQ $ID add_node
37    
38     NODE $NODE node_of snd kil _any_
39 root 1.10 resolve_node
40 root 1.5 become_slave become_public
41     );
42 root 1.1
43     our $DEFAULT_SECRET;
44 root 1.10 our $DEFAULT_PORT = "4040";
45 root 1.1
46     our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
47     our $CONNECT_TIMEOUT = 30; # includes handshake
48    
49 root 1.4 =item $AnyEvent::MP::Base::WARN
50    
51     This value is called with an error or warning message, when e.g. a connection
52     could not be created, authorisation failed and so on.
53    
54     The default simply logs the message to STDERR.
55    
56     =cut
57    
58 root 1.3 our $WARN = sub {
59 root 1.8 my $msg = $_[0];
60     $msg =~ s/\n$//;
61     warn "$msg\n";
62 root 1.3 };
63    
64 root 1.1 sub nonce($) {
65     my $nonce;
66    
67     if (open my $fh, "</dev/urandom") {
68     sysread $fh, $nonce, $_[0];
69     } else {
70     # shit...
71     our $nonce_init;
72     unless ($nonce_init++) {
73     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
74     }
75    
76     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
77     }
78    
79     $nonce
80     }
81    
82     sub default_secret {
83     unless (defined $DEFAULT_SECRET) {
84     if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
85     sysread $fh, $DEFAULT_SECRET, -s $fh;
86     } else {
87 root 1.2 $DEFAULT_SECRET = nonce 32;
88 root 1.1 }
89     }
90    
91     $DEFAULT_SECRET
92     }
93    
94 root 1.6 sub gen_uniq {
95     my $uniq = pack "wN", $$, time;
96     $uniq = MIME::Base64::encode_base64 $uniq, "";
97     $uniq =~ s/=+$//;
98     $uniq
99     }
100    
101     our $UNIQ = gen_uniq; # per-process/node unique cookie
102 root 1.1 our $ID = "a";
103     our $PUBLIC = 0;
104 root 1.9 our $NODE = unpack "H*", nonce 16;
105 root 1.1
106     our %NODE; # node id to transport mapping, or "undef", for local node
107 root 1.8 our (%PORT, %PORT_DATA); # local ports
108 root 1.5
109     our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
110     our %LMON; # monitored _local_ ports
111    
112 root 1.8 our %REG; # registered port names
113    
114     our %LISTENER;
115 root 1.1
116 root 1.5 our $SRCNODE; # holds the sending node during _inject
117    
118 root 1.8 sub NODE() {
119     $NODE
120     }
121    
122     sub node_of($) {
123     my ($noderef, undef) = split /#/, $_[0], 2;
124    
125     $noderef
126     }
127 root 1.1
128     sub _ANY_() { 1 }
129     sub _any_() { \&_ANY_ }
130    
131     sub _inject {
132 root 1.5 &{ $PORT{+shift} or return };
133 root 1.1 }
134    
135     sub add_node {
136     my ($noderef) = @_;
137    
138     return $NODE{$noderef}
139     if exists $NODE{$noderef};
140    
141     for (split /,/, $noderef) {
142     return $NODE{$noderef} = $NODE{$_}
143     if exists $NODE{$_};
144     }
145    
146 root 1.10 # new node, check validity
147    
148     for (split /,/, $noderef) {
149     my ($host, $port) = AnyEvent::Socket::parse_hostport $_
150     or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
151    
152     $port > 0
153     or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
154    
155     AnyEvent::Socket::parse_address $host
156     or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
157     }
158    
159     # TODO: for indirect sends, use a different class
160 root 1.1 my $node = new AnyEvent::MP::Node::Direct $noderef;
161    
162     $NODE{$_} = $node
163     for $noderef, split /,/, $noderef;
164    
165     $node
166     }
167    
168     sub snd(@) {
169     my ($noderef, $port) = split /#/, shift, 2;
170    
171 root 1.5 ($NODE{$noderef} || add_node $noderef)
172     ->send ([$port, @_]);
173     }
174    
175 root 1.8 sub kil(@) {
176 root 1.5 my ($noderef, $port) = split /#/, shift, 2;
177    
178 root 1.11 length $port or Carp::cluck "yuk\n";#d#
179 root 1.10 length $port
180 root 1.11 or Carp::croak "$noderef: killing a node port is not allowed, caught";
181 root 1.10
182 root 1.7 ($NODE{$noderef} || add_node $noderef)
183     ->kill ($port, @_);
184 root 1.1 }
185    
186 root 1.10 sub resolve_node($) {
187     my ($noderef) = @_;
188    
189     my $cv = AE::cv;
190     my @res;
191    
192     $cv->begin (sub {
193     my %seen;
194     my @refs;
195     for (sort { $a->[0] <=> $b->[0] } @res) {
196     push @refs, $_->[1] unless $seen{$_->[1]}++
197     }
198     shift->send (join ",", @refs);
199     });
200    
201     $noderef = $DEFAULT_PORT unless length $noderef;
202    
203     my $idx;
204     for my $t (split /,/, $noderef) {
205     my $pri = ++$idx;
206    
207     if ($t =~ /^\d*$/) {
208     require POSIX;
209     my $nodename = (POSIX::uname ())[1];
210    
211     $cv->begin;
212     AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
213     for (@_) {
214     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
215     push @res, [
216     $pri += 1e-5,
217     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
218     ];
219     }
220     $cv->end;
221     };
222    
223     # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
224     #
225     # for (@ipv4) {
226     # push @res, [
227     # $pri,
228     # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
229     # ];
230     # }
231     } else {
232     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
233     or Carp::croak "$t: unparsable transport descriptor";
234    
235     $cv->begin;
236     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
237     for (@_) {
238     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
239     push @res, [
240     $pri += 1e-5,
241     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
242     ];
243     }
244     $cv->end;
245     }
246     }
247     }
248    
249     $cv->end;
250    
251     $cv
252     }
253    
254 root 1.1 sub become_public {
255     return if $PUBLIC;
256    
257     my $noderef = join ",", @_;
258     my @args = @_;
259    
260 root 1.10 $NODE = (resolve_node $noderef)->recv;
261 root 1.1
262     for my $t (split /,/, $NODE) {
263     $NODE{$t} = $NODE{""};
264    
265     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
266    
267     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
268     @args,
269     sub {
270     my ($tp) = @_;
271    
272 root 1.5 # TODO: urgs
273     my $node = add_node $tp->{remote_node};
274     $node->{trial}{accept} = $tp;
275 root 1.1 },
276     ;
277     }
278    
279     $PUBLIC = 1;
280     }
281    
282     #############################################################################
283     # self node code
284    
285 root 1.5 our %node_req = (
286     # monitoring
287     mon0 => sub { # disable monitoring
288     my $portid = shift;
289     my $node = $SRCNODE;
290     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
291     },
292     mon1 => sub { # enable monitoring
293     my $portid = shift;
294     my $node = $SRCNODE;
295     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
296 root 1.8 $node->send (["", kil => $portid, @_]);
297 root 1.5 });
298     },
299 root 1.7 kil => sub {
300 root 1.8 my $cbs = delete $SRCNODE->{lmon}{+shift}
301 root 1.5 or return;
302    
303 root 1.8 $_->(@_) for @$cbs;
304 root 1.5 },
305    
306     # well-known-port lookup
307 root 1.8 lookup => sub {
308     my $name = shift;
309     my $port = $REG{$name};
310     #TODO: check vailidity
311     snd @_, $port;
312 root 1.5 },
313    
314     # relay message to another node / generic echo
315     relay => sub {
316     &snd;
317     },
318    
319     # random garbage
320     eval => sub {
321     my @res = eval shift;
322     snd @_, "$@", @res if @_;
323     },
324     time => sub {
325     snd @_, AE::time;
326     },
327     devnull => sub {
328     #
329     },
330     );
331    
332 root 1.9 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
333 root 1.5 $PORT{""} = sub { &{ $node_req{+shift} or return } };
334 root 1.1
335 root 1.4 =back
336    
337 root 1.1 =head1 SEE ALSO
338    
339     L<AnyEvent::MP>.
340    
341     =head1 AUTHOR
342    
343     Marc Lehmann <schmorp@schmorp.de>
344     http://home.schmorp.de/
345    
346     =cut
347    
348     1
349