ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.4 by root, Sat Aug 1 15:04:30 2009 UTC vs.
Revision 1.30 by root, Fri Aug 28 20:57:42 2009 UTC

13package AnyEvent::MP::Node; 13package AnyEvent::MP::Node;
14 14
15use common::sense; 15use common::sense;
16 16
17use AE (); 17use AE ();
18use AnyEvent::Util ();
18use AnyEvent::Socket (); 19use AnyEvent::Socket ();
19 20
20use AnyEvent::MP::Transport (); 21use AnyEvent::MP::Transport ();
21 22
22use base Exporter::;
23
24our $VERSION = '0.0';
25
26sub new { 23sub new {
27 my ($class, $noderef) = @_; 24 my ($self, $id) = @_;
28 25
29 bless { noderef => $noderef }, $class 26 $self = bless { id => $id }, $self;
30}
31 27
28 $self->init;
29 $self->transport_reset;
30
31 $self
32}
33
34sub init {
35 #
36}
37
38sub send {
39 &{ shift->{send} }
40}
41
42# nodes reachable via the network
32package AnyEvent::MP::Node::Direct; 43package AnyEvent::MP::Node::External;
33 44
34use base "AnyEvent::MP::Node"; 45use base "AnyEvent::MP::Node";
35 46
36sub send { 47# called at init time, mostly sets {send}
48sub transport_reset {
37 my ($self, $msg) = @_; 49 my ($self) = @_;
38 50
39 if ($self->{transport}) { 51 delete $self->{transport};
40 $self->{transport}->send ($msg); 52
41 } elsif ($self->{queue}) { 53 Scalar::Util::weaken $self;
54
55 $self->{send} = sub {
42 push @{ $self->{queue} }, $msg; 56 push @{$self->{queue}}, shift;
43 } else {
44 $self->{queue} = [$msg];
45 $self->connect; 57 $self->connect;
58 };
59}
60
61# called only after successful handshake
62sub transport_error {
63 my ($self, @reason) = @_;
64
65 my $no_transport = !$self->{transport};
66
67 delete $self->{connect_w};
68 delete $self->{connect_to};
69
70 delete $self->{queue};
71 $self->transport_reset;
72
73 if (my $mon = delete $self->{lmon}) {
74 $_->(@reason) for map @$_, values %$mon;
46 } 75 }
47}
48 76
49sub set_transport { 77 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
78 unless $no_transport;
79}
80
81# called after handshake was successful
82sub transport_connect {
50 my ($self, $transport) = @_; 83 my ($self, $transport) = @_;
51 84
52 delete $self->{trial}; 85 delete $self->{trial};
86
87 $self->transport_error (transport_error => "switched connections")
88 if $self->{transport};
89
53 delete $self->{next_connect}; 90 delete $self->{connect_w};
91 delete $self->{connect_to};
54 92
55 if (
56 exists $self->{remote_uniq}
57 && $self->{remote_uniq} ne $transport->{remote_uniq}
58 ) {
59 # uniq changed, drop queue
60 delete $self->{queue};
61 #TODO: "DOWN"
62 }
63
64 $self->{remote_uniq} = $transport->{remote_uniq};
65 $self->{transport} = $transport; 93 $self->{transport} = $transport;
94
95 my $transport_send = $transport->can ("send");
96
97 $self->{send} = sub {
98 $transport_send->($transport, $_[0]);
99 };
100
101 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
66 102
67 $transport->send ($_) 103 $transport->send ($_)
68 for @{ delete $self->{queue} || [] }; 104 for @{ delete $self->{queue} || [] };
69} 105}
70 106
107sub connect {
108 my ($self, @addresses) = @_;
109
110 return if $self->{transport};
111
112 Scalar::Util::weaken $self;
113
114 $self->{connect_to} ||= AE::timer
115 $AnyEvent::MP::Config::CFG{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT,
116 0,
117 sub {
118 $self->transport_error (transport_error => $self->{id}, "unable to connect");
119 };
120
121 return unless @addresses;
122
123 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]");
124
125 unless ($self->{connect_w}) {
126 my @endpoints;
127
128 $self->{connect_w} = AE::timer
129 rand,
130 $AnyEvent::MP::Config::CFG{connect_interval} || $AnyEvent::MP::Kernel::CONNECT_INTERVAL,
131 sub {
132 @endpoints = @addresses
133 unless @endpoints;
134
135 my $endpoint = shift @endpoints;
136
137 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint");
138
139 $self->{trial}{$endpoint} ||= do {
140 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
141 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference.");
142
143 AnyEvent::MP::Transport::mp_connect
144 $host, $port,
145 sub { delete $self->{trial}{$endpoint} },
146 };
147 }
148 ;
149 }
150}
151
152sub kill {
153 my ($self, $port, @reason) = @_;
154
155 $self->send (["", kil => $port, @reason]);
156}
157
158sub monitor {
159 my ($self, $portid, $cb) = @_;
160
161 my $list = $self->{lmon}{$portid} ||= [];
162
163 $self->send (["", mon1 => $portid])
164 unless @$list || !length $portid;
165
166 push @$list, $cb;
167}
168
169sub unmonitor {
170 my ($self, $portid, $cb) = @_;
171
172 my $list = $self->{lmon}{$portid}
173 or return;
174
175 @$list = grep $_ != $cb, @$list;
176
177 unless (@$list) {
178 $self->send (["", mon0 => $portid]);
179 delete $self->{monitor}{$portid};
180 }
181}
182
183# used for direct slave connections as well
184package AnyEvent::MP::Node::Direct;
185
186use base "AnyEvent::MP::Node::External";
187
188package AnyEvent::MP::Node::Self;
189
190use base "AnyEvent::MP::Node";
191
192sub connect {
193 # we are trivially connected
194}
195
71sub clr_transport { 196sub transport_reset {
72 my ($self) = @_; 197 my ($self) = @_;
73 198
74 delete $self->{transport};
75
76 $self->connect;
77}
78
79sub connect {
80 my ($self) = @_;
81
82 Scalar::Util::weaken $self; 199 Scalar::Util::weaken $self;
83 200
84 unless (exists $self->{n_noderef}) { 201 $self->{send} = sub {
85 (AnyEvent::MP::normalise_noderef ($self->{noderef}))->cb (sub { 202 local $AnyEvent::MP::Kernel::SRCNODE = $self;
86 $self or return; 203 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
87 my $noderef = shift->recv;
88
89 $self->{n_noderef} = $noderef;
90
91 $AnyEvent::MP::NODE{$_} = $self
92 for split /,/, $noderef;
93
94 $self->connect;
95 });
96 return;
97 }
98
99 $self->{retry} ||= [split /,/, $self->{n_noderef}];
100
101 my $endpoint = shift @{ $self->{retry} };
102
103 if (defined $endpoint) {
104 $self->{trial}{$endpoint} ||= do {
105 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
106 or return;
107
108 my ($w, $g);
109
110 $w = AE::timer $AnyEvent::MP::CONNECT_TIMEOUT, 0, sub {
111 delete $self->{trial}{$endpoint};
112 };
113 $g = AnyEvent::MP::Transport::mp_connect
114 $host, $port,
115 sub {
116 delete $self->{trial}{$endpoint}
117 unless @_;
118 $g = shift;
119 };
120 ;
121
122 [$w, \$g]
123 };
124 } else {
125 delete $self->{retry};
126 }
127
128 $self->{next_connect} = AE::timer $AnyEvent::MP::CONNECT_INTERVAL, 0, sub {
129 $self->connect;
130 }; 204 };
131} 205}
132 206
133package AnyEvent::MP::Node::Self; 207sub transport_connect {
208 my ($self, $tp) = @_;
134 209
135use base "AnyEvent::MP::Node"; 210 $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})");
136
137sub set_transport {
138 die "FATAL error, set_transport was called";
139} 211}
140 212
141sub send { 213sub kill {
142 AnyEvent::MP::_inject ($_[1]); 214 my ($self, $port, @reason) = @_;
215
216 delete $AnyEvent::MP::Kernel::PORT{$port};
217 delete $AnyEvent::MP::Kernel::PORT_DATA{$port};
218
219 my $mon = delete $AnyEvent::MP::Kernel::LMON{$port}
220 or !@reason
221 or $AnyEvent::MP::Kernel::WARN->(2, "unmonitored local port $port died with reason: @reason");
222
223 $_->(@reason) for values %$mon;
224}
225
226sub monitor {
227 my ($self, $portid, $cb) = @_;
228
229 return $cb->(no_such_port => "cannot monitor nonexistent port")
230 unless exists $AnyEvent::MP::Kernel::PORT{$portid};
231
232 $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0} = $cb;
233}
234
235sub unmonitor {
236 my ($self, $portid, $cb) = @_;
237
238 delete $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0};
143} 239}
144 240
145=head1 SEE ALSO 241=head1 SEE ALSO
146 242
147L<AnyEvent>. 243L<AnyEvent::MP>.
148 244
149=head1 AUTHOR 245=head1 AUTHOR
150 246
151 Marc Lehmann <schmorp@schmorp.de> 247 Marc Lehmann <schmorp@schmorp.de>
152 http://home.schmorp.de/ 248 http://home.schmorp.de/

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines