… | |
… | |
28 | |
28 | |
29 | use common::sense; |
29 | use common::sense; |
30 | use Carp (); |
30 | use Carp (); |
31 | use MIME::Base64 (); |
31 | use MIME::Base64 (); |
32 | |
32 | |
|
|
33 | use AnyEvent::Util (); |
|
|
34 | |
33 | use AnyEvent::MP; |
35 | use AnyEvent::MP; |
34 | use AnyEvent::MP::Kernel; |
36 | use AnyEvent::MP::Kernel; |
35 | |
37 | |
36 | our $VERSION = $AnyEvent::MP::VERSION; |
38 | our $VERSION = $AnyEvent::MP::VERSION; |
37 | |
39 | |
38 | our $port = port; |
40 | our %port; # our rendezvous port on the other side |
39 | our %other; # our rendevouz port on the other side |
41 | our %lreg; # local registry, name => [pid...] |
|
|
42 | our %lmon; # local rgeistry monitoring name,pid => mon |
|
|
43 | our %greg; # global regstry, name => [pid...] |
|
|
44 | |
|
|
45 | sub unreg_groups($) { |
|
|
46 | my ($noderef) = @_; |
|
|
47 | |
|
|
48 | my $qr = qr/^\Q$noderef\E(?:#|$)/; |
|
|
49 | |
|
|
50 | for my $group (values %greg) { |
|
|
51 | @$group = grep $_ !~ $qr, @$group; |
|
|
52 | } |
|
|
53 | } |
|
|
54 | |
|
|
55 | sub set_groups($$) { |
|
|
56 | my ($noderef, $lreg) = @_; |
|
|
57 | } |
|
|
58 | |
|
|
59 | =item $guard = register $port, $group |
|
|
60 | |
|
|
61 | Register the given (local!) port in the named global group C<$group>. |
|
|
62 | |
|
|
63 | The port will be unregistered automatically when the port is destroyed. |
|
|
64 | |
|
|
65 | When not called in void context, then a guard object will be returned that |
|
|
66 | will also cause the name to be unregistered when destroyed. |
|
|
67 | |
|
|
68 | =cut |
|
|
69 | |
|
|
70 | # register port from any node |
|
|
71 | sub _register { |
|
|
72 | my ($port, $group) = @_; |
|
|
73 | |
|
|
74 | push @{ $greg{$group} }, $port; |
|
|
75 | } |
|
|
76 | |
|
|
77 | # unregister local port |
|
|
78 | sub unregister { |
|
|
79 | my ($port, $group) = @_; |
|
|
80 | |
|
|
81 | delete $lmon{"$group\x00$port"}; |
|
|
82 | @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} }; |
|
|
83 | |
|
|
84 | _unregister $port, $group; |
|
|
85 | |
|
|
86 | snd $_, reg0 => $port, $group |
|
|
87 | for values %port; |
|
|
88 | } |
|
|
89 | |
|
|
90 | # register local port |
|
|
91 | sub register($$) { |
|
|
92 | my ($port, $group) = @_; |
|
|
93 | |
|
|
94 | port_is_local $port |
|
|
95 | or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught"; |
|
|
96 | |
|
|
97 | $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group }; |
|
|
98 | push @{ $lreg{$group} }, $port; |
|
|
99 | |
|
|
100 | snd $_, reg1 => $port, $group |
|
|
101 | for values %port; |
|
|
102 | |
|
|
103 | _register $port, $group; |
|
|
104 | |
|
|
105 | wantarray && AnyEvent::Util::guard { unregister $port, $group } |
|
|
106 | } |
|
|
107 | |
|
|
108 | sub start_node { |
|
|
109 | my ($noderef) = @_; |
|
|
110 | |
|
|
111 | return if exists $port{$noderef}; |
|
|
112 | |
|
|
113 | # establish connection |
|
|
114 | my $port = $port{$noderef} = spawn $noderef, "AnyEvent::MP::Global::connect", $NODE; |
|
|
115 | # request any other nodes possibly known to us |
|
|
116 | mon $port, sub { |
|
|
117 | unreg_groups $noderef; |
|
|
118 | delete $port{$noderef}; |
|
|
119 | }; |
|
|
120 | snd $port, connect_nodes => up_nodes; |
|
|
121 | snd $port, set => \%greg; |
|
|
122 | } |
40 | |
123 | |
41 | sub connect { |
124 | sub connect { |
42 | my ($noderef) = @_; |
125 | my ($noderef) = @_; |
43 | |
126 | |
44 | # monitor them, silently die |
127 | # monitor them, silently die |
45 | mon $noderef, psub { kil $SELF }; |
128 | mon $noderef, psub { kil $SELF }; |
46 | |
129 | |
|
|
130 | warn "$SELF,$NODE\n";#d# |
|
|
131 | rcv $SELF, |
47 | rcv $SELF, connect_nodes => sub { |
132 | connect_nodes => sub { |
|
|
133 | for (@_) { |
48 | connect_node $_ for @_; |
134 | connect_node $_; |
|
|
135 | start_node $_; |
|
|
136 | } |
|
|
137 | }, |
|
|
138 | set => sub { |
|
|
139 | unreg_groups $noderef; |
|
|
140 | set_groups $noderef, shift; |
|
|
141 | }, |
|
|
142 | reg1 => \&_register, |
|
|
143 | reg0 => \&_unregister, |
49 | }; |
144 | ; |
50 | } |
145 | } |
51 | |
146 | |
52 | sub mon_node { |
147 | sub mon_node { |
53 | my ($noderef, $is_up) = @_; |
148 | my ($noderef, $is_up) = @_; |
54 | |
149 | |
55 | if ($is_up) { |
150 | if ($is_up) { |
56 | # establish connection |
151 | start_node $noderef; |
57 | my $other = $other{$noderef} = spawn $noderef, "AnyEvent::MP::Global::connect", $NODE; |
|
|
58 | # request any other nodes possibly known to us |
|
|
59 | snd $other, connect_nodes => up_nodes; |
|
|
60 | } else { |
152 | } else { |
61 | kil delete $other{$noderef}; |
153 | unreg_groups $noderef; |
62 | } |
154 | } |
63 | #warn "node<$noderef,$is_up>\n";#d# |
155 | #warn "node<$noderef,$is_up>\n";#d# |
64 | } |
156 | } |
65 | |
157 | |
66 | mon_node $_, 1 |
158 | mon_node $_, 1 |