ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Global.pm (file contents):
Revision 1.3 by root, Sat Aug 15 04:34:34 2009 UTC vs.
Revision 1.4 by root, Sun Aug 16 02:55:17 2009 UTC

28 28
29use common::sense; 29use common::sense;
30use Carp (); 30use Carp ();
31use MIME::Base64 (); 31use MIME::Base64 ();
32 32
33use AnyEvent::Util ();
34
33use AnyEvent::MP; 35use AnyEvent::MP;
34use AnyEvent::MP::Kernel; 36use AnyEvent::MP::Kernel;
35 37
36our $VERSION = $AnyEvent::MP::VERSION; 38our $VERSION = $AnyEvent::MP::VERSION;
37 39
38our $port = port; 40our %port; # our rendezvous port on the other side
39our %other; # our rendevouz port on the other side 41our %lreg; # local registry, name => [pid...]
42our %lmon; # local rgeistry monitoring name,pid => mon
43our %greg; # global regstry, name => [pid...]
44
45sub unreg_groups($) {
46 my ($noderef) = @_;
47
48 my $qr = qr/^\Q$noderef\E(?:#|$)/;
49
50 for my $group (values %greg) {
51 @$group = grep $_ !~ $qr, @$group;
52 }
53}
54
55sub set_groups($$) {
56 my ($noderef, $lreg) = @_;
57}
58
59=item $guard = register $port, $group
60
61Register the given (local!) port in the named global group C<$group>.
62
63The port will be unregistered automatically when the port is destroyed.
64
65When not called in void context, then a guard object will be returned that
66will also cause the name to be unregistered when destroyed.
67
68=cut
69
70# register port from any node
71sub _register {
72 my ($port, $group) = @_;
73
74 push @{ $greg{$group} }, $port;
75}
76
77# unregister local port
78sub unregister {
79 my ($port, $group) = @_;
80
81 delete $lmon{"$group\x00$port"};
82 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
83
84 _unregister $port, $group;
85
86 snd $_, reg0 => $port, $group
87 for values %port;
88}
89
90# register local port
91sub register($$) {
92 my ($port, $group) = @_;
93
94 port_is_local $port
95 or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
96
97 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
98 push @{ $lreg{$group} }, $port;
99
100 snd $_, reg1 => $port, $group
101 for values %port;
102
103 _register $port, $group;
104
105 wantarray && AnyEvent::Util::guard { unregister $port, $group }
106}
107
108sub start_node {
109 my ($noderef) = @_;
110
111 return if exists $port{$noderef};
112
113 # establish connection
114 my $port = $port{$noderef} = spawn $noderef, "AnyEvent::MP::Global::connect", $NODE;
115 # request any other nodes possibly known to us
116 mon $port, sub {
117 unreg_groups $noderef;
118 delete $port{$noderef};
119 };
120 snd $port, connect_nodes => up_nodes;
121 snd $port, set => \%greg;
122}
40 123
41sub connect { 124sub connect {
42 my ($noderef) = @_; 125 my ($noderef) = @_;
43 126
44 # monitor them, silently die 127 # monitor them, silently die
45 mon $noderef, psub { kil $SELF }; 128 mon $noderef, psub { kil $SELF };
46 129
130 warn "$SELF,$NODE\n";#d#
131 rcv $SELF,
47 rcv $SELF, connect_nodes => sub { 132 connect_nodes => sub {
133 for (@_) {
48 connect_node $_ for @_; 134 connect_node $_;
135 start_node $_;
136 }
137 },
138 set => sub {
139 unreg_groups $noderef;
140 set_groups $noderef, shift;
141 },
142 reg1 => \&_register,
143 reg0 => \&_unregister,
49 }; 144 ;
50} 145}
51 146
52sub mon_node { 147sub mon_node {
53 my ($noderef, $is_up) = @_; 148 my ($noderef, $is_up) = @_;
54 149
55 if ($is_up) { 150 if ($is_up) {
56 # establish connection 151 start_node $noderef;
57 my $other = $other{$noderef} = spawn $noderef, "AnyEvent::MP::Global::connect", $NODE;
58 # request any other nodes possibly known to us
59 snd $other, connect_nodes => up_nodes;
60 } else { 152 } else {
61 kil delete $other{$noderef}; 153 unreg_groups $noderef;
62 } 154 }
63 #warn "node<$noderef,$is_up>\n";#d# 155 #warn "node<$noderef,$is_up>\n";#d#
64} 156}
65 157
66mon_node $_, 1 158mon_node $_, 1

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines