… | |
… | |
44 | our $ID = "a"; |
44 | our $ID = "a"; |
45 | our %STATE; |
45 | our %STATE; |
46 | |
46 | |
47 | # another node tells us to await a connection |
47 | # another node tells us to await a connection |
48 | sub _expect { |
48 | sub _expect { |
49 | my ($id, $port, $timeout, $initspec) = @_; |
49 | my ($id, $port, $timeout, $initfunc, @initdata) = @_; |
50 | |
50 | |
51 | $STATE{$id} = { |
51 | $STATE{$id} = { |
52 | id => $id, |
52 | id => $id, |
53 | to => (AE::timer $timeout, 0, sub { |
53 | to => (AE::timer $timeout, 0, sub { |
54 | $STATE{$id}{done}(undef); |
54 | $STATE{$id}{done}(undef); |
… | |
… | |
57 | my ($hdl, $error) = @_; |
57 | my ($hdl, $error) = @_; |
58 | |
58 | |
59 | %{delete $STATE{$id}} = (); |
59 | %{delete $STATE{$id}} = (); |
60 | |
60 | |
61 | if (defined $hdl) { |
61 | if (defined $hdl) { |
62 | my ($func, @args) = @$initspec; |
|
|
63 | (AnyEvent::MP::Kernel::load_func $func)->(@args, $hdl); |
62 | (AnyEvent::MP::Kernel::load_func $initfunc)->(@initdata, $hdl); |
64 | } else { |
63 | } else { |
65 | kil $port, AnyEvent::MP::DataConn:: => $error; |
64 | kil $port, AnyEvent::MP::DataConn:: => $error; |
66 | } |
65 | } |
67 | }, |
66 | }, |
68 | }; |
67 | }; |
… | |
… | |
108 | sub { $transport->destroy }, #TODO: destroys handshaked conenctions too early |
107 | sub { $transport->destroy }, #TODO: destroys handshaked conenctions too early |
109 | ; |
108 | ; |
110 | }; |
109 | }; |
111 | } |
110 | } |
112 | |
111 | |
113 | =item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], $cb->($handle) |
112 | =item AnyEvent::MP::DataConn::connect_to $node, $timeout, $initfunc, @initdata, $cb->($handle) |
114 | |
113 | |
115 | Creates a socket connection between the local node and the node C<$node> |
114 | Creates a socket connection between the local node and the node C<$node> |
116 | (which can also be specified as a port). One of the nodes must have |
115 | (which can also be specified as a port). One of the nodes must have |
117 | listening ports ("binds"). |
116 | listening ports ("binds"). |
118 | |
117 | |
… | |
… | |
138 | |
137 | |
139 | Example: on node1, establish a connection to node2 and send a line of text, |
138 | Example: on node1, establish a connection to node2 and send a line of text, |
140 | one node2, provide a receiver function. |
139 | one node2, provide a receiver function. |
141 | |
140 | |
142 | # node1, code executes in some port context |
141 | # node1, code executes in some port context |
143 | AnyEvent::MP::DataConn::connect_to "node2", 5, ["pkg::receiver", 1], sub { |
142 | AnyEvent::MP::DataConn::connect_to "node2", 5, "pkg::receiver", 1, sub { |
144 | my ($hdl) = @_; |
143 | my ($hdl) = @_; |
145 | warn "connection established, sending line.\n" |
144 | warn "connection established, sending line.\n" |
146 | $hdl->push_write ("blabla\n") |
145 | $hdl->push_write ("blabla\n") |
147 | }; |
146 | }; |
148 | |
147 | |
… | |
… | |
157 | }); |
156 | }); |
158 | } |
157 | } |
159 | |
158 | |
160 | =cut |
159 | =cut |
161 | |
160 | |
162 | sub connect_to($$$;@) { |
161 | sub connect_to($$$$@) { |
|
|
162 | my $cb = pop; |
163 | my ($node, $timeout, $initspec, $cb) = @_; |
163 | my ($node, $timeout, $initfunc, @initdata) = @_; |
164 | my $port = $SELF; |
164 | my $port = $SELF; |
165 | $node = node_of $node; |
165 | $node = node_of $node; |
166 | |
166 | |
167 | my $id = (++$ID) . "\@$NODE"; |
167 | my $id = (++$ID) . "\@$NODE"; |
168 | |
168 | |
… | |
… | |
189 | if (AnyEvent::MP::Kernel::port_is_local $node) { |
189 | if (AnyEvent::MP::Kernel::port_is_local $node) { |
190 | return kil $port, AnyEvent::MP::DataConn:: => |
190 | return kil $port, AnyEvent::MP::DataConn:: => |
191 | "connect_to does not yet support local/local connections, please bug me about it"; |
191 | "connect_to does not yet support local/local connections, please bug me about it"; |
192 | |
192 | |
193 | } else { |
193 | } else { |
194 | AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_expect:: => $id, $port, $timeout, $initspec; |
194 | AnyEvent::MP::Kernel::snd_to_func $node, |
|
|
195 | AnyEvent::MP::DataConn::_expect:: => $id, $port, $timeout, $initfunc, @initdata; |
195 | |
196 | |
196 | $state->{wait} = sub { |
197 | $state->{wait} = sub { |
197 | if (my $addr = $AnyEvent::MP::Global::addr{$node}) { |
198 | if (my $addr = $AnyEvent::MP::Global::addr{$node}) { |
198 | delete $AnyEvent::MP::Global::ON_SETUP{$id}; |
199 | delete $AnyEvent::MP::Global::ON_SETUP{$id}; |
199 | |
200 | |