ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
Revision: 1.10
Committed: Tue Aug 4 23:16:57 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.9: +88 -2 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Base - basis for AnyEvent::MP and Coro::MP
4
5 =head1 SYNOPSIS
6
7 # use AnyEvent::MP or Coro::MP instead
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 =head1 GLOBALS
16
17 =over 4
18
19 =cut
20
21 package AnyEvent::MP::Base;
22
23 use common::sense;
24 use Carp ();
25 use MIME::Base64 ();
26
27 use AE ();
28
29 use AnyEvent::MP::Node;
30 use AnyEvent::MP::Transport;
31
32 use base "Exporter";
33
34 our $VERSION = '0.01';
35 our @EXPORT = qw(
36 %NODE %PORT %PORT_DATA %REG $UNIQ $ID add_node
37
38 NODE $NODE node_of snd kil _any_
39 resolve_node
40 become_slave become_public
41 );
42
43 our $DEFAULT_SECRET;
44 our $DEFAULT_PORT = "4040";
45
46 our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
47 our $CONNECT_TIMEOUT = 30; # includes handshake
48
49 =item $AnyEvent::MP::Base::WARN
50
51 This value is called with an error or warning message, when e.g. a connection
52 could not be created, authorisation failed and so on.
53
54 The default simply logs the message to STDERR.
55
56 =cut
57
58 our $WARN = sub {
59 my $msg = $_[0];
60 $msg =~ s/\n$//;
61 warn "$msg\n";
62 };
63
64 sub nonce($) {
65 my $nonce;
66
67 if (open my $fh, "</dev/urandom") {
68 sysread $fh, $nonce, $_[0];
69 } else {
70 # shit...
71 our $nonce_init;
72 unless ($nonce_init++) {
73 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
74 }
75
76 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
77 }
78
79 $nonce
80 }
81
82 sub default_secret {
83 unless (defined $DEFAULT_SECRET) {
84 if (open my $fh, "<$ENV{HOME}/.aemp-secret") {
85 sysread $fh, $DEFAULT_SECRET, -s $fh;
86 } else {
87 $DEFAULT_SECRET = nonce 32;
88 }
89 }
90
91 $DEFAULT_SECRET
92 }
93
94 sub gen_uniq {
95 my $uniq = pack "wN", $$, time;
96 $uniq = MIME::Base64::encode_base64 $uniq, "";
97 $uniq =~ s/=+$//;
98 $uniq
99 }
100
101 our $UNIQ = gen_uniq; # per-process/node unique cookie
102 our $ID = "a";
103 our $PUBLIC = 0;
104 our $NODE = unpack "H*", nonce 16;
105
106 our %NODE; # node id to transport mapping, or "undef", for local node
107 our (%PORT, %PORT_DATA); # local ports
108
109 our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
110 our %LMON; # monitored _local_ ports
111
112 our %REG; # registered port names
113
114 our %LISTENER;
115
116 our $SRCNODE; # holds the sending node during _inject
117
118 sub NODE() {
119 $NODE
120 }
121
122 sub node_of($) {
123 my ($noderef, undef) = split /#/, $_[0], 2;
124
125 $noderef
126 }
127
128 sub _ANY_() { 1 }
129 sub _any_() { \&_ANY_ }
130
131 sub _inject {
132 &{ $PORT{+shift} or return };
133 }
134
135 sub add_node {
136 my ($noderef) = @_;
137
138 return $NODE{$noderef}
139 if exists $NODE{$noderef};
140
141 for (split /,/, $noderef) {
142 return $NODE{$noderef} = $NODE{$_}
143 if exists $NODE{$_};
144 }
145
146 # new node, check validity
147
148 for (split /,/, $noderef) {
149 my ($host, $port) = AnyEvent::Socket::parse_hostport $_
150 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
151
152 $port > 0
153 or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
154
155 AnyEvent::Socket::parse_address $host
156 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
157 }
158
159 # TODO: for indirect sends, use a different class
160 my $node = new AnyEvent::MP::Node::Direct $noderef;
161
162 $NODE{$_} = $node
163 for $noderef, split /,/, $noderef;
164
165 $node
166 }
167
168 sub snd(@) {
169 my ($noderef, $port) = split /#/, shift, 2;
170
171 ($NODE{$noderef} || add_node $noderef)
172 ->send ([$port, @_]);
173 }
174
175 sub kil(@) {
176 my ($noderef, $port) = split /#/, shift, 2;
177
178 length $port
179 or Carp::croak "killing the node port is not allowed, caught";
180
181 ($NODE{$noderef} || add_node $noderef)
182 ->kill ($port, @_);
183 }
184
185 sub resolve_node($) {
186 my ($noderef) = @_;
187
188 my $cv = AE::cv;
189 my @res;
190
191 $cv->begin (sub {
192 my %seen;
193 my @refs;
194 for (sort { $a->[0] <=> $b->[0] } @res) {
195 push @refs, $_->[1] unless $seen{$_->[1]}++
196 }
197 shift->send (join ",", @refs);
198 });
199
200 $noderef = $DEFAULT_PORT unless length $noderef;
201
202 my $idx;
203 for my $t (split /,/, $noderef) {
204 my $pri = ++$idx;
205
206 if ($t =~ /^\d*$/) {
207 require POSIX;
208 my $nodename = (POSIX::uname ())[1];
209
210 $cv->begin;
211 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
212 for (@_) {
213 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
214 push @res, [
215 $pri += 1e-5,
216 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
217 ];
218 }
219 $cv->end;
220 };
221
222 # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
223 #
224 # for (@ipv4) {
225 # push @res, [
226 # $pri,
227 # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
228 # ];
229 # }
230 } else {
231 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
232 or Carp::croak "$t: unparsable transport descriptor";
233
234 $cv->begin;
235 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
236 for (@_) {
237 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
238 push @res, [
239 $pri += 1e-5,
240 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
241 ];
242 }
243 $cv->end;
244 }
245 }
246 }
247
248 $cv->end;
249
250 $cv
251 }
252
253 sub become_public {
254 return if $PUBLIC;
255
256 my $noderef = join ",", @_;
257 my @args = @_;
258
259 $NODE = (resolve_node $noderef)->recv;
260
261 for my $t (split /,/, $NODE) {
262 $NODE{$t} = $NODE{""};
263
264 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
265
266 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
267 @args,
268 sub {
269 my ($tp) = @_;
270
271 # TODO: urgs
272 my $node = add_node $tp->{remote_node};
273 $node->{trial}{accept} = $tp;
274 },
275 ;
276 }
277
278 $PUBLIC = 1;
279 }
280
281 #############################################################################
282 # self node code
283
284 our %node_req = (
285 # monitoring
286 mon0 => sub { # disable monitoring
287 my $portid = shift;
288 my $node = $SRCNODE;
289 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
290 },
291 mon1 => sub { # enable monitoring
292 my $portid = shift;
293 my $node = $SRCNODE;
294 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
295 $node->send (["", kil => $portid, @_]);
296 });
297 },
298 kil => sub {
299 my $cbs = delete $SRCNODE->{lmon}{+shift}
300 or return;
301
302 $_->(@_) for @$cbs;
303 },
304
305 # well-known-port lookup
306 lookup => sub {
307 my $name = shift;
308 my $port = $REG{$name};
309 #TODO: check vailidity
310 snd @_, $port;
311 },
312
313 # relay message to another node / generic echo
314 relay => sub {
315 &snd;
316 },
317
318 # random garbage
319 eval => sub {
320 my @res = eval shift;
321 snd @_, "$@", @res if @_;
322 },
323 time => sub {
324 snd @_, AE::time;
325 },
326 devnull => sub {
327 #
328 },
329 );
330
331 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
332 $PORT{""} = sub { &{ $node_req{+shift} or return } };
333
334 =back
335
336 =head1 SEE ALSO
337
338 L<AnyEvent::MP>.
339
340 =head1 AUTHOR
341
342 Marc Lehmann <schmorp@schmorp.de>
343 http://home.schmorp.de/
344
345 =cut
346
347 1
348