ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.4
Committed: Sun Aug 16 02:55:17 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
Changes since 1.3: +102 -10 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8 # -OR-
9 aemp addservice AnyEvent::MP::Global::
10
11 =head1 DESCRIPTION
12
13 This module provides an assortment of network-global functions: group name
14 registration and non-local locks.
15
16 It will also try to build and maintain a full mesh of all network nodes.
17
18 While it isn't mandatory to run the global services, running it on one
19 node will automatically run it on all nodes.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31 use MIME::Base64 ();
32
33 use AnyEvent::Util ();
34
35 use AnyEvent::MP;
36 use AnyEvent::MP::Kernel;
37
38 our $VERSION = $AnyEvent::MP::VERSION;
39
40 our %port; # our rendezvous port on the other side
41 our %lreg; # local registry, name => [pid...]
42 our %lmon; # local rgeistry monitoring name,pid => mon
43 our %greg; # global regstry, name => [pid...]
44
45 sub unreg_groups($) {
46 my ($noderef) = @_;
47
48 my $qr = qr/^\Q$noderef\E(?:#|$)/;
49
50 for my $group (values %greg) {
51 @$group = grep $_ !~ $qr, @$group;
52 }
53 }
54
55 sub set_groups($$) {
56 my ($noderef, $lreg) = @_;
57 }
58
59 =item $guard = register $port, $group
60
61 Register the given (local!) port in the named global group C<$group>.
62
63 The port will be unregistered automatically when the port is destroyed.
64
65 When not called in void context, then a guard object will be returned that
66 will also cause the name to be unregistered when destroyed.
67
68 =cut
69
70 # register port from any node
71 sub _register {
72 my ($port, $group) = @_;
73
74 push @{ $greg{$group} }, $port;
75 }
76
77 # unregister local port
78 sub unregister {
79 my ($port, $group) = @_;
80
81 delete $lmon{"$group\x00$port"};
82 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
83
84 _unregister $port, $group;
85
86 snd $_, reg0 => $port, $group
87 for values %port;
88 }
89
90 # register local port
91 sub register($$) {
92 my ($port, $group) = @_;
93
94 port_is_local $port
95 or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
96
97 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
98 push @{ $lreg{$group} }, $port;
99
100 snd $_, reg1 => $port, $group
101 for values %port;
102
103 _register $port, $group;
104
105 wantarray && AnyEvent::Util::guard { unregister $port, $group }
106 }
107
108 sub start_node {
109 my ($noderef) = @_;
110
111 return if exists $port{$noderef};
112
113 # establish connection
114 my $port = $port{$noderef} = spawn $noderef, "AnyEvent::MP::Global::connect", $NODE;
115 # request any other nodes possibly known to us
116 mon $port, sub {
117 unreg_groups $noderef;
118 delete $port{$noderef};
119 };
120 snd $port, connect_nodes => up_nodes;
121 snd $port, set => \%greg;
122 }
123
124 sub connect {
125 my ($noderef) = @_;
126
127 # monitor them, silently die
128 mon $noderef, psub { kil $SELF };
129
130 warn "$SELF,$NODE\n";#d#
131 rcv $SELF,
132 connect_nodes => sub {
133 for (@_) {
134 connect_node $_;
135 start_node $_;
136 }
137 },
138 set => sub {
139 unreg_groups $noderef;
140 set_groups $noderef, shift;
141 },
142 reg1 => \&_register,
143 reg0 => \&_unregister,
144 ;
145 }
146
147 sub mon_node {
148 my ($noderef, $is_up) = @_;
149
150 if ($is_up) {
151 start_node $noderef;
152 } else {
153 unreg_groups $noderef;
154 }
155 #warn "node<$noderef,$is_up>\n";#d#
156 }
157
158 mon_node $_, 1
159 for up_nodes;
160
161 mon_nodes \&mon_node;
162
163 =back
164
165 =head1 SEE ALSO
166
167 L<AnyEvent::MP>.
168
169 =head1 AUTHOR
170
171 Marc Lehmann <schmorp@schmorp.de>
172 http://home.schmorp.de/
173
174 =cut
175
176 1
177