… | |
… | |
19 | use AnyEvent::Socket (); |
19 | use AnyEvent::Socket (); |
20 | |
20 | |
21 | use AnyEvent::MP::Transport (); |
21 | use AnyEvent::MP::Transport (); |
22 | |
22 | |
23 | sub new { |
23 | sub new { |
24 | my ($self, $noderef) = @_; |
24 | my ($self, $id) = @_; |
25 | |
25 | |
26 | $self = bless { noderef => $noderef }, $self; |
26 | $self = bless { id => $id }, $self; |
27 | |
27 | |
28 | $self->init; |
28 | $self->init; |
29 | $self->transport_reset; |
29 | $self->transport_reset; |
30 | |
30 | |
31 | $self |
31 | $self |
… | |
… | |
83 | |
83 | |
84 | # called after handshake was successful |
84 | # called after handshake was successful |
85 | sub transport_connect { |
85 | sub transport_connect { |
86 | my ($self, $transport) = @_; |
86 | my ($self, $transport) = @_; |
87 | |
87 | |
|
|
88 | delete $self->{trial}; |
|
|
89 | |
88 | $self->transport_error (transport_error => "switched connections") |
90 | $self->transport_error (transport_error => "switched connections") |
89 | if $self->{transport}; |
91 | if $self->{transport}; |
90 | |
92 | |
91 | delete $self->{connect_w}; |
93 | delete $self->{connect_w}; |
92 | delete $self->{connect_to}; |
94 | delete $self->{connect_to}; |
… | |
… | |
104 | $transport->send ($_) |
106 | $transport->send ($_) |
105 | for @{ delete $self->{queue} || [] }; |
107 | for @{ delete $self->{queue} || [] }; |
106 | } |
108 | } |
107 | |
109 | |
108 | sub connect { |
110 | sub connect { |
109 | my ($self) = @_; |
111 | my ($self, @addresses) = @_; |
110 | |
112 | |
111 | return if $self->{transport}; |
113 | return if $self->{transport}; |
112 | |
|
|
113 | # just ignore connect requests for slave nodes - let's hope it connects to us instead |
|
|
114 | return if $self->{noderef} =~ /^slave\//; |
|
|
115 | |
114 | |
116 | Scalar::Util::weaken $self; |
115 | Scalar::Util::weaken $self; |
117 | |
116 | |
118 | $self->{connect_to} ||= AE::timer |
117 | $self->{connect_to} ||= AE::timer |
119 | $AnyEvent::MP::Config::CFG{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, |
118 | $AnyEvent::MP::Config::CFG{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, |
120 | 0, |
119 | 0, |
121 | sub { |
120 | sub { |
122 | $self->transport_error (transport_error => $self->{noderef}, "unable to connect"); |
121 | $self->transport_error (transport_error => $self->{id}, "unable to connect"); |
123 | }; |
122 | }; |
|
|
123 | |
|
|
124 | return unless @addresses; |
|
|
125 | |
|
|
126 | $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]"); |
124 | |
127 | |
125 | unless ($self->{connect_w}) { |
128 | unless ($self->{connect_w}) { |
126 | my @endpoints; |
129 | my @endpoints; |
127 | my %trial; |
|
|
128 | |
130 | |
129 | $self->{connect_w} = AE::timer |
131 | $self->{connect_w} = AE::timer |
130 | rand, |
132 | rand, |
131 | $AnyEvent::MP::Config::CFG{connect_interval} || $AnyEvent::MP::Kernel::CONNECT_INTERVAL, |
133 | $AnyEvent::MP::Config::CFG{connect_interval} || $AnyEvent::MP::Kernel::CONNECT_INTERVAL, |
132 | sub { |
134 | sub { |
133 | @endpoints = split /,/, $self->{noderef} |
135 | @endpoints = @addresses |
134 | unless @endpoints; |
136 | unless @endpoints; |
135 | |
137 | |
136 | my $endpoint = shift @endpoints; |
138 | my $endpoint = shift @endpoints; |
137 | |
139 | |
|
|
140 | $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint"); |
|
|
141 | |
138 | $trial{$endpoint} ||= do { |
142 | $self->{trial}{$endpoint} ||= do { |
139 | my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint |
143 | my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint |
140 | or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{noderef}: not a resolved node reference."); |
144 | or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference."); |
141 | |
145 | |
142 | AnyEvent::MP::Transport::mp_connect |
146 | AnyEvent::MP::Transport::mp_connect |
143 | $host, $port, |
147 | $host, $port, |
144 | sub { delete $trial{$endpoint} } |
148 | sub { delete $self->{trial}{$endpoint} }, |
145 | ; |
|
|
146 | }; |
149 | }; |
147 | } |
150 | } |
148 | ; |
151 | ; |
149 | } |
152 | } |
150 | } |
153 | } |
… | |
… | |
202 | local $AnyEvent::MP::Kernel::SRCNODE = $self; |
205 | local $AnyEvent::MP::Kernel::SRCNODE = $self; |
203 | AnyEvent::MP::Kernel::_inject (@{ $_[0] }); |
206 | AnyEvent::MP::Kernel::_inject (@{ $_[0] }); |
204 | }; |
207 | }; |
205 | } |
208 | } |
206 | |
209 | |
|
|
210 | sub transport_connect { |
|
|
211 | my ($self, $tp) = @_; |
|
|
212 | |
|
|
213 | $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"); |
|
|
214 | } |
|
|
215 | |
207 | sub kill { |
216 | sub kill { |
208 | my ($self, $port, @reason) = @_; |
217 | my ($self, $port, @reason) = @_; |
209 | |
218 | |
210 | delete $AnyEvent::MP::Kernel::PORT{$port}; |
219 | delete $AnyEvent::MP::Kernel::PORT{$port}; |
211 | delete $AnyEvent::MP::Kernel::PORT_DATA{$port}; |
220 | delete $AnyEvent::MP::Kernel::PORT_DATA{$port}; |