… | |
… | |
294 | length $portid |
294 | length $portid |
295 | or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught"; |
295 | or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught"; |
296 | |
296 | |
297 | ($NODE{$nodeid} || add_node $nodeid) |
297 | ($NODE{$nodeid} || add_node $nodeid) |
298 | ->kill ("$portid", @_); |
298 | ->kill ("$portid", @_); |
299 | } |
|
|
300 | |
|
|
301 | # actual code that kills a port |
|
|
302 | sub _kill { |
|
|
303 | my $port = shift; |
|
|
304 | |
|
|
305 | delete $PORT{$port} |
|
|
306 | or return; # killing nonexistent ports is O.K. |
|
|
307 | delete $PORT_DATA{$port}; |
|
|
308 | |
|
|
309 | my $mon = delete $LMON{$port} |
|
|
310 | or !@_ |
|
|
311 | or $WARN->(2, "unmonitored local port $port died with reason: @_"); |
|
|
312 | |
|
|
313 | $_->(@_) for values %$mon; |
|
|
314 | } |
299 | } |
315 | |
300 | |
316 | sub _nodename { |
301 | sub _nodename { |
317 | require POSIX; |
302 | require POSIX; |
318 | (POSIX::uname ())[1] |
303 | (POSIX::uname ())[1] |
… | |
… | |
572 | } |
557 | } |
573 | |
558 | |
574 | ############################################################################# |
559 | ############################################################################# |
575 | # self node code |
560 | # self node code |
576 | |
561 | |
|
|
562 | sub _kill { |
|
|
563 | my $port = shift; |
|
|
564 | |
|
|
565 | delete $PORT{$port} |
|
|
566 | or return; # killing nonexistent ports is O.K. |
|
|
567 | delete $PORT_DATA{$port}; |
|
|
568 | |
|
|
569 | my $mon = delete $LMON{$port} |
|
|
570 | or !@_ |
|
|
571 | or $WARN->(2, "unmonitored local port $port died with reason: @_"); |
|
|
572 | |
|
|
573 | $_->(@_) for values %$mon; |
|
|
574 | } |
|
|
575 | |
|
|
576 | sub _monitor { |
|
|
577 | return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]") |
|
|
578 | unless exists $PORT{$_[1]}; |
|
|
579 | |
|
|
580 | $LMON{$_[1]}{$_[2]+0} = $_[2]; |
|
|
581 | } |
|
|
582 | |
|
|
583 | sub _unmonitor { |
|
|
584 | delete $LMON{$_[1]}{$_[2]+0}; |
|
|
585 | } |
|
|
586 | |
577 | our %node_req = ( |
587 | our %node_req = ( |
578 | # internal services |
588 | # internal services |
579 | |
589 | |
580 | # monitoring |
590 | # monitoring |
581 | mon0 => sub { # stop monitoring a port for another node |
591 | mon0 => sub { # stop monitoring a port for another node |
582 | my $portid = shift; |
592 | my $portid = shift; |
583 | my $node = $SRCNODE; |
|
|
584 | $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid}); |
593 | _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid}; |
585 | }, |
594 | }, |
586 | mon1 => sub { # start monitoring a port for another node |
595 | mon1 => sub { # start monitoring a port for another node |
587 | my $portid = shift; |
596 | my $portid = shift; |
588 | my $node = $SRCNODE; |
|
|
589 | Scalar::Util::weaken $node; |
597 | Scalar::Util::weaken (my $node = $SRCNODE); |
590 | $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub { |
598 | _monitor undef, $portid, $node->{rmon}{$portid} = sub { |
591 | delete $node->{rmon}{$portid}; |
599 | delete $node->{rmon}{$portid}; |
592 | $node->send (["", kil0 => $portid, @_]) |
600 | $node->send (["", kil0 => $portid, @_]) |
593 | if $node && $node->{transport}; |
601 | if $node && $node->{transport}; |
594 | }); |
602 | }; |
595 | }, |
603 | }, |
596 | # another node has killed a monitored port |
604 | # another node has killed a monitored port |
597 | kil0 => sub { |
605 | kil0 => sub { |
598 | my $cbs = delete $SRCNODE->{lmon}{+shift} |
606 | my $cbs = delete $SRCNODE->{lmon}{+shift} |
599 | or return; |
607 | or return; |