… | |
… | |
36 | |
36 | |
37 | use AnyEvent (); |
37 | use AnyEvent (); |
38 | use AnyEvent::Util (); |
38 | use AnyEvent::Util (); |
39 | |
39 | |
40 | use AnyEvent::MP; |
40 | use AnyEvent::MP; |
41 | use AnyEvent::MP::Kernel; |
41 | use AnyEvent::MP::Kernel (); |
|
|
42 | use AnyEvent::MP::Global (); |
|
|
43 | |
|
|
44 | our $ID = "a"; |
|
|
45 | our %STATE; |
|
|
46 | |
|
|
47 | sub _accept { |
|
|
48 | my ($id, $timeout, $initspec) = @_; |
|
|
49 | |
|
|
50 | my $cleanup = sub { |
|
|
51 | %{delete $STATE{$id}} = (); |
|
|
52 | }; |
|
|
53 | |
|
|
54 | $STATE{$id} = { |
|
|
55 | id => $id, |
|
|
56 | to => (AE::timer $timeout, 0, $cleanup), |
|
|
57 | success => sub { |
|
|
58 | my ($func, @args) = @$initspec; |
|
|
59 | $cleanup->(); |
|
|
60 | (AnyEvent::MP::Kernel::load_func $func)->(@args, @_); |
|
|
61 | }, |
|
|
62 | fail => sub { |
|
|
63 | $cleanup->(); |
|
|
64 | # nop? |
|
|
65 | }, |
|
|
66 | }; |
|
|
67 | } |
42 | |
68 | |
43 | sub _inject { |
69 | sub _inject { |
44 | my ($conn, $error) = @_; |
70 | my ($conn, $error) = @_; |
45 | } |
|
|
46 | |
71 | |
|
|
72 | my $hdl = delete $conn->{hdl}; |
|
|
73 | die "inject <@_>\n";#d# |
|
|
74 | } |
|
|
75 | |
|
|
76 | sub _connect { |
|
|
77 | my ($id) = @_; |
|
|
78 | |
|
|
79 | my $state = $STATE{$id} |
|
|
80 | or return; |
|
|
81 | |
|
|
82 | my $addr = $AnyEvent::MP::Global::addr{$state->{node}}; |
|
|
83 | |
|
|
84 | @$addr |
|
|
85 | or return $state->{fail}("$state->{node}: no listeners found"); |
|
|
86 | |
|
|
87 | my %transport; |
|
|
88 | |
|
|
89 | # I love hardcoded constants |
|
|
90 | $state->{next} = AE::timer 0, 2, sub { |
|
|
91 | my $endpoint = shift @$addr |
|
|
92 | or return delete $state->{next}; |
|
|
93 | |
|
|
94 | my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint |
|
|
95 | or return; |
|
|
96 | |
|
|
97 | my $transport; $transport = AnyEvent::MP::Transport::mp_connect |
|
|
98 | $host, $port, |
|
|
99 | protocol => "aemp-dataconn", |
|
|
100 | sub { $transport->destroy }, |
|
|
101 | ; |
|
|
102 | }; |
|
|
103 | } |
|
|
104 | |
47 | =item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], @localmsg |
105 | =item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], $cb->($handle) |
48 | |
106 | |
49 | Creates a socket connection between the local node and the node C<$node> |
107 | Creates a socket connection between the local node and the node C<$node> |
50 | (which can also be specified as a port). |
108 | (which can also be specified as a port). |
51 | |
109 | |
52 | When the connection could be successfully created, the C<$initfunc> |
110 | When the connection could be successfully created, the C<$initfunc> |
53 | will be called with the given C<@initdata> on the remote node (similar |
111 | will be called with the given C<@initdata> on the remote node (similar |
54 | to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object |
112 | to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object |
55 | representing the connection as additional argument. |
113 | representing the remote connection end as additional argument. |
56 | |
114 | |
57 | Also, the local port in which context C<connect_to> was called (C<$SELF>) |
115 | Also, the callback given as last argument will be called with the |
58 | will receive a message with contents C<@localmsg>, again with the |
|
|
59 | AnyEvent::Handle object tacked to the end. |
116 | AnyEvent::Handle object for the local side. |
60 | |
117 | |
61 | The AnyEvent::Handle object will be in a "quiescent" state - you could rip |
118 | The AnyEvent::Handle objects will be in a "quiescent" state - you could rip |
62 | out the file handle and forget about it, but it is recommended to use it, |
119 | out the file handle and forget about it, but it is recommended to use it, |
63 | as the security settings might have called for a TLS connection. IF you |
120 | as the security settings might have called for a TLS connection. If you |
64 | opt to use it, you at least have to set an C<on_error> callback. |
121 | opt to use it, you at least have to set an C<on_error> callback. |
65 | |
122 | |
66 | In case of any error (or timeout), nothing will be called on the |
123 | In case of any error (timeout etc.), nothing will be called on |
67 | remote side, and the local port will be C<kil>'ed with an C<< |
124 | the remote side, and the local port will be C<kil>'ed with an C<< |
68 | AnyEvent::MP::DataConn => "error message" >> kill reason. |
125 | AnyEvent::MP::DataConn => "error message" >> kill reason. |
69 | |
126 | |
70 | =cut |
127 | =cut |
71 | |
128 | |
72 | sub connect_to($$$;@) { |
129 | sub connect_to($$$;@) { |
73 | my ($node, $timeout, $initspec, @localmsg) = @_; |
130 | my ($node, $timeout, $initspec, @localmsg) = @_; |
74 | my $port = $SELF; |
131 | my $port = $SELF; |
75 | $node = node_of $node; |
132 | $node = node_of $node; |
76 | |
133 | |
77 | my %state; |
134 | my $id = (++$ID) . "\@$NODE"; |
78 | |
135 | |
|
|
136 | my $cleanup = sub { |
|
|
137 | delete $AnyEvent::MP::Global::ON_SETUP{$id}; |
|
|
138 | %{delete $STATE{$id}} = (); |
|
|
139 | }; |
|
|
140 | |
|
|
141 | # damn, why do my simple state hashes resemble objects so quickly |
|
|
142 | my $state = $STATE{$id} = { |
|
|
143 | id => (++$ID) . "\@$NODE", |
|
|
144 | node => $node, |
|
|
145 | port => $port, |
79 | $state{to} = AE::timer $timeout, 0, sub { |
146 | to => (AE::timer $timeout, 0, sub { |
80 | %state = (); |
147 | $cleanup->(); |
81 | kil $port, AnyEvent::MP::DataConn:: => "$node: unable to establish connection within $timeout seconds"; |
148 | kil $port, AnyEvent::MP::DataConn:: => "$node: unable to establish connection within $timeout seconds"; |
|
|
149 | }), |
|
|
150 | success => sub { |
|
|
151 | $cleanup->(); |
|
|
152 | snd @localmsg, @_; |
|
|
153 | }, |
|
|
154 | fail => sub { |
|
|
155 | $cleanup->(); |
|
|
156 | kil $port, AnyEvent::MP::DataConn:: => $_[0]; |
|
|
157 | }, |
82 | }; |
158 | }; |
83 | |
159 | |
84 | my $continue_connect = sub { |
160 | if (AnyEvent::MP::Kernel::port_is_local $node) { |
85 | use Data::Dumper; |
|
|
86 | warn Dumper $NODE{$node};#d# |
|
|
87 | }; |
|
|
88 | |
|
|
89 | if (port_is_local $node) { |
|
|
90 | return kil $port, AnyEvent::MP::DataConn:: => |
161 | return kil $port, AnyEvent::MP::DataConn:: => |
91 | "connect_to does not yet support local/local connections, please bug me about it"; |
162 | "connect_to does not yet support local/local connections, please bug me about it"; |
92 | } elsif (node_is_up $node) { |
163 | |
93 | $continue_connect->(); |
|
|
94 | } else { |
164 | } else { |
95 | $state{mon_nodes} = mon_nodes sub { |
165 | AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_accept:: => $id, $timeout, $initspec; |
96 | return unless $_[0] eq $node && $_[1]; |
166 | |
97 | delete $state{mon_nodes}; |
167 | $state->{wait} = sub { |
|
|
168 | if (my $addr = $AnyEvent::MP::Global::addr{$node}) { |
|
|
169 | delete $AnyEvent::MP::Global::ON_SETUP{$id}; |
|
|
170 | |
|
|
171 | warn "<@_ @$addr $addr>\n";#d# |
98 | $continue_connect->() |
172 | # continue connect |
|
|
173 | if (@$addr) { |
|
|
174 | # node has listeners, so connect |
|
|
175 | _connect $id; |
|
|
176 | } else { |
|
|
177 | # no listeners, ask it to connect to us |
|
|
178 | AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id; |
|
|
179 | } |
|
|
180 | } else { |
|
|
181 | # wait for the next global setup handshake |
|
|
182 | $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait}; |
|
|
183 | }; |
99 | }; |
184 | }; |
|
|
185 | |
|
|
186 | $state->{wait}->(); |
100 | } |
187 | } |
101 | } |
188 | } |
102 | |
189 | |
103 | =back |
190 | =back |
104 | |
191 | |