ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Global.pm (file contents):
Revision 1.65 by root, Fri Mar 23 21:30:32 2012 UTC vs.
Revision 1.66 by root, Sat Mar 24 00:48:53 2012 UTC

45our %NODE; 45our %NODE;
46our $NODE; 46our $NODE;
47 47
48our $GLOBAL; 48our $GLOBAL;
49our $MASTER; 49our $MASTER;
50our $MASTER_MON;
51our $MASTER_TIMER; 50our $MASTER_TIMER;
52 51
53our %GLOBAL_SLAVE; 52our %GLOBAL_SLAVE;
54 53
55our %GLOBAL_DB; # global db 54our %GLOBAL_DB; # global db
67 snd $_, @_ 66 snd $_, @_
68 for grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} } 67 for grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
69} 68}
70 69
71# add/replace/del inside a family in the database 70# add/replace/del inside a family in the database
72# @$dle must not contain any key in %$set 71# @$del must not contain any key in %$set
73sub g_upd { 72sub g_upd {
74 my ($node, $family, $set, $del) = @_; 73 my ($node, $family, $set, $del) = @_;
75 74
76 my $ldb = $LOCAL_DBS{$node}{$family} ||= {}; 75 my $ldb = $LOCAL_DBS{$node}{$family} ||= {};
77 my $gdb = $GLOBAL_DB {$family} ||= {}; 76 my $gdb = $GLOBAL_DB {$family} ||= {};
146 my $node = $SRCNODE; 145 my $node = $SRCNODE;
147 undef $GLOBAL_SLAVE{$node}; 146 undef $GLOBAL_SLAVE{$node};
148 g_set $node, $db; 147 g_set $node, $db;
149}; 148};
150 149
150# other node (global and slave) sends us their database
151$NODE_REQ{g_set} = sub { 151$NODE_REQ{g_set} = sub {
152 &g_set ($SRCNODE, @_); 152 &g_set ($SRCNODE, @_);
153}; 153};
154 154
155# other node (global and slave) sends us a family update
155$NODE_REQ{g_upd} = sub { 156$NODE_REQ{g_upd} = sub {
156 &g_upd ($SRCNODE, @_); 157 &g_upd ($SRCNODE, @_);
157}; 158};
158 159
160# slave node wants to know the listeners of a node
159$NODE_REQ{g_find} = sub { 161$NODE_REQ{g_find} = sub {
160 my ($node) = @_; 162 my ($node) = @_;
161 163
162 snd $SRCNODE, g_found => $node, $GLOBAL_DB{"'l"}{$node}; 164 snd $SRCNODE, g_found => $node, $GLOBAL_DB{"'l"}{$node};
163}; 165};
212}; 214};
213 215
214############################################################################# 216#############################################################################
215# switch to global mode 217# switch to global mode
216 218
217# regularly try to connect to global nodes - maybe use seeding code? 219# maintain conenctions to all global nodes that we know of
218$MASTER_TIMER = AE::timer $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
219 (add_node $_)->connect
220 for keys %{ $GLOBAL_DB{"'g"} }
221};
222
223# instantly connect to other global nodes when we learn of them
224# so we don't have to wait for the timer.
225db_mon "'g" => sub { 220db_mon "'g" => sub {
226 (add_node $_)->connect 221 keepalive_add $_ for @{ $_[1] };
227 for @{ $_[1] }; 222 keepalive_del $_ for @{ $_[3] };
228}; 223};
229 224
230# delete slaves on node-down 225# delete slaves on node-down
231# clear slave db on node-down 226# clear slave db on node-down
232$MASTER_MON = mon_nodes sub { 227mon_nodes sub {
233 g_slave_disconnect $_[0] unless $_[1]; 228 g_slave_disconnect $_[0] unless $_[1];
234}; 229};
235 230
236# tell everybody who connects that we are a global node 231# tell everybody who connects that we are a global node
237push @AnyEvent::MP::Transport::HOOK_GREET, sub { 232push @AnyEvent::MP::Transport::HOOK_GREET, sub {
241# connect from a global node 236# connect from a global node
242sub g_global_connect { 237sub g_global_connect {
243 my ($node) = @_; 238 my ($node) = @_;
244 239
245 # we need to set this currently, as to avoid race conditions 240 # we need to set this currently, as to avoid race conditions
246 # because it takes a while until the other global node tells us it is global. 241 # because it takes a while until the other global node tells us it is global,
242 # but we need to send updates even before that.
247 243
248 undef $GLOBAL_DB{"'g"}{$node}; 244 undef $GLOBAL_DB {"'g"}{$node};
249 undef $LOCAL_DBS{$node}{"'g"}{$node}; 245 undef $LOCAL_DBS{$node}{"'g"}{$node};
250 246
251 # global nodes send all local databases, merged, 247 # global nodes send all local databases, merged,
252 # as their local database to global nodes 248 # as their local database to other global nodes
253 my %db; 249 my %db;
254 250
255 for (values %LOCAL_DBS) { 251 for (values %LOCAL_DBS) {
256 while (my ($f, $fv) = each %$_) { 252 while (my ($f, $fv) = each %$_) {
257 while (my ($k, $kv) = each %$fv) { 253 while (my ($k, $kv) = each %$fv) {
268 return unless $_[0]{remote_greeting}{global}; 264 return unless $_[0]{remote_greeting}{global};
269 265
270 g_global_connect $_[0]{remote_node}; 266 g_global_connect $_[0]{remote_node};
271}; 267};
272 268
273# tell our master that we are global now 269# tell other global nodes that we are global now
270# TODO: there is probably a race when two conencted nodes beocme global at the same time
271# very ugly.
274for (values %NODE) { 272for (values %NODE) {
275 if ($_->{transport} && $_->{transport}{remote_greeting}{global}) { 273 if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
276 snd $_->{id} => "g_global"; 274 snd $_->{id} => "g_global";
277 g_global_connect $_->{id}; 275 g_global_connect $_->{id};
278 } 276 }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines