ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.25
Committed: Fri Aug 28 20:57:42 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.24: +2 -2 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. are needed.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '0.8';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil
44 port_is_local
45 resolve_node initialise_node
46 known_nodes up_nodes mon_nodes node_is_known node_is_up
47 );
48
49 our $DEFAULT_PORT = "4040";
50
51 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
52 our $NETWORK_LATENCY = 3; # activity timeout
53 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
54
55 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
56
57 This value is called with an error or warning message, when e.g. a connection
58 could not be created, authorisation failed and so on.
59
60 C<$level> sould be C<0> for messages ot be logged always, C<1> for
61 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62 node connectivity and services, C<8> for debugging messages and C<9> for
63 tracing messages.
64
65 The default simply logs the message to STDERR.
66
67 =cut
68
69 our $WARN = sub {
70 my ($level, $msg) = @_;
71
72 $msg =~ s/\n$//;
73
74 printf STDERR "%s <%d> %s\n",
75 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76 $level,
77 $msg;
78 };
79
80 sub load_func($) {
81 my $func = $_[0];
82
83 unless (defined &$func) {
84 my $pkg = $func;
85 do {
86 $pkg =~ s/::[^:]+$//
87 or return sub { die "unable to resolve '$func'" };
88 eval "require $pkg";
89 } until defined &$func;
90 }
91
92 \&$func
93 }
94
95 sub nonce($) {
96 my $nonce;
97
98 if (open my $fh, "</dev/urandom") {
99 sysread $fh, $nonce, $_[0];
100 } else {
101 # shit...
102 our $nonce_init;
103 unless ($nonce_init++) {
104 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
105 }
106
107 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
108 }
109
110 $nonce
111 }
112
113 sub alnumbits($) {
114 my $data = $_[0];
115
116 if (eval "use Math::GMP 2.05; 1") {
117 $data = Math::GMP::get_str_gmp (
118 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
119 62
120 );
121 } else {
122 $data = MIME::Base64::encode_base64 $data, "";
123 $data =~ s/=//;
124 $data =~ s/\//s/g;
125 $data =~ s/\+/p/g;
126 }
127
128 $data
129 }
130
131 sub gen_uniq {
132 alnumbits pack "wNa*", $$, time, nonce 2
133 }
134
135 =item $AnyEvent::MP::Kernel::PUBLIC
136
137 A boolean indicating whether this is a public node, which can create and
138 accept direct connections from other nodes.
139
140 =cut
141
142 our $CONFIG; # this node's configuration
143
144 our $RUNIQ = alnumbits nonce 16;; # remote uniq value
145 our $UNIQ = gen_uniq; # per-process/node unique cookie
146 our $NODE = "anon/$RUNIQ";
147 our $ID = "a";
148
149 our %NODE; # node id to transport mapping, or "undef", for local node
150 our (%PORT, %PORT_DATA); # local ports
151
152 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
153 our %LMON; # monitored _local_ ports
154
155 our %LISTENER;
156 our $LISTENER; # our listeners, as arrayref
157
158 our $SRCNODE; # holds the sending node during _inject
159
160 sub NODE() {
161 $NODE
162 }
163
164 sub node_of($) {
165 my ($node, undef) = split /#/, $_[0], 2;
166
167 $node
168 }
169
170 BEGIN {
171 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
172 ? sub () { 1 }
173 : sub () { 0 };
174 }
175
176 sub _inject {
177 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
178 &{ $PORT{+shift} or return };
179 }
180
181 # this function adds a node-ref, so you can send stuff to it
182 # it is basically the central routing component.
183 sub add_node {
184 my ($node) = @_;
185
186 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
187 }
188
189 sub snd(@) {
190 my ($nodeid, $portid) = split /#/, shift, 2;
191
192 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
193
194 ($NODE{$nodeid} || add_node $nodeid)
195 ->{send} (["$portid", @_]);
196 }
197
198 =item $is_local = port_is_local $port
199
200 Returns true iff the port is a local port.
201
202 =cut
203
204 sub port_is_local($) {
205 my ($nodeid, undef) = split /#/, $_[0], 2;
206
207 $NODE{$nodeid} == $NODE{""}
208 }
209
210 =item snd_to_func $node, $func, @args
211
212 Expects a node ID and a name of a function. Asynchronously tries to call
213 this function with the given arguments on that node.
214
215 This function can be used to implement C<spawn>-like interfaces.
216
217 =cut
218
219 sub snd_to_func($$;@) {
220 my $nodeid = shift;
221
222 ($NODE{$nodeid} || add_node $nodeid)
223 ->send (["", @_]);
224 }
225
226 =item snd_on $node, @msg
227
228 Executes C<snd> with the given C<@msg> (which must include the destination
229 port) on the given node.
230
231 =cut
232
233 sub snd_on($@) {
234 my $node = shift;
235 snd $node, snd => @_;
236 }
237
238 =item eval_on $node, $string
239
240 Evaluates the given string as Perl expression on the given node.
241
242 =cut
243
244 sub eval_on($@) {
245 my $node = shift;
246 snd $node, eval => @_;
247 }
248
249 sub kil(@) {
250 my ($nodeid, $portid) = split /#/, shift, 2;
251
252 length $portid
253 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
254
255 ($NODE{$nodeid} || add_node $nodeid)
256 ->kill ("$portid", @_);
257 }
258
259 sub _nodename {
260 require POSIX;
261 (POSIX::uname ())[1]
262 }
263
264 sub _resolve($) {
265 my ($nodeid) = @_;
266
267 my $cv = AE::cv;
268 my @res;
269
270 $cv->begin (sub {
271 my %seen;
272 my @refs;
273 for (sort { $a->[0] <=> $b->[0] } @res) {
274 push @refs, $_->[1] unless $seen{$_->[1]}++
275 }
276 shift->send (@refs);
277 });
278
279 $nodeid = $DEFAULT_PORT unless length $nodeid;
280
281 my $idx;
282 for my $t (split /,/, $nodeid) {
283 my $pri = ++$idx;
284
285 $t = length $t ? _nodename . ":$t" : _nodename
286 if $t =~ /^\d*$/;
287
288 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
289 or Carp::croak "$t: unparsable transport descriptor";
290
291 $cv->begin;
292 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
293 for (@_) {
294 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
295 push @res, [
296 $pri += 1e-5,
297 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
298 ];
299 }
300 $cv->end;
301 };
302 }
303
304 $cv->end;
305
306 $cv
307 }
308
309 sub initialise_node(;$%) {
310 my ($profile) = @_;
311
312 $profile = _nodename
313 unless defined $profile;
314
315 $CONFIG = AnyEvent::MP::Config::find_profile $profile;
316
317 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
318 $NODE = $node
319 unless $node eq "anon/";
320
321 $NODE{$NODE} = $NODE{""};
322 $NODE{$NODE}{id} = $NODE;
323
324 my $seeds = $CONFIG->{seeds};
325 my $binds = $CONFIG->{binds};
326
327 $binds ||= [$NODE];
328
329 $WARN->(8, "node $NODE starting up.");
330
331 $LISTENER = [];
332 %LISTENER = ();
333
334 for (map _resolve $_, @$binds) {
335 for my $bind ($_->recv) {
336 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
337 or Carp::croak "$bind: unparsable local bind address";
338
339 $LISTENER{$bind} = AnyEvent::MP::Transport::mp_server $host, $port;
340 push @$LISTENER, $bind;
341 }
342 }
343
344 # the global service is mandatory currently
345 require AnyEvent::MP::Global;
346
347 # connect to all seednodes
348 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
349
350 for (@{ $CONFIG->{services} }) {
351 if (s/::$//) {
352 eval "require $_";
353 die $@ if $@;
354 } else {
355 (load_func $_)->();
356 }
357 }
358 }
359
360 #############################################################################
361 # node monitoring and info
362
363 sub _uniq_nodes {
364 my %node;
365
366 @node{values %NODE} = values %NODE;
367
368 values %node;
369 }
370
371 sub _public_nodes {
372 &_uniq_nodes
373 }
374
375 =item node_is_known $nodeid
376
377 Returns true iff the given node is currently known to the system.
378
379 =cut
380
381 sub node_is_known($) {
382 exists $NODE{$_[0]}
383 }
384
385 =item node_is_up $nodeid
386
387 Returns true if the given node is "up", that is, the kernel thinks it has
388 a working connection to it.
389
390 If the node is known but not currently connected, returns C<0>. If the
391 node is not known, returns C<undef>.
392
393 =cut
394
395 sub node_is_up($) {
396 ($NODE{$_[0]} or return)->{transport}
397 ? 1 : 0
398 }
399
400 =item known_nodes
401
402 Returns the node IDs of all public nodes connected to this node, including
403 itself.
404
405 =cut
406
407 sub known_nodes {
408 map $_->{id}, _public_nodes
409 }
410
411 =item up_nodes
412
413 Return the node IDs of all public nodes that are currently connected
414 (excluding the node itself).
415
416 =cut
417
418 sub up_nodes {
419 map $_->{id}, grep $_->{transport}, _public_nodes
420 }
421
422 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
423
424 Registers a callback that is called each time a node goes up (connection
425 is established) or down (connection is lost).
426
427 Node up messages can only be followed by node down messages for the same
428 node, and vice versa.
429
430 The function returns an optional guard which can be used to de-register
431 the monitoring callback again.
432
433 =cut
434
435 our %MON_NODES;
436
437 sub mon_nodes($) {
438 my ($cb) = @_;
439
440 $MON_NODES{$cb+0} = $cb;
441
442 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
443 }
444
445 sub _inject_nodeevent($$;@) {
446 my ($node, $up, @reason) = @_;
447
448 for my $cb (values %MON_NODES) {
449 eval { $cb->($node->{id}, $up, @reason); 1 }
450 or $WARN->(1, $@);
451 }
452
453 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
454 }
455
456 #############################################################################
457 # self node code
458
459 our %node_req = (
460 # internal services
461
462 # monitoring
463 mon0 => sub { # disable monitoring
464 my $portid = shift;
465 my $node = $SRCNODE;
466 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
467 },
468 mon1 => sub { # enable monitoring
469 my $portid = shift;
470 my $node = $SRCNODE;
471 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
472 $node->send (["", kil => $portid, @_]);
473 });
474 },
475 kil => sub {
476 my $cbs = delete $SRCNODE->{lmon}{+shift}
477 or return;
478
479 $_->(@_) for @$cbs;
480 },
481
482 # "public" services - not actually public
483
484 # relay message to another node / generic echo
485 snd => \&snd,
486 snd_multi => sub {
487 snd @$_ for @_
488 },
489
490 # informational
491 info => sub {
492 snd @_, $NODE;
493 },
494 known_nodes => sub {
495 snd @_, known_nodes;
496 },
497 up_nodes => sub {
498 snd @_, up_nodes;
499 },
500
501 # random garbage
502 eval => sub {
503 my @res = eval shift;
504 snd @_, "$@", @res if @_;
505 },
506 time => sub {
507 snd @_, AE::time;
508 },
509 devnull => sub {
510 #
511 },
512 "" => sub {
513 # empty messages are sent by monitoring
514 },
515 );
516
517 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
518 $PORT{""} = sub {
519 my $tag = shift;
520 eval { &{ $node_req{$tag} ||= load_func $tag } };
521 $WARN->(2, "error processing node message: $@") if $@;
522 };
523
524 =back
525
526 =head1 SEE ALSO
527
528 L<AnyEvent::MP>.
529
530 =head1 AUTHOR
531
532 Marc Lehmann <schmorp@schmorp.de>
533 http://home.schmorp.de/
534
535 =cut
536
537 1
538