ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.2
Committed: Wed Aug 12 22:03:15 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.1: +2 -2 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - basis for AnyEvent::MP and Coro::MP
4
5 =head1 SYNOPSIS
6
7 # use AnyEvent::MP or Coro::MP instead
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 =head1 GLOBALS
16
17 =over 4
18
19 =cut
20
21 package AnyEvent::MP::Kernel;
22
23 use common::sense;
24 use Carp ();
25 use MIME::Base64 ();
26
27 use AE ();
28
29 use AnyEvent::MP::Node;
30 use AnyEvent::MP::Transport;
31
32 use base "Exporter";
33
34 our $VERSION = '0.4';
35 our @EXPORT = qw(
36 %NODE %PORT %PORT_DATA %REG $UNIQ $RUNIQ $ID add_node load_func
37
38 NODE $NODE node_of snd kil _any_
39 resolve_node initialise_node
40 );
41
42 our $DEFAULT_PORT = "4040";
43
44 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
45 our $NETWORK_LATENCY = 3; # activity timeout
46 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
47
48 =item $AnyEvent::MP::Kernel::WARN
49
50 This value is called with an error or warning message, when e.g. a connection
51 could not be created, authorisation failed and so on.
52
53 The default simply logs the message to STDERR.
54
55 =cut
56
57 our $WARN = sub {
58 my $msg = $_[0];
59 $msg =~ s/\n$//;
60 warn "$msg\n";
61 };
62
63 sub nonce($) {
64 my $nonce;
65
66 if (open my $fh, "</dev/urandom") {
67 sysread $fh, $nonce, $_[0];
68 } else {
69 # shit...
70 our $nonce_init;
71 unless ($nonce_init++) {
72 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
73 }
74
75 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
76 }
77
78 $nonce
79 }
80
81 sub asciibits($) {
82 my $data = $_[0];
83
84 if (eval "use Math::GMP 2.05; 1") {
85 $data = Math::GMP::get_str_gmp (
86 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
87 62
88 );
89 } else {
90 $data = MIME::Base64::encode_base64 $data, "";
91 $data =~ s/=//;
92 $data =~ s/\//s/g;
93 $data =~ s/\+/p/g;
94 }
95
96 $data
97 }
98
99 sub gen_uniq {
100 asciibits pack "wNa*", $$, time, nonce 2
101 }
102
103 our $PUBLIC = 0;
104 our $SLAVE = 0;
105 our $MASTER; # master noderef when $SLAVE
106
107 our $NODE = asciibits nonce 16;
108 our $RUNIQ = $NODE; # remote uniq value
109 our $UNIQ = gen_uniq; # per-process/node unique cookie
110 our $ID = "a";
111
112 our %NODE; # node id to transport mapping, or "undef", for local node
113 our (%PORT, %PORT_DATA); # local ports
114
115 our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
116 our %LMON; # monitored _local_ ports
117
118 our %LISTENER;
119
120 our $SRCNODE; # holds the sending node during _inject
121
122 sub NODE() {
123 $NODE
124 }
125
126 sub node_of($) {
127 my ($noderef, undef) = split /#/, $_[0], 2;
128
129 $noderef
130 }
131
132 sub _ANY_() { 1 }
133 sub _any_() { \&_ANY_ }
134
135 sub TRACE() { 0 }
136
137 sub _inject {
138 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
139 &{ $PORT{+shift} or return };
140 }
141
142 sub add_node {
143 my ($noderef) = @_;
144
145 return $NODE{$noderef}
146 if exists $NODE{$noderef};
147
148 for (split /,/, $noderef) {
149 return $NODE{$noderef} = $NODE{$_}
150 if exists $NODE{$_};
151 }
152
153 # new node, check validity
154 my $node;
155
156 if ($noderef =~ /^slave\/.+$/) {
157 $node = new AnyEvent::MP::Node::Indirect $noderef;
158
159 } else {
160 for (split /,/, $noderef) {
161 my ($host, $port) = AnyEvent::Socket::parse_hostport $_
162 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
163
164 $port > 0
165 or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
166
167 AnyEvent::Socket::parse_address $host
168 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
169 }
170
171 # TODO: for indirect sends, use a different class
172 $node = new AnyEvent::MP::Node::Direct $noderef;
173 }
174
175 $NODE{$_} = $node
176 for $noderef, split /,/, $noderef;
177
178 $node
179 }
180
181 sub snd(@) {
182 my ($noderef, $portid) = split /#/, shift, 2;
183
184 warn "SND $noderef <- $portid @_\n" if TRACE;#d#
185
186 ($NODE{$noderef} || add_node $noderef)
187 ->{send} (["$portid", @_]);
188 }
189
190 sub kil(@) {
191 my ($noderef, $portid) = split /#/, shift, 2;
192
193 length $portid
194 or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
195
196 ($NODE{$noderef} || add_node $noderef)
197 ->kill ("$portid", @_);
198 }
199
200 sub resolve_node($) {
201 my ($noderef) = @_;
202
203 my $cv = AE::cv;
204 my @res;
205
206 $cv->begin (sub {
207 my %seen;
208 my @refs;
209 for (sort { $a->[0] <=> $b->[0] } @res) {
210 push @refs, $_->[1] unless $seen{$_->[1]}++
211 }
212 shift->send (join ",", @refs);
213 });
214
215 $noderef = $DEFAULT_PORT unless length $noderef;
216
217 my $idx;
218 for my $t (split /,/, $noderef) {
219 my $pri = ++$idx;
220
221 if ($t =~ /^\d*$/) {
222 require POSIX;
223 my $nodename = (POSIX::uname ())[1];
224
225 $cv->begin;
226 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
227 for (@_) {
228 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
229 push @res, [
230 $pri += 1e-5,
231 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
232 ];
233 }
234 $cv->end;
235 };
236
237 # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
238 #
239 # for (@ipv4) {
240 # push @res, [
241 # $pri,
242 # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
243 # ];
244 # }
245 } else {
246 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
247 or Carp::croak "$t: unparsable transport descriptor";
248
249 $cv->begin;
250 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
251 for (@_) {
252 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
253 push @res, [
254 $pri += 1e-5,
255 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
256 ];
257 }
258 $cv->end;
259 }
260 }
261 }
262
263 $cv->end;
264
265 $cv
266 }
267
268 sub initialise_node(@) {
269 my ($noderef, @others) = @_;
270
271 if ($noderef =~ /^slave\/(.*)$/) {
272 $SLAVE = AE::cv;
273 my $name = $1;
274 $name = $NODE unless length $name;
275 $noderef = AE::cv;
276 $noderef->send ("slave/$name");
277
278 @others
279 or Carp::croak "seed nodes must be specified for slave nodes";
280
281 } else {
282 $PUBLIC = 1;
283 $noderef = resolve_node $noderef;
284 }
285
286 @others = map $_->recv, map +(resolve_node $_), @others;
287
288 $NODE = $noderef->recv;
289
290 for my $t (split /,/, $NODE) {
291 $NODE{$t} = $NODE{""};
292
293 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
294
295 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
296 sub {
297 my ($tp) = @_;
298
299 # TODO: urgs
300 my $node = add_node $tp->{remote_node};
301 $node->{trial}{accept} = $tp;
302 },
303 ;
304 }
305
306 (add_node $_)->connect for @others;
307
308 if ($SLAVE) {
309 my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
310 $MASTER = $SLAVE->recv;
311 defined $MASTER
312 or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
313
314 (my $via = $MASTER) =~ s/,/!/g;
315
316 $NODE .= "\@$via";
317 $NODE{$NODE} = $NODE{""};
318
319 $_->send (["", iam => $NODE])
320 for values %NODE;
321
322 $SLAVE = 1;
323 }
324 }
325
326 #############################################################################
327 # self node code
328
329 sub load_func($) {
330 my $func = $_[0];
331
332 unless (defined &$func) {
333 my $pkg = $func;
334 do {
335 $pkg =~ s/::[^:]+$//
336 or return sub { die "unable to resolve '$func'" };
337 eval "require $pkg";
338 } until defined &$func;
339 }
340
341 \&$func
342 }
343
344 our %node_req = (
345 # internal services
346
347 # monitoring
348 mon0 => sub { # disable monitoring
349 my $portid = shift;
350 my $node = $SRCNODE;
351 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
352 },
353 mon1 => sub { # enable monitoring
354 my $portid = shift;
355 my $node = $SRCNODE;
356 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
357 $node->send (["", kil => $portid, @_]);
358 });
359 },
360 kil => sub {
361 my $cbs = delete $SRCNODE->{lmon}{+shift}
362 or return;
363
364 $_->(@_) for @$cbs;
365 },
366 # node changed its name (for slave nodes)
367 iam => sub {
368 $SRCNODE->{noderef} = $_[0];
369 $NODE{$_[0]} = $SRCNODE;
370 },
371
372 # public services
373
374 # relay message to another node / generic echo
375 relay => sub {
376 &snd;
377 },
378 relay_multiple => sub {
379 snd @$_ for @_
380 },
381
382 # random garbage
383 eval => sub {
384 my @res = eval shift;
385 snd @_, "$@", @res if @_;
386 },
387 time => sub {
388 snd @_, AE::time;
389 },
390 devnull => sub {
391 #
392 },
393 );
394
395 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
396 $PORT{""} = sub {
397 my $tag = shift;
398 eval { &{ $node_req{$tag} ||= load_func $tag } };
399 $WARN->("error processing node message: $@") if $@;
400 };
401
402 =back
403
404 =head1 SEE ALSO
405
406 L<AnyEvent::MP>.
407
408 =head1 AUTHOR
409
410 Marc Lehmann <schmorp@schmorp.de>
411 http://home.schmorp.de/
412
413 =cut
414
415 1
416