ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/DataConn.pm
(Generate patch)

Comparing AnyEvent-MP/MP/DataConn.pm (file contents):
Revision 1.2 by root, Sat Nov 7 02:36:31 2009 UTC vs.
Revision 1.3 by root, Sun Nov 8 01:15:25 2009 UTC

42use AnyEvent::MP::Global (); 42use AnyEvent::MP::Global ();
43 43
44our $ID = "a"; 44our $ID = "a";
45our %STATE; 45our %STATE;
46 46
47sub _accept { 47# another node tells us to await a connection
48sub _expect {
48 my ($id, $timeout, $initspec) = @_; 49 my ($id, $timeout, $initspec) = @_;
49 50
50 my $cleanup = sub {
51 %{delete $STATE{$id}} = ();
52 };
53
54 $STATE{$id} = { 51 $STATE{$id} = {
55 id => $id, 52 id => $id,
56 to => (AE::timer $timeout, 0, $cleanup), 53 to => (AE::timer $timeout, 0, sub {
54 $STATE{$id}{done}->(undef);
55 }),
57 success => sub { 56 done => sub {
57 my ($hdl, $error) = @_;
58
59 %{delete $STATE{$id}} = ();
60
61 if (defined $hdl) {
58 my ($func, @args) = @$initspec; 62 my ($func, @args) = @$initspec;
59 $cleanup->();
60 (AnyEvent::MP::Kernel::load_func $func)->(@args, @_); 63 (AnyEvent::MP::Kernel::load_func $func)->(@args, $hdl);
61 }, 64 }
62 fail => sub {
63 $cleanup->();
64 # nop?
65 }, 65 },
66 }; 66 };
67} 67}
68 68
69# AEMP::Transport call for dataconn-connections
69sub _inject { 70sub _inject {
70 my ($conn, $error) = @_; 71 my ($conn, $error) = @_;
71 72
72 my $hdl = delete $conn->{hdl}; 73 my $hdl = defined $error ? undef : delete $conn->{hdl};
73 die "inject <@_>\n";#d# 74 my $id = $conn->{local_greeting}{dataconn_id} || $conn->{remote_greeting}{dataconn_id}
74} 75 or return;
75 76
77 $conn->destroy;
78
79 ($STATE{$id} or return)->{done}->($hdl, $error);
80}
81
82# actively connect to some other node
76sub _connect { 83sub _connect {
77 my ($id) = @_; 84 my ($id, $node) = @_;
78 85
79 my $state = $STATE{$id} 86 my $state = $STATE{$id}
80 or return; 87 or return;
81 88
82 my $addr = $AnyEvent::MP::Global::addr{$state->{node}}; 89 my $addr = $AnyEvent::MP::Global::addr{$node};
83 90
84 @$addr 91 @$addr
85 or return $state->{fail}("$state->{node}: no listeners found"); 92 or return $state->{fail}("$node: no listeners found");
86 93
87 my %transport;
88
89 # I love hardcoded constants 94 # I love hardcoded constants !
90 $state->{next} = AE::timer 0, 2, sub { 95 $state->{next} = AE::timer 0, 2, sub {
91 my $endpoint = shift @$addr 96 my $endpoint = shift @$addr
92 or return delete $state->{next}; 97 or return delete $state->{next};
93 98
94 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 99 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
95 or return; 100 or return;
96 101
97 my $transport; $transport = AnyEvent::MP::Transport::mp_connect 102 my $transport; $transport = AnyEvent::MP::Transport::mp_connect
98 $host, $port, 103 $host, $port,
99 protocol => "aemp-dataconn", 104 protocol => "aemp-dataconn",
100 sub { $transport->destroy }, 105 local_greeting => { dataconn_id => $id },
106 sub { $transport->destroy }, #TODO: destroys handshaked conenctions too early
101 ; 107 ;
102 }; 108 };
103} 109}
104 110
105=item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], $cb->($handle) 111=item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], $cb->($handle)
131 my $port = $SELF; 137 my $port = $SELF;
132 $node = node_of $node; 138 $node = node_of $node;
133 139
134 my $id = (++$ID) . "\@$NODE"; 140 my $id = (++$ID) . "\@$NODE";
135 141
136 my $cleanup = sub {
137 delete $AnyEvent::MP::Global::ON_SETUP{$id};
138 %{delete $STATE{$id}} = ();
139 };
140
141 # damn, why do my simple state hashes resemble objects so quickly 142 # damn, why do my simple state hashes resemble objects so quickly
142 my $state = $STATE{$id} = { 143 my $state = $STATE{$id} = {
143 id => (++$ID) . "\@$NODE", 144 id => (++$ID) . "\@$NODE",
144 node => $node,
145 port => $port,
146 to => (AE::timer $timeout, 0, sub { 145 to => (AE::timer $timeout, 0, sub {
147 $cleanup->(); 146 $STATE{$id}{done}->(undef, "$node: unable to establish connection within $timeout seconds");
148 kil $port, AnyEvent::MP::DataConn:: => "$node: unable to establish connection within $timeout seconds";
149 }), 147 }),
150 success => sub { 148 done => sub {
151 $cleanup->(); 149 my ($hdl, $error) = @_;
150
151 warn "done<@_>\n";#d#
152
153 delete $AnyEvent::MP::Global::ON_SETUP{$id};
154 %{delete $STATE{$id}} = ();
155
156 if (defined $hdl) {
152 snd @localmsg, @_; 157 snd @localmsg, $hdl;
153 }, 158 } else {
154 fail => sub {
155 $cleanup->();
156 kil $port, AnyEvent::MP::DataConn:: => $_[0]; 159 kil $port, AnyEvent::MP::DataConn:: => $error;
160 }
157 }, 161 },
158 }; 162 };
159 163
160 if (AnyEvent::MP::Kernel::port_is_local $node) { 164 if (AnyEvent::MP::Kernel::port_is_local $node) {
161 return kil $port, AnyEvent::MP::DataConn:: => 165 return kil $port, AnyEvent::MP::DataConn:: =>
162 "connect_to does not yet support local/local connections, please bug me about it"; 166 "connect_to does not yet support local/local connections, please bug me about it";
163 167
164 } else { 168 } else {
165 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_accept:: => $id, $timeout, $initspec; 169 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_expect:: => $id, $timeout, $initspec;
166 170
167 $state->{wait} = sub { 171 $state->{wait} = sub {
168 if (my $addr = $AnyEvent::MP::Global::addr{$node}) { 172 if (my $addr = $AnyEvent::MP::Global::addr{$node}) {
169 delete $AnyEvent::MP::Global::ON_SETUP{$id}; 173 delete $AnyEvent::MP::Global::ON_SETUP{$id};
170 174
171 warn "<@_ @$addr $addr>\n";#d#
172 # continue connect 175 # continue connect
173 if (@$addr) { 176 if (@$addr) {
174 # node has listeners, so connect 177 # node has listeners, so connect
175 _connect $id; 178 _connect $id, $node;
176 } else { 179 } else {
177 # no listeners, ask it to connect to us 180 # no listeners, ask it to connect to us
178 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id; 181 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id, $node;
179 } 182 }
180 } else { 183 } else {
181 # wait for the next global setup handshake 184 # wait for the next global setup handshake
182 $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait}; 185 $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait};
183 }; 186 };

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines