ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/DataConn.pm
(Generate patch)

Comparing AnyEvent-MP/MP/DataConn.pm (file contents):
Revision 1.1 by root, Thu Nov 5 22:44:56 2009 UTC vs.
Revision 1.2 by root, Sat Nov 7 02:36:31 2009 UTC

36 36
37use AnyEvent (); 37use AnyEvent ();
38use AnyEvent::Util (); 38use AnyEvent::Util ();
39 39
40use AnyEvent::MP; 40use AnyEvent::MP;
41use AnyEvent::MP::Kernel; 41use AnyEvent::MP::Kernel ();
42use AnyEvent::MP::Global ();
43
44our $ID = "a";
45our %STATE;
46
47sub _accept {
48 my ($id, $timeout, $initspec) = @_;
49
50 my $cleanup = sub {
51 %{delete $STATE{$id}} = ();
52 };
53
54 $STATE{$id} = {
55 id => $id,
56 to => (AE::timer $timeout, 0, $cleanup),
57 success => sub {
58 my ($func, @args) = @$initspec;
59 $cleanup->();
60 (AnyEvent::MP::Kernel::load_func $func)->(@args, @_);
61 },
62 fail => sub {
63 $cleanup->();
64 # nop?
65 },
66 };
67}
42 68
43sub _inject { 69sub _inject {
44 my ($conn, $error) = @_; 70 my ($conn, $error) = @_;
45}
46 71
72 my $hdl = delete $conn->{hdl};
73 die "inject <@_>\n";#d#
74}
75
76sub _connect {
77 my ($id) = @_;
78
79 my $state = $STATE{$id}
80 or return;
81
82 my $addr = $AnyEvent::MP::Global::addr{$state->{node}};
83
84 @$addr
85 or return $state->{fail}("$state->{node}: no listeners found");
86
87 my %transport;
88
89 # I love hardcoded constants
90 $state->{next} = AE::timer 0, 2, sub {
91 my $endpoint = shift @$addr
92 or return delete $state->{next};
93
94 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
95 or return;
96
97 my $transport; $transport = AnyEvent::MP::Transport::mp_connect
98 $host, $port,
99 protocol => "aemp-dataconn",
100 sub { $transport->destroy },
101 ;
102 };
103}
104
47=item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], @localmsg 105=item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], $cb->($handle)
48 106
49Creates a socket connection between the local node and the node C<$node> 107Creates a socket connection between the local node and the node C<$node>
50(which can also be specified as a port). 108(which can also be specified as a port).
51 109
52When the connection could be successfully created, the C<$initfunc> 110When the connection could be successfully created, the C<$initfunc>
53will be called with the given C<@initdata> on the remote node (similar 111will be called with the given C<@initdata> on the remote node (similar
54to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object 112to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object
55representing the connection as additional argument. 113representing the remote connection end as additional argument.
56 114
57Also, the local port in which context C<connect_to> was called (C<$SELF>) 115Also, the callback given as last argument will be called with the
58will receive a message with contents C<@localmsg>, again with the
59AnyEvent::Handle object tacked to the end. 116AnyEvent::Handle object for the local side.
60 117
61The AnyEvent::Handle object will be in a "quiescent" state - you could rip 118The AnyEvent::Handle objects will be in a "quiescent" state - you could rip
62out the file handle and forget about it, but it is recommended to use it, 119out the file handle and forget about it, but it is recommended to use it,
63as the security settings might have called for a TLS connection. IF you 120as the security settings might have called for a TLS connection. If you
64opt to use it, you at least have to set an C<on_error> callback. 121opt to use it, you at least have to set an C<on_error> callback.
65 122
66In case of any error (or timeout), nothing will be called on the 123In case of any error (timeout etc.), nothing will be called on
67remote side, and the local port will be C<kil>'ed with an C<< 124the remote side, and the local port will be C<kil>'ed with an C<<
68AnyEvent::MP::DataConn => "error message" >> kill reason. 125AnyEvent::MP::DataConn => "error message" >> kill reason.
69 126
70=cut 127=cut
71 128
72sub connect_to($$$;@) { 129sub connect_to($$$;@) {
73 my ($node, $timeout, $initspec, @localmsg) = @_; 130 my ($node, $timeout, $initspec, @localmsg) = @_;
74 my $port = $SELF; 131 my $port = $SELF;
75 $node = node_of $node; 132 $node = node_of $node;
76 133
77 my %state; 134 my $id = (++$ID) . "\@$NODE";
78 135
136 my $cleanup = sub {
137 delete $AnyEvent::MP::Global::ON_SETUP{$id};
138 %{delete $STATE{$id}} = ();
139 };
140
141 # damn, why do my simple state hashes resemble objects so quickly
142 my $state = $STATE{$id} = {
143 id => (++$ID) . "\@$NODE",
144 node => $node,
145 port => $port,
79 $state{to} = AE::timer $timeout, 0, sub { 146 to => (AE::timer $timeout, 0, sub {
80 %state = (); 147 $cleanup->();
81 kil $port, AnyEvent::MP::DataConn:: => "$node: unable to establish connection within $timeout seconds"; 148 kil $port, AnyEvent::MP::DataConn:: => "$node: unable to establish connection within $timeout seconds";
149 }),
150 success => sub {
151 $cleanup->();
152 snd @localmsg, @_;
153 },
154 fail => sub {
155 $cleanup->();
156 kil $port, AnyEvent::MP::DataConn:: => $_[0];
157 },
82 }; 158 };
83 159
84 my $continue_connect = sub { 160 if (AnyEvent::MP::Kernel::port_is_local $node) {
85 use Data::Dumper;
86 warn Dumper $NODE{$node};#d#
87 };
88
89 if (port_is_local $node) {
90 return kil $port, AnyEvent::MP::DataConn:: => 161 return kil $port, AnyEvent::MP::DataConn:: =>
91 "connect_to does not yet support local/local connections, please bug me about it"; 162 "connect_to does not yet support local/local connections, please bug me about it";
92 } elsif (node_is_up $node) { 163
93 $continue_connect->();
94 } else { 164 } else {
95 $state{mon_nodes} = mon_nodes sub { 165 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_accept:: => $id, $timeout, $initspec;
96 return unless $_[0] eq $node && $_[1]; 166
97 delete $state{mon_nodes}; 167 $state->{wait} = sub {
168 if (my $addr = $AnyEvent::MP::Global::addr{$node}) {
169 delete $AnyEvent::MP::Global::ON_SETUP{$id};
170
171 warn "<@_ @$addr $addr>\n";#d#
98 $continue_connect->() 172 # continue connect
173 if (@$addr) {
174 # node has listeners, so connect
175 _connect $id;
176 } else {
177 # no listeners, ask it to connect to us
178 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id;
179 }
180 } else {
181 # wait for the next global setup handshake
182 $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait};
183 };
99 }; 184 };
185
186 $state->{wait}->();
100 } 187 }
101} 188}
102 189
103=back 190=back
104 191

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines