ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Global.pm (file contents):
Revision 1.67 by root, Sat Mar 24 01:00:28 2012 UTC vs.
Revision 1.68 by root, Sat Mar 24 13:05:40 2012 UTC

50our $MASTER_TIMER; 50our $MASTER_TIMER;
51 51
52our %GLOBAL_SLAVE; 52our %GLOBAL_SLAVE;
53 53
54our %GLOBAL_DB; # global db 54our %GLOBAL_DB; # global db
55our %LOCAL_DBS; # local databases of other global nodes 55our %LOCAL_DBS; # local databases of other nodes (global and slave)
56our %LOCAL_DB; # this node database 56our %LOCAL_DB; # this node database
57 57
58our $SRCNODE; # the origin node id 58our $SRCNODE; # the origin node id
59our %NODE_REQ; 59our %NODE_REQ;
60 60
84 my @del; # actual deletes 84 my @del; # actual deletes
85 85
86 # take care of deletes 86 # take care of deletes
87 keydel: 87 keydel:
88 for my $k (@$del) { 88 for my $k (@$del) {
89 next unless exists $ldb->{$k};
90
89 delete $ldb->{$k}; 91 delete $ldb->{$k};
90 delete $gdb->{$k}; 92 delete $gdb->{$k};
91 93
92 # check if some other node still has the key, then we don't delete, but change 94 # check if some other node still has the key, then we don't delete, but change
93 for (values %LOCAL_DBS) { 95 for (values %LOCAL_DBS) {
103 105
104 # family could be empty now 106 # family could be empty now
105 delete $GLOBAL_DB{$family} unless %$gdb; 107 delete $GLOBAL_DB{$family} unless %$gdb;
106 delete $LOCAL_DBS{$node}{$family} unless %$ldb; 108 delete $LOCAL_DBS{$node}{$family} unless %$ldb;
107 109
110 return unless %$set || @del;
111
108 g_broadcast g_upd => $family, $set, \@del 112 g_broadcast g_upd => $family, $set, \@del
109 if exists $GLOBAL_SLAVE{$node}; 113 if exists $GLOBAL_SLAVE{$node};
110 114
111 # tell subscribers we have changed the family 115 # tell subscribers we have changed the family
112 snd $_ => g_chg2 => $family, $set, \@del 116 snd $_ => g_chg2 => $family, $set, \@del
182# monitoring 186# monitoring
183 187
184sub g_disconnect($) { 188sub g_disconnect($) {
185 my ($node) = @_; 189 my ($node) = @_;
186 190
191 db_del "'g" => $node;
187 g_clr $node; 192 g_clr $node;
188 193
189 if (my $mon = delete $GLOBAL_SLAVE{$node}) { 194 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
190 while (my ($f, $fv) = each %$mon) { 195 while (my ($f, $fv) = each %$mon) {
191 delete $GLOBAL_MON{$f}{$_} 196 delete $GLOBAL_MON{$f}{$_}
214}; 219};
215 220
216############################################################################# 221#############################################################################
217# switch to global mode 222# switch to global mode
218 223
219# maintain conenctions to all global nodes that we know of
220db_mon "'g" => sub {
221 keepalive_add $_ for @{ $_[1] };
222 keepalive_del $_ for @{ $_[3] };
223};
224
225# delete data from other nodes on node-down 224# delete data from other nodes on node-down
226mon_nodes sub { 225mon_nodes sub {
227 g_disconnect $_[0] unless $_[1]; 226 g_disconnect $_[0] unless $_[1];
228}; 227};
229 228
234 233
235# connect from a global node 234# connect from a global node
236sub g_global_connect { 235sub g_global_connect {
237 my ($node) = @_; 236 my ($node) = @_;
238 237
239 # we need to set this currently, as to avoid race conditions 238 # each node puts the set of connected global nodes into
240 # because it takes a while until the other global node tells us it is global, 239 # 'g - this causes a big duplication and mergefest, but
241 # but we need to send updates even before that. 240 # is the easiest way to ensure global nodes have a list
241 # of all other global nodes.
242 242
243 undef $GLOBAL_DB {"'g"}{$node}; 243 db_set "'g" => $node;
244 undef $LOCAL_DBS{$node}{"'g"}{$node};
245 244
246 # global nodes send all local databases, merged, 245 # global nodes send all local databases of their slaves, merged,
247 # as their local database to other global nodes 246 # as their local database to other global nodes
248 my %db; 247 my %db;
249 248
250 for (values %LOCAL_DBS) { 249 while (my ($k, $v) = each %LOCAL_DBS) {
250 next unless exists $GLOBAL_SLAVE{$k};
251
251 while (my ($f, $fv) = each %$_) { 252 while (my ($f, $fv) = each %$_) {
252 while (my ($k, $kv) = each %$fv) { 253 while (my ($k, $kv) = each %$fv) {
253 $db{$f}{$k} = $kv; 254 $db{$f}{$k} = $kv;
254 } 255 }
255 } 256 }
289 290
290# global nodes are their own masters - this 291# global nodes are their own masters - this
291# resends global requests and sets the local database. 292# resends global requests and sets the local database.
292master_set $NODE; 293master_set $NODE;
293 294
295# from here on we should be able to act normally
296
294# now add us to the set of global nodes 297# now add us to the set of global nodes
295db_set "'g" => $NODE => undef; 298db_set "'g" => $NODE;
299
300# maintain connections to all global nodes that we know of
301db_mon "'g" => sub {
302 keepalive_add $_ for @{ $_[1] };
303 keepalive_del $_ for @{ $_[3] };
304};
296 305
297############################################################################# 306#############################################################################
298# compatibility functions for aemp 1.0 307# compatibility functions for aemp 1.0
299 308
300package AnyEvent::MP::Global; 309package AnyEvent::MP::Global;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines