ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Kernel.pm (file contents):
Revision 1.19 by root, Wed Aug 19 05:57:14 2009 UTC vs.
Revision 1.20 by root, Thu Aug 27 07:12:48 2009 UTC

142A boolean indicating whether this node is a slave node, i.e. does most of it's 142A boolean indicating whether this node is a slave node, i.e. does most of it's
143message sending/receiving through some master node. 143message sending/receiving through some master node.
144 144
145=item $AnyEvent::MP::Kernel::MASTER 145=item $AnyEvent::MP::Kernel::MASTER
146 146
147Defined only in slave mode, in which cas eit contains the noderef of the 147Defined only in slave mode, in which case it contains the noderef of the
148master node. 148master node.
149 149
150=cut 150=cut
151 151
152our $CONFIG; # this node's configuration
152our $PUBLIC = 0; 153our $PUBLIC = 0;
153our $SLAVE = 0; 154our $SLAVE = "";
154our $MASTER; # master noderef when $SLAVE 155our $MASTER; # master noderef when $SLAVE
155 156
156our $NODE = asciibits nonce 16; 157our $NODE = asciibits nonce 16;
158our $NODEID = $NODE; # same as NODE, except slave nodes have no @master part
157our $RUNIQ = $NODE; # remote uniq value 159our $RUNIQ = $NODE; # remote uniq value
158our $UNIQ = gen_uniq; # per-process/node unique cookie 160our $UNIQ = gen_uniq; # per-process/node unique cookie
159our $ID = "a"; 161our $ID = "a";
160 162
161our %NODE; # node id to transport mapping, or "undef", for local node 163our %NODE; # node id to transport mapping, or "undef", for local node
187sub _inject { 189sub _inject {
188 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE && @_;#d# 190 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE && @_;#d#
189 &{ $PORT{+shift} or return }; 191 &{ $PORT{+shift} or return };
190} 192}
191 193
194# this function adds a node-ref, so you can send stuff to it
195# it is basically the central routing component.
192sub add_node { 196sub add_node {
193 my ($noderef) = @_; 197 my ($noderef) = @_;
194 198
195 return $NODE{$noderef} 199 $NODE{$noderef} ||= do {
196 if exists $NODE{$noderef};
197
198 for (split /,/, $noderef) {
199 return $NODE{$noderef} = $NODE{$_}
200 if exists $NODE{$_};
201 }
202
203 # new node, check validity 200 # new node, check validity
204 my $node; 201 my $node;
205 202
206 if ($noderef =~ /^slave\/.+$/) { 203 if ($noderef =~ /^slave\/.+$/) {
204 # slave node without routing part -> direct connection
205 # only really valid from transports
207 $node = new AnyEvent::MP::Node::Indirect $noderef; 206 $node = new AnyEvent::MP::Node::Direct $noderef;
208 207
209 } else { 208 } else {
209 # direct node (or slave node without routing part)
210
210 for (split /,/, $noderef) { 211 for (split /,/, $noderef) {
211 my ($host, $port) = AnyEvent::Socket::parse_hostport $_ 212 my ($host, $port) = AnyEvent::Socket::parse_hostport $_
212 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)"; 213 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
213 214
214 $port > 0 215 $port > 0
215 or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)"; 216 or Carp::croak "$noderef: not a resolved node reference ('$_' contains invalid port)";
216 217
217 AnyEvent::Socket::parse_address $host 218 AnyEvent::Socket::parse_address $host
218 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)"; 219 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
220 }
221
222 $node = new AnyEvent::MP::Node::Direct $noderef;
219 } 223 }
220 224
221 $node = new AnyEvent::MP::Node::Direct $noderef;
222 }
223
224 $NODE{$_} = $node
225 for $noderef, split /,/, $noderef;
226
227 $node 225 $node
226 }
228} 227}
229 228
230sub connect_node { 229sub connect_node {
231 &add_node->connect; 230 &add_node->connect;
232} 231}
255=item snd_to_func $node, $func, @args 254=item snd_to_func $node, $func, @args
256 255
257Expects a noderef and a name of a function. Asynchronously tries to call 256Expects a noderef and a name of a function. Asynchronously tries to call
258this function with the given arguments on that node. 257this function with the given arguments on that node.
259 258
260This fucntion can be used to implement C<spawn>-like interfaces. 259This function can be used to implement C<spawn>-like interfaces.
261 260
262=cut 261=cut
263 262
264sub snd_to_func($$;@) { 263sub snd_to_func($$;@) {
265 my $noderef = shift; 264 my $noderef = shift;
349 $cv->end; 348 $cv->end;
350 349
351 $cv 350 $cv
352} 351}
353 352
354sub _node_rename {
355 $NODE = shift;
356
357 my $self = $NODE{""};
358 $NODE{$NODE} = delete $NODE{$self->{noderef}};
359 $self->{noderef} = $NODE;
360}
361
362sub initialise_node(@) { 353sub initialise_node(@) {
363 my ($noderef, @others) = @_; 354 my ($noderef, @others) = @_;
364 355
365 my $profile = AnyEvent::MP::Config::find_profile 356 $CONFIG = AnyEvent::MP::Config::find_profile
366 +(defined $noderef ? $noderef : _nodename); 357 +(defined $noderef ? $noderef : _nodename);
367 358
368 $noderef = $profile->{noderef} 359 $noderef = $CONFIG->{noderef}
369 if exists $profile->{noderef}; 360 if exists $CONFIG->{noderef};
370 361
371 push @others, @{ $profile->{seeds} }; 362 push @others, @{ $CONFIG->{seeds} };
363
364 @others = map $_->recv, map +(resolve_node $_), @others;
372 365
373 if ($noderef =~ /^slave\/(.*)$/) { 366 if ($noderef =~ /^slave\/(.*)$/) {
374 $SLAVE = AE::cv;
375 my $name = $1; 367 my $name = $1;
376 $name = $NODE unless length $name; 368 $name = $NODE unless length $name;
377 $noderef = AE::cv;
378 $noderef->send ("slave/$name");
379 369
380 @others 370 @others
381 or Carp::croak "seed nodes must be specified for slave nodes"; 371 or Carp::croak "seed nodes must be specified for slave nodes";
382 372
373 $SLAVE = 1;
374 $NODE = "slave/$name";
375
383 } else { 376 } else {
384 $PUBLIC = 1; 377 $PUBLIC = 1;
385 $noderef = resolve_node $noderef; 378 $NODE = (resolve_node $noderef)->recv;
386 } 379 }
387 380
388 @others = map $_->recv, map +(resolve_node $_), @others; 381 $NODE{$NODE} = $NODE{""};
382 $NODE{$NODE}{noderef} = $NODE;
389 383
390 _node_rename $noderef->recv; 384 unless ($SLAVE) {
391
392 for my $t (split /,/, $NODE) { 385 for my $t (split /,/, $NODE) {
393 my ($host, $port) = AnyEvent::Socket::parse_hostport $t; 386 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
394 387
395 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, 388 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
396 sub { 389 sub {
397 my ($tp) = @_; 390 my ($tp) = @_;
398 391
399 # TODO: urgs 392 # TODO: urgs
400 my $node = add_node $tp->{remote_node}; 393 my $node = add_node $tp->{remote_node};
401 $node->{trial}{accept} = $tp; 394 $node->{trial}{accept} = $tp;
402 }, 395 },
396 ;
403 ; 397 }
404 } 398 }
405 399
406 for (@others) { 400 for (@others) {
407 my $node = add_node $_; 401 my $node = add_node $_;
408 $node->{autoconnect} = 1; 402 $node->{autoconnect} = 1;
409 $node->connect; 403 $node->connect;
410 } 404 }
411 405
412 if ($SLAVE) {
413 my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
414 my $master = $SLAVE->recv;
415 $master
416 or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
417
418 $MASTER = $master->{noderef};
419 $master->{autoconnect} = 1;
420
421 (my $via = $MASTER) =~ s/,/!/g;
422
423 $NODE .= "\@$via";
424 _node_rename $NODE;
425
426 $_->send (["", iam => $NODE])
427 for values %NODE;
428
429 $SLAVE = 1;
430 }
431
432 for (@{ $profile->{services} }) { 406 for (@{ $CONFIG->{services} }) {
433 if (s/::$//) { 407 if (s/::$//) {
434 eval "require $_"; 408 eval "require $_";
435 die $@ if $@; 409 die $@ if $@;
436 } else { 410 } else {
437 (load_func $_)->(); 411 (load_func $_)->();
438 } 412 }
439 } 413 }
414
415 # slave nodes need global
416 require AnyEvent::MP::Global
417 if $SLAVE;
440} 418}
441 419
442############################################################################# 420#############################################################################
443# node monitoring and info 421# node monitoring and info
444 422
446 my %node; 424 my %node;
447 425
448 @node{values %NODE} = values %NODE; 426 @node{values %NODE} = values %NODE;
449 427
450 values %node; 428 values %node;
429}
430
431sub _public_nodes {
432 grep $_->{noderef} !~ /^slave\//, _uniq_nodes
451} 433}
452 434
453=item node_is_known $noderef 435=item node_is_known $noderef
454 436
455Returns true iff the given node is currently known to the system. 437Returns true iff the given node is currently known to the system.
475 ? 1 : 0 457 ? 1 : 0
476} 458}
477 459
478=item known_nodes 460=item known_nodes
479 461
480Returns the noderefs of all nodes connected to this node, including 462Returns the noderefs of all public nodes connected to this node, including
481itself. 463itself.
482 464
483=cut 465=cut
484 466
485sub known_nodes { 467sub known_nodes {
486 map $_->{noderef}, _uniq_nodes 468 map $_->{noderef}, _public_nodes
487} 469}
488 470
489=item up_nodes 471=item up_nodes
490 472
491Return the noderefs of all nodes that are currently connected (excluding 473Return the noderefs of all public nodes that are currently connected
492the node itself). 474(excluding the node itself).
493 475
494=cut 476=cut
495 477
496sub up_nodes { 478sub up_nodes {
497 map $_->{noderef}, grep $_->{transport}, _uniq_nodes 479 map $_->{noderef}, grep $_->{transport}, _public_nodes
498} 480}
499 481
500=item $guard = mon_nodes $callback->($noderef, $is_up, @reason) 482=item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
501 483
502Registers a callback that is called each time a node goes up (connection 484Registers a callback that is called each time a node goes up (connection
554 my $cbs = delete $SRCNODE->{lmon}{+shift} 536 my $cbs = delete $SRCNODE->{lmon}{+shift}
555 or return; 537 or return;
556 538
557 $_->(@_) for @$cbs; 539 $_->(@_) for @$cbs;
558 }, 540 },
559 # node changed its name (for slave nodes)
560 iam => sub {
561 # get rid of bogus slave/xxx name, hopefully
562 delete $NODE{$SRCNODE->{noderef}};
563
564 # change noderef
565 $SRCNODE->{noderef} = $_[0];
566
567 # anchor
568 $NODE{$_[0]} = $SRCNODE;
569 },
570 541
571 # "public" services - not actually public 542 # "public" services - not actually public
572 543
573 # relay message to another node / generic echo 544 # relay message to another node / generic echo
574 snd => \&snd, 545 snd => \&snd,

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines