… | |
… | |
30 | use AE (); |
30 | use AE (); |
31 | |
31 | |
32 | use base "Exporter"; |
32 | use base "Exporter"; |
33 | |
33 | |
34 | our $VERSION = '0.01'; |
34 | our $VERSION = '0.01'; |
35 | our @EXPORT = qw(NODE $NODE snd _any_ become_slave become_public); |
35 | our @EXPORT = qw( |
|
|
36 | NODE $NODE snd del _any_ |
|
|
37 | become_slave become_public |
|
|
38 | ); |
36 | |
39 | |
37 | our $DEFAULT_SECRET; |
40 | our $DEFAULT_SECRET; |
38 | |
41 | |
39 | our $CONNECT_INTERVAL = 5; # new connect every 5s, at least |
42 | our $CONNECT_INTERVAL = 5; # new connect every 5s, at least |
40 | our $CONNECT_TIMEOUT = 30; # includes handshake |
43 | our $CONNECT_TIMEOUT = 30; # includes handshake |
… | |
… | |
88 | our $NODE = $$; |
91 | our $NODE = $$; |
89 | our $PORT; |
92 | our $PORT; |
90 | |
93 | |
91 | our %NODE; # node id to transport mapping, or "undef", for local node |
94 | our %NODE; # node id to transport mapping, or "undef", for local node |
92 | our %PORT; # local ports |
95 | our %PORT; # local ports |
|
|
96 | |
|
|
97 | our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb) |
|
|
98 | our %LMON; # monitored _local_ ports |
|
|
99 | |
93 | our %WKP; |
100 | our %WKP; |
94 | our %LISTENER; # local transports |
101 | our %LISTENER; # local transports |
95 | |
102 | |
|
|
103 | our $SRCNODE; # holds the sending node during _inject |
|
|
104 | |
96 | sub NODE() { $NODE } |
105 | sub NODE() { $NODE } |
97 | |
106 | |
98 | sub _ANY_() { 1 } |
107 | sub _ANY_() { 1 } |
99 | sub _any_() { \&_ANY_ } |
108 | sub _any_() { \&_ANY_ } |
100 | |
109 | |
101 | sub _inject { |
110 | sub _inject { |
102 | ($PORT{$_[0][0]} or return)->(@{$_[0][1]}); |
111 | &{ $PORT{+shift} or return }; |
103 | } |
112 | } |
104 | |
113 | |
105 | sub add_node { |
114 | sub add_node { |
106 | my ($noderef) = @_; |
115 | my ($noderef) = @_; |
107 | |
116 | |
… | |
… | |
123 | } |
132 | } |
124 | |
133 | |
125 | sub snd(@) { |
134 | sub snd(@) { |
126 | my ($noderef, $port) = split /#/, shift, 2; |
135 | my ($noderef, $port) = split /#/, shift, 2; |
127 | |
136 | |
128 | add_node $noderef |
137 | ($NODE{$noderef} || add_node $noderef) |
129 | unless exists $NODE{$noderef}; |
138 | ->send ([$port, @_]); |
|
|
139 | } |
130 | |
140 | |
131 | $NODE{$noderef}->send (["$port", [@_]]); |
141 | sub del($) { |
|
|
142 | my ($noderef, $port) = split /#/, shift, 2; |
|
|
143 | |
|
|
144 | delete $PORT{$port}; |
|
|
145 | |
|
|
146 | my $mon = delete $LMON{$port} |
|
|
147 | or return; |
|
|
148 | |
|
|
149 | $_->() for values %$mon; |
132 | } |
150 | } |
133 | |
151 | |
134 | sub become_public { |
152 | sub become_public { |
135 | return if $PUBLIC; |
153 | return if $PUBLIC; |
136 | |
154 | |
… | |
… | |
144 | |
162 | |
145 | my ($host, $port) = AnyEvent::Socket::parse_hostport $t; |
163 | my ($host, $port) = AnyEvent::Socket::parse_hostport $t; |
146 | |
164 | |
147 | $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, |
165 | $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, |
148 | @args, |
166 | @args, |
149 | on_error => sub { |
|
|
150 | die "on_error<@_>\n";#d# |
|
|
151 | }, |
|
|
152 | on_connect => sub { |
|
|
153 | my ($tp) = @_; |
|
|
154 | |
|
|
155 | $NODE{$tp->{remote_id}} = $_[0]; |
|
|
156 | }, |
|
|
157 | sub { |
167 | sub { |
158 | my ($tp) = @_; |
168 | my ($tp) = @_; |
159 | |
169 | |
160 | $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp; |
170 | # TODO: urgs |
|
|
171 | my $node = add_node $tp->{remote_node}; |
|
|
172 | $node->{trial}{accept} = $tp; |
161 | }, |
173 | }, |
162 | ; |
174 | ; |
163 | } |
175 | } |
164 | |
176 | |
165 | $PUBLIC = 1; |
177 | $PUBLIC = 1; |
166 | } |
178 | } |
167 | |
179 | |
168 | ############################################################################# |
180 | ############################################################################# |
169 | # self node code |
181 | # self node code |
170 | |
182 | |
|
|
183 | our %node_req = ( |
|
|
184 | # monitoring |
|
|
185 | mon0 => sub { # disable monitoring |
|
|
186 | my $portid = shift; |
|
|
187 | my $node = $SRCNODE; |
|
|
188 | $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid}); |
|
|
189 | }, |
|
|
190 | mon1 => sub { # enable monitoring |
|
|
191 | my $portid = shift; |
|
|
192 | my $node = $SRCNODE; |
|
|
193 | $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub { |
|
|
194 | $node->send (["", exit => $portid]); |
|
|
195 | }); |
|
|
196 | }, |
|
|
197 | exit => sub { |
|
|
198 | my $cbs = delete $SRCNODE->{lmon}{$_[0]} |
|
|
199 | or return; |
|
|
200 | |
|
|
201 | $_->() for @$cbs; |
|
|
202 | }, |
|
|
203 | |
|
|
204 | # well-known-port lookup |
|
|
205 | wkp => sub { |
|
|
206 | my $wkname = shift; |
|
|
207 | snd @_, $WKP{$wkname}; |
|
|
208 | }, |
|
|
209 | |
|
|
210 | # relay message to another node / generic echo |
|
|
211 | relay => sub { |
|
|
212 | &snd; |
|
|
213 | }, |
|
|
214 | |
|
|
215 | # random garbage |
|
|
216 | eval => sub { |
|
|
217 | my @res = eval shift; |
|
|
218 | snd @_, "$@", @res if @_; |
|
|
219 | }, |
|
|
220 | time => sub { |
|
|
221 | snd @_, AE::time; |
|
|
222 | }, |
|
|
223 | devnull => sub { |
|
|
224 | # |
|
|
225 | }, |
|
|
226 | ); |
|
|
227 | |
171 | $NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE; |
228 | $NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE; |
172 | $PORT{""} = sub { |
229 | $PORT{""} = sub { &{ $node_req{+shift} or return } }; |
173 | given (shift) { |
|
|
174 | when ("wkp") { |
|
|
175 | my $wkname = shift; |
|
|
176 | snd @_, $WKP{$wkname}; |
|
|
177 | } |
|
|
178 | when ("relay") { |
|
|
179 | &snd; |
|
|
180 | } |
|
|
181 | when ("eval") { |
|
|
182 | my @res = eval shift; |
|
|
183 | snd @_, "$@", @res if @_; |
|
|
184 | } |
|
|
185 | when ("time") { |
|
|
186 | snd @_, AE::time; |
|
|
187 | } |
|
|
188 | when ("devnull") { |
|
|
189 | # |
|
|
190 | } |
|
|
191 | } |
|
|
192 | }; |
|
|
193 | |
230 | |
194 | =back |
231 | =back |
195 | |
232 | |
196 | =head1 SEE ALSO |
233 | =head1 SEE ALSO |
197 | |
234 | |