ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Global.pm (file contents):
Revision 1.45 by root, Wed Feb 29 18:58:23 2012 UTC vs.
Revision 1.46 by root, Thu Mar 1 18:11:56 2012 UTC

52{ 52{
53 package AnyEvent::MP::Kernel; 53 package AnyEvent::MP::Kernel;
54 54
55 # TODO: this is ugly, maybe this should go into MP::Kernel or a separate module #d# 55 # TODO: this is ugly, maybe this should go into MP::Kernel or a separate module #d#
56 56
57 our %NODE;
58 our $NODE;
59 our $LISTENER;
60
57 our $GLOBAL; 61 our $GLOBAL;
58 our $MASTER; 62 our $MASTER;
59 our $NODE_ADDR; 63
60 our $GLOBAL_ADDR;
61 our $NODE;
62 our %GLOBAL_SLAVE; 64 our %GLOBAL_SLAVE;
63 our $GLOBAL_MON; 65 our $GLOBAL_MON;
64 our $LISTENER;
65 66
67 our %GLOBAL_DB; # global db
68 our %LOCAL_DBS; # local databases of other global nodes
69 our %LOCAL_DB; # this node database
70
71 our $SRCNODE;
72 our %node_req;
73
74 # broadcasts a message to all other global nodes
75 sub g_broadcast {
76 snd $_, @_
77 for other_globals;
78 }
79
80 sub g_mon_check {
81 warn "g_mon_check<@_>\n";#d#
82 use Data::Dump; ddx \%GLOBAL_DB;#d#
83 }
84
85 # add/replace a key in the database
86 sub g_add($$$$) {
87 $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} =
88 $GLOBAL_DB {$_[1]}{$_[2]} = $_[3];
89
90 g_broadcast g_add => $_[1] => $_[2] => $_[3]
91 if exists $GLOBAL_SLAVE{$_[0]};
92
93 warn "g_add<@_>\n";#d#
94 &g_mon_check;
95 }
96
97 # delete a key from the database
98 sub g_del($$$) {
99 delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]};
100
101 g_broadcast g_del => $_[1] => $_[2]
102 if exists $GLOBAL_SLAVE{$_[0]};
103
104 delete $GLOBAL_DB{$_[1]}{$_[2]};
105
106 # check if other node maybe still has the key, then we don't delete, but add
107 for (values %LOCAL_DBS) {
108 if (exists $_->{$_[1]}{$_[2]}) {
109 $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]};
110 last;
111 }
112 }
113
114 warn "g_del<@_>\n";#d#
115 &g_mon_check;
116 }
117
118 # delete all keys from a database
119 sub g_clr($) {
120 my ($node) = @_;
121
122 my $db = $LOCAL_DBS{$node};
123 while (my ($f, $k) = each %$db) {
124 g_del $node, $f => $_
125 for keys %$k;
126 }
127
128 delete $LOCAL_DBS{$node};
129 }
130
131 # set the whole (node-local) database - previous value must be empty
132 sub g_set($$) {
133 my ($node, $db) = @_;
134
135 while (my ($f, $k) = each %$db) {
136 g_add $node, $f => $_ => delete $k->{$_}
137 for keys %$k;
138 }
139 }
140
141 # gather node databases from slaves
142
143 # other node wants to make us the master
144 $node_req{g_slave} = sub {
145 my ($db) = @_;
146
147 my $node = $SRCNODE->{id};
148 undef $GLOBAL_SLAVE{$node};
149 g_set $node, $db;
150 };
151
152 $node_req{g_add} = sub {
153 &g_add ($SRCNODE->{id}, @_);
154 };
155
156 $node_req{g_del} = sub {
157 &g_del ($SRCNODE->{id}, @_);
158 };
159
160 $node_req{g_set} = sub {
161 g_set $SRCNODE->{id}, $_[0];
162 };
163
164 $node_req{g_find} = sub {
165 my ($node) = @_;
166
167 snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node};
168 };
169
170 #############################################################################
66 # switch to global mode 171 # switch to global mode
172
67 $GLOBAL = 1; 173 $GLOBAL = 1;
68 $MASTER = $NODE; 174 $MASTER = $NODE;
175 undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave)
69 undef $GLOBAL_MON; 176 undef $GLOBAL_MON;
70 177
71 $GLOBAL_ADDR->{$NODE} = $LISTENER; 178 # delete slaves on nodw-down
72 179 # clear slave db on node-down
73 $GLOBAL_MON = mon_nodes sub { 180 $GLOBAL_MON = mon_nodes sub {
74 return if $_[1]; 181 return if $_[1];
75 182
76 delete $NODE_ADDR->{$_[0]};
77
78 if (delete $GLOBAL_ADDR->{$_[0]}) { 183 delete $GLOBAL_SLAVE{$_[0]};
79 # if the node is global, tell our slaves 184 g_clr $_[0];
80 185 ldb_set "'l" => $_[0];
81 our %GLOBAL_SLAVE; # ugh, will be in AnyEvent::MP::Global 186 # clear listener and global database entries on node-down
82 snd $_, g_del => $_[0] 187 #ldb_set "'g" => $_[0]; # really?
83 for keys %GLOBAL_SLAVE;
84 }
85 }; 188 };
86 189
87 # tell everybody who connects that we are a global node 190 # tell everybody who connects that we are a global node
88 push @AnyEvent::MP::Transport::HOOK_GREET, sub { 191 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
89 $_[0]{local_greeting}{global} = 1; 192 $_[0]{local_greeting}{global} = 1;
90 }; 193 };
91 194
92 # tell every global node that connects that we are global too 195 # send our database to every global node that connects
93 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { 196 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
94 snd $_[0], g_add => $NODE, $LISTENER
95 if $_[0]{remote_greeting}{global}; 197 return unless $_[0]{remote_greeting}{global};
198
199 # global nodes send all local databases, merged,
200 # as their local database to global nodes
201 my %db;
202
203 for (values %LOCAL_DBS) {
204 while (my ($f, $k) = each %$_) {
205 while (my ($kk, $kv) = each %$k) {
206 $db{$f}{$kk} = $kv;
207 }
208 }
209 }
210
211 snd $_[0]{remote_node} => g_set => \%db;
212 };
213
214# # tell our master else that we are global now
215# for (values %NODE) {
216# if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
217# snd $_->{id} => "g_global";
218# snd $_->{id} => g_set => \%LOCAL_DB;
219# }
96 }; 220# }
221#
222 #d#d disconnect everybody to bootstrap development grr
97 223
98 # tell everybody else that we are global now 224 $_->transport_error # remove Self::transport_error
99 snd $_ => g_add => $NODE, $LISTENER 225 for values %NODE;
100 for up_nodes; 226
227 # now add us to the set of global nodes
228 ldb_set "'g" => $NODE => undef;
229 g_set $NODE => \%LOCAL_DB;
101} 230}
102 231
103=item $guard = grp_reg $group, $port 232=item $guard = grp_reg $group, $port
104 233
105Register the given (local!) port in the named global group C<$group>. 234Register the given (local!) port in the named global group C<$group>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines