… | |
… | |
28 | |
28 | |
29 | use common::sense; |
29 | use common::sense; |
30 | use Carp (); |
30 | use Carp (); |
31 | use MIME::Base64 (); |
31 | use MIME::Base64 (); |
32 | |
32 | |
|
|
33 | use AnyEvent (); |
33 | use AnyEvent::Util (); |
34 | use AnyEvent::Util (); |
34 | |
35 | |
35 | use AnyEvent::MP; |
36 | use AnyEvent::MP; |
36 | use AnyEvent::MP::Kernel; |
37 | use AnyEvent::MP::Kernel; |
37 | |
38 | |
38 | our $VERSION = $AnyEvent::MP::VERSION; |
39 | our $VERSION = $AnyEvent::MP::VERSION; |
|
|
40 | |
|
|
41 | our %addr; # port ID => [address...] mapping |
39 | |
42 | |
40 | our %port; # our rendezvous port on the other side |
43 | our %port; # our rendezvous port on the other side |
41 | our %lreg; # local registry, name => [pid...] |
44 | our %lreg; # local registry, name => [pid...] |
42 | our %lmon; # local rgeistry monitoring name,pid => mon |
45 | our %lmon; # local rgeistry monitoring name,pid => mon |
43 | our %greg; # global regstry, name => [pid...] |
46 | our %greg; # global regstry, name => [pid...] |
44 | |
47 | |
|
|
48 | our $nodecnt; |
|
|
49 | |
45 | $AnyEvent::MP::Kernel::WARN->(7, "starting global service."); |
50 | $AnyEvent::MP::Kernel::WARN->(7, "starting global service."); |
46 | |
51 | |
|
|
52 | ############################################################################# |
|
|
53 | # seednodes |
|
|
54 | |
|
|
55 | our @SEEDS; |
|
|
56 | our $SEED_WATCHER; |
|
|
57 | |
|
|
58 | sub seed_connect { |
|
|
59 | my ($seed) = @_; |
|
|
60 | |
|
|
61 | my ($host, $port) = AnyEvent::Socket::parse_hostport $seed |
|
|
62 | or Carp::croak "$seed: unparsable seed address"; |
|
|
63 | |
|
|
64 | # ughhh |
|
|
65 | my $tp; $tp = AnyEvent::MP::Transport::mp_connect $host, $port, |
|
|
66 | release => \&more_seeding, |
|
|
67 | sub { |
|
|
68 | $tp = shift; |
|
|
69 | $tp->{keepalive} = $tp; |
|
|
70 | }, |
|
|
71 | ; |
|
|
72 | } |
|
|
73 | |
|
|
74 | sub more_seeding { |
|
|
75 | return if $nodecnt; |
|
|
76 | return unless @SEEDS; |
|
|
77 | |
|
|
78 | $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding."); |
|
|
79 | |
|
|
80 | seed_connect $SEEDS[rand @SEEDS]; |
|
|
81 | } |
|
|
82 | |
|
|
83 | sub set_seeds(@) { |
|
|
84 | @SEEDS = @_; |
|
|
85 | |
|
|
86 | $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding; |
|
|
87 | |
|
|
88 | seed_connect $_ |
|
|
89 | for @SEEDS; |
|
|
90 | } |
|
|
91 | |
|
|
92 | ############################################################################# |
|
|
93 | |
47 | sub unreg_groups($) { |
94 | sub unreg_groups($) { |
48 | my ($noderef) = @_; |
95 | my ($node) = @_; |
49 | |
96 | |
50 | my $qr = qr/^\Q$noderef\E(?:#|$)/; |
97 | my $qr = qr/^\Q$node\E(?:#|$)/; |
51 | |
98 | |
52 | for my $group (values %greg) { |
99 | for my $group (values %greg) { |
53 | @$group = grep $_ !~ $qr, @$group; |
100 | @$group = grep $_ !~ $qr, @$group; |
54 | } |
101 | } |
55 | } |
102 | } |
56 | |
103 | |
57 | sub set_groups($$) { |
104 | sub set_groups($$) { |
58 | my ($noderef, $lreg) = @_; |
105 | my ($node, $lreg) = @_; |
59 | |
106 | |
60 | while (my ($k, $v) = each %$lreg) { |
107 | while (my ($k, $v) = each %$lreg) { |
61 | push @{ $greg{$k} }, @$v; |
108 | push @{ $greg{$k} }, @$v; |
62 | } |
109 | } |
63 | } |
110 | } |
… | |
… | |
131 | |
178 | |
132 | wantarray && AnyEvent::Util::guard { unregister $port, $group } |
179 | wantarray && AnyEvent::Util::guard { unregister $port, $group } |
133 | } |
180 | } |
134 | |
181 | |
135 | sub start_node { |
182 | sub start_node { |
136 | my ($noderef) = @_; |
183 | my ($node) = @_; |
137 | |
184 | |
138 | return if exists $port{$noderef}; |
185 | return if exists $port{$node}; |
139 | return if $noderef eq $NODE; # do not connect to ourselves |
186 | return if $node eq $NODE; # do not connect to ourselves |
140 | |
187 | |
141 | # establish connection |
188 | # establish connection |
142 | my $port = $port{$noderef} = spawn $noderef, "AnyEvent::MP::Global::connect", 0, $NODE; |
189 | my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE; |
143 | # request any other nodes possibly known to us |
190 | |
144 | mon $port, sub { |
191 | mon $port, sub { |
145 | unreg_groups $noderef; |
192 | unreg_groups $node; |
146 | delete $port{$noderef}; |
193 | delete $port{$node}; |
147 | }; |
194 | }; |
|
|
195 | |
|
|
196 | snd $port, addr => $AnyEvent::MP::Kernel::LISTENER if @$AnyEvent::MP::Kernel::LISTENER; |
148 | snd $port, connect_nodes => up_nodes; |
197 | snd $port, connect_nodes => \%addr if %addr; |
149 | snd $port, set => \%lreg; |
198 | snd $port, set => \%lreg if %lreg; |
150 | } |
199 | } |
151 | |
200 | |
152 | # other nodes connect via this |
201 | # other nodes connect via this |
153 | sub connect { |
202 | sub connect { |
154 | my ($version, $noderef) = @_; |
203 | my ($version, $node) = @_; |
155 | |
204 | |
156 | # monitor them, silently die |
205 | # monitor them, silently die |
157 | mon $noderef, psub { kil $SELF }; |
206 | mon $node, psub { kil $SELF }; |
158 | |
207 | |
159 | rcv $SELF, |
208 | rcv $SELF, |
|
|
209 | addr => sub { |
|
|
210 | my $addresses = shift; |
|
|
211 | $AnyEvent::MP::Kernel::WARN->(9, "$node told us it's addresses (@$addresses)."); |
|
|
212 | $addr{$node} = $addresses; |
|
|
213 | }, |
160 | connect_nodes => sub { |
214 | connect_nodes => sub { |
161 | for (@_) { |
215 | my ($kv) = @_; |
162 | connect_node $_; |
216 | |
|
|
217 | use JSON::XS;#d# |
|
|
218 | my $kv_txt = JSON::XS->new->encode ($kv);#d# |
|
|
219 | $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d# |
|
|
220 | |
|
|
221 | while (my ($id, $addresses) = each %$kv) { |
|
|
222 | my $node = AnyEvent::MP::Kernel::add_node $id; |
|
|
223 | $node->connect (@$addresses); |
163 | start_node $_; |
224 | start_node $id; |
164 | } |
225 | } |
165 | }, |
226 | }, |
|
|
227 | find_node => sub { |
|
|
228 | my ($othernode) = @_; |
|
|
229 | |
|
|
230 | $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode."); |
|
|
231 | snd $port{$node}, connect_nodes => { $othernode => $addr{$othernode} } |
|
|
232 | if $addr{$othernode}; |
|
|
233 | }, |
166 | set => sub { |
234 | set => sub { |
167 | set_groups $noderef, shift; |
235 | set_groups $node, shift; |
168 | }, |
236 | }, |
169 | reg1 => \&_register, |
237 | reg1 => \&_register, |
170 | reg0 => \&_unregister, |
238 | reg0 => \&_unregister, |
171 | ; |
239 | ; |
172 | } |
240 | } |
173 | |
241 | |
174 | sub mon_node { |
242 | sub mon_node { |
175 | my ($noderef, $is_up) = @_; |
243 | my ($node, $is_up) = @_; |
176 | |
244 | |
177 | if ($is_up) { |
245 | if ($is_up) { |
|
|
246 | ++$nodecnt; |
178 | start_node $noderef; |
247 | start_node $node; |
179 | } else { |
248 | } else { |
|
|
249 | --$nodecnt; |
|
|
250 | more_seeding unless $nodecnt; |
180 | unreg_groups $noderef; |
251 | unreg_groups $node; |
|
|
252 | |
|
|
253 | # forget about the node |
|
|
254 | delete $addr{$node}; |
|
|
255 | # ask other nodes if they know the node |
|
|
256 | snd $_, find_node => $node |
|
|
257 | for values %port; |
181 | } |
258 | } |
182 | #warn "node<$noderef,$is_up>\n";#d# |
259 | #warn "node<$node,$is_up>\n";#d# |
183 | } |
260 | } |
184 | |
261 | |
185 | mon_node $_, 1 |
262 | mon_node $_, 1 |
186 | for up_nodes; |
263 | for up_nodes; |
187 | |
264 | |