… | |
… | |
44 | our $ID = "a"; |
44 | our $ID = "a"; |
45 | our %STATE; |
45 | our %STATE; |
46 | |
46 | |
47 | # another node tells us to await a connection |
47 | # another node tells us to await a connection |
48 | sub _expect { |
48 | sub _expect { |
49 | my ($id, $timeout, $initspec) = @_; |
49 | my ($id, $port, $timeout, $initspec) = @_; |
50 | |
50 | |
51 | $STATE{$id} = { |
51 | $STATE{$id} = { |
52 | id => $id, |
52 | id => $id, |
53 | to => (AE::timer $timeout, 0, sub { |
53 | to => (AE::timer $timeout, 0, sub { |
54 | $STATE{$id}{done}->(undef); |
54 | $STATE{$id}{done}(undef); |
55 | }), |
55 | }), |
56 | done => sub { |
56 | done => sub { |
57 | my ($hdl, $error) = @_; |
57 | my ($hdl, $error) = @_; |
58 | |
58 | |
59 | %{delete $STATE{$id}} = (); |
59 | %{delete $STATE{$id}} = (); |
60 | |
60 | |
61 | if (defined $hdl) { |
61 | if (defined $hdl) { |
62 | my ($func, @args) = @$initspec; |
62 | my ($func, @args) = @$initspec; |
63 | (AnyEvent::MP::Kernel::load_func $func)->(@args, $hdl); |
63 | (AnyEvent::MP::Kernel::load_func $func)->(@args, $hdl); |
|
|
64 | } else { |
|
|
65 | kil $port, AnyEvent::MP::DataConn:: => $error; |
64 | } |
66 | } |
65 | }, |
67 | }, |
66 | }; |
68 | }; |
67 | } |
69 | } |
68 | |
70 | |
… | |
… | |
74 | my $id = $conn->{local_greeting}{dataconn_id} || $conn->{remote_greeting}{dataconn_id} |
76 | my $id = $conn->{local_greeting}{dataconn_id} || $conn->{remote_greeting}{dataconn_id} |
75 | or return; |
77 | or return; |
76 | |
78 | |
77 | $conn->destroy; |
79 | $conn->destroy; |
78 | |
80 | |
79 | ($STATE{$id} or return)->{done}->($hdl, $error); |
81 | ($STATE{$id} or return)->{done}($hdl, $error); |
80 | } |
82 | } |
81 | |
83 | |
82 | # actively connect to some other node |
84 | # actively connect to some other node |
83 | sub _connect { |
85 | sub _connect { |
84 | my ($id, $node) = @_; |
86 | my ($id, $node) = @_; |
… | |
… | |
87 | or return; |
89 | or return; |
88 | |
90 | |
89 | my $addr = $AnyEvent::MP::Global::addr{$node}; |
91 | my $addr = $AnyEvent::MP::Global::addr{$node}; |
90 | |
92 | |
91 | @$addr |
93 | @$addr |
92 | or return $state->{fail}("$node: no listeners found"); |
94 | or return $state->{done}(undef, "$node: no listeners found"); |
93 | |
95 | |
94 | # I love hardcoded constants ! |
96 | # I love hardcoded constants ! |
95 | $state->{next} = AE::timer 0, 2, sub { |
97 | $state->{next} = AE::timer 0, 2, sub { |
96 | my $endpoint = shift @$addr |
98 | my $endpoint = shift @$addr |
97 | or return delete $state->{next}; |
99 | or return delete $state->{next}; |
… | |
… | |
109 | } |
111 | } |
110 | |
112 | |
111 | =item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], $cb->($handle) |
113 | =item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], $cb->($handle) |
112 | |
114 | |
113 | Creates a socket connection between the local node and the node C<$node> |
115 | Creates a socket connection between the local node and the node C<$node> |
114 | (which can also be specified as a port). |
116 | (which can also be specified as a port). One of the nodes must have |
|
|
117 | listening ports ("binds"). |
115 | |
118 | |
116 | When the connection could be successfully created, the C<$initfunc> |
119 | When the connection could be successfully created, the C<$initfunc> |
117 | will be called with the given C<@initdata> on the remote node (similar |
120 | will be called with the given C<@initdata> on the remote node (similar |
118 | to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object |
121 | to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object |
119 | representing the remote connection end as additional argument. |
122 | representing the remote connection end as additional argument. |
… | |
… | |
128 | |
131 | |
129 | In case of any error (timeout etc.), nothing will be called on |
132 | In case of any error (timeout etc.), nothing will be called on |
130 | the remote side, and the local port will be C<kil>'ed with an C<< |
133 | the remote side, and the local port will be C<kil>'ed with an C<< |
131 | AnyEvent::MP::DataConn => "error message" >> kill reason. |
134 | AnyEvent::MP::DataConn => "error message" >> kill reason. |
132 | |
135 | |
|
|
136 | The timeout should be large enough to cover at least four network |
|
|
137 | round-trips and one message round-trip. |
|
|
138 | |
|
|
139 | Example: on node1, establish a connection to node2 and send a line of text, |
|
|
140 | one node2, provide a receiver function. |
|
|
141 | |
|
|
142 | # node1, code executes in some port context |
|
|
143 | AnyEvent::MP::DataConn::connect_to "node2", 5, ["pkg::receiver", 1], sub { |
|
|
144 | my ($hdl) = @_; |
|
|
145 | warn "connection established, sending line.\n" |
|
|
146 | $hdl->push_write ("blabla\n") |
|
|
147 | }; |
|
|
148 | |
|
|
149 | # node2 |
|
|
150 | sub pkg::receiver { |
|
|
151 | my ($one, $hdl) = @_; |
|
|
152 | warn "connection established, wait for a line...\n" |
|
|
153 | |
|
|
154 | $hdl->push_read (line => sub { |
|
|
155 | warn "received a line: $_[1]\n"; |
|
|
156 | undef $hdl; |
|
|
157 | }); |
|
|
158 | } |
|
|
159 | |
133 | =cut |
160 | =cut |
134 | |
161 | |
135 | sub connect_to($$$;@) { |
162 | sub connect_to($$$;@) { |
136 | my ($node, $timeout, $initspec, @localmsg) = @_; |
163 | my ($node, $timeout, $initspec, $cb) = @_; |
137 | my $port = $SELF; |
164 | my $port = $SELF; |
138 | $node = node_of $node; |
165 | $node = node_of $node; |
139 | |
166 | |
140 | my $id = (++$ID) . "\@$NODE"; |
167 | my $id = (++$ID) . "\@$NODE"; |
141 | |
168 | |
142 | # damn, why do my simple state hashes resemble objects so quickly |
169 | # damn, why do my simple state hashes resemble objects so quickly |
143 | my $state = $STATE{$id} = { |
170 | my $state = $STATE{$id} = { |
144 | id => (++$ID) . "\@$NODE", |
171 | id => (++$ID) . "\@$NODE", |
145 | to => (AE::timer $timeout, 0, sub { |
172 | to => (AE::timer $timeout, 0, sub { |
146 | $STATE{$id}{done}->(undef, "$node: unable to establish connection within $timeout seconds"); |
173 | $STATE{$id}{done}(undef, "$node: unable to establish connection within $timeout seconds"); |
147 | }), |
174 | }), |
148 | done => sub { |
175 | done => sub { |
149 | my ($hdl, $error) = @_; |
176 | my ($hdl, $error) = @_; |
150 | |
177 | |
151 | warn "done<@_>\n";#d# |
|
|
152 | |
|
|
153 | delete $AnyEvent::MP::Global::ON_SETUP{$id}; |
178 | delete $AnyEvent::MP::Global::ON_SETUP{$id}; |
154 | %{delete $STATE{$id}} = (); |
179 | %{delete $STATE{$id}} = (); |
155 | |
180 | |
156 | if (defined $hdl) { |
181 | if (defined $hdl) { |
157 | snd @localmsg, $hdl; |
182 | $cb->($hdl); |
158 | } else { |
183 | } else { |
159 | kil $port, AnyEvent::MP::DataConn:: => $error; |
184 | kil $port, AnyEvent::MP::DataConn:: => $error; |
160 | } |
185 | } |
161 | }, |
186 | }, |
162 | }; |
187 | }; |
… | |
… | |
164 | if (AnyEvent::MP::Kernel::port_is_local $node) { |
189 | if (AnyEvent::MP::Kernel::port_is_local $node) { |
165 | return kil $port, AnyEvent::MP::DataConn:: => |
190 | return kil $port, AnyEvent::MP::DataConn:: => |
166 | "connect_to does not yet support local/local connections, please bug me about it"; |
191 | "connect_to does not yet support local/local connections, please bug me about it"; |
167 | |
192 | |
168 | } else { |
193 | } else { |
169 | AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_expect:: => $id, $timeout, $initspec; |
194 | AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_expect:: => $id, $port, $timeout, $initspec; |
170 | |
195 | |
171 | $state->{wait} = sub { |
196 | $state->{wait} = sub { |
172 | if (my $addr = $AnyEvent::MP::Global::addr{$node}) { |
197 | if (my $addr = $AnyEvent::MP::Global::addr{$node}) { |
173 | delete $AnyEvent::MP::Global::ON_SETUP{$id}; |
198 | delete $AnyEvent::MP::Global::ON_SETUP{$id}; |
174 | |
199 | |
… | |
… | |
176 | if (@$addr) { |
201 | if (@$addr) { |
177 | # node has listeners, so connect |
202 | # node has listeners, so connect |
178 | _connect $id, $node; |
203 | _connect $id, $node; |
179 | } else { |
204 | } else { |
180 | # no listeners, ask it to connect to us |
205 | # no listeners, ask it to connect to us |
181 | AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id, $node; |
206 | AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id, $NODE; |
182 | } |
207 | } |
183 | } else { |
208 | } else { |
184 | # wait for the next global setup handshake |
209 | # wait for the next global setup handshake |
|
|
210 | # due to the round-trip at the beginning, this should never be necessary |
185 | $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait}; |
211 | $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait}; |
186 | }; |
212 | }; |
187 | }; |
213 | }; |
188 | |
214 | |
189 | $state->{wait}->(); |
215 | # we actually have to make sure that the connection arrives after the expect message, and |
|
|
216 | # the easiest way to do this is to use an rpc call. |
|
|
217 | AnyEvent::MP::Kernel::snd_on $node, port { $state->{wait}() }; |
190 | } |
218 | } |
191 | } |
219 | } |
192 | |
220 | |
193 | =back |
221 | =back |
194 | |
222 | |