ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.30
Committed: Sun Aug 30 13:22:46 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.29: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '0.9';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil
44 port_is_local
45 resolve_node initialise_node
46 known_nodes up_nodes mon_nodes node_is_known node_is_up
47 );
48
49 our $DEFAULT_PORT = "4040";
50
51 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
52 our $NETWORK_LATENCY = 3; # activity timeout
53 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
54
55 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
56
57 This value is called with an error or warning message, when e.g. a
58 connection could not be created, authorisation failed and so on.
59
60 It I<must not> block or send messages -queue it and use an idle watcher if
61 you need to do any of these things.
62
63 C<$level> sould be C<0> for messages ot be logged always, C<1> for
64 unexpected messages and errors, C<2> for warnings, C<7> for messages about
65 node connectivity and services, C<8> for debugging messages and C<9> for
66 tracing messages.
67
68 The default simply logs the message to STDERR.
69
70 =cut
71
72 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
73
74 our $WARN = sub {
75 return if $WARNLEVEL < $_[0];
76
77 my ($level, $msg) = @_;
78
79 $msg =~ s/\n$//;
80
81 printf STDERR "%s <%d> %s\n",
82 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
83 $level,
84 $msg;
85 };
86
87 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
88
89 The maximum level at which warning messages will be printed to STDERR by
90 the default warn handler.
91
92 =cut
93
94 sub load_func($) {
95 my $func = $_[0];
96
97 unless (defined &$func) {
98 my $pkg = $func;
99 do {
100 $pkg =~ s/::[^:]+$//
101 or return sub { die "unable to resolve '$func'" };
102 eval "require $pkg";
103 } until defined &$func;
104 }
105
106 \&$func
107 }
108
109 sub nonce($) {
110 my $nonce;
111
112 if (open my $fh, "</dev/urandom") {
113 sysread $fh, $nonce, $_[0];
114 } else {
115 # shit...
116 our $nonce_init;
117 unless ($nonce_init++) {
118 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
119 }
120
121 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
122 }
123
124 $nonce
125 }
126
127 sub alnumbits($) {
128 my $data = $_[0];
129
130 if (eval "use Math::GMP 2.05; 1") {
131 $data = Math::GMP::get_str_gmp (
132 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
133 62
134 );
135 } else {
136 $data = MIME::Base64::encode_base64 $data, "";
137 $data =~ s/=//;
138 $data =~ s/\//s/g;
139 $data =~ s/\+/p/g;
140 }
141
142 $data
143 }
144
145 sub gen_uniq {
146 alnumbits pack "wNa*", $$, time, nonce 2
147 }
148
149 our $CONFIG; # this node's configuration
150
151 our $RUNIQ = alnumbits nonce 16;; # remote uniq value
152 our $UNIQ = gen_uniq; # per-process/node unique cookie
153 our $NODE = "anon/$RUNIQ";
154 our $ID = "a";
155
156 our %NODE; # node id to transport mapping, or "undef", for local node
157 our (%PORT, %PORT_DATA); # local ports
158
159 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
160 our %LMON; # monitored _local_ ports
161
162 our %LISTENER;
163 our $LISTENER; # our listeners, as arrayref
164
165 our $SRCNODE; # holds the sending node during _inject
166
167 sub NODE() {
168 $NODE
169 }
170
171 sub node_of($) {
172 my ($node, undef) = split /#/, $_[0], 2;
173
174 $node
175 }
176
177 BEGIN {
178 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
179 ? sub () { 1 }
180 : sub () { 0 };
181 }
182
183 sub _inject {
184 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
185 &{ $PORT{+shift} or return };
186 }
187
188 # this function adds a node-ref, so you can send stuff to it
189 # it is basically the central routing component.
190 sub add_node {
191 my ($node) = @_;
192
193 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
194 }
195
196 sub snd(@) {
197 my ($nodeid, $portid) = split /#/, shift, 2;
198
199 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
200
201 ($NODE{$nodeid} || add_node $nodeid)
202 ->{send} (["$portid", @_]);
203 }
204
205 =item $is_local = port_is_local $port
206
207 Returns true iff the port is a local port.
208
209 =cut
210
211 sub port_is_local($) {
212 my ($nodeid, undef) = split /#/, $_[0], 2;
213
214 $NODE{$nodeid} == $NODE{""}
215 }
216
217 =item snd_to_func $node, $func, @args
218
219 Expects a node ID and a name of a function. Asynchronously tries to call
220 this function with the given arguments on that node.
221
222 This function can be used to implement C<spawn>-like interfaces.
223
224 =cut
225
226 sub snd_to_func($$;@) {
227 my $nodeid = shift;
228
229 ($NODE{$nodeid} || add_node $nodeid)
230 ->send (["", @_]);
231 }
232
233 =item snd_on $node, @msg
234
235 Executes C<snd> with the given C<@msg> (which must include the destination
236 port) on the given node.
237
238 =cut
239
240 sub snd_on($@) {
241 my $node = shift;
242 snd $node, snd => @_;
243 }
244
245 =item eval_on $node, $string[, @reply]
246
247 Evaluates the given string as Perl expression on the given node. When
248 @reply is specified, then it is used to construct a reply message with
249 C<"$@"> and any results from the eval appended.
250
251 =cut
252
253 sub eval_on($$;@) {
254 my $node = shift;
255 snd $node, eval => @_;
256 }
257
258 sub kil(@) {
259 my ($nodeid, $portid) = split /#/, shift, 2;
260
261 length $portid
262 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
263
264 ($NODE{$nodeid} || add_node $nodeid)
265 ->kill ("$portid", @_);
266 }
267
268 sub _nodename {
269 require POSIX;
270 (POSIX::uname ())[1]
271 }
272
273 sub _resolve($) {
274 my ($nodeid) = @_;
275
276 my $cv = AE::cv;
277 my @res;
278
279 $cv->begin (sub {
280 my %seen;
281 my @refs;
282 for (sort { $a->[0] <=> $b->[0] } @res) {
283 push @refs, $_->[1] unless $seen{$_->[1]}++
284 }
285 shift->send (@refs);
286 });
287
288 $nodeid = $DEFAULT_PORT unless length $nodeid;
289
290 my $idx;
291 for my $t (split /,/, $nodeid) {
292 my $pri = ++$idx;
293
294 $t = length $t ? _nodename . ":$t" : _nodename
295 if $t =~ /^\d*$/;
296
297 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
298 or Carp::croak "$t: unparsable transport descriptor";
299
300 $cv->begin;
301 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
302 for (@_) {
303 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
304 push @res, [
305 $pri += 1e-5,
306 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
307 ];
308 }
309 $cv->end;
310 };
311 }
312
313 $cv->end;
314
315 $cv
316 }
317
318 sub initialise_node(;$%) {
319 my ($profile) = @_;
320
321 $profile = _nodename
322 unless defined $profile;
323
324 $CONFIG = AnyEvent::MP::Config::find_profile $profile;
325
326 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
327 $NODE = $node
328 unless $node eq "anon/";
329
330 $NODE{$NODE} = $NODE{""};
331 $NODE{$NODE}{id} = $NODE;
332
333 my $seeds = $CONFIG->{seeds};
334 my $binds = $CONFIG->{binds};
335
336 $binds ||= [$NODE];
337
338 $WARN->(8, "node $NODE starting up.");
339
340 $LISTENER = [];
341 %LISTENER = ();
342
343 for (map _resolve $_, @$binds) {
344 for my $bind ($_->recv) {
345 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
346 or Carp::croak "$bind: unparsable local bind address";
347
348 $LISTENER{$bind} = AnyEvent::MP::Transport::mp_server $host, $port;
349 push @$LISTENER, $bind;
350 }
351 }
352
353 # the global service is mandatory currently
354 require AnyEvent::MP::Global;
355
356 # connect to all seednodes
357 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
358
359 for (@{ $CONFIG->{services} }) {
360 if (s/::$//) {
361 eval "require $_";
362 die $@ if $@;
363 } else {
364 (load_func $_)->();
365 }
366 }
367 }
368
369 #############################################################################
370 # node monitoring and info
371
372 =item node_is_known $nodeid
373
374 Returns true iff the given node is currently known to the system.
375
376 =cut
377
378 sub node_is_known($) {
379 exists $NODE{$_[0]}
380 }
381
382 =item node_is_up $nodeid
383
384 Returns true if the given node is "up", that is, the kernel thinks it has
385 a working connection to it.
386
387 If the node is known but not currently connected, returns C<0>. If the
388 node is not known, returns C<undef>.
389
390 =cut
391
392 sub node_is_up($) {
393 ($NODE{$_[0]} or return)->{transport}
394 ? 1 : 0
395 }
396
397 =item known_nodes
398
399 Returns the node IDs of all nodes currently known to this node, including
400 itself and nodes not currently connected.
401
402 =cut
403
404 sub known_nodes {
405 map $_->{id}, values %NODE
406 }
407
408 =item up_nodes
409
410 Return the node IDs of all nodes that are currently connected (excluding
411 the node itself).
412
413 =cut
414
415 sub up_nodes {
416 map $_->{id}, grep $_->{transport}, values %NODE
417 }
418
419 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
420
421 Registers a callback that is called each time a node goes up (a connection
422 is established) or down (the connection is lost).
423
424 Node up messages can only be followed by node down messages for the same
425 node, and vice versa.
426
427 Note that monitoring a node is usually better done by monitoring it's node
428 port. This function is mainly of interest to modules that are concerned
429 about the network topology and low-level connection handling.
430
431 Callbacks I<must not> block and I<should not> send any messages.
432
433 The function returns an optional guard which can be used to unregister
434 the monitoring callback again.
435
436 =cut
437
438 our %MON_NODES;
439
440 sub mon_nodes($) {
441 my ($cb) = @_;
442
443 $MON_NODES{$cb+0} = $cb;
444
445 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
446 }
447
448 sub _inject_nodeevent($$;@) {
449 my ($node, $up, @reason) = @_;
450
451 for my $cb (values %MON_NODES) {
452 eval { $cb->($node->{id}, $up, @reason); 1 }
453 or $WARN->(1, $@);
454 }
455
456 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
457 }
458
459 #############################################################################
460 # self node code
461
462 our %node_req = (
463 # internal services
464
465 # monitoring
466 mon0 => sub { # stop monitoring a port
467 my $portid = shift;
468 my $node = $SRCNODE;
469 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
470 },
471 mon1 => sub { # start monitoring a port
472 my $portid = shift;
473 my $node = $SRCNODE;
474 Scalar::Util::weaken $node; #TODO# ugly
475 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
476 $node->send (["", kil => $portid, @_])
477 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disocnnect
478 });
479 },
480 kil => sub {
481 my $cbs = delete $SRCNODE->{lmon}{+shift}
482 or return;
483
484 $_->(@_) for @$cbs;
485 },
486
487 # "public" services - not actually public
488
489 # relay message to another node / generic echo
490 snd => \&snd,
491 snd_multiple => sub {
492 snd @$_ for @_
493 },
494
495 # informational
496 info => sub {
497 snd @_, $NODE;
498 },
499 known_nodes => sub {
500 snd @_, known_nodes;
501 },
502 up_nodes => sub {
503 snd @_, up_nodes;
504 },
505
506 # random utilities
507 eval => sub {
508 my @res = eval shift;
509 snd @_, "$@", @res if @_;
510 },
511 time => sub {
512 snd @_, AE::time;
513 },
514 devnull => sub {
515 #
516 },
517 "" => sub {
518 # empty messages are keepalives or similar devnull-applications
519 },
520 );
521
522 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
523 $PORT{""} = sub {
524 my $tag = shift;
525 eval { &{ $node_req{$tag} ||= load_func $tag } };
526 $WARN->(2, "error processing node message: $@") if $@;
527 };
528
529 =back
530
531 =head1 SEE ALSO
532
533 L<AnyEvent::MP>.
534
535 =head1 AUTHOR
536
537 Marc Lehmann <schmorp@schmorp.de>
538 http://home.schmorp.de/
539
540 =cut
541
542 1
543