ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.19 by root, Wed Aug 12 22:03:15 2009 UTC vs.
Revision 1.24 by root, Sat Aug 15 04:34:34 2009 UTC

48 48
49 $self->{send} = sub { 49 $self->{send} = sub {
50 push @{$self->{queue}}, shift; 50 push @{$self->{queue}}, shift;
51 $self->connect; 51 $self->connect;
52 }; 52 };
53
54 $self->connect
55 if $self->{autoconnect};
53} 56}
54 57
55# called only after successful handshake 58# called only after successful handshake
56sub transport_error { 59sub transport_error {
57 my ($self, @reason) = @_; 60 my ($self, @reason) = @_;
58 61
62 my $no_transport = !$self->{transport};
63
64 delete $self->{connect_w};
65 delete $self->{connect_to};
66
59 delete $self->{queue}; 67 delete $self->{queue};
60 $self->transport_reset; 68 $self->transport_reset;
69
70 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
71 unless $no_transport;
61 72
62 if (my $mon = delete $self->{lmon}) { 73 if (my $mon = delete $self->{lmon}) {
63 $_->(@reason) for map @$_, values %$mon; 74 $_->(@reason) for map @$_, values %$mon;
64 } 75 }
65} 76}
66 77
67# called after handshake was successful 78# called after handshake was successful
68sub transport_connect { 79sub transport_connect {
69 my ($self, $transport) = @_; 80 my ($self, $transport) = @_;
70 81
71 $self->transport_reset 82 # first connect with a master node
83 $AnyEvent::MP::Kernel::SLAVE->($self)
84 if ref $AnyEvent::MP::Kernel::SLAVE;
85
86 $self->transport_error (transport_error => "switched connections")
72 if $self->{transport}; 87 if $self->{transport};
73 88
74 delete $self->{connect_w}; 89 delete $self->{connect_w};
75 delete $self->{connect_to}; 90 delete $self->{connect_to};
76 91
80 95
81 $self->{send} = sub { 96 $self->{send} = sub {
82 $transport_send->($transport, $_[0]); 97 $transport_send->($transport, $_[0]);
83 }; 98 };
84 99
100 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
101
85 $transport->send ($_) 102 $transport->send ($_)
86 for @{ delete $self->{queue} || [] }; 103 for @{ delete $self->{queue} || [] };
87} 104}
88 105
89sub connect { 106sub connect {
90 my ($self) = @_; 107 my ($self) = @_;
108
109 return if $self->{transport};
91 110
92 Scalar::Util::weaken $self; 111 Scalar::Util::weaken $self;
93 112
94 $self->{connect_to} ||= AE::timer 113 $self->{connect_to} ||= AE::timer
95 $AnyEvent::MP::Config::CFG{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, 114 $AnyEvent::MP::Config::CFG{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT,
135 my ($self, $portid, $cb) = @_; 154 my ($self, $portid, $cb) = @_;
136 155
137 my $list = $self->{lmon}{$portid} ||= []; 156 my $list = $self->{lmon}{$portid} ||= [];
138 157
139 $self->send (["", mon1 => $portid]) 158 $self->send (["", mon1 => $portid])
140 unless @$list; 159 unless @$list || !length $portid;
141 160
142 push @$list, $cb; 161 push @$list, $cb;
143} 162}
144 163
145sub unmonitor { 164sub unmonitor {
162 181
163package AnyEvent::MP::Node::Indirect; 182package AnyEvent::MP::Node::Indirect;
164 183
165use base "AnyEvent::MP::Node::Direct"; 184use base "AnyEvent::MP::Node::Direct";
166 185
186sub master {
187 my ($self) = @_;
188
189 my (undef, $master) = split /\@/, $self->{noderef}, 2;
190 $master =~ s/!/,/g;
191 $master
192}
193
194sub transport_reset {
195 my ($self) = @_;
196
197 if ($self->{transport}) {
198 # as an optimisation, immediately nuke slave nodes
199 delete $AnyEvent::MP::Kernel::NODE{$self->{noderef}};
200 } else {
201 $self->SUPER::transport_reset;
202 return;#d##TODO#
203
204 my $noderef = $self->{noderef};
205 my $master = $self->master;
206
207 # slave nodes are so cool - we can always send to them :)
208
209 $self->{send} = sub {
210 $self->connect;
211 snd $master, snd => $noderef, @_;
212 };
213 }
214}
215
167sub connect { 216sub connect {
168 my ($self) = @_; 217 my ($self) = @_;
169 218
170 $self->transport_error (transport_error => $self->{noderef}, "unable to connect to indirect node"); 219 #TODO#
220# # ask for a connection, #TODO# rate-limit this somehow
221# snd $self->master, relay => $self->{noderef}, connect_node => $AnyEvent::MP::Kernel::NODE;
171} 222}
172 223
173package AnyEvent::MP::Node::Self; 224package AnyEvent::MP::Node::Self;
174 225
175use base "AnyEvent::MP::Node"; 226use base "AnyEvent::MP::Node";
227
228sub connect {
229 # we are trivially connected
230}
176 231
177sub transport_reset { 232sub transport_reset {
178 my ($self) = @_; 233 my ($self) = @_;
179 234
180 Scalar::Util::weaken $self; 235 Scalar::Util::weaken $self;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines