ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.28
Committed: Sat Aug 29 16:48:18 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-0_9
Changes since 1.27: +1 -1 lines
Log Message:
0.9

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '0.9';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil
44 port_is_local
45 resolve_node initialise_node
46 known_nodes up_nodes mon_nodes node_is_known node_is_up
47 );
48
49 our $DEFAULT_PORT = "4040";
50
51 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
52 our $NETWORK_LATENCY = 3; # activity timeout
53 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
54
55 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
56
57 This value is called with an error or warning message, when e.g. a
58 connection could not be created, authorisation failed and so on.
59
60 It I<must not> block or send messages -queue it and use an idle watcher if
61 you need to do any of these things.
62
63 C<$level> sould be C<0> for messages ot be logged always, C<1> for
64 unexpected messages and errors, C<2> for warnings, C<7> for messages about
65 node connectivity and services, C<8> for debugging messages and C<9> for
66 tracing messages.
67
68 The default simply logs the message to STDERR.
69
70 =cut
71
72 our $WARN = sub {
73 my ($level, $msg) = @_;
74
75 $msg =~ s/\n$//;
76
77 printf STDERR "%s <%d> %s\n",
78 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
79 $level,
80 $msg;
81 };
82
83 sub load_func($) {
84 my $func = $_[0];
85
86 unless (defined &$func) {
87 my $pkg = $func;
88 do {
89 $pkg =~ s/::[^:]+$//
90 or return sub { die "unable to resolve '$func'" };
91 eval "require $pkg";
92 } until defined &$func;
93 }
94
95 \&$func
96 }
97
98 sub nonce($) {
99 my $nonce;
100
101 if (open my $fh, "</dev/urandom") {
102 sysread $fh, $nonce, $_[0];
103 } else {
104 # shit...
105 our $nonce_init;
106 unless ($nonce_init++) {
107 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
108 }
109
110 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
111 }
112
113 $nonce
114 }
115
116 sub alnumbits($) {
117 my $data = $_[0];
118
119 if (eval "use Math::GMP 2.05; 1") {
120 $data = Math::GMP::get_str_gmp (
121 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
122 62
123 );
124 } else {
125 $data = MIME::Base64::encode_base64 $data, "";
126 $data =~ s/=//;
127 $data =~ s/\//s/g;
128 $data =~ s/\+/p/g;
129 }
130
131 $data
132 }
133
134 sub gen_uniq {
135 alnumbits pack "wNa*", $$, time, nonce 2
136 }
137
138 our $CONFIG; # this node's configuration
139
140 our $RUNIQ = alnumbits nonce 16;; # remote uniq value
141 our $UNIQ = gen_uniq; # per-process/node unique cookie
142 our $NODE = "anon/$RUNIQ";
143 our $ID = "a";
144
145 our %NODE; # node id to transport mapping, or "undef", for local node
146 our (%PORT, %PORT_DATA); # local ports
147
148 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
149 our %LMON; # monitored _local_ ports
150
151 our %LISTENER;
152 our $LISTENER; # our listeners, as arrayref
153
154 our $SRCNODE; # holds the sending node during _inject
155
156 sub NODE() {
157 $NODE
158 }
159
160 sub node_of($) {
161 my ($node, undef) = split /#/, $_[0], 2;
162
163 $node
164 }
165
166 BEGIN {
167 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
168 ? sub () { 1 }
169 : sub () { 0 };
170 }
171
172 sub _inject {
173 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
174 &{ $PORT{+shift} or return };
175 }
176
177 # this function adds a node-ref, so you can send stuff to it
178 # it is basically the central routing component.
179 sub add_node {
180 my ($node) = @_;
181
182 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
183 }
184
185 sub snd(@) {
186 my ($nodeid, $portid) = split /#/, shift, 2;
187
188 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
189
190 ($NODE{$nodeid} || add_node $nodeid)
191 ->{send} (["$portid", @_]);
192 }
193
194 =item $is_local = port_is_local $port
195
196 Returns true iff the port is a local port.
197
198 =cut
199
200 sub port_is_local($) {
201 my ($nodeid, undef) = split /#/, $_[0], 2;
202
203 $NODE{$nodeid} == $NODE{""}
204 }
205
206 =item snd_to_func $node, $func, @args
207
208 Expects a node ID and a name of a function. Asynchronously tries to call
209 this function with the given arguments on that node.
210
211 This function can be used to implement C<spawn>-like interfaces.
212
213 =cut
214
215 sub snd_to_func($$;@) {
216 my $nodeid = shift;
217
218 ($NODE{$nodeid} || add_node $nodeid)
219 ->send (["", @_]);
220 }
221
222 =item snd_on $node, @msg
223
224 Executes C<snd> with the given C<@msg> (which must include the destination
225 port) on the given node.
226
227 =cut
228
229 sub snd_on($@) {
230 my $node = shift;
231 snd $node, snd => @_;
232 }
233
234 =item eval_on $node, $string
235
236 Evaluates the given string as Perl expression on the given node.
237
238 =cut
239
240 sub eval_on($@) {
241 my $node = shift;
242 snd $node, eval => @_;
243 }
244
245 sub kil(@) {
246 my ($nodeid, $portid) = split /#/, shift, 2;
247
248 length $portid
249 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
250
251 ($NODE{$nodeid} || add_node $nodeid)
252 ->kill ("$portid", @_);
253 }
254
255 sub _nodename {
256 require POSIX;
257 (POSIX::uname ())[1]
258 }
259
260 sub _resolve($) {
261 my ($nodeid) = @_;
262
263 my $cv = AE::cv;
264 my @res;
265
266 $cv->begin (sub {
267 my %seen;
268 my @refs;
269 for (sort { $a->[0] <=> $b->[0] } @res) {
270 push @refs, $_->[1] unless $seen{$_->[1]}++
271 }
272 shift->send (@refs);
273 });
274
275 $nodeid = $DEFAULT_PORT unless length $nodeid;
276
277 my $idx;
278 for my $t (split /,/, $nodeid) {
279 my $pri = ++$idx;
280
281 $t = length $t ? _nodename . ":$t" : _nodename
282 if $t =~ /^\d*$/;
283
284 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
285 or Carp::croak "$t: unparsable transport descriptor";
286
287 $cv->begin;
288 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
289 for (@_) {
290 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
291 push @res, [
292 $pri += 1e-5,
293 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
294 ];
295 }
296 $cv->end;
297 };
298 }
299
300 $cv->end;
301
302 $cv
303 }
304
305 sub initialise_node(;$%) {
306 my ($profile) = @_;
307
308 $profile = _nodename
309 unless defined $profile;
310
311 $CONFIG = AnyEvent::MP::Config::find_profile $profile;
312
313 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
314 $NODE = $node
315 unless $node eq "anon/";
316
317 $NODE{$NODE} = $NODE{""};
318 $NODE{$NODE}{id} = $NODE;
319
320 my $seeds = $CONFIG->{seeds};
321 my $binds = $CONFIG->{binds};
322
323 $binds ||= [$NODE];
324
325 $WARN->(8, "node $NODE starting up.");
326
327 $LISTENER = [];
328 %LISTENER = ();
329
330 for (map _resolve $_, @$binds) {
331 for my $bind ($_->recv) {
332 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
333 or Carp::croak "$bind: unparsable local bind address";
334
335 $LISTENER{$bind} = AnyEvent::MP::Transport::mp_server $host, $port;
336 push @$LISTENER, $bind;
337 }
338 }
339
340 # the global service is mandatory currently
341 require AnyEvent::MP::Global;
342
343 # connect to all seednodes
344 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
345
346 for (@{ $CONFIG->{services} }) {
347 if (s/::$//) {
348 eval "require $_";
349 die $@ if $@;
350 } else {
351 (load_func $_)->();
352 }
353 }
354 }
355
356 #############################################################################
357 # node monitoring and info
358
359 =item node_is_known $nodeid
360
361 Returns true iff the given node is currently known to the system.
362
363 =cut
364
365 sub node_is_known($) {
366 exists $NODE{$_[0]}
367 }
368
369 =item node_is_up $nodeid
370
371 Returns true if the given node is "up", that is, the kernel thinks it has
372 a working connection to it.
373
374 If the node is known but not currently connected, returns C<0>. If the
375 node is not known, returns C<undef>.
376
377 =cut
378
379 sub node_is_up($) {
380 ($NODE{$_[0]} or return)->{transport}
381 ? 1 : 0
382 }
383
384 =item known_nodes
385
386 Returns the node IDs of all nodes currently known to this node, including
387 itself and nodes not currently connected.
388
389 =cut
390
391 sub known_nodes {
392 map $_->{id}, values %NODE
393 }
394
395 =item up_nodes
396
397 Return the node IDs of all nodes that are currently connected (excluding
398 the node itself).
399
400 =cut
401
402 sub up_nodes {
403 map $_->{id}, grep $_->{transport}, values %NODE
404 }
405
406 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
407
408 Registers a callback that is called each time a node goes up (a connection
409 is established) or down (the connection is lost).
410
411 Node up messages can only be followed by node down messages for the same
412 node, and vice versa.
413
414 Note that monitoring a node is usually better done by monitoring it's node
415 port. This function is mainly of interest to modules that are concerned
416 about the network topology and low-level connection handling.
417
418 Callbacks I<must not> block and I<should not> send any messages.
419
420 The function returns an optional guard which can be used to unregister
421 the monitoring callback again.
422
423 =cut
424
425 our %MON_NODES;
426
427 sub mon_nodes($) {
428 my ($cb) = @_;
429
430 $MON_NODES{$cb+0} = $cb;
431
432 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
433 }
434
435 sub _inject_nodeevent($$;@) {
436 my ($node, $up, @reason) = @_;
437
438 for my $cb (values %MON_NODES) {
439 eval { $cb->($node->{id}, $up, @reason); 1 }
440 or $WARN->(1, $@);
441 }
442
443 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
444 }
445
446 #############################################################################
447 # self node code
448
449 our %node_req = (
450 # internal services
451
452 # monitoring
453 mon0 => sub { # stop monitoring a port
454 my $portid = shift;
455 my $node = $SRCNODE;
456 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
457 },
458 mon1 => sub { # start monitoring a port
459 my $portid = shift;
460 my $node = $SRCNODE;
461 Scalar::Util::weaken $node; #TODO# ugly
462 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
463 $node->send (["", kil => $portid, @_])
464 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disocnnect
465 });
466 },
467 kil => sub {
468 my $cbs = delete $SRCNODE->{lmon}{+shift}
469 or return;
470
471 $_->(@_) for @$cbs;
472 },
473
474 # "public" services - not actually public
475
476 # relay message to another node / generic echo
477 snd => \&snd,
478 snd_multiple => sub {
479 snd @$_ for @_
480 },
481
482 # informational
483 info => sub {
484 snd @_, $NODE;
485 },
486 known_nodes => sub {
487 snd @_, known_nodes;
488 },
489 up_nodes => sub {
490 snd @_, up_nodes;
491 },
492
493 # random garbage
494 eval => sub {
495 my @res = eval shift;
496 snd @_, "$@", @res if @_;
497 },
498 time => sub {
499 snd @_, AE::time;
500 },
501 devnull => sub {
502 #
503 },
504 "" => sub {
505 # empty messages are keepalives or similar devnull-applications
506 },
507 );
508
509 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
510 $PORT{""} = sub {
511 my $tag = shift;
512 eval { &{ $node_req{$tag} ||= load_func $tag } };
513 $WARN->(2, "error processing node message: $@") if $@;
514 };
515
516 =back
517
518 =head1 SEE ALSO
519
520 L<AnyEvent::MP>.
521
522 =head1 AUTHOR
523
524 Marc Lehmann <schmorp@schmorp.de>
525 http://home.schmorp.de/
526
527 =cut
528
529 1
530