ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.46
Committed: Thu Mar 1 18:11:56 2012 UTC (12 years, 5 months ago) by root
Branch: MAIN
Changes since 1.45: +150 -21 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.42 This module maintains a fully-meshed network between global nodes and
12     tries to have connections with all nodes in the network.
13 root 1.1
14 root 1.15 It also manages named port groups - ports can register themselves in any
15     number of groups that will be available network-wide, which is great for
16     discovering services.
17 root 1.1
18 root 1.15 Running it on one node will automatically run it on all nodes, although,
19     at the moment, the global service is started by default anyways.
20 root 1.3
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31    
32 root 1.8 use AnyEvent ();
33 root 1.4 use AnyEvent::Util ();
34    
35 root 1.1 use AnyEvent::MP;
36     use AnyEvent::MP::Kernel;
37 root 1.22 use AnyEvent::MP::Transport ();
38 root 1.1
39 root 1.20 use base "Exporter";
40    
41     our @EXPORT = qw(
42     grp_reg
43     grp_get
44     grp_mon
45     );
46    
47 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
48    
49 root 1.8 #############################################################################
50 root 1.44 # node protocol parts for global nodes
51 root 1.8
52 root 1.44 {
53     package AnyEvent::MP::Kernel;
54 root 1.8
55 root 1.44 # TODO: this is ugly, maybe this should go into MP::Kernel or a separate module #d#
56 root 1.26
57 root 1.46 our %NODE;
58     our $NODE;
59     our $LISTENER;
60    
61 root 1.44 our $GLOBAL;
62 root 1.45 our $MASTER;
63 root 1.46
64 root 1.44 our %GLOBAL_SLAVE;
65     our $GLOBAL_MON;
66 root 1.26
67 root 1.46 our %GLOBAL_DB; # global db
68     our %LOCAL_DBS; # local databases of other global nodes
69     our %LOCAL_DB; # this node database
70    
71     our $SRCNODE;
72     our %node_req;
73    
74     # broadcasts a message to all other global nodes
75     sub g_broadcast {
76     snd $_, @_
77     for other_globals;
78     }
79    
80     sub g_mon_check {
81     warn "g_mon_check<@_>\n";#d#
82     use Data::Dump; ddx \%GLOBAL_DB;#d#
83     }
84    
85     # add/replace a key in the database
86     sub g_add($$$$) {
87     $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} =
88     $GLOBAL_DB {$_[1]}{$_[2]} = $_[3];
89    
90     g_broadcast g_add => $_[1] => $_[2] => $_[3]
91     if exists $GLOBAL_SLAVE{$_[0]};
92    
93     warn "g_add<@_>\n";#d#
94     &g_mon_check;
95     }
96    
97     # delete a key from the database
98     sub g_del($$$) {
99     delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]};
100    
101     g_broadcast g_del => $_[1] => $_[2]
102     if exists $GLOBAL_SLAVE{$_[0]};
103    
104     delete $GLOBAL_DB{$_[1]}{$_[2]};
105    
106     # check if other node maybe still has the key, then we don't delete, but add
107     for (values %LOCAL_DBS) {
108     if (exists $_->{$_[1]}{$_[2]}) {
109     $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]};
110     last;
111     }
112     }
113    
114     warn "g_del<@_>\n";#d#
115     &g_mon_check;
116     }
117    
118     # delete all keys from a database
119     sub g_clr($) {
120     my ($node) = @_;
121    
122     my $db = $LOCAL_DBS{$node};
123     while (my ($f, $k) = each %$db) {
124     g_del $node, $f => $_
125     for keys %$k;
126     }
127    
128     delete $LOCAL_DBS{$node};
129     }
130    
131     # set the whole (node-local) database - previous value must be empty
132     sub g_set($$) {
133     my ($node, $db) = @_;
134    
135     while (my ($f, $k) = each %$db) {
136     g_add $node, $f => $_ => delete $k->{$_}
137     for keys %$k;
138     }
139     }
140    
141     # gather node databases from slaves
142    
143     # other node wants to make us the master
144     $node_req{g_slave} = sub {
145     my ($db) = @_;
146    
147     my $node = $SRCNODE->{id};
148     undef $GLOBAL_SLAVE{$node};
149     g_set $node, $db;
150     };
151    
152     $node_req{g_add} = sub {
153     &g_add ($SRCNODE->{id}, @_);
154     };
155    
156     $node_req{g_del} = sub {
157     &g_del ($SRCNODE->{id}, @_);
158     };
159    
160     $node_req{g_set} = sub {
161     g_set $SRCNODE->{id}, $_[0];
162     };
163    
164     $node_req{g_find} = sub {
165     my ($node) = @_;
166    
167     snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node};
168     };
169    
170     #############################################################################
171 root 1.44 # switch to global mode
172 root 1.46
173 root 1.44 $GLOBAL = 1;
174     $MASTER = $NODE;
175 root 1.46 undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave)
176 root 1.44 undef $GLOBAL_MON;
177 root 1.22
178 root 1.46 # delete slaves on nodw-down
179     # clear slave db on node-down
180 root 1.44 $GLOBAL_MON = mon_nodes sub {
181     return if $_[1];
182 root 1.20
183 root 1.46 delete $GLOBAL_SLAVE{$_[0]};
184     g_clr $_[0];
185     ldb_set "'l" => $_[0];
186     # clear listener and global database entries on node-down
187     #ldb_set "'g" => $_[0]; # really?
188 root 1.44 };
189 root 1.20
190 root 1.44 # tell everybody who connects that we are a global node
191     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
192     $_[0]{local_greeting}{global} = 1;
193     };
194 root 1.4
195 root 1.46 # send our database to every global node that connects
196 root 1.44 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
197 root 1.46 return unless $_[0]{remote_greeting}{global};
198    
199     # global nodes send all local databases, merged,
200     # as their local database to global nodes
201     my %db;
202    
203     for (values %LOCAL_DBS) {
204     while (my ($f, $k) = each %$_) {
205     while (my ($kk, $kv) = each %$k) {
206     $db{$f}{$kk} = $kv;
207     }
208     }
209     }
210    
211     snd $_[0]{remote_node} => g_set => \%db;
212 root 1.44 };
213 root 1.4
214 root 1.46 # # tell our master else that we are global now
215     # for (values %NODE) {
216     # if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
217     # snd $_->{id} => "g_global";
218     # snd $_->{id} => g_set => \%LOCAL_DB;
219     # }
220     # }
221     #
222     #d#d disconnect everybody to bootstrap development grr
223    
224     $_->transport_error # remove Self::transport_error
225     for values %NODE;
226    
227     # now add us to the set of global nodes
228     ldb_set "'g" => $NODE => undef;
229     g_set $NODE => \%LOCAL_DB;
230 root 1.4 }
231    
232 root 1.20 =item $guard = grp_reg $group, $port
233 root 1.4
234     Register the given (local!) port in the named global group C<$group>.
235    
236     The port will be unregistered automatically when the port is destroyed.
237    
238     When not called in void context, then a guard object will be returned that
239     will also cause the name to be unregistered when destroyed.
240    
241     =cut
242    
243     # register local port
244 root 1.20 sub grp_reg($$) {
245     my ($group, $port) = @_;
246 root 1.4
247     port_is_local $port
248 root 1.20 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
249    
250 root 1.44 defined wantarray && AnyEvent::Util::guard { unregister ($port, $group) }
251 root 1.4 }
252    
253 root 1.20 =item $ports = grp_get $group
254 root 1.15
255     Returns all the ports currently registered to the given group (as
256 root 1.20 read-only(!) array reference). When the group has no registered members,
257 root 1.15 return C<undef>.
258    
259     =cut
260    
261 root 1.20 sub grp_get($) {
262     }
263    
264     =item $guard = grp_mon $group, $callback->($ports, $add, $del)
265    
266     Installs a monitor on the given group. Each time there is a change it
267     will be called with the current group members as an arrayref as the
268     first argument. The second argument only contains ports added, the third
269     argument only ports removed.
270    
271     Unlike C<grp_get>, all three arguments will always be array-refs, even if
272     the array is empty. None of the arrays must be modified in any way.
273    
274     The first invocation will be with the first two arguments set to the
275     current members, as if all of them were just added, but only when the
276 root 1.39 group is actually non-empty.
277 root 1.20
278     Optionally returns a guard object that uninstalls the watcher when it is
279     destroyed.
280    
281     =cut
282    
283     sub grp_mon($$) {
284     my ($grp, $cb) = @_;
285 root 1.4 }
286 root 1.3
287 root 1.1 =back
288    
289     =head1 SEE ALSO
290    
291     L<AnyEvent::MP>.
292    
293     =head1 AUTHOR
294    
295     Marc Lehmann <schmorp@schmorp.de>
296     http://home.schmorp.de/
297    
298     =cut
299    
300     1
301