ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.57
Committed: Fri Mar 9 17:05:26 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.56: +46 -47 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.42 This module maintains a fully-meshed network between global nodes and
12     tries to have connections with all nodes in the network.
13 root 1.1
14 root 1.15 It also manages named port groups - ports can register themselves in any
15     number of groups that will be available network-wide, which is great for
16     discovering services.
17 root 1.1
18 root 1.15 Running it on one node will automatically run it on all nodes, although,
19     at the moment, the global service is started by default anyways.
20 root 1.3
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31    
32 root 1.8 use AnyEvent ();
33 root 1.4
34 root 1.1 use AnyEvent::MP;
35     use AnyEvent::MP::Kernel;
36 root 1.20
37 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
38    
39 root 1.8 #############################################################################
40 root 1.44 # node protocol parts for global nodes
41 root 1.8
42 root 1.49 package AnyEvent::MP::Kernel;
43 root 1.8
44 root 1.49 # TODO: this is ugly (classical use vars vs. our),
45     # maybe this should go into MP::Kernel
46 root 1.26
47 root 1.49 our %NODE;
48     our $NODE;
49 root 1.46
50 root 1.49 our $GLOBAL;
51     our $MASTER;
52     our $MASTER_MON;
53     our $MASTER_TIMER;
54 root 1.46
55 root 1.49 our %GLOBAL_SLAVE;
56 root 1.26
57 root 1.49 our %GLOBAL_DB; # global db
58     our %LOCAL_DBS; # local databases of other global nodes
59     our %LOCAL_DB; # this node database
60 root 1.46
61 root 1.49 our $SRCNODE;
62     our %NODE_REQ;
63 root 1.46
64 root 1.49 # only in global code
65 root 1.51 our %GLOBAL_MON; # monitors {family}
66 root 1.47
67 root 1.49 sub other_globals() {
68     grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
69     }
70 root 1.47
71 root 1.49 # broadcasts a message to all other global nodes
72     sub g_broadcast {
73     snd $_, @_
74     for other_globals;
75     }
76 root 1.46
77 root 1.57 # add/replace/del inside a family in the database
78     # @$dle must not contain any key in %$set
79     sub g_upd {
80     my ($node, $family, $set, $del) = @_;
81    
82     my $ldb = $LOCAL_DBS{$node}{$family} ||= {};
83     my $gdb = $GLOBAL_DB {$family} ||= {};
84    
85     # add/replace keys
86     while (my ($k, $v) = each %$set) {
87     $ldb->{$k} =
88     $gdb->{$k} = $v;
89     }
90    
91     my @del; # actual deletes
92    
93     # take care of deletes
94     keydel:
95     for my $k (@$del) {
96     delete $ldb->{$k};
97     delete $gdb->{$k};
98    
99     # check if some other node still has the key, then we don't delete, but change
100     for (values %LOCAL_DBS) {
101     if (exists $_->{$family}{$k}) {
102     $set->{$k} = $gdb->{$k} = $_->{$family}{$k};
103 root 1.46
104 root 1.57 next keydel;
105     }
106     }
107 root 1.46
108 root 1.57 push @del, $k;
109     }
110 root 1.51
111 root 1.57 # family could be empty now
112     delete $GLOBAL_DB{$family} unless %$gdb;
113     delete $LOCAL_DBS{$node}{$family} unless %$ldb;
114 root 1.51
115 root 1.57 g_broadcast g_upd => $family, $set, \@del
116     if exists $GLOBAL_SLAVE{$node};
117 root 1.46
118 root 1.57 # tell subscribers we have changed the family
119     snd $_ => g_chg2 => $family, $set, \@del
120     for keys %{ $GLOBAL_MON{$family} };
121 root 1.51 }
122    
123     # set the whole (node-local) database - previous value must be empty
124     sub g_set($$) {
125     my ($node, $db) = @_;
126    
127     while (my ($f, $k) = each %$db) {
128 root 1.57 g_upd $node, $f, $k;
129 root 1.51 }
130 root 1.49 }
131 root 1.46
132 root 1.49 # delete all keys from a database
133     sub g_clr($) {
134     my ($node) = @_;
135    
136     my $db = $LOCAL_DBS{$node};
137 root 1.57
138 root 1.49 while (my ($f, $k) = each %$db) {
139 root 1.57 g_upd $node, $f, undef, [keys %$k];
140 root 1.46 }
141    
142 root 1.49 delete $LOCAL_DBS{$node};
143     }
144    
145     # gather node databases from slaves
146 root 1.46
147 root 1.49 # other node wants to make us the master
148     $NODE_REQ{g_slave} = sub {
149     my ($db) = @_;
150 root 1.46
151 root 1.49 my $node = $SRCNODE->{id};
152     undef $GLOBAL_SLAVE{$node};
153     g_set $node, $db;
154     };
155 root 1.46
156 root 1.57 $NODE_REQ{g_set} = sub {
157     g_set $SRCNODE->{id}, @_;
158 root 1.49 };
159 root 1.46
160 root 1.57 $NODE_REQ{g_upd} = sub {
161     g_upd $SRCNODE->{id}, @_;
162 root 1.49 };
163 root 1.46
164 root 1.49 $NODE_REQ{g_find} = sub {
165     my ($node) = @_;
166 root 1.46
167 root 1.49 snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node};
168     };
169 root 1.46
170 root 1.55 $NODE_REQ{g_db_family} = sub {
171     my ($family, $id) = @_;
172     snd $SRCNODE->{id}, g_reply => $id, $GLOBAL_DB{$family} || {};
173     };
174    
175     $NODE_REQ{g_db_keys} = sub {
176     my ($family, $id) = @_;
177     snd $SRCNODE->{id}, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ];
178     };
179    
180     $NODE_REQ{g_db_values} = sub {
181     my ($family, $id) = @_;
182     snd $SRCNODE->{id}, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ];
183     };
184    
185 root 1.49 # monitoring
186 root 1.47
187 root 1.49 sub g_slave_disconnect($) {
188     my ($node) = @_;
189 root 1.47
190 root 1.49 g_clr $node;
191 root 1.47
192 root 1.49 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
193     while (my ($f, $fv) = each %$mon) {
194     delete $GLOBAL_MON{$f}{$_}
195     for keys %$fv;
196 root 1.51
197     delete $GLOBAL_MON{$f}
198     unless %{ $GLOBAL_MON{$f} };
199 root 1.47 }
200     }
201 root 1.49 }
202    
203 root 1.51 # g_mon0 family - stop monitoring
204 root 1.49 $NODE_REQ{g_mon0} = sub {
205 root 1.51 delete $GLOBAL_MON{$_[0]}{$SRCNODE->{id}};
206     delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} };
207    
208     delete $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]};
209 root 1.49 };
210    
211     # g_mon1 family key - start monitoring
212     $NODE_REQ{g_mon1} = sub {
213 root 1.51 undef $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]};
214     undef $GLOBAL_MON{$_[0]}{$SRCNODE->{id}};
215 root 1.49
216 root 1.51 snd $SRCNODE->{id}, g_chg1 => $_[0], $GLOBAL_DB{$_[0]};
217 root 1.49 };
218 root 1.47
219 root 1.49 #############################################################################
220     # switch to global mode
221    
222     # regularly try to connect to global nodes - maybe use seeding code?
223     $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
224     (add_node $_)->connect
225     for keys %{ $GLOBAL_DB{"'g"} };
226     };
227    
228     # instantly connect to other global nodes when we learn of them
229     # so we don't have to wait for the timer.
230     #TODO
231 root 1.47 # $GLOBAL_MON{"'g"}{""}{""} = sub {
232     # (add_node $_[1])->connect;
233     # };
234    
235 root 1.49 # delete slaves on node-down
236     # clear slave db on node-down
237     $MASTER_MON = mon_nodes sub {
238     g_slave_disconnect $_[0] unless $_[1];
239     };
240    
241     # tell everybody who connects that we are a global node
242     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
243     $_[0]{local_greeting}{global} = 1;
244     };
245    
246     # connect from a global node
247     sub g_global_connect {
248     my ($node) = @_;
249    
250     # we need to set this currently, as to avoid race conditions
251     # because it takes a while until the other global node tells us it is global.
252    
253     undef $GLOBAL_DB{"'g"}{$node};
254     undef $LOCAL_DBS{$node}{"'g"}{$node};
255    
256     # global nodes send all local databases, merged,
257     # as their local database to global nodes
258     my %db;
259    
260     for (values %LOCAL_DBS) {
261     while (my ($f, $fv) = each %$_) {
262     while (my ($k, $kv) = each %$fv) {
263     $db{$f}{$k} = $kv;
264 root 1.46 }
265     }
266 root 1.47 }
267    
268 root 1.49 snd $node => g_set => \%db;
269 root 1.4 }
270    
271 root 1.49 # send our database to every global node that connects
272     push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
273     return unless $_[0]{remote_greeting}{global};
274    
275     g_global_connect $_[0]{remote_node};
276     };
277    
278     # tell our master that we are global now
279     for (values %NODE) {
280     if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
281     snd $_->{id} => "g_global";
282     g_global_connect $_->{id};
283     }
284 root 1.20 }
285    
286 root 1.49 $NODE_REQ{g_global} = sub {
287     g_slave_disconnect $SRCNODE->{id};
288     $SRCNODE->{transport}{remote_greeting}{global} = 1;
289     g_global_connect $SRCNODE->{id};
290     };
291 root 1.20
292 root 1.56 # enable global mode
293     $GLOBAL = 1;
294    
295     # global nodes are their own masters - this
296     # resends global requests and sets the local database.
297     master_set $NODE;
298    
299 root 1.49 # now add us to the set of global nodes
300     db_set "'g" => $NODE => undef;
301 root 1.3
302 root 1.1 =back
303    
304     =head1 SEE ALSO
305    
306     L<AnyEvent::MP>.
307    
308     =head1 AUTHOR
309    
310     Marc Lehmann <schmorp@schmorp.de>
311     http://home.schmorp.de/
312    
313     =cut
314    
315     1
316