ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/DataConn.pm
(Generate patch)

Comparing AnyEvent-MP/MP/DataConn.pm (file contents):
Revision 1.3 by root, Sun Nov 8 01:15:25 2009 UTC vs.
Revision 1.4 by root, Sun Nov 8 23:49:40 2009 UTC

44our $ID = "a"; 44our $ID = "a";
45our %STATE; 45our %STATE;
46 46
47# another node tells us to await a connection 47# another node tells us to await a connection
48sub _expect { 48sub _expect {
49 my ($id, $timeout, $initspec) = @_; 49 my ($id, $port, $timeout, $initspec) = @_;
50 50
51 $STATE{$id} = { 51 $STATE{$id} = {
52 id => $id, 52 id => $id,
53 to => (AE::timer $timeout, 0, sub { 53 to => (AE::timer $timeout, 0, sub {
54 $STATE{$id}{done}->(undef); 54 $STATE{$id}{done}(undef);
55 }), 55 }),
56 done => sub { 56 done => sub {
57 my ($hdl, $error) = @_; 57 my ($hdl, $error) = @_;
58 58
59 %{delete $STATE{$id}} = (); 59 %{delete $STATE{$id}} = ();
60 60
61 if (defined $hdl) { 61 if (defined $hdl) {
62 my ($func, @args) = @$initspec; 62 my ($func, @args) = @$initspec;
63 (AnyEvent::MP::Kernel::load_func $func)->(@args, $hdl); 63 (AnyEvent::MP::Kernel::load_func $func)->(@args, $hdl);
64 } else {
65 kil $port, AnyEvent::MP::DataConn:: => $error;
64 } 66 }
65 }, 67 },
66 }; 68 };
67} 69}
68 70
74 my $id = $conn->{local_greeting}{dataconn_id} || $conn->{remote_greeting}{dataconn_id} 76 my $id = $conn->{local_greeting}{dataconn_id} || $conn->{remote_greeting}{dataconn_id}
75 or return; 77 or return;
76 78
77 $conn->destroy; 79 $conn->destroy;
78 80
79 ($STATE{$id} or return)->{done}->($hdl, $error); 81 ($STATE{$id} or return)->{done}($hdl, $error);
80} 82}
81 83
82# actively connect to some other node 84# actively connect to some other node
83sub _connect { 85sub _connect {
84 my ($id, $node) = @_; 86 my ($id, $node) = @_;
87 or return; 89 or return;
88 90
89 my $addr = $AnyEvent::MP::Global::addr{$node}; 91 my $addr = $AnyEvent::MP::Global::addr{$node};
90 92
91 @$addr 93 @$addr
92 or return $state->{fail}("$node: no listeners found"); 94 or return $state->{done}(undef, "$node: no listeners found");
93 95
94 # I love hardcoded constants ! 96 # I love hardcoded constants !
95 $state->{next} = AE::timer 0, 2, sub { 97 $state->{next} = AE::timer 0, 2, sub {
96 my $endpoint = shift @$addr 98 my $endpoint = shift @$addr
97 or return delete $state->{next}; 99 or return delete $state->{next};
109} 111}
110 112
111=item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], $cb->($handle) 113=item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], $cb->($handle)
112 114
113Creates a socket connection between the local node and the node C<$node> 115Creates a socket connection between the local node and the node C<$node>
114(which can also be specified as a port). 116(which can also be specified as a port). One of the nodes must have
117listening ports ("binds").
115 118
116When the connection could be successfully created, the C<$initfunc> 119When the connection could be successfully created, the C<$initfunc>
117will be called with the given C<@initdata> on the remote node (similar 120will be called with the given C<@initdata> on the remote node (similar
118to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object 121to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object
119representing the remote connection end as additional argument. 122representing the remote connection end as additional argument.
128 131
129In case of any error (timeout etc.), nothing will be called on 132In case of any error (timeout etc.), nothing will be called on
130the remote side, and the local port will be C<kil>'ed with an C<< 133the remote side, and the local port will be C<kil>'ed with an C<<
131AnyEvent::MP::DataConn => "error message" >> kill reason. 134AnyEvent::MP::DataConn => "error message" >> kill reason.
132 135
136The timeout should be large enough to cover at least four network
137round-trips and one message round-trip.
138
139Example: on node1, establish a connection to node2 and send a line of text,
140one node2, provide a receiver function.
141
142 # node1, code executes in some port context
143 AnyEvent::MP::DataConn::connect_to "node2", 5, ["pkg::receiver", 1], sub {
144 my ($hdl) = @_;
145 warn "connection established, sending line.\n"
146 $hdl->push_write ("blabla\n")
147 };
148
149 # node2
150 sub pkg::receiver {
151 my ($one, $hdl) = @_;
152 warn "connection established, wait for a line...\n"
153
154 $hdl->push_read (line => sub {
155 warn "received a line: $_[1]\n";
156 undef $hdl;
157 });
158 }
159
133=cut 160=cut
134 161
135sub connect_to($$$;@) { 162sub connect_to($$$;@) {
136 my ($node, $timeout, $initspec, @localmsg) = @_; 163 my ($node, $timeout, $initspec, $cb) = @_;
137 my $port = $SELF; 164 my $port = $SELF;
138 $node = node_of $node; 165 $node = node_of $node;
139 166
140 my $id = (++$ID) . "\@$NODE"; 167 my $id = (++$ID) . "\@$NODE";
141 168
142 # damn, why do my simple state hashes resemble objects so quickly 169 # damn, why do my simple state hashes resemble objects so quickly
143 my $state = $STATE{$id} = { 170 my $state = $STATE{$id} = {
144 id => (++$ID) . "\@$NODE", 171 id => (++$ID) . "\@$NODE",
145 to => (AE::timer $timeout, 0, sub { 172 to => (AE::timer $timeout, 0, sub {
146 $STATE{$id}{done}->(undef, "$node: unable to establish connection within $timeout seconds"); 173 $STATE{$id}{done}(undef, "$node: unable to establish connection within $timeout seconds");
147 }), 174 }),
148 done => sub { 175 done => sub {
149 my ($hdl, $error) = @_; 176 my ($hdl, $error) = @_;
150 177
151 warn "done<@_>\n";#d#
152
153 delete $AnyEvent::MP::Global::ON_SETUP{$id}; 178 delete $AnyEvent::MP::Global::ON_SETUP{$id};
154 %{delete $STATE{$id}} = (); 179 %{delete $STATE{$id}} = ();
155 180
156 if (defined $hdl) { 181 if (defined $hdl) {
157 snd @localmsg, $hdl; 182 $cb->($hdl);
158 } else { 183 } else {
159 kil $port, AnyEvent::MP::DataConn:: => $error; 184 kil $port, AnyEvent::MP::DataConn:: => $error;
160 } 185 }
161 }, 186 },
162 }; 187 };
164 if (AnyEvent::MP::Kernel::port_is_local $node) { 189 if (AnyEvent::MP::Kernel::port_is_local $node) {
165 return kil $port, AnyEvent::MP::DataConn:: => 190 return kil $port, AnyEvent::MP::DataConn:: =>
166 "connect_to does not yet support local/local connections, please bug me about it"; 191 "connect_to does not yet support local/local connections, please bug me about it";
167 192
168 } else { 193 } else {
169 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_expect:: => $id, $timeout, $initspec; 194 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_expect:: => $id, $port, $timeout, $initspec;
170 195
171 $state->{wait} = sub { 196 $state->{wait} = sub {
172 if (my $addr = $AnyEvent::MP::Global::addr{$node}) { 197 if (my $addr = $AnyEvent::MP::Global::addr{$node}) {
173 delete $AnyEvent::MP::Global::ON_SETUP{$id}; 198 delete $AnyEvent::MP::Global::ON_SETUP{$id};
174 199
176 if (@$addr) { 201 if (@$addr) {
177 # node has listeners, so connect 202 # node has listeners, so connect
178 _connect $id, $node; 203 _connect $id, $node;
179 } else { 204 } else {
180 # no listeners, ask it to connect to us 205 # no listeners, ask it to connect to us
181 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id, $node; 206 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id, $NODE;
182 } 207 }
183 } else { 208 } else {
184 # wait for the next global setup handshake 209 # wait for the next global setup handshake
210 # due to the round-trip at the beginning, this should never be necessary
185 $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait}; 211 $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait};
186 }; 212 };
187 }; 213 };
188 214
189 $state->{wait}->(); 215 # we actually have to make sure that the connection arrives after the expect message, and
216 # the easiest way to do this is to use an rpc call.
217 AnyEvent::MP::Kernel::snd_on $node, port { $state->{wait}() };
190 } 218 }
191} 219}
192 220
193=back 221=back
194 222

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines