ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.6
Committed: Mon Aug 17 03:33:18 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.5: +11 -3 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8 # -OR-
9 aemp addservice AnyEvent::MP::Global::
10
11 =head1 DESCRIPTION
12
13 This module provides an assortment of network-global functions: group name
14 registration and non-local locks.
15
16 It will also try to build and maintain a full mesh of all network nodes.
17
18 While it isn't mandatory to run the global services, running it on one
19 node will automatically run it on all nodes.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31 use MIME::Base64 ();
32
33 use AnyEvent::Util ();
34
35 use AnyEvent::MP;
36 use AnyEvent::MP::Kernel;
37
38 our $VERSION = $AnyEvent::MP::VERSION;
39
40 our %port; # our rendezvous port on the other side
41 our %lreg; # local registry, name => [pid...]
42 our %lmon; # local rgeistry monitoring name,pid => mon
43 our %greg; # global regstry, name => [pid...]
44
45 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
46
47 sub unreg_groups($) {
48 my ($noderef) = @_;
49
50 my $qr = qr/^\Q$noderef\E(?:#|$)/;
51
52 for my $group (values %greg) {
53 @$group = grep $_ !~ $qr, @$group;
54 }
55 }
56
57 sub set_groups($$) {
58 my ($noderef, $lreg) = @_;
59
60 while (my ($k, $v) = each %$lreg) {
61 push @{ $greg{$k} }, @$v;
62 }
63 }
64
65 =item $guard = register $port, $group
66
67 Register the given (local!) port in the named global group C<$group>.
68
69 The port will be unregistered automatically when the port is destroyed.
70
71 When not called in void context, then a guard object will be returned that
72 will also cause the name to be unregistered when destroyed.
73
74 =cut
75
76 # register port from any node
77 sub _register {
78 my ($port, $group) = @_;
79
80 push @{ $greg{$group} }, $port;
81 }
82
83 # unregister port from any node
84 sub _unregister {
85 my ($port, $group) = @_;
86
87 @{ $greg{$group} } = grep $_ ne $port, @{ $greg{$group} };
88 }
89
90 # unregister local port
91 sub unregister {
92 my ($port, $group) = @_;
93
94 delete $lmon{"$group\x00$port"};
95 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
96
97 _unregister $port, $group;
98
99 snd $_, reg0 => $port, $group
100 for values %port;
101 }
102
103 # register local port
104 sub register($$) {
105 my ($port, $group) = @_;
106
107 port_is_local $port
108 or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
109
110 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
111 push @{ $lreg{$group} }, $port;
112
113 snd $_, reg1 => $port, $group
114 for values %port;
115
116 _register $port, $group;
117
118 wantarray && AnyEvent::Util::guard { unregister $port, $group }
119 }
120
121 sub start_node {
122 my ($noderef) = @_;
123
124 return if exists $port{$noderef};
125 return if $noderef eq $NODE; # do not connect to ourselves
126
127 # establish connection
128 my $port = $port{$noderef} = spawn $noderef, "AnyEvent::MP::Global::connect", 0, $NODE;
129 # request any other nodes possibly known to us
130 mon $port, sub {
131 unreg_groups $noderef;
132 delete $port{$noderef};
133 };
134 snd $port, connect_nodes => up_nodes;
135 snd $port, set => \%lreg;
136 }
137
138 # other nodes connect via this
139 sub connect {
140 my ($version, $noderef) = @_;
141
142 # monitor them, silently die
143 mon $noderef, psub { kil $SELF };
144
145 rcv $SELF,
146 connect_nodes => sub {
147 for (@_) {
148 connect_node $_;
149 start_node $_;
150 }
151 },
152 set => sub {
153 set_groups $noderef, shift;
154 },
155 reg1 => \&_register,
156 reg0 => \&_unregister,
157 ;
158 }
159
160 sub mon_node {
161 my ($noderef, $is_up) = @_;
162
163 if ($is_up) {
164 start_node $noderef;
165 } else {
166 unreg_groups $noderef;
167 }
168 #warn "node<$noderef,$is_up>\n";#d#
169 }
170
171 mon_node $_, 1
172 for up_nodes;
173
174 mon_nodes \&mon_node;
175
176 =back
177
178 =head1 SEE ALSO
179
180 L<AnyEvent::MP>.
181
182 =head1 AUTHOR
183
184 Marc Lehmann <schmorp@schmorp.de>
185 http://home.schmorp.de/
186
187 =cut
188
189 1
190