… | |
… | |
142 | A boolean indicating whether this node is a slave node, i.e. does most of it's |
142 | A boolean indicating whether this node is a slave node, i.e. does most of it's |
143 | message sending/receiving through some master node. |
143 | message sending/receiving through some master node. |
144 | |
144 | |
145 | =item $AnyEvent::MP::Kernel::MASTER |
145 | =item $AnyEvent::MP::Kernel::MASTER |
146 | |
146 | |
147 | Defined only in slave mode, in which cas eit contains the noderef of the |
147 | Defined only in slave mode, in which case it contains the noderef of the |
148 | master node. |
148 | master node. |
149 | |
149 | |
150 | =cut |
150 | =cut |
151 | |
151 | |
|
|
152 | our $CONFIG; # this node's configuration |
152 | our $PUBLIC = 0; |
153 | our $PUBLIC = 0; |
153 | our $SLAVE = 0; |
154 | our $SLAVE = ""; |
154 | our $MASTER; # master noderef when $SLAVE |
155 | our $MASTER; # master noderef when $SLAVE |
155 | |
156 | |
156 | our $NODE = asciibits nonce 16; |
157 | our $NODE = asciibits nonce 16; |
|
|
158 | our $NODEID = $NODE; # same as NODE, except slave nodes have no @master part |
157 | our $RUNIQ = $NODE; # remote uniq value |
159 | our $RUNIQ = $NODE; # remote uniq value |
158 | our $UNIQ = gen_uniq; # per-process/node unique cookie |
160 | our $UNIQ = gen_uniq; # per-process/node unique cookie |
159 | our $ID = "a"; |
161 | our $ID = "a"; |
160 | |
162 | |
161 | our %NODE; # node id to transport mapping, or "undef", for local node |
163 | our %NODE; # node id to transport mapping, or "undef", for local node |
… | |
… | |
187 | sub _inject { |
189 | sub _inject { |
188 | warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE && @_;#d# |
190 | warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE && @_;#d# |
189 | &{ $PORT{+shift} or return }; |
191 | &{ $PORT{+shift} or return }; |
190 | } |
192 | } |
191 | |
193 | |
|
|
194 | # this function adds a node-ref, so you can send stuff to it |
|
|
195 | # it is basically the central routing component. |
192 | sub add_node { |
196 | sub add_node { |
193 | my ($noderef) = @_; |
197 | my ($noderef) = @_; |
194 | |
198 | |
195 | return $NODE{$noderef} |
199 | $NODE{$noderef} ||= do { |
196 | if exists $NODE{$noderef}; |
|
|
197 | |
|
|
198 | for (split /,/, $noderef) { |
|
|
199 | return $NODE{$noderef} = $NODE{$_} |
|
|
200 | if exists $NODE{$_}; |
|
|
201 | } |
|
|
202 | |
|
|
203 | # new node, check validity |
200 | # new node, check validity |
204 | my $node; |
201 | my $node; |
205 | |
202 | |
206 | if ($noderef =~ /^slave\/.+$/) { |
203 | if ($noderef =~ /^slave\/.+$/) { |
|
|
204 | # slave node without routing part -> direct connection |
|
|
205 | # only really valid from transports |
207 | $node = new AnyEvent::MP::Node::Indirect $noderef; |
206 | $node = new AnyEvent::MP::Node::Direct $noderef; |
208 | |
207 | |
209 | } else { |
208 | } else { |
|
|
209 | # direct node (or slave node without routing part) |
|
|
210 | |
210 | for (split /,/, $noderef) { |
211 | for (split /,/, $noderef) { |
211 | my ($host, $port) = AnyEvent::Socket::parse_hostport $_ |
212 | my ($host, $port) = AnyEvent::Socket::parse_hostport $_ |
212 | or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)"; |
213 | or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)"; |
213 | |
214 | |
214 | $port > 0 |
215 | $port > 0 |
215 | or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)"; |
216 | or Carp::croak "$noderef: not a resolved node reference ('$_' contains invalid port)"; |
216 | |
217 | |
217 | AnyEvent::Socket::parse_address $host |
218 | AnyEvent::Socket::parse_address $host |
218 | or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)"; |
219 | or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)"; |
|
|
220 | } |
|
|
221 | |
|
|
222 | $node = new AnyEvent::MP::Node::Direct $noderef; |
219 | } |
223 | } |
220 | |
224 | |
221 | $node = new AnyEvent::MP::Node::Direct $noderef; |
|
|
222 | } |
|
|
223 | |
|
|
224 | $NODE{$_} = $node |
|
|
225 | for $noderef, split /,/, $noderef; |
|
|
226 | |
|
|
227 | $node |
225 | $node |
|
|
226 | } |
228 | } |
227 | } |
229 | |
228 | |
230 | sub connect_node { |
229 | sub connect_node { |
231 | &add_node->connect; |
230 | &add_node->connect; |
232 | } |
231 | } |
… | |
… | |
255 | =item snd_to_func $node, $func, @args |
254 | =item snd_to_func $node, $func, @args |
256 | |
255 | |
257 | Expects a noderef and a name of a function. Asynchronously tries to call |
256 | Expects a noderef and a name of a function. Asynchronously tries to call |
258 | this function with the given arguments on that node. |
257 | this function with the given arguments on that node. |
259 | |
258 | |
260 | This fucntion can be used to implement C<spawn>-like interfaces. |
259 | This function can be used to implement C<spawn>-like interfaces. |
261 | |
260 | |
262 | =cut |
261 | =cut |
263 | |
262 | |
264 | sub snd_to_func($$;@) { |
263 | sub snd_to_func($$;@) { |
265 | my $noderef = shift; |
264 | my $noderef = shift; |
… | |
… | |
349 | $cv->end; |
348 | $cv->end; |
350 | |
349 | |
351 | $cv |
350 | $cv |
352 | } |
351 | } |
353 | |
352 | |
354 | sub _node_rename { |
|
|
355 | $NODE = shift; |
|
|
356 | |
|
|
357 | my $self = $NODE{""}; |
|
|
358 | $NODE{$NODE} = delete $NODE{$self->{noderef}}; |
|
|
359 | $self->{noderef} = $NODE; |
|
|
360 | } |
|
|
361 | |
|
|
362 | sub initialise_node(@) { |
353 | sub initialise_node(@) { |
363 | my ($noderef, @others) = @_; |
354 | my ($noderef, @others) = @_; |
364 | |
355 | |
365 | my $profile = AnyEvent::MP::Config::find_profile |
356 | $CONFIG = AnyEvent::MP::Config::find_profile |
366 | +(defined $noderef ? $noderef : _nodename); |
357 | +(defined $noderef ? $noderef : _nodename); |
367 | |
358 | |
368 | $noderef = $profile->{noderef} |
359 | $noderef = $CONFIG->{noderef} |
369 | if exists $profile->{noderef}; |
360 | if exists $CONFIG->{noderef}; |
370 | |
361 | |
371 | push @others, @{ $profile->{seeds} }; |
362 | push @others, @{ $CONFIG->{seeds} }; |
|
|
363 | |
|
|
364 | @others = map $_->recv, map +(resolve_node $_), @others; |
372 | |
365 | |
373 | if ($noderef =~ /^slave\/(.*)$/) { |
366 | if ($noderef =~ /^slave\/(.*)$/) { |
374 | $SLAVE = AE::cv; |
|
|
375 | my $name = $1; |
367 | my $name = $1; |
376 | $name = $NODE unless length $name; |
368 | $name = $NODE unless length $name; |
377 | $noderef = AE::cv; |
|
|
378 | $noderef->send ("slave/$name"); |
|
|
379 | |
369 | |
380 | @others |
370 | @others |
381 | or Carp::croak "seed nodes must be specified for slave nodes"; |
371 | or Carp::croak "seed nodes must be specified for slave nodes"; |
382 | |
372 | |
|
|
373 | $SLAVE = 1; |
|
|
374 | $NODE = "slave/$name"; |
|
|
375 | |
383 | } else { |
376 | } else { |
384 | $PUBLIC = 1; |
377 | $PUBLIC = 1; |
385 | $noderef = resolve_node $noderef; |
378 | $NODE = (resolve_node $noderef)->recv; |
386 | } |
379 | } |
387 | |
380 | |
388 | @others = map $_->recv, map +(resolve_node $_), @others; |
381 | $NODE{$NODE} = $NODE{""}; |
|
|
382 | $NODE{$NODE}{noderef} = $NODE; |
389 | |
383 | |
390 | _node_rename $noderef->recv; |
384 | unless ($SLAVE) { |
391 | |
|
|
392 | for my $t (split /,/, $NODE) { |
385 | for my $t (split /,/, $NODE) { |
393 | my ($host, $port) = AnyEvent::Socket::parse_hostport $t; |
386 | my ($host, $port) = AnyEvent::Socket::parse_hostport $t; |
394 | |
387 | |
395 | $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, |
388 | $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, |
396 | sub { |
389 | sub { |
397 | my ($tp) = @_; |
390 | my ($tp) = @_; |
398 | |
391 | |
399 | # TODO: urgs |
392 | # TODO: urgs |
400 | my $node = add_node $tp->{remote_node}; |
393 | my $node = add_node $tp->{remote_node}; |
401 | $node->{trial}{accept} = $tp; |
394 | $node->{trial}{accept} = $tp; |
402 | }, |
395 | }, |
|
|
396 | ; |
403 | ; |
397 | } |
404 | } |
398 | } |
405 | |
399 | |
406 | for (@others) { |
400 | for (@others) { |
407 | my $node = add_node $_; |
401 | my $node = add_node $_; |
408 | $node->{autoconnect} = 1; |
402 | $node->{autoconnect} = 1; |
409 | $node->connect; |
403 | $node->connect; |
410 | } |
404 | } |
411 | |
405 | |
412 | if ($SLAVE) { |
|
|
413 | my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() }; |
|
|
414 | my $master = $SLAVE->recv; |
|
|
415 | $master |
|
|
416 | or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n"; |
|
|
417 | |
|
|
418 | $MASTER = $master->{noderef}; |
|
|
419 | $master->{autoconnect} = 1; |
|
|
420 | |
|
|
421 | (my $via = $MASTER) =~ s/,/!/g; |
|
|
422 | |
|
|
423 | $NODE .= "\@$via"; |
|
|
424 | _node_rename $NODE; |
|
|
425 | |
|
|
426 | $_->send (["", iam => $NODE]) |
|
|
427 | for values %NODE; |
|
|
428 | |
|
|
429 | $SLAVE = 1; |
|
|
430 | } |
|
|
431 | |
|
|
432 | for (@{ $profile->{services} }) { |
406 | for (@{ $CONFIG->{services} }) { |
433 | if (s/::$//) { |
407 | if (s/::$//) { |
434 | eval "require $_"; |
408 | eval "require $_"; |
435 | die $@ if $@; |
409 | die $@ if $@; |
436 | } else { |
410 | } else { |
437 | (load_func $_)->(); |
411 | (load_func $_)->(); |
438 | } |
412 | } |
439 | } |
413 | } |
|
|
414 | |
|
|
415 | # slave nodes need global |
|
|
416 | require AnyEvent::MP::Global |
|
|
417 | if $SLAVE; |
440 | } |
418 | } |
441 | |
419 | |
442 | ############################################################################# |
420 | ############################################################################# |
443 | # node monitoring and info |
421 | # node monitoring and info |
444 | |
422 | |
… | |
… | |
446 | my %node; |
424 | my %node; |
447 | |
425 | |
448 | @node{values %NODE} = values %NODE; |
426 | @node{values %NODE} = values %NODE; |
449 | |
427 | |
450 | values %node; |
428 | values %node; |
|
|
429 | } |
|
|
430 | |
|
|
431 | sub _public_nodes { |
|
|
432 | grep $_->{noderef} !~ /^slave\//, _uniq_nodes |
451 | } |
433 | } |
452 | |
434 | |
453 | =item node_is_known $noderef |
435 | =item node_is_known $noderef |
454 | |
436 | |
455 | Returns true iff the given node is currently known to the system. |
437 | Returns true iff the given node is currently known to the system. |
… | |
… | |
475 | ? 1 : 0 |
457 | ? 1 : 0 |
476 | } |
458 | } |
477 | |
459 | |
478 | =item known_nodes |
460 | =item known_nodes |
479 | |
461 | |
480 | Returns the noderefs of all nodes connected to this node, including |
462 | Returns the noderefs of all public nodes connected to this node, including |
481 | itself. |
463 | itself. |
482 | |
464 | |
483 | =cut |
465 | =cut |
484 | |
466 | |
485 | sub known_nodes { |
467 | sub known_nodes { |
486 | map $_->{noderef}, _uniq_nodes |
468 | map $_->{noderef}, _public_nodes |
487 | } |
469 | } |
488 | |
470 | |
489 | =item up_nodes |
471 | =item up_nodes |
490 | |
472 | |
491 | Return the noderefs of all nodes that are currently connected (excluding |
473 | Return the noderefs of all public nodes that are currently connected |
492 | the node itself). |
474 | (excluding the node itself). |
493 | |
475 | |
494 | =cut |
476 | =cut |
495 | |
477 | |
496 | sub up_nodes { |
478 | sub up_nodes { |
497 | map $_->{noderef}, grep $_->{transport}, _uniq_nodes |
479 | map $_->{noderef}, grep $_->{transport}, _public_nodes |
498 | } |
480 | } |
499 | |
481 | |
500 | =item $guard = mon_nodes $callback->($noderef, $is_up, @reason) |
482 | =item $guard = mon_nodes $callback->($noderef, $is_up, @reason) |
501 | |
483 | |
502 | Registers a callback that is called each time a node goes up (connection |
484 | Registers a callback that is called each time a node goes up (connection |
… | |
… | |
554 | my $cbs = delete $SRCNODE->{lmon}{+shift} |
536 | my $cbs = delete $SRCNODE->{lmon}{+shift} |
555 | or return; |
537 | or return; |
556 | |
538 | |
557 | $_->(@_) for @$cbs; |
539 | $_->(@_) for @$cbs; |
558 | }, |
540 | }, |
559 | # node changed its name (for slave nodes) |
|
|
560 | iam => sub { |
|
|
561 | # get rid of bogus slave/xxx name, hopefully |
|
|
562 | delete $NODE{$SRCNODE->{noderef}}; |
|
|
563 | |
|
|
564 | # change noderef |
|
|
565 | $SRCNODE->{noderef} = $_[0]; |
|
|
566 | |
|
|
567 | # anchor |
|
|
568 | $NODE{$_[0]} = $SRCNODE; |
|
|
569 | }, |
|
|
570 | |
541 | |
571 | # "public" services - not actually public |
542 | # "public" services - not actually public |
572 | |
543 | |
573 | # relay message to another node / generic echo |
544 | # relay message to another node / generic echo |
574 | snd => \&snd, |
545 | snd => \&snd, |