ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP/Kernel.pm
Revision: 1.20
Committed: Thu Aug 27 07:12:48 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.19: +68 -97 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. are needed.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '0.8';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 connect_node add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil
44 port_is_local
45 resolve_node initialise_node
46 known_nodes up_nodes mon_nodes node_is_known node_is_up
47 );
48
49 our $DEFAULT_PORT = "4040";
50
51 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
52 our $NETWORK_LATENCY = 3; # activity timeout
53 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
54
55 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
56
57 This value is called with an error or warning message, when e.g. a connection
58 could not be created, authorisation failed and so on.
59
60 C<$level> sould be C<0> for messages ot be logged always, C<1> for
61 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62 node connectivity and services, C<8> for debugging messages and C<9> for
63 tracing messages.
64
65 The default simply logs the message to STDERR.
66
67 =cut
68
69 our $WARN = sub {
70 my ($level, $msg) = @_;
71
72 $msg =~ s/\n$//;
73
74 printf STDERR "%s <%d> %s\n",
75 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76 $level,
77 $msg;
78 };
79
80 sub load_func($) {
81 my $func = $_[0];
82
83 unless (defined &$func) {
84 my $pkg = $func;
85 do {
86 $pkg =~ s/::[^:]+$//
87 or return sub { die "unable to resolve '$func'" };
88 eval "require $pkg";
89 } until defined &$func;
90 }
91
92 \&$func
93 }
94
95 sub nonce($) {
96 my $nonce;
97
98 if (open my $fh, "</dev/urandom") {
99 sysread $fh, $nonce, $_[0];
100 } else {
101 # shit...
102 our $nonce_init;
103 unless ($nonce_init++) {
104 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
105 }
106
107 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
108 }
109
110 $nonce
111 }
112
113 sub asciibits($) {
114 my $data = $_[0];
115
116 if (eval "use Math::GMP 2.05; 1") {
117 $data = Math::GMP::get_str_gmp (
118 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
119 62
120 );
121 } else {
122 $data = MIME::Base64::encode_base64 $data, "";
123 $data =~ s/=//;
124 $data =~ s/\//s/g;
125 $data =~ s/\+/p/g;
126 }
127
128 $data
129 }
130
131 sub gen_uniq {
132 asciibits pack "wNa*", $$, time, nonce 2
133 }
134
135 =item $AnyEvent::MP::Kernel::PUBLIC
136
137 A boolean indicating whether this is a full/public node, which can create
138 and accept direct connections form othe rnodes.
139
140 =item $AnyEvent::MP::Kernel::SLAVE
141
142 A boolean indicating whether this node is a slave node, i.e. does most of it's
143 message sending/receiving through some master node.
144
145 =item $AnyEvent::MP::Kernel::MASTER
146
147 Defined only in slave mode, in which case it contains the noderef of the
148 master node.
149
150 =cut
151
152 our $CONFIG; # this node's configuration
153 our $PUBLIC = 0;
154 our $SLAVE = "";
155 our $MASTER; # master noderef when $SLAVE
156
157 our $NODE = asciibits nonce 16;
158 our $NODEID = $NODE; # same as NODE, except slave nodes have no @master part
159 our $RUNIQ = $NODE; # remote uniq value
160 our $UNIQ = gen_uniq; # per-process/node unique cookie
161 our $ID = "a";
162
163 our %NODE; # node id to transport mapping, or "undef", for local node
164 our (%PORT, %PORT_DATA); # local ports
165
166 our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
167 our %LMON; # monitored _local_ ports
168
169 our %LISTENER;
170
171 our $SRCNODE; # holds the sending node during _inject
172
173 sub NODE() {
174 $NODE
175 }
176
177 sub node_of($) {
178 my ($noderef, undef) = split /#/, $_[0], 2;
179
180 $noderef
181 }
182
183 BEGIN {
184 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
185 ? sub () { 1 }
186 : sub () { 0 };
187 }
188
189 sub _inject {
190 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE && @_;#d#
191 &{ $PORT{+shift} or return };
192 }
193
194 # this function adds a node-ref, so you can send stuff to it
195 # it is basically the central routing component.
196 sub add_node {
197 my ($noderef) = @_;
198
199 $NODE{$noderef} ||= do {
200 # new node, check validity
201 my $node;
202
203 if ($noderef =~ /^slave\/.+$/) {
204 # slave node without routing part -> direct connection
205 # only really valid from transports
206 $node = new AnyEvent::MP::Node::Direct $noderef;
207
208 } else {
209 # direct node (or slave node without routing part)
210
211 for (split /,/, $noderef) {
212 my ($host, $port) = AnyEvent::Socket::parse_hostport $_
213 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
214
215 $port > 0
216 or Carp::croak "$noderef: not a resolved node reference ('$_' contains invalid port)";
217
218 AnyEvent::Socket::parse_address $host
219 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
220 }
221
222 $node = new AnyEvent::MP::Node::Direct $noderef;
223 }
224
225 $node
226 }
227 }
228
229 sub connect_node {
230 &add_node->connect;
231 }
232
233 sub snd(@) {
234 my ($noderef, $portid) = split /#/, shift, 2;
235
236 warn "SND $noderef <- $portid @_\n" if TRACE;#d#
237
238 ($NODE{$noderef} || add_node $noderef)
239 ->{send} (["$portid", @_]);
240 }
241
242 =item $is_local = port_is_local $port
243
244 Returns true iff the port is a local port.
245
246 =cut
247
248 sub port_is_local($) {
249 my ($noderef, undef) = split /#/, $_[0], 2;
250
251 $NODE{$noderef} == $NODE{""}
252 }
253
254 =item snd_to_func $node, $func, @args
255
256 Expects a noderef and a name of a function. Asynchronously tries to call
257 this function with the given arguments on that node.
258
259 This function can be used to implement C<spawn>-like interfaces.
260
261 =cut
262
263 sub snd_to_func($$;@) {
264 my $noderef = shift;
265
266 ($NODE{$noderef} || add_node $noderef)
267 ->send (["", @_]);
268 }
269
270 =item snd_on $node, @msg
271
272 Executes C<snd> with the given C<@msg> (which must include the destination
273 port) on the given node.
274
275 =cut
276
277 sub snd_on($@) {
278 my $node = shift;
279 snd $node, snd => @_;
280 }
281
282 =item eval_on $node, $string
283
284 Evaluates the given string as Perl expression on the given node.
285
286 =cut
287
288 sub eval_on($@) {
289 my $node = shift;
290 snd $node, eval => @_;
291 }
292
293 sub kil(@) {
294 my ($noderef, $portid) = split /#/, shift, 2;
295
296 length $portid
297 or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
298
299 ($NODE{$noderef} || add_node $noderef)
300 ->kill ("$portid", @_);
301 }
302
303 sub _nodename {
304 require POSIX;
305 (POSIX::uname ())[1]
306 }
307
308 sub resolve_node($) {
309 my ($noderef) = @_;
310
311 my $cv = AE::cv;
312 my @res;
313
314 $cv->begin (sub {
315 my %seen;
316 my @refs;
317 for (sort { $a->[0] <=> $b->[0] } @res) {
318 push @refs, $_->[1] unless $seen{$_->[1]}++
319 }
320 shift->send (join ",", @refs);
321 });
322
323 $noderef = $DEFAULT_PORT unless length $noderef;
324
325 my $idx;
326 for my $t (split /,/, $noderef) {
327 my $pri = ++$idx;
328
329 $t = length $t ? _nodename . ":$t" : _nodename
330 if $t =~ /^\d*$/;
331
332 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
333 or Carp::croak "$t: unparsable transport descriptor";
334
335 $cv->begin;
336 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
337 for (@_) {
338 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
339 push @res, [
340 $pri += 1e-5,
341 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
342 ];
343 }
344 $cv->end;
345 };
346 }
347
348 $cv->end;
349
350 $cv
351 }
352
353 sub initialise_node(@) {
354 my ($noderef, @others) = @_;
355
356 $CONFIG = AnyEvent::MP::Config::find_profile
357 +(defined $noderef ? $noderef : _nodename);
358
359 $noderef = $CONFIG->{noderef}
360 if exists $CONFIG->{noderef};
361
362 push @others, @{ $CONFIG->{seeds} };
363
364 @others = map $_->recv, map +(resolve_node $_), @others;
365
366 if ($noderef =~ /^slave\/(.*)$/) {
367 my $name = $1;
368 $name = $NODE unless length $name;
369
370 @others
371 or Carp::croak "seed nodes must be specified for slave nodes";
372
373 $SLAVE = 1;
374 $NODE = "slave/$name";
375
376 } else {
377 $PUBLIC = 1;
378 $NODE = (resolve_node $noderef)->recv;
379 }
380
381 $NODE{$NODE} = $NODE{""};
382 $NODE{$NODE}{noderef} = $NODE;
383
384 unless ($SLAVE) {
385 for my $t (split /,/, $NODE) {
386 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
387
388 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
389 sub {
390 my ($tp) = @_;
391
392 # TODO: urgs
393 my $node = add_node $tp->{remote_node};
394 $node->{trial}{accept} = $tp;
395 },
396 ;
397 }
398 }
399
400 for (@others) {
401 my $node = add_node $_;
402 $node->{autoconnect} = 1;
403 $node->connect;
404 }
405
406 for (@{ $CONFIG->{services} }) {
407 if (s/::$//) {
408 eval "require $_";
409 die $@ if $@;
410 } else {
411 (load_func $_)->();
412 }
413 }
414
415 # slave nodes need global
416 require AnyEvent::MP::Global
417 if $SLAVE;
418 }
419
420 #############################################################################
421 # node monitoring and info
422
423 sub _uniq_nodes {
424 my %node;
425
426 @node{values %NODE} = values %NODE;
427
428 values %node;
429 }
430
431 sub _public_nodes {
432 grep $_->{noderef} !~ /^slave\//, _uniq_nodes
433 }
434
435 =item node_is_known $noderef
436
437 Returns true iff the given node is currently known to the system.
438
439 =cut
440
441 sub node_is_known($) {
442 exists $NODE{$_[0]}
443 }
444
445 =item node_is_up $noderef
446
447 Returns true if the given node is "up", that is, the kernel thinks it has
448 a working connection to it.
449
450 If the node is known but not currently connected, returns C<0>. If the
451 node is not known, returns C<undef>.
452
453 =cut
454
455 sub node_is_up($) {
456 ($NODE{$_[0]} or return)->{transport}
457 ? 1 : 0
458 }
459
460 =item known_nodes
461
462 Returns the noderefs of all public nodes connected to this node, including
463 itself.
464
465 =cut
466
467 sub known_nodes {
468 map $_->{noderef}, _public_nodes
469 }
470
471 =item up_nodes
472
473 Return the noderefs of all public nodes that are currently connected
474 (excluding the node itself).
475
476 =cut
477
478 sub up_nodes {
479 map $_->{noderef}, grep $_->{transport}, _public_nodes
480 }
481
482 =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
483
484 Registers a callback that is called each time a node goes up (connection
485 is established) or down (connection is lost).
486
487 Node up messages can only be followed by node down messages for the same
488 node, and vice versa.
489
490 The function returns an optional guard which can be used to de-register
491 the monitoring callback again.
492
493 =cut
494
495 our %MON_NODES;
496
497 sub mon_nodes($) {
498 my ($cb) = @_;
499
500 $MON_NODES{$cb+0} = $cb;
501
502 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
503 }
504
505 sub _inject_nodeevent($$;@) {
506 my ($node, $up, @reason) = @_;
507
508 for my $cb (values %MON_NODES) {
509 eval { $cb->($node->{noderef}, $up, @reason); 1 }
510 or $WARN->(1, $@);
511 }
512
513 $WARN->(7, "$node->{noderef} is " . ($up ? "up" : "down") . " (@reason)");
514 }
515
516 #############################################################################
517 # self node code
518
519 our %node_req = (
520 # internal services
521
522 # monitoring
523 mon0 => sub { # disable monitoring
524 my $portid = shift;
525 my $node = $SRCNODE;
526 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
527 },
528 mon1 => sub { # enable monitoring
529 my $portid = shift;
530 my $node = $SRCNODE;
531 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
532 $node->send (["", kil => $portid, @_]);
533 });
534 },
535 kil => sub {
536 my $cbs = delete $SRCNODE->{lmon}{+shift}
537 or return;
538
539 $_->(@_) for @$cbs;
540 },
541
542 # "public" services - not actually public
543
544 # relay message to another node / generic echo
545 snd => \&snd,
546 snd_multi => sub {
547 snd @$_ for @_
548 },
549
550 # informational
551 info => sub {
552 snd @_, $NODE;
553 },
554 known_nodes => sub {
555 snd @_, known_nodes;
556 },
557 up_nodes => sub {
558 snd @_, up_nodes;
559 },
560
561 # random garbage
562 eval => sub {
563 my @res = eval shift;
564 snd @_, "$@", @res if @_;
565 },
566 time => sub {
567 snd @_, AE::time;
568 },
569 devnull => sub {
570 #
571 },
572 "" => sub {
573 # empty messages are sent by monitoring
574 },
575 );
576
577 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
578 $PORT{""} = sub {
579 my $tag = shift;
580 eval { &{ $node_req{$tag} ||= load_func $tag } };
581 $WARN->(2, "error processing node message: $@") if $@;
582 };
583
584 =back
585
586 =head1 SEE ALSO
587
588 L<AnyEvent::MP>.
589
590 =head1 AUTHOR
591
592 Marc Lehmann <schmorp@schmorp.de>
593 http://home.schmorp.de/
594
595 =cut
596
597 1
598