ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Base.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Base.pm (file contents):
Revision 1.4 by root, Mon Aug 3 15:40:53 2009 UTC vs.
Revision 1.5 by root, Mon Aug 3 21:35:03 2009 UTC

30use AE (); 30use AE ();
31 31
32use base "Exporter"; 32use base "Exporter";
33 33
34our $VERSION = '0.01'; 34our $VERSION = '0.01';
35our @EXPORT = qw(NODE $NODE snd _any_ become_slave become_public); 35our @EXPORT = qw(
36 NODE $NODE snd del _any_
37 become_slave become_public
38);
36 39
37our $DEFAULT_SECRET; 40our $DEFAULT_SECRET;
38 41
39our $CONNECT_INTERVAL = 5; # new connect every 5s, at least 42our $CONNECT_INTERVAL = 5; # new connect every 5s, at least
40our $CONNECT_TIMEOUT = 30; # includes handshake 43our $CONNECT_TIMEOUT = 30; # includes handshake
88our $NODE = $$; 91our $NODE = $$;
89our $PORT; 92our $PORT;
90 93
91our %NODE; # node id to transport mapping, or "undef", for local node 94our %NODE; # node id to transport mapping, or "undef", for local node
92our %PORT; # local ports 95our %PORT; # local ports
96
97our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
98our %LMON; # monitored _local_ ports
99
93our %WKP; 100our %WKP;
94our %LISTENER; # local transports 101our %LISTENER; # local transports
95 102
103our $SRCNODE; # holds the sending node during _inject
104
96sub NODE() { $NODE } 105sub NODE() { $NODE }
97 106
98sub _ANY_() { 1 } 107sub _ANY_() { 1 }
99sub _any_() { \&_ANY_ } 108sub _any_() { \&_ANY_ }
100 109
101sub _inject { 110sub _inject {
102 ($PORT{$_[0][0]} or return)->(@{$_[0][1]}); 111 &{ $PORT{+shift} or return };
103} 112}
104 113
105sub add_node { 114sub add_node {
106 my ($noderef) = @_; 115 my ($noderef) = @_;
107 116
123} 132}
124 133
125sub snd(@) { 134sub snd(@) {
126 my ($noderef, $port) = split /#/, shift, 2; 135 my ($noderef, $port) = split /#/, shift, 2;
127 136
128 add_node $noderef 137 ($NODE{$noderef} || add_node $noderef)
129 unless exists $NODE{$noderef}; 138 ->send ([$port, @_]);
139}
130 140
131 $NODE{$noderef}->send (["$port", [@_]]); 141sub del($) {
142 my ($noderef, $port) = split /#/, shift, 2;
143
144 delete $PORT{$port};
145
146 my $mon = delete $LMON{$port}
147 or return;
148
149 $_->() for values %$mon;
132} 150}
133 151
134sub become_public { 152sub become_public {
135 return if $PUBLIC; 153 return if $PUBLIC;
136 154
144 162
145 my ($host, $port) = AnyEvent::Socket::parse_hostport $t; 163 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
146 164
147 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, 165 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
148 @args, 166 @args,
149 on_error => sub {
150 die "on_error<@_>\n";#d#
151 },
152 on_connect => sub {
153 my ($tp) = @_;
154
155 $NODE{$tp->{remote_id}} = $_[0];
156 },
157 sub { 167 sub {
158 my ($tp) = @_; 168 my ($tp) = @_;
159 169
160 $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp; 170 # TODO: urgs
171 my $node = add_node $tp->{remote_node};
172 $node->{trial}{accept} = $tp;
161 }, 173 },
162 ; 174 ;
163 } 175 }
164 176
165 $PUBLIC = 1; 177 $PUBLIC = 1;
166} 178}
167 179
168############################################################################# 180#############################################################################
169# self node code 181# self node code
170 182
183our %node_req = (
184 # monitoring
185 mon0 => sub { # disable monitoring
186 my $portid = shift;
187 my $node = $SRCNODE;
188 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
189 },
190 mon1 => sub { # enable monitoring
191 my $portid = shift;
192 my $node = $SRCNODE;
193 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
194 $node->send (["", exit => $portid]);
195 });
196 },
197 exit => sub {
198 my $cbs = delete $SRCNODE->{lmon}{$_[0]}
199 or return;
200
201 $_->() for @$cbs;
202 },
203
204 # well-known-port lookup
205 wkp => sub {
206 my $wkname = shift;
207 snd @_, $WKP{$wkname};
208 },
209
210 # relay message to another node / generic echo
211 relay => sub {
212 &snd;
213 },
214
215 # random garbage
216 eval => sub {
217 my @res = eval shift;
218 snd @_, "$@", @res if @_;
219 },
220 time => sub {
221 snd @_, AE::time;
222 },
223 devnull => sub {
224 #
225 },
226);
227
171$NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE; 228$NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE;
172$PORT{""} = sub { 229$PORT{""} = sub { &{ $node_req{+shift} or return } };
173 given (shift) {
174 when ("wkp") {
175 my $wkname = shift;
176 snd @_, $WKP{$wkname};
177 }
178 when ("relay") {
179 &snd;
180 }
181 when ("eval") {
182 my @res = eval shift;
183 snd @_, "$@", @res if @_;
184 }
185 when ("time") {
186 snd @_, AE::time;
187 }
188 when ("devnull") {
189 #
190 }
191 }
192};
193 230
194=back 231=back
195 232
196=head1 SEE ALSO 233=head1 SEE ALSO
197 234

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines