… | |
… | |
36 | use base "Exporter"; |
36 | use base "Exporter"; |
37 | |
37 | |
38 | our $VERSION = '0.7'; |
38 | our $VERSION = '0.7'; |
39 | our @EXPORT = qw( |
39 | our @EXPORT = qw( |
40 | %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID |
40 | %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID |
41 | connect_node add_node load_func snd_to_func |
41 | connect_node add_node load_func snd_to_func snd_on eval_on |
42 | |
42 | |
43 | NODE $NODE node_of snd kil |
43 | NODE $NODE node_of snd kil |
44 | port_is_local |
44 | port_is_local |
45 | resolve_node initialise_node |
45 | resolve_node initialise_node |
46 | known_nodes up_nodes mon_nodes node_is_known node_is_up |
46 | known_nodes up_nodes mon_nodes node_is_known node_is_up |
… | |
… | |
183 | ? sub () { 1 } |
183 | ? sub () { 1 } |
184 | : sub () { 0 }; |
184 | : sub () { 0 }; |
185 | } |
185 | } |
186 | |
186 | |
187 | sub _inject { |
187 | sub _inject { |
188 | warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d# |
188 | warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE && @_;#d# |
189 | &{ $PORT{+shift} or return }; |
189 | &{ $PORT{+shift} or return }; |
190 | } |
190 | } |
191 | |
191 | |
192 | sub add_node { |
192 | sub add_node { |
193 | my ($noderef) = @_; |
193 | my ($noderef) = @_; |
… | |
… | |
250 | my ($noderef, undef) = split /#/, $_[0], 2; |
250 | my ($noderef, undef) = split /#/, $_[0], 2; |
251 | |
251 | |
252 | $NODE{$noderef} == $NODE{""} |
252 | $NODE{$noderef} == $NODE{""} |
253 | } |
253 | } |
254 | |
254 | |
255 | =item snd_to_func $noderef, $func, @args |
255 | =item snd_to_func $node, $func, @args |
256 | |
256 | |
257 | Expects a noderef and a name of a function. Asynchronously tries to call |
257 | Expects a noderef and a name of a function. Asynchronously tries to call |
258 | this function with the given arguments on that node. |
258 | this function with the given arguments on that node. |
259 | |
259 | |
260 | This fucntion can be used to implement C<spawn>-like interfaces. |
260 | This fucntion can be used to implement C<spawn>-like interfaces. |
261 | |
261 | |
262 | =cut |
262 | =cut |
263 | |
263 | |
264 | sub snd_to_func { |
264 | sub snd_to_func($$;@) { |
265 | my $noderef = shift; |
265 | my $noderef = shift; |
266 | |
266 | |
267 | ($NODE{$noderef} || add_node $noderef) |
267 | ($NODE{$noderef} || add_node $noderef) |
268 | ->send (["", @_]); |
268 | ->send (["", @_]); |
|
|
269 | } |
|
|
270 | |
|
|
271 | =item snd_on $node, @msg |
|
|
272 | |
|
|
273 | Executes C<snd> with the given C<@msg> (which must include the destination |
|
|
274 | port) on the given node. |
|
|
275 | |
|
|
276 | =cut |
|
|
277 | |
|
|
278 | sub snd_on($@) { |
|
|
279 | my $node = shift; |
|
|
280 | snd $node, snd => @_; |
|
|
281 | } |
|
|
282 | |
|
|
283 | =item eval_on $node, $string |
|
|
284 | |
|
|
285 | Evaluates the given string as Perl expression on the given node. |
|
|
286 | |
|
|
287 | =cut |
|
|
288 | |
|
|
289 | sub eval_on($@) { |
|
|
290 | my $node = shift; |
|
|
291 | snd $node, eval => @_; |
269 | } |
292 | } |
270 | |
293 | |
271 | sub kil(@) { |
294 | sub kil(@) { |
272 | my ($noderef, $portid) = split /#/, shift, 2; |
295 | my ($noderef, $portid) = split /#/, shift, 2; |
273 | |
296 | |
… | |
… | |
326 | $cv->end; |
349 | $cv->end; |
327 | |
350 | |
328 | $cv |
351 | $cv |
329 | } |
352 | } |
330 | |
353 | |
|
|
354 | sub _node_rename { |
|
|
355 | $NODE = shift; |
|
|
356 | |
|
|
357 | my $self = $NODE{""}; |
|
|
358 | $NODE{$NODE} = delete $NODE{$self->{noderef}}; |
|
|
359 | $self->{noderef} = $NODE; |
|
|
360 | } |
|
|
361 | |
331 | sub initialise_node(@) { |
362 | sub initialise_node(@) { |
332 | my ($noderef, @others) = @_; |
363 | my ($noderef, @others) = @_; |
333 | |
364 | |
334 | my $profile = AnyEvent::MP::Config::find_profile |
365 | my $profile = AnyEvent::MP::Config::find_profile |
335 | +(defined $noderef ? $noderef : _nodename); |
366 | +(defined $noderef ? $noderef : _nodename); |
… | |
… | |
354 | $noderef = resolve_node $noderef; |
385 | $noderef = resolve_node $noderef; |
355 | } |
386 | } |
356 | |
387 | |
357 | @others = map $_->recv, map +(resolve_node $_), @others; |
388 | @others = map $_->recv, map +(resolve_node $_), @others; |
358 | |
389 | |
359 | $NODE = $noderef->recv; |
390 | _node_rename $noderef->recv; |
360 | |
|
|
361 | $NODE{$NODE} = $NODE{""}; |
|
|
362 | |
391 | |
363 | for my $t (split /,/, $NODE) { |
392 | for my $t (split /,/, $NODE) { |
364 | $NODE{$t} = $NODE{""}; |
|
|
365 | |
|
|
366 | my ($host, $port) = AnyEvent::Socket::parse_hostport $t; |
393 | my ($host, $port) = AnyEvent::Socket::parse_hostport $t; |
367 | |
394 | |
368 | $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, |
395 | $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, |
369 | sub { |
396 | sub { |
370 | my ($tp) = @_; |
397 | my ($tp) = @_; |
… | |
… | |
392 | $master->{autoconnect} = 1; |
419 | $master->{autoconnect} = 1; |
393 | |
420 | |
394 | (my $via = $MASTER) =~ s/,/!/g; |
421 | (my $via = $MASTER) =~ s/,/!/g; |
395 | |
422 | |
396 | $NODE .= "\@$via"; |
423 | $NODE .= "\@$via"; |
397 | $NODE{$NODE} = $NODE{""}; |
424 | _node_rename $NODE; |
398 | |
425 | |
399 | $_->send (["", iam => $NODE]) |
426 | $_->send (["", iam => $NODE]) |
400 | for values %NODE; |
427 | for values %NODE; |
401 | |
428 | |
402 | $SLAVE = 1; |
429 | $SLAVE = 1; |
… | |
… | |
448 | ? 1 : 0 |
475 | ? 1 : 0 |
449 | } |
476 | } |
450 | |
477 | |
451 | =item known_nodes |
478 | =item known_nodes |
452 | |
479 | |
453 | Returns the noderefs of all nodes connected to this node. |
480 | Returns the noderefs of all nodes connected to this node, including |
|
|
481 | itself. |
454 | |
482 | |
455 | =cut |
483 | =cut |
456 | |
484 | |
457 | sub known_nodes { |
485 | sub known_nodes { |
458 | map $_->{noderef}, _uniq_nodes |
486 | map $_->{noderef}, _uniq_nodes |
… | |
… | |
538 | |
566 | |
539 | # anchor |
567 | # anchor |
540 | $NODE{$_[0]} = $SRCNODE; |
568 | $NODE{$_[0]} = $SRCNODE; |
541 | }, |
569 | }, |
542 | |
570 | |
543 | # public services |
571 | # "public" services - not actually public |
544 | |
572 | |
545 | # relay message to another node / generic echo |
573 | # relay message to another node / generic echo |
546 | snd => \&snd, |
574 | snd => \&snd, |
547 | snd_multi => sub { |
575 | snd_multi => sub { |
548 | snd @$_ for @_ |
576 | snd @$_ for @_ |
… | |
… | |
573 | "" => sub { |
601 | "" => sub { |
574 | # empty messages are sent by monitoring |
602 | # empty messages are sent by monitoring |
575 | }, |
603 | }, |
576 | ); |
604 | ); |
577 | |
605 | |
578 | $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE; |
606 | $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE; |
579 | $PORT{""} = sub { |
607 | $PORT{""} = sub { |
580 | my $tag = shift; |
608 | my $tag = shift; |
581 | eval { &{ $node_req{$tag} ||= load_func $tag } }; |
609 | eval { &{ $node_req{$tag} ||= load_func $tag } }; |
582 | $WARN->(2, "error processing node message: $@") if $@; |
610 | $WARN->(2, "error processing node message: $@") if $@; |
583 | }; |
611 | }; |