ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.46
Committed: Thu Mar 1 18:11:56 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.45: +150 -21 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network between global nodes and
12 tries to have connections with all nodes in the network.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31
32 use AnyEvent ();
33 use AnyEvent::Util ();
34
35 use AnyEvent::MP;
36 use AnyEvent::MP::Kernel;
37 use AnyEvent::MP::Transport ();
38
39 use base "Exporter";
40
41 our @EXPORT = qw(
42 grp_reg
43 grp_get
44 grp_mon
45 );
46
47 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
48
49 #############################################################################
50 # node protocol parts for global nodes
51
52 {
53 package AnyEvent::MP::Kernel;
54
55 # TODO: this is ugly, maybe this should go into MP::Kernel or a separate module #d#
56
57 our %NODE;
58 our $NODE;
59 our $LISTENER;
60
61 our $GLOBAL;
62 our $MASTER;
63
64 our %GLOBAL_SLAVE;
65 our $GLOBAL_MON;
66
67 our %GLOBAL_DB; # global db
68 our %LOCAL_DBS; # local databases of other global nodes
69 our %LOCAL_DB; # this node database
70
71 our $SRCNODE;
72 our %node_req;
73
74 # broadcasts a message to all other global nodes
75 sub g_broadcast {
76 snd $_, @_
77 for other_globals;
78 }
79
80 sub g_mon_check {
81 warn "g_mon_check<@_>\n";#d#
82 use Data::Dump; ddx \%GLOBAL_DB;#d#
83 }
84
85 # add/replace a key in the database
86 sub g_add($$$$) {
87 $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} =
88 $GLOBAL_DB {$_[1]}{$_[2]} = $_[3];
89
90 g_broadcast g_add => $_[1] => $_[2] => $_[3]
91 if exists $GLOBAL_SLAVE{$_[0]};
92
93 warn "g_add<@_>\n";#d#
94 &g_mon_check;
95 }
96
97 # delete a key from the database
98 sub g_del($$$) {
99 delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]};
100
101 g_broadcast g_del => $_[1] => $_[2]
102 if exists $GLOBAL_SLAVE{$_[0]};
103
104 delete $GLOBAL_DB{$_[1]}{$_[2]};
105
106 # check if other node maybe still has the key, then we don't delete, but add
107 for (values %LOCAL_DBS) {
108 if (exists $_->{$_[1]}{$_[2]}) {
109 $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]};
110 last;
111 }
112 }
113
114 warn "g_del<@_>\n";#d#
115 &g_mon_check;
116 }
117
118 # delete all keys from a database
119 sub g_clr($) {
120 my ($node) = @_;
121
122 my $db = $LOCAL_DBS{$node};
123 while (my ($f, $k) = each %$db) {
124 g_del $node, $f => $_
125 for keys %$k;
126 }
127
128 delete $LOCAL_DBS{$node};
129 }
130
131 # set the whole (node-local) database - previous value must be empty
132 sub g_set($$) {
133 my ($node, $db) = @_;
134
135 while (my ($f, $k) = each %$db) {
136 g_add $node, $f => $_ => delete $k->{$_}
137 for keys %$k;
138 }
139 }
140
141 # gather node databases from slaves
142
143 # other node wants to make us the master
144 $node_req{g_slave} = sub {
145 my ($db) = @_;
146
147 my $node = $SRCNODE->{id};
148 undef $GLOBAL_SLAVE{$node};
149 g_set $node, $db;
150 };
151
152 $node_req{g_add} = sub {
153 &g_add ($SRCNODE->{id}, @_);
154 };
155
156 $node_req{g_del} = sub {
157 &g_del ($SRCNODE->{id}, @_);
158 };
159
160 $node_req{g_set} = sub {
161 g_set $SRCNODE->{id}, $_[0];
162 };
163
164 $node_req{g_find} = sub {
165 my ($node) = @_;
166
167 snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node};
168 };
169
170 #############################################################################
171 # switch to global mode
172
173 $GLOBAL = 1;
174 $MASTER = $NODE;
175 undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave)
176 undef $GLOBAL_MON;
177
178 # delete slaves on nodw-down
179 # clear slave db on node-down
180 $GLOBAL_MON = mon_nodes sub {
181 return if $_[1];
182
183 delete $GLOBAL_SLAVE{$_[0]};
184 g_clr $_[0];
185 ldb_set "'l" => $_[0];
186 # clear listener and global database entries on node-down
187 #ldb_set "'g" => $_[0]; # really?
188 };
189
190 # tell everybody who connects that we are a global node
191 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
192 $_[0]{local_greeting}{global} = 1;
193 };
194
195 # send our database to every global node that connects
196 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
197 return unless $_[0]{remote_greeting}{global};
198
199 # global nodes send all local databases, merged,
200 # as their local database to global nodes
201 my %db;
202
203 for (values %LOCAL_DBS) {
204 while (my ($f, $k) = each %$_) {
205 while (my ($kk, $kv) = each %$k) {
206 $db{$f}{$kk} = $kv;
207 }
208 }
209 }
210
211 snd $_[0]{remote_node} => g_set => \%db;
212 };
213
214 # # tell our master else that we are global now
215 # for (values %NODE) {
216 # if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
217 # snd $_->{id} => "g_global";
218 # snd $_->{id} => g_set => \%LOCAL_DB;
219 # }
220 # }
221 #
222 #d#d disconnect everybody to bootstrap development grr
223
224 $_->transport_error # remove Self::transport_error
225 for values %NODE;
226
227 # now add us to the set of global nodes
228 ldb_set "'g" => $NODE => undef;
229 g_set $NODE => \%LOCAL_DB;
230 }
231
232 =item $guard = grp_reg $group, $port
233
234 Register the given (local!) port in the named global group C<$group>.
235
236 The port will be unregistered automatically when the port is destroyed.
237
238 When not called in void context, then a guard object will be returned that
239 will also cause the name to be unregistered when destroyed.
240
241 =cut
242
243 # register local port
244 sub grp_reg($$) {
245 my ($group, $port) = @_;
246
247 port_is_local $port
248 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
249
250 defined wantarray && AnyEvent::Util::guard { unregister ($port, $group) }
251 }
252
253 =item $ports = grp_get $group
254
255 Returns all the ports currently registered to the given group (as
256 read-only(!) array reference). When the group has no registered members,
257 return C<undef>.
258
259 =cut
260
261 sub grp_get($) {
262 }
263
264 =item $guard = grp_mon $group, $callback->($ports, $add, $del)
265
266 Installs a monitor on the given group. Each time there is a change it
267 will be called with the current group members as an arrayref as the
268 first argument. The second argument only contains ports added, the third
269 argument only ports removed.
270
271 Unlike C<grp_get>, all three arguments will always be array-refs, even if
272 the array is empty. None of the arrays must be modified in any way.
273
274 The first invocation will be with the first two arguments set to the
275 current members, as if all of them were just added, but only when the
276 group is actually non-empty.
277
278 Optionally returns a guard object that uninstalls the watcher when it is
279 destroyed.
280
281 =cut
282
283 sub grp_mon($$) {
284 my ($grp, $cb) = @_;
285 }
286
287 =back
288
289 =head1 SEE ALSO
290
291 L<AnyEvent::MP>.
292
293 =head1 AUTHOR
294
295 Marc Lehmann <schmorp@schmorp.de>
296 http://home.schmorp.de/
297
298 =cut
299
300 1
301