ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.18 by root, Wed Aug 12 21:39:59 2009 UTC vs.
Revision 1.26 by root, Sat Aug 15 15:13:04 2009 UTC

29 29
30 $self 30 $self
31} 31}
32 32
33sub send { 33sub send {
34 &{$_[0]->{send}} 34 &{ shift->{send} }
35} 35}
36 36
37package AnyEvent::MP::Node::External; 37package AnyEvent::MP::Node::External;
38 38
39use base "AnyEvent::MP::Node"; 39use base "AnyEvent::MP::Node";
42sub transport_reset { 42sub transport_reset {
43 my ($self) = @_; 43 my ($self) = @_;
44 44
45 delete $self->{transport}; 45 delete $self->{transport};
46 46
47 Scalar::Util::weaken $self;
48
47 $self->{send} = sub { 49 $self->{send} = sub {
48 push @{$_[0]{queue}}, $_[1]; 50 push @{$self->{queue}}, shift;
49 $_[0]->connect; 51 $self->connect;
50 }; 52 };
53
54 $self->connect
55 if $self->{autoconnect};
51} 56}
52 57
53# called only after successful handshake 58# called only after successful handshake
54sub transport_error { 59sub transport_error {
55 my ($self, @reason) = @_; 60 my ($self, @reason) = @_;
56 61
62 my $no_transport = !$self->{transport};
63
64 delete $self->{connect_w};
65 delete $self->{connect_to};
66
57 delete $self->{queue}; 67 delete $self->{queue};
58 $self->transport_reset; 68 $self->transport_reset;
69
70 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
71 unless $no_transport;
59 72
60 if (my $mon = delete $self->{lmon}) { 73 if (my $mon = delete $self->{lmon}) {
61 $_->(@reason) for map @$_, values %$mon; 74 $_->(@reason) for map @$_, values %$mon;
62 } 75 }
63} 76}
64 77
65# called after handshake was successful 78# called after handshake was successful
66sub transport_connect { 79sub transport_connect {
67 my ($self, $transport) = @_; 80 my ($self, $transport) = @_;
68 81
69 $self->transport_reset 82 # first connect with a master node
83 $AnyEvent::MP::Kernel::SLAVE->($self)
84 if ref $AnyEvent::MP::Kernel::SLAVE;
85
86 $self->transport_error (transport_error => "switched connections")
70 if $self->{transport}; 87 if $self->{transport};
71 88
72 delete $self->{connect_w}; 89 delete $self->{connect_w};
73 delete $self->{connect_to}; 90 delete $self->{connect_to};
74 91
75 $self->{transport} = $transport; 92 $self->{transport} = $transport;
76 93
94 my $transport_send = $transport->can ("send");
95
77 $self->{send} = sub { 96 $self->{send} = sub {
78 $transport->send ($_[1]); 97 $transport_send->($transport, $_[0]);
79 }; 98 };
99
100 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
80 101
81 $transport->send ($_) 102 $transport->send ($_)
82 for @{ delete $self->{queue} || [] }; 103 for @{ delete $self->{queue} || [] };
83} 104}
84 105
85sub connect { 106sub connect {
86 my ($self) = @_; 107 my ($self) = @_;
108
109 return if $self->{transport};
87 110
88 Scalar::Util::weaken $self; 111 Scalar::Util::weaken $self;
89 112
90 $self->{connect_to} ||= AE::timer 113 $self->{connect_to} ||= AE::timer
91 $AnyEvent::MP::Config::CFG{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, 114 $AnyEvent::MP::Config::CFG{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT,
97 unless ($self->{connect_w}) { 120 unless ($self->{connect_w}) {
98 my @endpoints; 121 my @endpoints;
99 my %trial; 122 my %trial;
100 123
101 $self->{connect_w} = AE::timer 124 $self->{connect_w} = AE::timer
102 0, 125 rand,
103 $AnyEvent::MP::Config::CFG{connect_interval} || $AnyEvent::MP::Kernel::CONNECT_INTERVAL, 126 $AnyEvent::MP::Config::CFG{connect_interval} || $AnyEvent::MP::Kernel::CONNECT_INTERVAL,
104 sub { 127 sub {
105 @endpoints = split /,/, $self->{noderef} 128 @endpoints = split /,/, $self->{noderef}
106 unless @endpoints; 129 unless @endpoints;
107 130
108 my $endpoint = shift @endpoints; 131 my $endpoint = shift @endpoints;
109 132
110 $trial{$endpoint} ||= do { 133 $trial{$endpoint} ||= do {
111 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 134 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
112 or return $AnyEvent::MP::Kernel::WARN->("$self->{noderef}: not a resolved node reference."); 135 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{noderef}: not a resolved node reference.");
113 136
114 AnyEvent::MP::Transport::mp_connect 137 AnyEvent::MP::Transport::mp_connect
115 $host, $port, 138 $host, $port,
116 sub { delete $trial{$endpoint} } 139 sub { delete $trial{$endpoint} }
117 ; 140 ;
131 my ($self, $portid, $cb) = @_; 154 my ($self, $portid, $cb) = @_;
132 155
133 my $list = $self->{lmon}{$portid} ||= []; 156 my $list = $self->{lmon}{$portid} ||= [];
134 157
135 $self->send (["", mon1 => $portid]) 158 $self->send (["", mon1 => $portid])
136 unless @$list; 159 unless @$list || !length $portid;
137 160
138 push @$list, $cb; 161 push @$list, $cb;
139} 162}
140 163
141sub unmonitor { 164sub unmonitor {
158 181
159package AnyEvent::MP::Node::Indirect; 182package AnyEvent::MP::Node::Indirect;
160 183
161use base "AnyEvent::MP::Node::Direct"; 184use base "AnyEvent::MP::Node::Direct";
162 185
186sub master {
187 my ($self) = @_;
188
189 my (undef, $master) = split /\@/, $self->{noderef}, 2;
190 $master =~ s/!/,/g;
191 $master
192}
193
194sub transport_reset {
195 my ($self) = @_;
196
197 if ($self->{transport}) {
198 # as an optimisation, immediately nuke slave nodes
199 delete $AnyEvent::MP::Kernel::NODE{$self->{noderef}};
200 } else {
201 $self->SUPER::transport_reset;
202 return;#d##TODO#
203
204 my $noderef = $self->{noderef};
205 my $master = $self->master;
206
207 # slave nodes are so cool - we can always send to them :)
208
209 $self->{send} = sub {
210 $self->connect;
211 snd $master, snd => $noderef, @_;
212 };
213 }
214}
215
163sub connect { 216sub connect {
164 my ($self) = @_; 217 my ($self) = @_;
165 218
166 $self->transport_error (transport_error => $self->{noderef}, "unable to connect to indirect node"); 219 #TODO#
220# # ask for a connection, #TODO# rate-limit this somehow
221# snd $self->master, relay => $self->{noderef}, connect_node => $AnyEvent::MP::Kernel::NODE;
167} 222}
168 223
169package AnyEvent::MP::Node::Self; 224package AnyEvent::MP::Node::Self;
170 225
171use base "AnyEvent::MP::Node"; 226use base "AnyEvent::MP::Node";
172 227
228sub connect {
229 # we are trivially connected
230}
231
173sub transport_reset { 232sub transport_reset {
174 my ($self) = @_; 233 my ($self) = @_;
175 234
235 Scalar::Util::weaken $self;
236
176 $self->{send} = sub { 237 $self->{send} = sub {
177 local $AnyEvent::MP::Kernel::SRCNODE = $_[0]; 238 local $AnyEvent::MP::Kernel::SRCNODE = $self;
178 AnyEvent::MP::Kernel::_inject (@{ $_[1] }); 239 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
179 }; 240 };
180} 241}
181 242
182sub kill { 243sub kill {
183 my ($self, $port, @reason) = @_; 244 my ($self, $port, @reason) = @_;
185 delete $AnyEvent::MP::Kernel::PORT{$port}; 246 delete $AnyEvent::MP::Kernel::PORT{$port};
186 delete $AnyEvent::MP::Kernel::PORT_DATA{$port}; 247 delete $AnyEvent::MP::Kernel::PORT_DATA{$port};
187 248
188 my $mon = delete $AnyEvent::MP::Kernel::LMON{$port} 249 my $mon = delete $AnyEvent::MP::Kernel::LMON{$port}
189 or !@reason 250 or !@reason
190 or $AnyEvent::MP::Kernel::WARN->("unmonitored local port $port died with reason: @reason"); 251 or $AnyEvent::MP::Kernel::WARN->(2, "unmonitored local port $port died with reason: @reason");
191 252
192 $_->(@reason) for values %$mon; 253 $_->(@reason) for values %$mon;
193} 254}
194 255
195sub monitor { 256sub monitor {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines