ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.12
Committed: Fri Aug 14 23:19:20 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-0_7
Changes since 1.11: +1 -1 lines
Log Message:
0.7

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. are needed.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use Carp ();
28 use MIME::Base64 ();
29
30 use AE ();
31
32 use AnyEvent::MP::Node;
33 use AnyEvent::MP::Transport;
34
35 use base "Exporter";
36
37 our $VERSION = '0.7';
38 our @EXPORT = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID add_node load_func snd_to_func
40
41 NODE $NODE node_of snd kil
42 resolve_node initialise_node
43 );
44
45 our $DEFAULT_PORT = "4040";
46
47 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
48 our $NETWORK_LATENCY = 3; # activity timeout
49 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
50
51 =item $AnyEvent::MP::Kernel::WARN
52
53 This value is called with an error or warning message, when e.g. a connection
54 could not be created, authorisation failed and so on.
55
56 The default simply logs the message to STDERR.
57
58 =cut
59
60 our $WARN = sub {
61 my $msg = $_[0];
62 $msg =~ s/\n$//;
63 warn "$msg\n";
64 };
65
66 sub load_func($) {
67 my $func = $_[0];
68
69 unless (defined &$func) {
70 my $pkg = $func;
71 do {
72 $pkg =~ s/::[^:]+$//
73 or return sub { die "unable to resolve '$func'" };
74 eval "require $pkg";
75 } until defined &$func;
76 }
77
78 \&$func
79 }
80
81 sub nonce($) {
82 my $nonce;
83
84 if (open my $fh, "</dev/urandom") {
85 sysread $fh, $nonce, $_[0];
86 } else {
87 # shit...
88 our $nonce_init;
89 unless ($nonce_init++) {
90 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
91 }
92
93 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
94 }
95
96 $nonce
97 }
98
99 sub asciibits($) {
100 my $data = $_[0];
101
102 if (eval "use Math::GMP 2.05; 1") {
103 $data = Math::GMP::get_str_gmp (
104 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
105 62
106 );
107 } else {
108 $data = MIME::Base64::encode_base64 $data, "";
109 $data =~ s/=//;
110 $data =~ s/\//s/g;
111 $data =~ s/\+/p/g;
112 }
113
114 $data
115 }
116
117 sub gen_uniq {
118 asciibits pack "wNa*", $$, time, nonce 2
119 }
120
121 =item $AnyEvent::MP::Kernel::PUBLIC
122
123 A boolean indicating whether this is a full/public node, which can create
124 and accept direct connections form othe rnodes.
125
126 =item $AnyEvent::MP::Kernel::SLAVE
127
128 A boolean indicating whether this node is a slave node, i.e. does most of it's
129 message sending/receiving through some master node.
130
131 =item $AnyEvent::MP::Kernel::MASTER
132
133 Defined only in slave mode, in which cas eit contains the noderef of the
134 master node.
135
136 =cut
137
138 our $PUBLIC = 0;
139 our $SLAVE = 0;
140 our $MASTER; # master noderef when $SLAVE
141
142 our $NODE = asciibits nonce 16;
143 our $RUNIQ = $NODE; # remote uniq value
144 our $UNIQ = gen_uniq; # per-process/node unique cookie
145 our $ID = "a";
146
147 our %NODE; # node id to transport mapping, or "undef", for local node
148 our (%PORT, %PORT_DATA); # local ports
149
150 our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
151 our %LMON; # monitored _local_ ports
152
153 our %LISTENER;
154
155 our $SRCNODE; # holds the sending node during _inject
156
157 sub NODE() {
158 $NODE
159 }
160
161 sub node_of($) {
162 my ($noderef, undef) = split /#/, $_[0], 2;
163
164 $noderef
165 }
166
167 sub TRACE() { 0 }
168
169 sub _inject {
170 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
171 &{ $PORT{+shift} or return };
172 }
173
174 sub add_node {
175 my ($noderef) = @_;
176
177 return $NODE{$noderef}
178 if exists $NODE{$noderef};
179
180 for (split /,/, $noderef) {
181 return $NODE{$noderef} = $NODE{$_}
182 if exists $NODE{$_};
183 }
184
185 # new node, check validity
186 my $node;
187
188 if ($noderef =~ /^slave\/.+$/) {
189 $node = new AnyEvent::MP::Node::Indirect $noderef;
190
191 } else {
192 for (split /,/, $noderef) {
193 my ($host, $port) = AnyEvent::Socket::parse_hostport $_
194 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
195
196 $port > 0
197 or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
198
199 AnyEvent::Socket::parse_address $host
200 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
201 }
202
203 # TODO: for indirect sends, use a different class
204 $node = new AnyEvent::MP::Node::Direct $noderef;
205 }
206
207 $NODE{$_} = $node
208 for $noderef, split /,/, $noderef;
209
210 $node
211 }
212
213 sub snd(@) {
214 my ($noderef, $portid) = split /#/, shift, 2;
215
216 warn "SND $noderef <- $portid @_\n" if TRACE;#d#
217
218 ($NODE{$noderef} || add_node $noderef)
219 ->{send} (["$portid", @_]);
220 }
221
222 =item snd_to_func $noderef, $func, @args
223
224 Expects a noderef and a name of a function. Asynchronously tries to call
225 this function with the given arguments on that node.
226
227 This fucntion can be used to implement C<spawn>-like interfaces.
228
229 =cut
230
231 sub snd_to_func {
232 my $noderef = shift;
233
234 ($NODE{$noderef} || add_node $noderef)
235 ->send (["", @_]);
236 }
237
238 sub kil(@) {
239 my ($noderef, $portid) = split /#/, shift, 2;
240
241 length $portid
242 or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
243
244 ($NODE{$noderef} || add_node $noderef)
245 ->kill ("$portid", @_);
246 }
247
248 sub _nodename {
249 require POSIX;
250 (POSIX::uname ())[1]
251 }
252
253 sub resolve_node($) {
254 my ($noderef) = @_;
255
256 my $cv = AE::cv;
257 my @res;
258
259 $cv->begin (sub {
260 my %seen;
261 my @refs;
262 for (sort { $a->[0] <=> $b->[0] } @res) {
263 push @refs, $_->[1] unless $seen{$_->[1]}++
264 }
265 shift->send (join ",", @refs);
266 });
267
268 $noderef = $DEFAULT_PORT unless length $noderef;
269
270 my $idx;
271 for my $t (split /,/, $noderef) {
272 my $pri = ++$idx;
273
274 $t = length $t ? _nodename . ":$t" : _nodename
275 if $t =~ /^\d*$/;
276
277 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
278 or Carp::croak "$t: unparsable transport descriptor";
279
280 $cv->begin;
281 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
282 for (@_) {
283 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
284 push @res, [
285 $pri += 1e-5,
286 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
287 ];
288 }
289 $cv->end;
290 };
291 }
292
293 $cv->end;
294
295 $cv
296 }
297
298 sub initialise_node(@) {
299 my ($noderef, @others) = @_;
300
301 my $profile = AnyEvent::MP::Config::find_profile
302 +(defined $noderef ? $noderef : _nodename);
303
304 $noderef = $profile->{noderef}
305 if exists $profile->{noderef};
306
307 push @others, @{ $profile->{seeds} };
308
309 if ($noderef =~ /^slave\/(.*)$/) {
310 $SLAVE = AE::cv;
311 my $name = $1;
312 $name = $NODE unless length $name;
313 $noderef = AE::cv;
314 $noderef->send ("slave/$name");
315
316 @others
317 or Carp::croak "seed nodes must be specified for slave nodes";
318
319 } else {
320 $PUBLIC = 1;
321 $noderef = resolve_node $noderef;
322 }
323
324 @others = map $_->recv, map +(resolve_node $_), @others;
325
326 $NODE = $noderef->recv;
327
328 for my $t (split /,/, $NODE) {
329 $NODE{$t} = $NODE{""};
330
331 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
332
333 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
334 sub {
335 my ($tp) = @_;
336
337 # TODO: urgs
338 my $node = add_node $tp->{remote_node};
339 $node->{trial}{accept} = $tp;
340 },
341 ;
342 }
343
344 (add_node $_)->connect for @others;
345
346 if ($SLAVE) {
347 my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
348 $MASTER = $SLAVE->recv;
349 defined $MASTER
350 or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
351
352 (my $via = $MASTER) =~ s/,/!/g;
353
354 $NODE .= "\@$via";
355 $NODE{$NODE} = $NODE{""};
356
357 $_->send (["", iam => $NODE])
358 for values %NODE;
359
360 $SLAVE = 1;
361 }
362
363 (load_func $_)->()
364 for @{ $profile->{services} };
365 }
366
367 #############################################################################
368 # node monitoring and info
369
370 sub _uniq_nodes {
371 my %node;
372
373 @node{values %NODE} = values %NODE;
374
375 values %node;
376 }
377
378 =item known_nodes
379
380 Returns the noderefs of all nodes connected to this node.
381
382 =cut
383
384 sub known_nodes {
385 map $_->{noderef}, _uniq_nodes
386 }
387
388 =item up_nodes
389
390 Return the noderefs of all nodes that are currently connected (excluding
391 the node itself).
392
393 =cut
394
395 sub up_nodes {
396 map $_->{noderef}, grep $_->{transport}, _uniq_nodes
397 }
398
399 =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
400
401 Registers a callback that is called each time a node goes up (connection
402 is established) or down (connection is lost).
403
404 Node up messages can only be followed by node down messages for the same
405 node, and vice versa.
406
407 The fucntino returns an optional guard which can be used to de-register
408 the monitoring callback again.
409
410 =cut
411
412 our %MON_NODES;
413
414 sub mon_nodes($) {
415 my ($cb) = @_;
416
417 $MON_NODES{$cb+0} = $cb;
418
419 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
420 }
421
422 sub _inject_nodeevent($$;@) {
423 my ($node, @args) = @_;
424
425 unshift @args, $node->{noderef};
426
427 for my $cb (values %MON_NODES) {
428 eval { $cb->(@args); 1 }
429 or $WARN->($@);
430 }
431 }
432
433 #############################################################################
434 # self node code
435
436 our %node_req = (
437 # internal services
438
439 # monitoring
440 mon0 => sub { # disable monitoring
441 my $portid = shift;
442 my $node = $SRCNODE;
443 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
444 },
445 mon1 => sub { # enable monitoring
446 my $portid = shift;
447 my $node = $SRCNODE;
448 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
449 $node->send (["", kil => $portid, @_]);
450 });
451 },
452 kil => sub {
453 my $cbs = delete $SRCNODE->{lmon}{+shift}
454 or return;
455
456 $_->(@_) for @$cbs;
457 },
458 # node changed its name (for slave nodes)
459 iam => sub {
460 # get rid of bogus slave/xxx name, hopefully
461 delete $NODE{$SRCNODE->{noderef}};
462
463 # change noderef
464 $SRCNODE->{noderef} = $_[0];
465
466 # anchor
467 $NODE{$_[0]} = $SRCNODE;
468 },
469
470 # public services
471
472 # relay message to another node / generic echo
473 relay => sub {
474 &snd;
475 },
476 relay_multiple => sub {
477 snd @$_ for @_
478 },
479
480 # informational
481 info => sub {
482 snd @_, $NODE;
483 },
484 known_nodes => sub {
485 snd @_, known_nodes;
486 },
487 up_nodes => sub {
488 snd @_, up_nodes;
489 },
490
491 # random garbage
492 eval => sub {
493 my @res = eval shift;
494 snd @_, "$@", @res if @_;
495 },
496 time => sub {
497 snd @_, AE::time;
498 },
499 devnull => sub {
500 #
501 },
502 );
503
504 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
505 $PORT{""} = sub {
506 my $tag = shift;
507 eval { &{ $node_req{$tag} ||= load_func $tag } };
508 $WARN->("error processing node message: $@") if $@;
509 };
510
511 =back
512
513 =head1 SEE ALSO
514
515 L<AnyEvent::MP>.
516
517 =head1 AUTHOR
518
519 Marc Lehmann <schmorp@schmorp.de>
520 http://home.schmorp.de/
521
522 =cut
523
524 1
525