ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.23
Committed: Thu Aug 27 23:46:33 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.22: +3 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. are needed.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '0.8';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil
44 port_is_local
45 resolve_node initialise_node
46 known_nodes up_nodes mon_nodes node_is_known node_is_up
47 );
48
49 our $DEFAULT_PORT = "4040";
50
51 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
52 our $NETWORK_LATENCY = 3; # activity timeout
53 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
54
55 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
56
57 This value is called with an error or warning message, when e.g. a connection
58 could not be created, authorisation failed and so on.
59
60 C<$level> sould be C<0> for messages ot be logged always, C<1> for
61 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62 node connectivity and services, C<8> for debugging messages and C<9> for
63 tracing messages.
64
65 The default simply logs the message to STDERR.
66
67 =cut
68
69 our $WARN = sub {
70 my ($level, $msg) = @_;
71
72 $msg =~ s/\n$//;
73
74 printf STDERR "%s <%d> %s\n",
75 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76 $level,
77 $msg;
78 };
79
80 sub load_func($) {
81 my $func = $_[0];
82
83 unless (defined &$func) {
84 my $pkg = $func;
85 do {
86 $pkg =~ s/::[^:]+$//
87 or return sub { die "unable to resolve '$func'" };
88 eval "require $pkg";
89 } until defined &$func;
90 }
91
92 \&$func
93 }
94
95 sub nonce($) {
96 my $nonce;
97
98 if (open my $fh, "</dev/urandom") {
99 sysread $fh, $nonce, $_[0];
100 } else {
101 # shit...
102 our $nonce_init;
103 unless ($nonce_init++) {
104 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
105 }
106
107 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
108 }
109
110 $nonce
111 }
112
113 sub alnumbits($) {
114 my $data = $_[0];
115
116 if (eval "use Math::GMP 2.05; 1") {
117 $data = Math::GMP::get_str_gmp (
118 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
119 62
120 );
121 } else {
122 $data = MIME::Base64::encode_base64 $data, "";
123 $data =~ s/=//;
124 $data =~ s/\//s/g;
125 $data =~ s/\+/p/g;
126 }
127
128 $data
129 }
130
131 sub gen_uniq {
132 alnumbits pack "wNa*", $$, time, nonce 2
133 }
134
135 =item $AnyEvent::MP::Kernel::PUBLIC
136
137 A boolean indicating whether this is a public node, which can create and
138 accept direct connections from other nodes.
139
140 =cut
141
142 our $CONFIG; # this node's configuration
143
144 our $RUNIQ = alnumbits nonce 16;; # remote uniq value
145 our $UNIQ = gen_uniq; # per-process/node unique cookie
146 our $NODE = "anon/$RUNIQ";
147 our $ID = "a";
148
149 our %NODE; # node id to transport mapping, or "undef", for local node
150 our (%PORT, %PORT_DATA); # local ports
151
152 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
153 our %LMON; # monitored _local_ ports
154
155 our %LISTENER;
156 our $LISTENER; # our listeners, as arrayref
157
158 our $SRCNODE; # holds the sending node during _inject
159
160 sub NODE() {
161 $NODE
162 }
163
164 sub node_of($) {
165 my ($node, undef) = split /#/, $_[0], 2;
166
167 $node
168 }
169
170 BEGIN {
171 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
172 ? sub () { 1 }
173 : sub () { 0 };
174 }
175
176 sub _inject {
177 warn "RCV $SRCNODE->{id} -> @_\n" if TRACE && @_;#d#
178 &{ $PORT{+shift} or return };
179 }
180
181 # this function adds a node-ref, so you can send stuff to it
182 # it is basically the central routing component.
183 sub add_node {
184 my ($node) = @_;
185
186 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
187 }
188
189 sub snd(@) {
190 my ($nodeid, $portid) = split /#/, shift, 2;
191
192 warn "SND $nodeid <- $portid @_\n" if TRACE;#d#
193
194 ($NODE{$nodeid} || add_node $nodeid)
195 ->{send} (["$portid", @_]);
196 }
197
198 =item $is_local = port_is_local $port
199
200 Returns true iff the port is a local port.
201
202 =cut
203
204 sub port_is_local($) {
205 my ($nodeid, undef) = split /#/, $_[0], 2;
206
207 $NODE{$nodeid} == $NODE{""}
208 }
209
210 =item snd_to_func $node, $func, @args
211
212 Expects a node ID and a name of a function. Asynchronously tries to call
213 this function with the given arguments on that node.
214
215 This function can be used to implement C<spawn>-like interfaces.
216
217 =cut
218
219 sub snd_to_func($$;@) {
220 my $nodeid = shift;
221
222 ($NODE{$nodeid} || add_node $nodeid)
223 ->send (["", @_]);
224 }
225
226 =item snd_on $node, @msg
227
228 Executes C<snd> with the given C<@msg> (which must include the destination
229 port) on the given node.
230
231 =cut
232
233 sub snd_on($@) {
234 my $node = shift;
235 snd $node, snd => @_;
236 }
237
238 =item eval_on $node, $string
239
240 Evaluates the given string as Perl expression on the given node.
241
242 =cut
243
244 sub eval_on($@) {
245 my $node = shift;
246 snd $node, eval => @_;
247 }
248
249 sub kil(@) {
250 my ($nodeid, $portid) = split /#/, shift, 2;
251
252 length $portid
253 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
254
255 ($NODE{$nodeid} || add_node $nodeid)
256 ->kill ("$portid", @_);
257 }
258
259 sub _nodename {
260 require POSIX;
261 (POSIX::uname ())[1]
262 }
263
264 sub _resolve($) {
265 my ($nodeid) = @_;
266
267 my $cv = AE::cv;
268 my @res;
269
270 $cv->begin (sub {
271 my %seen;
272 my @refs;
273 for (sort { $a->[0] <=> $b->[0] } @res) {
274 push @refs, $_->[1] unless $seen{$_->[1]}++
275 }
276 shift->send (@refs);
277 });
278
279 $nodeid = $DEFAULT_PORT unless length $nodeid;
280
281 my $idx;
282 for my $t (split /,/, $nodeid) {
283 my $pri = ++$idx;
284
285 $t = length $t ? _nodename . ":$t" : _nodename
286 if $t =~ /^\d*$/;
287
288 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
289 or Carp::croak "$t: unparsable transport descriptor";
290
291 $cv->begin;
292 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
293 for (@_) {
294 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
295 push @res, [
296 $pri += 1e-5,
297 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
298 ];
299 }
300 $cv->end;
301 };
302 }
303
304 $cv->end;
305
306 $cv
307 }
308
309 sub initialise_node(;$%) {
310 my ($profile) = @_;
311
312 $profile = _nodename
313 unless defined $profile;
314
315 $CONFIG = AnyEvent::MP::Config::find_profile $profile;
316 $NODE = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid}
317 : $profile eq "anon/" ? $NODE
318 : $profile;
319
320 $NODE{$NODE} = $NODE{""};
321 $NODE{$NODE}{id} = $NODE;
322
323 my $seeds = $CONFIG->{seeds};
324 my $binds = $CONFIG->{binds};
325
326 $binds ||= [$NODE];
327
328 $WARN->(8, "node $NODE starting up.");
329
330 $LISTENER = [];
331 %LISTENER = ();
332
333 for (map _resolve $_, @$binds) {
334 for my $bind ($_->recv) {
335 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
336 or Carp::croak "$bind: unparsable local bind address";
337
338 $LISTENER{$bind} = AnyEvent::MP::Transport::mp_server $host, $port;
339 push @$LISTENER, $bind;
340 }
341 }
342
343 # the global service is mandatory currently
344 require AnyEvent::MP::Global;
345
346 # connect to all seednodes
347 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
348
349 for (@{ $CONFIG->{services} }) {
350 if (s/::$//) {
351 eval "require $_";
352 die $@ if $@;
353 } else {
354 (load_func $_)->();
355 }
356 }
357 }
358
359 #############################################################################
360 # node monitoring and info
361
362 sub _uniq_nodes {
363 my %node;
364
365 @node{values %NODE} = values %NODE;
366
367 values %node;
368 }
369
370 sub _public_nodes {
371 &_uniq_nodes
372 }
373
374 =item node_is_known $nodeid
375
376 Returns true iff the given node is currently known to the system.
377
378 =cut
379
380 sub node_is_known($) {
381 exists $NODE{$_[0]}
382 }
383
384 =item node_is_up $nodeid
385
386 Returns true if the given node is "up", that is, the kernel thinks it has
387 a working connection to it.
388
389 If the node is known but not currently connected, returns C<0>. If the
390 node is not known, returns C<undef>.
391
392 =cut
393
394 sub node_is_up($) {
395 ($NODE{$_[0]} or return)->{transport}
396 ? 1 : 0
397 }
398
399 =item known_nodes
400
401 Returns the node IDs of all public nodes connected to this node, including
402 itself.
403
404 =cut
405
406 sub known_nodes {
407 map $_->{id}, _public_nodes
408 }
409
410 =item up_nodes
411
412 Return the node IDs of all public nodes that are currently connected
413 (excluding the node itself).
414
415 =cut
416
417 sub up_nodes {
418 map $_->{id}, grep $_->{transport}, _public_nodes
419 }
420
421 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
422
423 Registers a callback that is called each time a node goes up (connection
424 is established) or down (connection is lost).
425
426 Node up messages can only be followed by node down messages for the same
427 node, and vice versa.
428
429 The function returns an optional guard which can be used to de-register
430 the monitoring callback again.
431
432 =cut
433
434 our %MON_NODES;
435
436 sub mon_nodes($) {
437 my ($cb) = @_;
438
439 $MON_NODES{$cb+0} = $cb;
440
441 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
442 }
443
444 sub _inject_nodeevent($$;@) {
445 my ($node, $up, @reason) = @_;
446
447 for my $cb (values %MON_NODES) {
448 eval { $cb->($node->{id}, $up, @reason); 1 }
449 or $WARN->(1, $@);
450 }
451
452 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
453 }
454
455 #############################################################################
456 # self node code
457
458 our %node_req = (
459 # internal services
460
461 # monitoring
462 mon0 => sub { # disable monitoring
463 my $portid = shift;
464 my $node = $SRCNODE;
465 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
466 },
467 mon1 => sub { # enable monitoring
468 my $portid = shift;
469 my $node = $SRCNODE;
470 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
471 $node->send (["", kil => $portid, @_]);
472 });
473 },
474 kil => sub {
475 my $cbs = delete $SRCNODE->{lmon}{+shift}
476 or return;
477
478 $_->(@_) for @$cbs;
479 },
480
481 # "public" services - not actually public
482
483 # relay message to another node / generic echo
484 snd => \&snd,
485 snd_multi => sub {
486 snd @$_ for @_
487 },
488
489 # informational
490 info => sub {
491 snd @_, $NODE;
492 },
493 known_nodes => sub {
494 snd @_, known_nodes;
495 },
496 up_nodes => sub {
497 snd @_, up_nodes;
498 },
499
500 # random garbage
501 eval => sub {
502 my @res = eval shift;
503 snd @_, "$@", @res if @_;
504 },
505 time => sub {
506 snd @_, AE::time;
507 },
508 devnull => sub {
509 #
510 },
511 "" => sub {
512 # empty messages are sent by monitoring
513 },
514 );
515
516 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
517 $PORT{""} = sub {
518 my $tag = shift;
519 eval { &{ $node_req{$tag} ||= load_func $tag } };
520 $WARN->(2, "error processing node message: $@") if $@;
521 };
522
523 =back
524
525 =head1 SEE ALSO
526
527 L<AnyEvent::MP>.
528
529 =head1 AUTHOR
530
531 Marc Lehmann <schmorp@schmorp.de>
532 http://home.schmorp.de/
533
534 =cut
535
536 1
537