ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.7
Committed: Mon Aug 17 03:50:28 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
CVS Tags: rel-0_8
Changes since 1.6: +14 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8 # -OR-
9 aemp addservice AnyEvent::MP::Global::
10
11 =head1 DESCRIPTION
12
13 This module provides an assortment of network-global functions: group name
14 registration and non-local locks.
15
16 It will also try to build and maintain a full mesh of all network nodes.
17
18 While it isn't mandatory to run the global services, running it on one
19 node will automatically run it on all nodes.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31 use MIME::Base64 ();
32
33 use AnyEvent::Util ();
34
35 use AnyEvent::MP;
36 use AnyEvent::MP::Kernel;
37
38 our $VERSION = $AnyEvent::MP::VERSION;
39
40 our %port; # our rendezvous port on the other side
41 our %lreg; # local registry, name => [pid...]
42 our %lmon; # local rgeistry monitoring name,pid => mon
43 our %greg; # global regstry, name => [pid...]
44
45 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
46
47 sub unreg_groups($) {
48 my ($noderef) = @_;
49
50 my $qr = qr/^\Q$noderef\E(?:#|$)/;
51
52 for my $group (values %greg) {
53 @$group = grep $_ !~ $qr, @$group;
54 }
55 }
56
57 sub set_groups($$) {
58 my ($noderef, $lreg) = @_;
59
60 while (my ($k, $v) = each %$lreg) {
61 push @{ $greg{$k} }, @$v;
62 }
63 }
64
65 =item $ports = find $group
66
67 Returns all the ports currently registered to the given group (as
68 read-only array reference). When the group has no registered members,
69 return C<undef>.
70
71 =cut
72
73 sub find($) {
74 @{ $greg{$_[0]} }
75 ? $greg{$_[0]}
76 : undef
77 }
78
79 =item $guard = register $port, $group
80
81 Register the given (local!) port in the named global group C<$group>.
82
83 The port will be unregistered automatically when the port is destroyed.
84
85 When not called in void context, then a guard object will be returned that
86 will also cause the name to be unregistered when destroyed.
87
88 =cut
89
90 # register port from any node
91 sub _register {
92 my ($port, $group) = @_;
93
94 push @{ $greg{$group} }, $port;
95 }
96
97 # unregister port from any node
98 sub _unregister {
99 my ($port, $group) = @_;
100
101 @{ $greg{$group} } = grep $_ ne $port, @{ $greg{$group} };
102 }
103
104 # unregister local port
105 sub unregister {
106 my ($port, $group) = @_;
107
108 delete $lmon{"$group\x00$port"};
109 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
110
111 _unregister $port, $group;
112
113 snd $_, reg0 => $port, $group
114 for values %port;
115 }
116
117 # register local port
118 sub register($$) {
119 my ($port, $group) = @_;
120
121 port_is_local $port
122 or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
123
124 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
125 push @{ $lreg{$group} }, $port;
126
127 snd $_, reg1 => $port, $group
128 for values %port;
129
130 _register $port, $group;
131
132 wantarray && AnyEvent::Util::guard { unregister $port, $group }
133 }
134
135 sub start_node {
136 my ($noderef) = @_;
137
138 return if exists $port{$noderef};
139 return if $noderef eq $NODE; # do not connect to ourselves
140
141 # establish connection
142 my $port = $port{$noderef} = spawn $noderef, "AnyEvent::MP::Global::connect", 0, $NODE;
143 # request any other nodes possibly known to us
144 mon $port, sub {
145 unreg_groups $noderef;
146 delete $port{$noderef};
147 };
148 snd $port, connect_nodes => up_nodes;
149 snd $port, set => \%lreg;
150 }
151
152 # other nodes connect via this
153 sub connect {
154 my ($version, $noderef) = @_;
155
156 # monitor them, silently die
157 mon $noderef, psub { kil $SELF };
158
159 rcv $SELF,
160 connect_nodes => sub {
161 for (@_) {
162 connect_node $_;
163 start_node $_;
164 }
165 },
166 set => sub {
167 set_groups $noderef, shift;
168 },
169 reg1 => \&_register,
170 reg0 => \&_unregister,
171 ;
172 }
173
174 sub mon_node {
175 my ($noderef, $is_up) = @_;
176
177 if ($is_up) {
178 start_node $noderef;
179 } else {
180 unreg_groups $noderef;
181 }
182 #warn "node<$noderef,$is_up>\n";#d#
183 }
184
185 mon_node $_, 1
186 for up_nodes;
187
188 mon_nodes \&mon_node;
189
190 =back
191
192 =head1 SEE ALSO
193
194 L<AnyEvent::MP>.
195
196 =head1 AUTHOR
197
198 Marc Lehmann <schmorp@schmorp.de>
199 http://home.schmorp.de/
200
201 =cut
202
203 1
204