… | |
… | |
42 | use AnyEvent::MP::Global (); |
42 | use AnyEvent::MP::Global (); |
43 | |
43 | |
44 | our $ID = "a"; |
44 | our $ID = "a"; |
45 | our %STATE; |
45 | our %STATE; |
46 | |
46 | |
47 | sub _accept { |
47 | # another node tells us to await a connection |
|
|
48 | sub _expect { |
48 | my ($id, $timeout, $initspec) = @_; |
49 | my ($id, $timeout, $initspec) = @_; |
49 | |
50 | |
50 | my $cleanup = sub { |
|
|
51 | %{delete $STATE{$id}} = (); |
|
|
52 | }; |
|
|
53 | |
|
|
54 | $STATE{$id} = { |
51 | $STATE{$id} = { |
55 | id => $id, |
52 | id => $id, |
56 | to => (AE::timer $timeout, 0, $cleanup), |
53 | to => (AE::timer $timeout, 0, sub { |
|
|
54 | $STATE{$id}{done}->(undef); |
|
|
55 | }), |
57 | success => sub { |
56 | done => sub { |
|
|
57 | my ($hdl, $error) = @_; |
|
|
58 | |
|
|
59 | %{delete $STATE{$id}} = (); |
|
|
60 | |
|
|
61 | if (defined $hdl) { |
58 | my ($func, @args) = @$initspec; |
62 | my ($func, @args) = @$initspec; |
59 | $cleanup->(); |
|
|
60 | (AnyEvent::MP::Kernel::load_func $func)->(@args, @_); |
63 | (AnyEvent::MP::Kernel::load_func $func)->(@args, $hdl); |
61 | }, |
64 | } |
62 | fail => sub { |
|
|
63 | $cleanup->(); |
|
|
64 | # nop? |
|
|
65 | }, |
65 | }, |
66 | }; |
66 | }; |
67 | } |
67 | } |
68 | |
68 | |
|
|
69 | # AEMP::Transport call for dataconn-connections |
69 | sub _inject { |
70 | sub _inject { |
70 | my ($conn, $error) = @_; |
71 | my ($conn, $error) = @_; |
71 | |
72 | |
72 | my $hdl = delete $conn->{hdl}; |
73 | my $hdl = defined $error ? undef : delete $conn->{hdl}; |
73 | die "inject <@_>\n";#d# |
74 | my $id = $conn->{local_greeting}{dataconn_id} || $conn->{remote_greeting}{dataconn_id} |
74 | } |
75 | or return; |
75 | |
76 | |
|
|
77 | $conn->destroy; |
|
|
78 | |
|
|
79 | ($STATE{$id} or return)->{done}->($hdl, $error); |
|
|
80 | } |
|
|
81 | |
|
|
82 | # actively connect to some other node |
76 | sub _connect { |
83 | sub _connect { |
77 | my ($id) = @_; |
84 | my ($id, $node) = @_; |
78 | |
85 | |
79 | my $state = $STATE{$id} |
86 | my $state = $STATE{$id} |
80 | or return; |
87 | or return; |
81 | |
88 | |
82 | my $addr = $AnyEvent::MP::Global::addr{$state->{node}}; |
89 | my $addr = $AnyEvent::MP::Global::addr{$node}; |
83 | |
90 | |
84 | @$addr |
91 | @$addr |
85 | or return $state->{fail}("$state->{node}: no listeners found"); |
92 | or return $state->{fail}("$node: no listeners found"); |
86 | |
93 | |
87 | my %transport; |
|
|
88 | |
|
|
89 | # I love hardcoded constants |
94 | # I love hardcoded constants ! |
90 | $state->{next} = AE::timer 0, 2, sub { |
95 | $state->{next} = AE::timer 0, 2, sub { |
91 | my $endpoint = shift @$addr |
96 | my $endpoint = shift @$addr |
92 | or return delete $state->{next}; |
97 | or return delete $state->{next}; |
93 | |
98 | |
94 | my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint |
99 | my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint |
95 | or return; |
100 | or return; |
96 | |
101 | |
97 | my $transport; $transport = AnyEvent::MP::Transport::mp_connect |
102 | my $transport; $transport = AnyEvent::MP::Transport::mp_connect |
98 | $host, $port, |
103 | $host, $port, |
99 | protocol => "aemp-dataconn", |
104 | protocol => "aemp-dataconn", |
100 | sub { $transport->destroy }, |
105 | local_greeting => { dataconn_id => $id }, |
|
|
106 | sub { $transport->destroy }, #TODO: destroys handshaked conenctions too early |
101 | ; |
107 | ; |
102 | }; |
108 | }; |
103 | } |
109 | } |
104 | |
110 | |
105 | =item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], $cb->($handle) |
111 | =item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], $cb->($handle) |
… | |
… | |
131 | my $port = $SELF; |
137 | my $port = $SELF; |
132 | $node = node_of $node; |
138 | $node = node_of $node; |
133 | |
139 | |
134 | my $id = (++$ID) . "\@$NODE"; |
140 | my $id = (++$ID) . "\@$NODE"; |
135 | |
141 | |
136 | my $cleanup = sub { |
|
|
137 | delete $AnyEvent::MP::Global::ON_SETUP{$id}; |
|
|
138 | %{delete $STATE{$id}} = (); |
|
|
139 | }; |
|
|
140 | |
|
|
141 | # damn, why do my simple state hashes resemble objects so quickly |
142 | # damn, why do my simple state hashes resemble objects so quickly |
142 | my $state = $STATE{$id} = { |
143 | my $state = $STATE{$id} = { |
143 | id => (++$ID) . "\@$NODE", |
144 | id => (++$ID) . "\@$NODE", |
144 | node => $node, |
|
|
145 | port => $port, |
|
|
146 | to => (AE::timer $timeout, 0, sub { |
145 | to => (AE::timer $timeout, 0, sub { |
147 | $cleanup->(); |
146 | $STATE{$id}{done}->(undef, "$node: unable to establish connection within $timeout seconds"); |
148 | kil $port, AnyEvent::MP::DataConn:: => "$node: unable to establish connection within $timeout seconds"; |
|
|
149 | }), |
147 | }), |
150 | success => sub { |
148 | done => sub { |
151 | $cleanup->(); |
149 | my ($hdl, $error) = @_; |
|
|
150 | |
|
|
151 | warn "done<@_>\n";#d# |
|
|
152 | |
|
|
153 | delete $AnyEvent::MP::Global::ON_SETUP{$id}; |
|
|
154 | %{delete $STATE{$id}} = (); |
|
|
155 | |
|
|
156 | if (defined $hdl) { |
152 | snd @localmsg, @_; |
157 | snd @localmsg, $hdl; |
153 | }, |
158 | } else { |
154 | fail => sub { |
|
|
155 | $cleanup->(); |
|
|
156 | kil $port, AnyEvent::MP::DataConn:: => $_[0]; |
159 | kil $port, AnyEvent::MP::DataConn:: => $error; |
|
|
160 | } |
157 | }, |
161 | }, |
158 | }; |
162 | }; |
159 | |
163 | |
160 | if (AnyEvent::MP::Kernel::port_is_local $node) { |
164 | if (AnyEvent::MP::Kernel::port_is_local $node) { |
161 | return kil $port, AnyEvent::MP::DataConn:: => |
165 | return kil $port, AnyEvent::MP::DataConn:: => |
162 | "connect_to does not yet support local/local connections, please bug me about it"; |
166 | "connect_to does not yet support local/local connections, please bug me about it"; |
163 | |
167 | |
164 | } else { |
168 | } else { |
165 | AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_accept:: => $id, $timeout, $initspec; |
169 | AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_expect:: => $id, $timeout, $initspec; |
166 | |
170 | |
167 | $state->{wait} = sub { |
171 | $state->{wait} = sub { |
168 | if (my $addr = $AnyEvent::MP::Global::addr{$node}) { |
172 | if (my $addr = $AnyEvent::MP::Global::addr{$node}) { |
169 | delete $AnyEvent::MP::Global::ON_SETUP{$id}; |
173 | delete $AnyEvent::MP::Global::ON_SETUP{$id}; |
170 | |
174 | |
171 | warn "<@_ @$addr $addr>\n";#d# |
|
|
172 | # continue connect |
175 | # continue connect |
173 | if (@$addr) { |
176 | if (@$addr) { |
174 | # node has listeners, so connect |
177 | # node has listeners, so connect |
175 | _connect $id; |
178 | _connect $id, $node; |
176 | } else { |
179 | } else { |
177 | # no listeners, ask it to connect to us |
180 | # no listeners, ask it to connect to us |
178 | AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id; |
181 | AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id, $node; |
179 | } |
182 | } |
180 | } else { |
183 | } else { |
181 | # wait for the next global setup handshake |
184 | # wait for the next global setup handshake |
182 | $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait}; |
185 | $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait}; |
183 | }; |
186 | }; |