ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.7
Committed: Thu Aug 13 15:29:59 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.6: +25 -43 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. are needed.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use Carp ();
28 use MIME::Base64 ();
29
30 use AE ();
31
32 use AnyEvent::MP::Node;
33 use AnyEvent::MP::Transport;
34
35 use base "Exporter";
36
37 our $VERSION = '0.6';
38 our @EXPORT = qw(
39 %NODE %PORT %PORT_DATA %REG $UNIQ $RUNIQ $ID add_node load_func
40
41 NODE $NODE node_of snd kil _any_
42 resolve_node initialise_node
43 );
44
45 our $DEFAULT_PORT = "4040";
46
47 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
48 our $NETWORK_LATENCY = 3; # activity timeout
49 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
50
51 =item $AnyEvent::MP::Kernel::WARN
52
53 This value is called with an error or warning message, when e.g. a connection
54 could not be created, authorisation failed and so on.
55
56 The default simply logs the message to STDERR.
57
58 =cut
59
60 our $WARN = sub {
61 my $msg = $_[0];
62 $msg =~ s/\n$//;
63 warn "$msg\n";
64 };
65
66 sub load_func($) {
67 my $func = $_[0];
68
69 unless (defined &$func) {
70 my $pkg = $func;
71 do {
72 $pkg =~ s/::[^:]+$//
73 or return sub { die "unable to resolve '$func'" };
74 eval "require $pkg";
75 } until defined &$func;
76 }
77
78 \&$func
79 }
80
81 sub nonce($) {
82 my $nonce;
83
84 if (open my $fh, "</dev/urandom") {
85 sysread $fh, $nonce, $_[0];
86 } else {
87 # shit...
88 our $nonce_init;
89 unless ($nonce_init++) {
90 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
91 }
92
93 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
94 }
95
96 $nonce
97 }
98
99 sub asciibits($) {
100 my $data = $_[0];
101
102 if (eval "use Math::GMP 2.05; 1") {
103 $data = Math::GMP::get_str_gmp (
104 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
105 62
106 );
107 } else {
108 $data = MIME::Base64::encode_base64 $data, "";
109 $data =~ s/=//;
110 $data =~ s/\//s/g;
111 $data =~ s/\+/p/g;
112 }
113
114 $data
115 }
116
117 sub gen_uniq {
118 asciibits pack "wNa*", $$, time, nonce 2
119 }
120
121 =item $AnyEvent::MP::Kernel::PUBLIC
122
123 A boolean indicating whether this is a full/public node, which can create
124 and accept direct connections form othe rnodes.
125
126 =item $AnyEvent::MP::Kernel::SLAVE
127
128 A boolean indicating whether this node is a slave node, i.e. does most of it's
129 message sending/receiving through some master node.
130
131 =item $AnyEvent::MP::Kernel::MASTER
132
133 Defined only in slave mode, in which cas eit contains the noderef of the
134 master node.
135
136 =cut
137
138 our $PUBLIC = 0;
139 our $SLAVE = 0;
140 our $MASTER; # master noderef when $SLAVE
141
142 our $NODE = asciibits nonce 16;
143 our $RUNIQ = $NODE; # remote uniq value
144 our $UNIQ = gen_uniq; # per-process/node unique cookie
145 our $ID = "a";
146
147 our %NODE; # node id to transport mapping, or "undef", for local node
148 our (%PORT, %PORT_DATA); # local ports
149
150 our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
151 our %LMON; # monitored _local_ ports
152
153 our %LISTENER;
154
155 our $SRCNODE; # holds the sending node during _inject
156
157 sub NODE() {
158 $NODE
159 }
160
161 sub node_of($) {
162 my ($noderef, undef) = split /#/, $_[0], 2;
163
164 $noderef
165 }
166
167 sub _ANY_() { 1 }
168 sub _any_() { \&_ANY_ }
169
170 sub TRACE() { 0 }
171
172 sub _inject {
173 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
174 &{ $PORT{+shift} or return };
175 }
176
177 sub add_node {
178 my ($noderef) = @_;
179
180 return $NODE{$noderef}
181 if exists $NODE{$noderef};
182
183 for (split /,/, $noderef) {
184 return $NODE{$noderef} = $NODE{$_}
185 if exists $NODE{$_};
186 }
187
188 # new node, check validity
189 my $node;
190
191 if ($noderef =~ /^slave\/.+$/) {
192 $node = new AnyEvent::MP::Node::Indirect $noderef;
193
194 } else {
195 for (split /,/, $noderef) {
196 my ($host, $port) = AnyEvent::Socket::parse_hostport $_
197 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
198
199 $port > 0
200 or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
201
202 AnyEvent::Socket::parse_address $host
203 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
204 }
205
206 # TODO: for indirect sends, use a different class
207 $node = new AnyEvent::MP::Node::Direct $noderef;
208 }
209
210 $NODE{$_} = $node
211 for $noderef, split /,/, $noderef;
212
213 $node
214 }
215
216 sub snd(@) {
217 my ($noderef, $portid) = split /#/, shift, 2;
218
219 warn "SND $noderef <- $portid @_\n" if TRACE;#d#
220
221 ($NODE{$noderef} || add_node $noderef)
222 ->{send} (["$portid", @_]);
223 }
224
225 sub kil(@) {
226 my ($noderef, $portid) = split /#/, shift, 2;
227
228 length $portid
229 or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
230
231 ($NODE{$noderef} || add_node $noderef)
232 ->kill ("$portid", @_);
233 }
234
235 sub _nodename {
236 require POSIX;
237 (POSIX::uname ())[1]
238 }
239
240 sub resolve_node($) {
241 my ($noderef) = @_;
242
243 my $cv = AE::cv;
244 my @res;
245
246 $cv->begin (sub {
247 my %seen;
248 my @refs;
249 for (sort { $a->[0] <=> $b->[0] } @res) {
250 push @refs, $_->[1] unless $seen{$_->[1]}++
251 }
252 shift->send (join ",", @refs);
253 });
254
255 $noderef = $DEFAULT_PORT unless length $noderef;
256
257 my $idx;
258 for my $t (split /,/, $noderef) {
259 my $pri = ++$idx;
260
261 $t = length $t ? _nodename . ":$t" : _nodename
262 if $t =~ /^\d*$/;
263
264 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
265 or Carp::croak "$t: unparsable transport descriptor";
266
267 $cv->begin;
268 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
269 for (@_) {
270 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
271 push @res, [
272 $pri += 1e-5,
273 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
274 ];
275 }
276 $cv->end;
277 };
278 }
279
280 $cv->end;
281
282 $cv
283 }
284
285 sub initialise_node(@) {
286 my ($noderef, @others) = @_;
287
288 my $profile = AnyEvent::MP::Config::find_profile
289 +(defined $noderef ? $noderef : _nodename);
290
291 $noderef = $profile->{noderef}
292 if exists $profile->{noderef};
293
294 @others = @{ $profile->{seeds} };
295
296 if ($noderef =~ /^slave\/(.*)$/) {
297 $SLAVE = AE::cv;
298 my $name = $1;
299 $name = $NODE unless length $name;
300 $noderef = AE::cv;
301 $noderef->send ("slave/$name");
302
303 @others
304 or Carp::croak "seed nodes must be specified for slave nodes";
305
306 } else {
307 $PUBLIC = 1;
308 $noderef = resolve_node $noderef;
309 }
310
311 @others = map $_->recv, map +(resolve_node $_), @others;
312
313 $NODE = $noderef->recv;
314
315 for my $t (split /,/, $NODE) {
316 $NODE{$t} = $NODE{""};
317
318 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
319
320 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
321 sub {
322 my ($tp) = @_;
323
324 # TODO: urgs
325 my $node = add_node $tp->{remote_node};
326 $node->{trial}{accept} = $tp;
327 },
328 ;
329 }
330
331 (add_node $_)->connect for @others;
332
333 if ($SLAVE) {
334 my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
335 $MASTER = $SLAVE->recv;
336 defined $MASTER
337 or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
338
339 (my $via = $MASTER) =~ s/,/!/g;
340
341 $NODE .= "\@$via";
342 $NODE{$NODE} = $NODE{""};
343
344 $_->send (["", iam => $NODE])
345 for values %NODE;
346
347 $SLAVE = 1;
348 }
349
350 (load_func $_)->()
351 for @{ $profile->{services} };
352 }
353
354 #############################################################################
355 # node monitoring and info
356
357 sub _uniq_nodes {
358 my %node;
359
360 @node{values %NODE} = values %NODE;
361
362 values %node;
363 }
364
365 =item known_nodes
366
367 Returns the noderefs of all nodes connected to this node.
368
369 =cut
370
371 sub known_nodes {
372 map $_->{noderef}, _uniq_nodes
373 }
374
375 =item up_nodes
376
377 Return the noderefs of all nodes that are currently connected (excluding
378 the node itself).
379
380 =cut
381
382 sub up_nodes {
383 map $_->{noderef}, grep $_->{transport}, _uniq_nodes
384 }
385
386 =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
387
388 Registers a callback that is called each time a node goes up (connection
389 is established) or down (connection is lost).
390
391 Node up messages can only be followed by node down messages for the same
392 node, and vice versa.
393
394 The fucntino returns an optional guard which can be used to de-register
395 the monitoring callback again.
396
397 =cut
398
399 our %MON_NODES;
400
401 sub mon_nodes($) {
402 my ($cb) = @_;
403
404 $MON_NODES{$cb+0} = $cb;
405
406 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
407 }
408
409 sub _inject_nodeevent($$;@) {
410 my ($node, @args) = @_;
411
412 unshift @args, $node->{noderef};
413
414 for my $cb (values %MON_NODES) {
415 eval { $cb->(@args); 1 }
416 or $WARN->($@);
417 }
418 }
419
420 #############################################################################
421 # self node code
422
423 our %node_req = (
424 # internal services
425
426 # monitoring
427 mon0 => sub { # disable monitoring
428 my $portid = shift;
429 my $node = $SRCNODE;
430 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
431 },
432 mon1 => sub { # enable monitoring
433 my $portid = shift;
434 my $node = $SRCNODE;
435 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
436 $node->send (["", kil => $portid, @_]);
437 });
438 },
439 kil => sub {
440 my $cbs = delete $SRCNODE->{lmon}{+shift}
441 or return;
442
443 $_->(@_) for @$cbs;
444 },
445 # node changed its name (for slave nodes)
446 iam => sub {
447 $SRCNODE->{noderef} = $_[0];
448 $NODE{$_[0]} = $SRCNODE;
449 },
450
451 # public services
452
453 # relay message to another node / generic echo
454 relay => sub {
455 &snd;
456 },
457 relay_multiple => sub {
458 snd @$_ for @_
459 },
460
461 # informational
462 info => sub {
463 snd @_, $NODE;
464 },
465 known_nodes => sub {
466 snd @_, known_nodes;
467 },
468 up_nodes => sub {
469 snd @_, up_nodes;
470 },
471
472 # random garbage
473 eval => sub {
474 my @res = eval shift;
475 snd @_, "$@", @res if @_;
476 },
477 time => sub {
478 snd @_, AE::time;
479 },
480 devnull => sub {
481 #
482 },
483 );
484
485 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
486 $PORT{""} = sub {
487 my $tag = shift;
488 eval { &{ $node_req{$tag} ||= load_func $tag } };
489 $WARN->("error processing node message: $@") if $@;
490 };
491
492 =back
493
494 =head1 SEE ALSO
495
496 L<AnyEvent::MP>.
497
498 =head1 AUTHOR
499
500 Marc Lehmann <schmorp@schmorp.de>
501 http://home.schmorp.de/
502
503 =cut
504
505 1
506