ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.6
Committed: Thu Aug 13 03:34:24 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.5: +25 -17 lines
Log Message:
services...

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. are needed.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use Carp ();
28 use MIME::Base64 ();
29
30 use AE ();
31
32 use AnyEvent::MP::Node;
33 use AnyEvent::MP::Transport;
34
35 use base "Exporter";
36
37 our $VERSION = '0.6';
38 our @EXPORT = qw(
39 %NODE %PORT %PORT_DATA %REG $UNIQ $RUNIQ $ID add_node load_func
40
41 NODE $NODE node_of snd kil _any_
42 resolve_node initialise_node
43 );
44
45 our $DEFAULT_PORT = "4040";
46
47 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
48 our $NETWORK_LATENCY = 3; # activity timeout
49 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
50
51 =item $AnyEvent::MP::Kernel::WARN
52
53 This value is called with an error or warning message, when e.g. a connection
54 could not be created, authorisation failed and so on.
55
56 The default simply logs the message to STDERR.
57
58 =cut
59
60 our $WARN = sub {
61 my $msg = $_[0];
62 $msg =~ s/\n$//;
63 warn "$msg\n";
64 };
65
66 sub load_func($) {
67 my $func = $_[0];
68
69 unless (defined &$func) {
70 my $pkg = $func;
71 do {
72 $pkg =~ s/::[^:]+$//
73 or return sub { die "unable to resolve '$func'" };
74 eval "require $pkg";
75 } until defined &$func;
76 }
77
78 \&$func
79 }
80
81 sub nonce($) {
82 my $nonce;
83
84 if (open my $fh, "</dev/urandom") {
85 sysread $fh, $nonce, $_[0];
86 } else {
87 # shit...
88 our $nonce_init;
89 unless ($nonce_init++) {
90 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
91 }
92
93 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
94 }
95
96 $nonce
97 }
98
99 sub asciibits($) {
100 my $data = $_[0];
101
102 if (eval "use Math::GMP 2.05; 1") {
103 $data = Math::GMP::get_str_gmp (
104 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
105 62
106 );
107 } else {
108 $data = MIME::Base64::encode_base64 $data, "";
109 $data =~ s/=//;
110 $data =~ s/\//s/g;
111 $data =~ s/\+/p/g;
112 }
113
114 $data
115 }
116
117 sub gen_uniq {
118 asciibits pack "wNa*", $$, time, nonce 2
119 }
120
121 =item $AnyEvent::MP::Kernel::PUBLIC
122
123 A boolean indicating whether this is a full/public node, which can create
124 and accept direct connections form othe rnodes.
125
126 =item $AnyEvent::MP::Kernel::SLAVE
127
128 A boolean indicating whether this node is a slave node, i.e. does most of it's
129 message sending/receiving through some master node.
130
131 =item $AnyEvent::MP::Kernel::MASTER
132
133 Defined only in slave mode, in which cas eit contains the noderef of the
134 master node.
135
136 =cut
137
138 our $PUBLIC = 0;
139 our $SLAVE = 0;
140 our $MASTER; # master noderef when $SLAVE
141
142 our $NODE = asciibits nonce 16;
143 our $RUNIQ = $NODE; # remote uniq value
144 our $UNIQ = gen_uniq; # per-process/node unique cookie
145 our $ID = "a";
146
147 our %NODE; # node id to transport mapping, or "undef", for local node
148 our (%PORT, %PORT_DATA); # local ports
149
150 our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
151 our %LMON; # monitored _local_ ports
152
153 our %LISTENER;
154
155 our $SRCNODE; # holds the sending node during _inject
156
157 sub NODE() {
158 $NODE
159 }
160
161 sub node_of($) {
162 my ($noderef, undef) = split /#/, $_[0], 2;
163
164 $noderef
165 }
166
167 sub _ANY_() { 1 }
168 sub _any_() { \&_ANY_ }
169
170 sub TRACE() { 0 }
171
172 sub _inject {
173 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
174 &{ $PORT{+shift} or return };
175 }
176
177 sub add_node {
178 my ($noderef) = @_;
179
180 return $NODE{$noderef}
181 if exists $NODE{$noderef};
182
183 for (split /,/, $noderef) {
184 return $NODE{$noderef} = $NODE{$_}
185 if exists $NODE{$_};
186 }
187
188 # new node, check validity
189 my $node;
190
191 if ($noderef =~ /^slave\/.+$/) {
192 $node = new AnyEvent::MP::Node::Indirect $noderef;
193
194 } else {
195 for (split /,/, $noderef) {
196 my ($host, $port) = AnyEvent::Socket::parse_hostport $_
197 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
198
199 $port > 0
200 or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
201
202 AnyEvent::Socket::parse_address $host
203 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
204 }
205
206 # TODO: for indirect sends, use a different class
207 $node = new AnyEvent::MP::Node::Direct $noderef;
208 }
209
210 $NODE{$_} = $node
211 for $noderef, split /,/, $noderef;
212
213 $node
214 }
215
216 sub snd(@) {
217 my ($noderef, $portid) = split /#/, shift, 2;
218
219 warn "SND $noderef <- $portid @_\n" if TRACE;#d#
220
221 ($NODE{$noderef} || add_node $noderef)
222 ->{send} (["$portid", @_]);
223 }
224
225 sub kil(@) {
226 my ($noderef, $portid) = split /#/, shift, 2;
227
228 length $portid
229 or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
230
231 ($NODE{$noderef} || add_node $noderef)
232 ->kill ("$portid", @_);
233 }
234
235 sub resolve_node($) {
236 my ($noderef) = @_;
237
238 my $cv = AE::cv;
239 my @res;
240
241 $cv->begin (sub {
242 my %seen;
243 my @refs;
244 for (sort { $a->[0] <=> $b->[0] } @res) {
245 push @refs, $_->[1] unless $seen{$_->[1]}++
246 }
247 shift->send (join ",", @refs);
248 });
249
250 $noderef = $DEFAULT_PORT unless length $noderef;
251
252 my $idx;
253 for my $t (split /,/, $noderef) {
254 my $pri = ++$idx;
255
256 if ($t =~ /^\d*$/) {
257 require POSIX;
258 my $nodename = (POSIX::uname ())[1];
259
260 $cv->begin;
261 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
262 for (@_) {
263 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
264 push @res, [
265 $pri += 1e-5,
266 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
267 ];
268 }
269 $cv->end;
270 };
271
272 # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
273 #
274 # for (@ipv4) {
275 # push @res, [
276 # $pri,
277 # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
278 # ];
279 # }
280 } else {
281 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
282 or Carp::croak "$t: unparsable transport descriptor";
283
284 $cv->begin;
285 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
286 for (@_) {
287 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
288 push @res, [
289 $pri += 1e-5,
290 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
291 ];
292 }
293 $cv->end;
294 }
295 }
296 }
297
298 $cv->end;
299
300 $cv
301 }
302
303 sub initialise_node(@) {
304 my ($noderef, @others) = @_;
305
306 my $profile = defined $noderef ? {} : \%AnyEvent::MP::Config::CFG;
307
308 $noderef = $profile->{noderef}
309 unless defined $noderef;
310
311 @others = @{ $profile->{seeds} }
312 unless @others;
313
314 if ($noderef =~ /^slave\/(.*)$/) {
315 $SLAVE = AE::cv;
316 my $name = $1;
317 $name = $NODE unless length $name;
318 $noderef = AE::cv;
319 $noderef->send ("slave/$name");
320
321 @others
322 or Carp::croak "seed nodes must be specified for slave nodes";
323
324 } else {
325 $PUBLIC = 1;
326 $noderef = resolve_node $noderef;
327 }
328
329 @others = map $_->recv, map +(resolve_node $_), @others;
330
331 $NODE = $noderef->recv;
332
333 for my $t (split /,/, $NODE) {
334 $NODE{$t} = $NODE{""};
335
336 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
337
338 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
339 sub {
340 my ($tp) = @_;
341
342 # TODO: urgs
343 my $node = add_node $tp->{remote_node};
344 $node->{trial}{accept} = $tp;
345 },
346 ;
347 }
348
349 (add_node $_)->connect for @others;
350
351 if ($SLAVE) {
352 my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
353 $MASTER = $SLAVE->recv;
354 defined $MASTER
355 or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
356
357 (my $via = $MASTER) =~ s/,/!/g;
358
359 $NODE .= "\@$via";
360 $NODE{$NODE} = $NODE{""};
361
362 $_->send (["", iam => $NODE])
363 for values %NODE;
364
365 $SLAVE = 1;
366 }
367
368 (load_func $_)->()
369 for @{ $profile->{services} };
370 }
371
372 #############################################################################
373 # node monitoring and info
374
375 sub _uniq_nodes {
376 my %node;
377
378 @node{values %NODE} = values %NODE;
379
380 values %node;
381 }
382
383 =item known_nodes
384
385 Returns the noderefs of all nodes connected to this node.
386
387 =cut
388
389 sub known_nodes {
390 map $_->{noderef}, _uniq_nodes
391 }
392
393 =item up_nodes
394
395 Return the noderefs of all nodes that are currently connected (excluding
396 the node itself).
397
398 =cut
399
400 sub up_nodes {
401 map $_->{noderef}, grep $_->{transport}, _uniq_nodes
402 }
403
404 =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
405
406 Registers a callback that is called each time a node goes up (connection
407 is established) or down (connection is lost).
408
409 Node up messages can only be followed by node down messages for the same
410 node, and vice versa.
411
412 The fucntino returns an optional guard which can be used to de-register
413 the monitoring callback again.
414
415 =cut
416
417 our %MON_NODES;
418
419 sub mon_nodes($) {
420 my ($cb) = @_;
421
422 $MON_NODES{$cb+0} = $cb;
423
424 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
425 }
426
427 sub _inject_nodeevent($$;@) {
428 my ($node, @args) = @_;
429
430 unshift @args, $node->{noderef};
431
432 for my $cb (values %MON_NODES) {
433 eval { $cb->(@args); 1 }
434 or $WARN->($@);
435 }
436 }
437
438 #############################################################################
439 # self node code
440
441 our %node_req = (
442 # internal services
443
444 # monitoring
445 mon0 => sub { # disable monitoring
446 my $portid = shift;
447 my $node = $SRCNODE;
448 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
449 },
450 mon1 => sub { # enable monitoring
451 my $portid = shift;
452 my $node = $SRCNODE;
453 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
454 $node->send (["", kil => $portid, @_]);
455 });
456 },
457 kil => sub {
458 my $cbs = delete $SRCNODE->{lmon}{+shift}
459 or return;
460
461 $_->(@_) for @$cbs;
462 },
463 # node changed its name (for slave nodes)
464 iam => sub {
465 $SRCNODE->{noderef} = $_[0];
466 $NODE{$_[0]} = $SRCNODE;
467 },
468
469 # public services
470
471 # relay message to another node / generic echo
472 relay => sub {
473 &snd;
474 },
475 relay_multiple => sub {
476 snd @$_ for @_
477 },
478
479 # informational
480 info => sub {
481 snd @_, $NODE;
482 },
483 known_nodes => sub {
484 snd @_, known_nodes;
485 },
486 up_nodes => sub {
487 snd @_, up_nodes;
488 },
489
490 # random garbage
491 eval => sub {
492 my @res = eval shift;
493 snd @_, "$@", @res if @_;
494 },
495 time => sub {
496 snd @_, AE::time;
497 },
498 devnull => sub {
499 #
500 },
501 );
502
503 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
504 $PORT{""} = sub {
505 my $tag = shift;
506 eval { &{ $node_req{$tag} ||= load_func $tag } };
507 $WARN->("error processing node message: $@") if $@;
508 };
509
510 =back
511
512 =head1 SEE ALSO
513
514 L<AnyEvent::MP>.
515
516 =head1 AUTHOR
517
518 Marc Lehmann <schmorp@schmorp.de>
519 http://home.schmorp.de/
520
521 =cut
522
523 1
524