ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP/Kernel.pm
(Generate patch)

Comparing cvsroot/AnyEvent-MP/MP/Kernel.pm (file contents):
Revision 1.17 by root, Sun Aug 16 02:55:17 2009 UTC vs.
Revision 1.18 by root, Sun Aug 16 05:02:24 2009 UTC

36use base "Exporter"; 36use base "Exporter";
37 37
38our $VERSION = '0.7'; 38our $VERSION = '0.7';
39our @EXPORT = qw( 39our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID 40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 connect_node add_node load_func snd_to_func 41 connect_node add_node load_func snd_to_func snd_on eval_on
42 42
43 NODE $NODE node_of snd kil 43 NODE $NODE node_of snd kil
44 port_is_local 44 port_is_local
45 resolve_node initialise_node 45 resolve_node initialise_node
46 known_nodes up_nodes mon_nodes node_is_known node_is_up 46 known_nodes up_nodes mon_nodes node_is_known node_is_up
183 ? sub () { 1 } 183 ? sub () { 1 }
184 : sub () { 0 }; 184 : sub () { 0 };
185} 185}
186 186
187sub _inject { 187sub _inject {
188 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d# 188 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE && @_;#d#
189 &{ $PORT{+shift} or return }; 189 &{ $PORT{+shift} or return };
190} 190}
191 191
192sub add_node { 192sub add_node {
193 my ($noderef) = @_; 193 my ($noderef) = @_;
250 my ($noderef, undef) = split /#/, $_[0], 2; 250 my ($noderef, undef) = split /#/, $_[0], 2;
251 251
252 $NODE{$noderef} == $NODE{""} 252 $NODE{$noderef} == $NODE{""}
253} 253}
254 254
255=item snd_to_func $noderef, $func, @args 255=item snd_to_func $node, $func, @args
256 256
257Expects a noderef and a name of a function. Asynchronously tries to call 257Expects a noderef and a name of a function. Asynchronously tries to call
258this function with the given arguments on that node. 258this function with the given arguments on that node.
259 259
260This fucntion can be used to implement C<spawn>-like interfaces. 260This fucntion can be used to implement C<spawn>-like interfaces.
261 261
262=cut 262=cut
263 263
264sub snd_to_func { 264sub snd_to_func($$;@) {
265 my $noderef = shift; 265 my $noderef = shift;
266 266
267 ($NODE{$noderef} || add_node $noderef) 267 ($NODE{$noderef} || add_node $noderef)
268 ->send (["", @_]); 268 ->send (["", @_]);
269}
270
271=item snd_on $node, @msg
272
273Executes C<snd> with the given C<@msg> (which must include the destination
274port) on the given node.
275
276=cut
277
278sub snd_on($@) {
279 my $node = shift;
280 snd $node, snd => @_;
281}
282
283=item eval_on $node, $string
284
285Evaluates the given string as Perl expression on the given node.
286
287=cut
288
289sub eval_on($@) {
290 my $node = shift;
291 snd $node, eval => @_;
269} 292}
270 293
271sub kil(@) { 294sub kil(@) {
272 my ($noderef, $portid) = split /#/, shift, 2; 295 my ($noderef, $portid) = split /#/, shift, 2;
273 296
326 $cv->end; 349 $cv->end;
327 350
328 $cv 351 $cv
329} 352}
330 353
354sub _node_rename {
355 $NODE = shift;
356
357 my $self = $NODE{""};
358 $NODE{$NODE} = delete $NODE{$self->{noderef}};
359 $self->{noderef} = $NODE;
360}
361
331sub initialise_node(@) { 362sub initialise_node(@) {
332 my ($noderef, @others) = @_; 363 my ($noderef, @others) = @_;
333 364
334 my $profile = AnyEvent::MP::Config::find_profile 365 my $profile = AnyEvent::MP::Config::find_profile
335 +(defined $noderef ? $noderef : _nodename); 366 +(defined $noderef ? $noderef : _nodename);
354 $noderef = resolve_node $noderef; 385 $noderef = resolve_node $noderef;
355 } 386 }
356 387
357 @others = map $_->recv, map +(resolve_node $_), @others; 388 @others = map $_->recv, map +(resolve_node $_), @others;
358 389
359 $NODE = $noderef->recv; 390 _node_rename $noderef->recv;
360
361 $NODE{$NODE} = $NODE{""};
362 391
363 for my $t (split /,/, $NODE) { 392 for my $t (split /,/, $NODE) {
364 $NODE{$t} = $NODE{""};
365
366 my ($host, $port) = AnyEvent::Socket::parse_hostport $t; 393 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
367 394
368 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, 395 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
369 sub { 396 sub {
370 my ($tp) = @_; 397 my ($tp) = @_;
392 $master->{autoconnect} = 1; 419 $master->{autoconnect} = 1;
393 420
394 (my $via = $MASTER) =~ s/,/!/g; 421 (my $via = $MASTER) =~ s/,/!/g;
395 422
396 $NODE .= "\@$via"; 423 $NODE .= "\@$via";
397 $NODE{$NODE} = $NODE{""}; 424 _node_rename $NODE;
398 425
399 $_->send (["", iam => $NODE]) 426 $_->send (["", iam => $NODE])
400 for values %NODE; 427 for values %NODE;
401 428
402 $SLAVE = 1; 429 $SLAVE = 1;
448 ? 1 : 0 475 ? 1 : 0
449} 476}
450 477
451=item known_nodes 478=item known_nodes
452 479
453Returns the noderefs of all nodes connected to this node. 480Returns the noderefs of all nodes connected to this node, including
481itself.
454 482
455=cut 483=cut
456 484
457sub known_nodes { 485sub known_nodes {
458 map $_->{noderef}, _uniq_nodes 486 map $_->{noderef}, _uniq_nodes
538 566
539 # anchor 567 # anchor
540 $NODE{$_[0]} = $SRCNODE; 568 $NODE{$_[0]} = $SRCNODE;
541 }, 569 },
542 570
543 # public services 571 # "public" services - not actually public
544 572
545 # relay message to another node / generic echo 573 # relay message to another node / generic echo
546 snd => \&snd, 574 snd => \&snd,
547 snd_multi => sub { 575 snd_multi => sub {
548 snd @$_ for @_ 576 snd @$_ for @_
573 "" => sub { 601 "" => sub {
574 # empty messages are sent by monitoring 602 # empty messages are sent by monitoring
575 }, 603 },
576); 604);
577 605
578$NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE; 606$NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
579$PORT{""} = sub { 607$PORT{""} = sub {
580 my $tag = shift; 608 my $tag = shift;
581 eval { &{ $node_req{$tag} ||= load_func $tag } }; 609 eval { &{ $node_req{$tag} ||= load_func $tag } };
582 $WARN->(2, "error processing node message: $@") if $@; 610 $WARN->(2, "error processing node message: $@") if $@;
583}; 611};

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines