ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.4
Committed: Sun Aug 16 02:55:17 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
Changes since 1.3: +102 -10 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8     # -OR-
9     aemp addservice AnyEvent::MP::Global::
10    
11     =head1 DESCRIPTION
12    
13     This module provides an assortment of network-global functions: group name
14     registration and non-local locks.
15    
16     It will also try to build and maintain a full mesh of all network nodes.
17    
18 root 1.3 While it isn't mandatory to run the global services, running it on one
19     node will automatically run it on all nodes.
20    
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31     use MIME::Base64 ();
32    
33 root 1.4 use AnyEvent::Util ();
34    
35 root 1.1 use AnyEvent::MP;
36     use AnyEvent::MP::Kernel;
37    
38     our $VERSION = $AnyEvent::MP::VERSION;
39    
40 root 1.4 our %port; # our rendezvous port on the other side
41     our %lreg; # local registry, name => [pid...]
42     our %lmon; # local rgeistry monitoring name,pid => mon
43     our %greg; # global regstry, name => [pid...]
44    
45     sub unreg_groups($) {
46     my ($noderef) = @_;
47    
48     my $qr = qr/^\Q$noderef\E(?:#|$)/;
49    
50     for my $group (values %greg) {
51     @$group = grep $_ !~ $qr, @$group;
52     }
53     }
54    
55     sub set_groups($$) {
56     my ($noderef, $lreg) = @_;
57     }
58    
59     =item $guard = register $port, $group
60    
61     Register the given (local!) port in the named global group C<$group>.
62    
63     The port will be unregistered automatically when the port is destroyed.
64    
65     When not called in void context, then a guard object will be returned that
66     will also cause the name to be unregistered when destroyed.
67    
68     =cut
69    
70     # register port from any node
71     sub _register {
72     my ($port, $group) = @_;
73    
74     push @{ $greg{$group} }, $port;
75     }
76    
77     # unregister local port
78     sub unregister {
79     my ($port, $group) = @_;
80    
81     delete $lmon{"$group\x00$port"};
82     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
83    
84     _unregister $port, $group;
85    
86     snd $_, reg0 => $port, $group
87     for values %port;
88     }
89    
90     # register local port
91     sub register($$) {
92     my ($port, $group) = @_;
93    
94     port_is_local $port
95     or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
96    
97     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
98     push @{ $lreg{$group} }, $port;
99    
100     snd $_, reg1 => $port, $group
101     for values %port;
102    
103     _register $port, $group;
104    
105     wantarray && AnyEvent::Util::guard { unregister $port, $group }
106     }
107    
108     sub start_node {
109     my ($noderef) = @_;
110    
111     return if exists $port{$noderef};
112    
113     # establish connection
114     my $port = $port{$noderef} = spawn $noderef, "AnyEvent::MP::Global::connect", $NODE;
115     # request any other nodes possibly known to us
116     mon $port, sub {
117     unreg_groups $noderef;
118     delete $port{$noderef};
119     };
120     snd $port, connect_nodes => up_nodes;
121     snd $port, set => \%greg;
122     }
123 root 1.3
124     sub connect {
125     my ($noderef) = @_;
126 root 1.1
127 root 1.3 # monitor them, silently die
128     mon $noderef, psub { kil $SELF };
129    
130 root 1.4 warn "$SELF,$NODE\n";#d#
131     rcv $SELF,
132     connect_nodes => sub {
133     for (@_) {
134     connect_node $_;
135     start_node $_;
136     }
137     },
138     set => sub {
139     unreg_groups $noderef;
140     set_groups $noderef, shift;
141     },
142     reg1 => \&_register,
143     reg0 => \&_unregister,
144     ;
145 root 1.3 }
146 root 1.1
147     sub mon_node {
148     my ($noderef, $is_up) = @_;
149    
150     if ($is_up) {
151 root 1.4 start_node $noderef;
152 root 1.2 } else {
153 root 1.4 unreg_groups $noderef;
154 root 1.1 }
155     #warn "node<$noderef,$is_up>\n";#d#
156     }
157    
158     mon_node $_, 1
159     for up_nodes;
160    
161     mon_nodes \&mon_node;
162    
163     =back
164    
165     =head1 SEE ALSO
166    
167     L<AnyEvent::MP>.
168    
169     =head1 AUTHOR
170    
171     Marc Lehmann <schmorp@schmorp.de>
172     http://home.schmorp.de/
173    
174     =cut
175    
176     1
177