ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Global.pm (file contents):
Revision 1.46 by root, Thu Mar 1 18:11:56 2012 UTC vs.
Revision 1.47 by root, Fri Mar 2 19:19:21 2012 UTC

58 our $NODE; 58 our $NODE;
59 our $LISTENER; 59 our $LISTENER;
60 60
61 our $GLOBAL; 61 our $GLOBAL;
62 our $MASTER; 62 our $MASTER;
63 our $MASTER_MON;
64 our $MASTER_TIMER;
63 65
64 our %GLOBAL_SLAVE; 66 our %GLOBAL_SLAVE;
65 our $GLOBAL_MON;
66 67
67 our %GLOBAL_DB; # global db 68 our %GLOBAL_DB; # global db
68 our %LOCAL_DBS; # local databases of other global nodes 69 our %LOCAL_DBS; # local databases of other global nodes
69 our %LOCAL_DB; # this node database 70 our %LOCAL_DB; # this node database
70 71
71 our $SRCNODE; 72 our $SRCNODE;
72 our %node_req; 73 our %node_req;
74
75 # only in global code
76 our %GLOBAL_MON; # monitors {family}{"" or key}
77
78 sub other_globals() {
79 grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
80 }
73 81
74 # broadcasts a message to all other global nodes 82 # broadcasts a message to all other global nodes
75 sub g_broadcast { 83 sub g_broadcast {
76 snd $_, @_ 84 snd $_, @_
77 for other_globals; 85 for other_globals;
78 } 86 }
79 87
80 sub g_mon_check { 88 sub g_mon_check {
81 warn "g_mon_check<@_>\n";#d# 89 warn "g_mon_check<@_>\n";#d#
82 use Data::Dump; ddx \%GLOBAL_DB;#d# 90
91 my %node = (
92 %{ $GLOBAL_MON{$_[1]}{$_[2]} },
93 %{ $GLOBAL_MON{$_[1]}{"" } },
94 %{ $GLOBAL_MON{"" }{"" } },
95 );
96
97 snd $_ => g_chg1 => $_[1], $_[2], @_ > 2 ? $_[3] : ()
98 for keys %node;
83 } 99 }
84 100
85 # add/replace a key in the database 101 # add/replace a key in the database
86 sub g_add($$$$) { 102 sub g_add($$$$) {
87 $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} = 103 $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} =
165 my ($node) = @_; 181 my ($node) = @_;
166 182
167 snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node}; 183 snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node};
168 }; 184 };
169 185
186 # monitoring
187
188 sub g_slave_disconnect($) {
189 my ($node) = @_;
190
191 g_clr $node;
192
193 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
194 while (my ($f, $fv) = each %$mon) {
195 delete $GLOBAL_MON{$f}{$_}
196 for keys %$fv;
197 }
198 }
199 }
200
201 # g_mon0 family key - stop monitoring
202 $node_req{g_mon0} = sub {
203 delete $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}};
204 delete $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]};
205 };
206
207 # g_mon1 family key - start monitoring
208 $node_req{g_mon1} = sub {
209 undef $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]};
210 undef $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}};
211 #d# generate lots of initial change requests, or one big?
212
213 snd $SRCNODE->{id}, g_chg0 => $_[0], $_[1]
214 };
215
170 ############################################################################# 216 #############################################################################
171 # switch to global mode 217 # switch to global mode
172 218
173 $GLOBAL = 1; 219 $GLOBAL = 1;
174 $MASTER = $NODE; 220 $MASTER = $NODE;
175 undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave) 221 undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave)
176 undef $GLOBAL_MON; 222 $LOCAL_DBS{$NODE} = { %LOCAL_DB };
177 223
224 # regularly try to connect to global nodes - maybe use seeding code?
225 $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
226 (add_node $_)->connect
227 for keys %{ $GLOBAL_DB{"'g"} };
228 };
229
230 # instantly connect to other global nodes when we learn of them
231 # so we don't have to wait for the timer.
232 #TODO
233# $GLOBAL_MON{"'g"}{""}{""} = sub {
234# (add_node $_[1])->connect;
235# };
236
178 # delete slaves on nodw-down 237 # delete slaves on node-down
179 # clear slave db on node-down 238 # clear slave db on node-down
180 $GLOBAL_MON = mon_nodes sub { 239 $MASTER_MON = mon_nodes sub {
181 return if $_[1]; 240 g_slave_disconnect $_[0] unless $_[1];
182
183 delete $GLOBAL_SLAVE{$_[0]};
184 g_clr $_[0];
185 ldb_set "'l" => $_[0];
186 # clear listener and global database entries on node-down
187 #ldb_set "'g" => $_[0]; # really?
188 }; 241 };
189 242
190 # tell everybody who connects that we are a global node 243 # tell everybody who connects that we are a global node
191 push @AnyEvent::MP::Transport::HOOK_GREET, sub { 244 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
192 $_[0]{local_greeting}{global} = 1; 245 $_[0]{local_greeting}{global} = 1;
193 }; 246 };
194 247
248 # connect from a global node
249 sub g_global_connect {
250 my ($node) = @_;
251
252 # we need to set this currently, as to avoid race conditions
253 # because it takes a while until the other global node tells us it is global.
254
255 undef $GLOBAL_DB{"'g"}{$node};
256 undef $LOCAL_DBS{$node}{"'g"}{$node};
257
258 # global nodes send all local databases, merged,
259 # as their local database to global nodes
260 my %db;
261
262 for (values %LOCAL_DBS) {
263 while (my ($f, $fv) = each %$_) {
264 while (my ($k, $kv) = each %$fv) {
265 $db{$f}{$k} = $kv;
266 }
267 }
268 }
269
270 snd $node => g_set => \%db;
271 }
272
195 # send our database to every global node that connects 273 # send our database to every global node that connects
196 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { 274 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
197 return unless $_[0]{remote_greeting}{global}; 275 return unless $_[0]{remote_greeting}{global};
198 276
199 # global nodes send all local databases, merged, 277 g_global_connect $_[0]{remote_node};
200 # as their local database to global nodes
201 my %db;
202
203 for (values %LOCAL_DBS) {
204 while (my ($f, $k) = each %$_) {
205 while (my ($kk, $kv) = each %$k) {
206 $db{$f}{$kk} = $kv;
207 }
208 }
209 }
210
211 snd $_[0]{remote_node} => g_set => \%db;
212 }; 278 };
213 279
214# # tell our master else that we are global now 280 # tell our master that we are global now
215# for (values %NODE) { 281 for (values %NODE) {
216# if ($_->{transport} && $_->{transport}{remote_greeting}{global}) { 282 if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
217# snd $_->{id} => "g_global"; 283 snd $_->{id} => "g_global";
218# snd $_->{id} => g_set => \%LOCAL_DB; 284 g_global_connect $_->{id};
219# } 285 }
220# } 286 }
221# 287
288 $node_req{g_global} = sub {
289 g_slave_disconnect $SRCNODE->{id};
290 $SRCNODE->{transport}{remote_greeting}{global} = 1;
291 g_global_connect $SRCNODE->{id};
292 };
293
222 #d#d disconnect everybody to bootstrap development grr 294 #d#d disconnect everybody to bootstrap development grr
223 295
224 $_->transport_error # remove Self::transport_error 296# $_->transport_error # remove Self::transport_error
225 for values %NODE; 297# for values %NODE;
226 298
227 # now add us to the set of global nodes 299 # now add us to the set of global nodes
228 ldb_set "'g" => $NODE => undef; 300 ldb_set "'g" => $NODE => undef;
229 g_set $NODE => \%LOCAL_DB;
230} 301}
231 302
232=item $guard = grp_reg $group, $port 303=item $guard = grp_reg $group, $port
233 304
234Register the given (local!) port in the named global group C<$group>. 305Register the given (local!) port in the named global group C<$group>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines