ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.22
Committed: Thu Aug 27 22:21:09 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.21: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. are needed.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '0.8';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil
44 port_is_local
45 resolve_node initialise_node
46 known_nodes up_nodes mon_nodes node_is_known node_is_up
47 );
48
49 our $DEFAULT_PORT = "4040";
50
51 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
52 our $NETWORK_LATENCY = 3; # activity timeout
53 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
54
55 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
56
57 This value is called with an error or warning message, when e.g. a connection
58 could not be created, authorisation failed and so on.
59
60 C<$level> sould be C<0> for messages ot be logged always, C<1> for
61 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62 node connectivity and services, C<8> for debugging messages and C<9> for
63 tracing messages.
64
65 The default simply logs the message to STDERR.
66
67 =cut
68
69 our $WARN = sub {
70 my ($level, $msg) = @_;
71
72 $msg =~ s/\n$//;
73
74 printf STDERR "%s <%d> %s\n",
75 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76 $level,
77 $msg;
78 };
79
80 sub load_func($) {
81 my $func = $_[0];
82
83 unless (defined &$func) {
84 my $pkg = $func;
85 do {
86 $pkg =~ s/::[^:]+$//
87 or return sub { die "unable to resolve '$func'" };
88 eval "require $pkg";
89 } until defined &$func;
90 }
91
92 \&$func
93 }
94
95 sub nonce($) {
96 my $nonce;
97
98 if (open my $fh, "</dev/urandom") {
99 sysread $fh, $nonce, $_[0];
100 } else {
101 # shit...
102 our $nonce_init;
103 unless ($nonce_init++) {
104 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
105 }
106
107 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
108 }
109
110 $nonce
111 }
112
113 sub alnumbits($) {
114 my $data = $_[0];
115
116 if (eval "use Math::GMP 2.05; 1") {
117 $data = Math::GMP::get_str_gmp (
118 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
119 62
120 );
121 } else {
122 $data = MIME::Base64::encode_base64 $data, "";
123 $data =~ s/=//;
124 $data =~ s/\//s/g;
125 $data =~ s/\+/p/g;
126 }
127
128 $data
129 }
130
131 sub gen_uniq {
132 alnumbits pack "wNa*", $$, time, nonce 2
133 }
134
135 =item $AnyEvent::MP::Kernel::PUBLIC
136
137 A boolean indicating whether this is a public node, which can create and
138 accept direct connections from other nodes.
139
140 =cut
141
142 our $CONFIG; # this node's configuration
143
144 our $RUNIQ = alnumbits nonce 16;; # remote uniq value
145 our $UNIQ = gen_uniq; # per-process/node unique cookie
146 our $NODE = "anon/$RUNIQ";
147 our $ID = "a";
148
149 our %NODE; # node id to transport mapping, or "undef", for local node
150 our (%PORT, %PORT_DATA); # local ports
151
152 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
153 our %LMON; # monitored _local_ ports
154
155 our %LISTENER;
156 our $LISTENER; # our listeners, as arrayref
157
158 our $SRCNODE; # holds the sending node during _inject
159
160 sub NODE() {
161 $NODE
162 }
163
164 sub node_of($) {
165 my ($node, undef) = split /#/, $_[0], 2;
166
167 $node
168 }
169
170 BEGIN {
171 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
172 ? sub () { 1 }
173 : sub () { 0 };
174 }
175
176 sub _inject {
177 warn "RCV $SRCNODE->{id} -> @_\n" if TRACE && @_;#d#
178 &{ $PORT{+shift} or return };
179 }
180
181 # this function adds a node-ref, so you can send stuff to it
182 # it is basically the central routing component.
183 sub add_node {
184 my ($node) = @_;
185
186 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
187 }
188
189 sub snd(@) {
190 my ($nodeid, $portid) = split /#/, shift, 2;
191
192 warn "SND $nodeid <- $portid @_\n" if TRACE;#d#
193
194 ($NODE{$nodeid} || add_node $nodeid)
195 ->{send} (["$portid", @_]);
196 }
197
198 =item $is_local = port_is_local $port
199
200 Returns true iff the port is a local port.
201
202 =cut
203
204 sub port_is_local($) {
205 my ($nodeid, undef) = split /#/, $_[0], 2;
206
207 $NODE{$nodeid} == $NODE{""}
208 }
209
210 =item snd_to_func $node, $func, @args
211
212 Expects a node ID and a name of a function. Asynchronously tries to call
213 this function with the given arguments on that node.
214
215 This function can be used to implement C<spawn>-like interfaces.
216
217 =cut
218
219 sub snd_to_func($$;@) {
220 my $nodeid = shift;
221
222 ($NODE{$nodeid} || add_node $nodeid)
223 ->send (["", @_]);
224 }
225
226 =item snd_on $node, @msg
227
228 Executes C<snd> with the given C<@msg> (which must include the destination
229 port) on the given node.
230
231 =cut
232
233 sub snd_on($@) {
234 my $node = shift;
235 snd $node, snd => @_;
236 }
237
238 =item eval_on $node, $string
239
240 Evaluates the given string as Perl expression on the given node.
241
242 =cut
243
244 sub eval_on($@) {
245 my $node = shift;
246 snd $node, eval => @_;
247 }
248
249 sub kil(@) {
250 my ($nodeid, $portid) = split /#/, shift, 2;
251
252 length $portid
253 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
254
255 ($NODE{$nodeid} || add_node $nodeid)
256 ->kill ("$portid", @_);
257 }
258
259 sub _nodename {
260 require POSIX;
261 (POSIX::uname ())[1]
262 }
263
264 sub _resolve($) {
265 my ($nodeid) = @_;
266
267 my $cv = AE::cv;
268 my @res;
269
270 $cv->begin (sub {
271 my %seen;
272 my @refs;
273 for (sort { $a->[0] <=> $b->[0] } @res) {
274 push @refs, $_->[1] unless $seen{$_->[1]}++
275 }
276 shift->send (@refs);
277 });
278
279 $nodeid = $DEFAULT_PORT unless length $nodeid;
280
281 my $idx;
282 for my $t (split /,/, $nodeid) {
283 my $pri = ++$idx;
284
285 $t = length $t ? _nodename . ":$t" : _nodename
286 if $t =~ /^\d*$/;
287
288 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
289 or Carp::croak "$t: unparsable transport descriptor";
290
291 $cv->begin;
292 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
293 for (@_) {
294 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
295 push @res, [
296 $pri += 1e-5,
297 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
298 ];
299 }
300 $cv->end;
301 };
302 }
303
304 $cv->end;
305
306 $cv
307 }
308
309 sub initialise_node(;$%) {
310 my ($profile) = @_;
311
312 $profile = _nodename
313 unless defined $profile;
314
315 $CONFIG = AnyEvent::MP::Config::find_profile $profile;
316 $NODE = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid}
317 : $profile eq "anon/" ? $NODE
318 : $profile;
319
320 $NODE{$NODE} = $NODE{""};
321 $NODE{$NODE}{id} = $NODE;
322
323 my $seeds = $CONFIG->{seeds};
324 my $binds = $CONFIG->{binds};
325
326 $binds ||= [$NODE];
327
328 $WARN->(8, "node $NODE starting up.");
329
330 for (map _resolve $_, @$binds) {
331 for my $bind ($_->recv) {
332 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
333 or Carp::croak "$bind: unparsable local bind address";
334
335 $LISTENER{$bind} = AnyEvent::MP::Transport::mp_server $host, $port;
336 push @$LISTENER, $bind;
337 }
338 }
339
340 # the global service is mandatory currently
341 require AnyEvent::MP::Global;
342
343 # connect to all seednodes
344 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
345
346 for (@{ $CONFIG->{services} }) {
347 if (s/::$//) {
348 eval "require $_";
349 die $@ if $@;
350 } else {
351 (load_func $_)->();
352 }
353 }
354 }
355
356 #############################################################################
357 # node monitoring and info
358
359 sub _uniq_nodes {
360 my %node;
361
362 @node{values %NODE} = values %NODE;
363
364 values %node;
365 }
366
367 sub _public_nodes {
368 &_uniq_nodes
369 }
370
371 =item node_is_known $nodeid
372
373 Returns true iff the given node is currently known to the system.
374
375 =cut
376
377 sub node_is_known($) {
378 exists $NODE{$_[0]}
379 }
380
381 =item node_is_up $nodeid
382
383 Returns true if the given node is "up", that is, the kernel thinks it has
384 a working connection to it.
385
386 If the node is known but not currently connected, returns C<0>. If the
387 node is not known, returns C<undef>.
388
389 =cut
390
391 sub node_is_up($) {
392 ($NODE{$_[0]} or return)->{transport}
393 ? 1 : 0
394 }
395
396 =item known_nodes
397
398 Returns the node IDs of all public nodes connected to this node, including
399 itself.
400
401 =cut
402
403 sub known_nodes {
404 map $_->{id}, _public_nodes
405 }
406
407 =item up_nodes
408
409 Return the node IDs of all public nodes that are currently connected
410 (excluding the node itself).
411
412 =cut
413
414 sub up_nodes {
415 map $_->{id}, grep $_->{transport}, _public_nodes
416 }
417
418 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
419
420 Registers a callback that is called each time a node goes up (connection
421 is established) or down (connection is lost).
422
423 Node up messages can only be followed by node down messages for the same
424 node, and vice versa.
425
426 The function returns an optional guard which can be used to de-register
427 the monitoring callback again.
428
429 =cut
430
431 our %MON_NODES;
432
433 sub mon_nodes($) {
434 my ($cb) = @_;
435
436 $MON_NODES{$cb+0} = $cb;
437
438 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
439 }
440
441 sub _inject_nodeevent($$;@) {
442 my ($node, $up, @reason) = @_;
443
444 for my $cb (values %MON_NODES) {
445 eval { $cb->($node->{id}, $up, @reason); 1 }
446 or $WARN->(1, $@);
447 }
448
449 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
450 }
451
452 #############################################################################
453 # self node code
454
455 our %node_req = (
456 # internal services
457
458 # monitoring
459 mon0 => sub { # disable monitoring
460 my $portid = shift;
461 my $node = $SRCNODE;
462 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
463 },
464 mon1 => sub { # enable monitoring
465 my $portid = shift;
466 my $node = $SRCNODE;
467 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
468 $node->send (["", kil => $portid, @_]);
469 });
470 },
471 kil => sub {
472 my $cbs = delete $SRCNODE->{lmon}{+shift}
473 or return;
474
475 $_->(@_) for @$cbs;
476 },
477
478 # "public" services - not actually public
479
480 # relay message to another node / generic echo
481 snd => \&snd,
482 snd_multi => sub {
483 snd @$_ for @_
484 },
485
486 # informational
487 info => sub {
488 snd @_, $NODE;
489 },
490 known_nodes => sub {
491 snd @_, known_nodes;
492 },
493 up_nodes => sub {
494 snd @_, up_nodes;
495 },
496
497 # random garbage
498 eval => sub {
499 my @res = eval shift;
500 snd @_, "$@", @res if @_;
501 },
502 time => sub {
503 snd @_, AE::time;
504 },
505 devnull => sub {
506 #
507 },
508 "" => sub {
509 # empty messages are sent by monitoring
510 },
511 );
512
513 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
514 $PORT{""} = sub {
515 my $tag = shift;
516 eval { &{ $node_req{$tag} ||= load_func $tag } };
517 $WARN->(2, "error processing node message: $@") if $@;
518 };
519
520 =back
521
522 =head1 SEE ALSO
523
524 L<AnyEvent::MP>.
525
526 =head1 AUTHOR
527
528 Marc Lehmann <schmorp@schmorp.de>
529 http://home.schmorp.de/
530
531 =cut
532
533 1
534