ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.18 by root, Wed Aug 12 21:39:59 2009 UTC vs.
Revision 1.21 by root, Thu Aug 13 00:49:23 2009 UTC

29 29
30 $self 30 $self
31} 31}
32 32
33sub send { 33sub send {
34 &{$_[0]->{send}} 34 &{ shift->{send} }
35} 35}
36 36
37package AnyEvent::MP::Node::External; 37package AnyEvent::MP::Node::External;
38 38
39use base "AnyEvent::MP::Node"; 39use base "AnyEvent::MP::Node";
42sub transport_reset { 42sub transport_reset {
43 my ($self) = @_; 43 my ($self) = @_;
44 44
45 delete $self->{transport}; 45 delete $self->{transport};
46 46
47 Scalar::Util::weaken $self;
48
47 $self->{send} = sub { 49 $self->{send} = sub {
48 push @{$_[0]{queue}}, $_[1]; 50 push @{$self->{queue}}, shift;
49 $_[0]->connect; 51 $self->connect;
50 }; 52 };
51} 53}
52 54
53# called only after successful handshake 55# called only after successful handshake
54sub transport_error { 56sub transport_error {
55 my ($self, @reason) = @_; 57 my ($self, @reason) = @_;
56 58
59 my $no_transport = !$self->{transport};
60
61 delete $self->{connect_w};
62 delete $self->{connect_to};
63
57 delete $self->{queue}; 64 delete $self->{queue};
58 $self->transport_reset; 65 $self->transport_reset;
66
67 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
68 unless $no_transport;
59 69
60 if (my $mon = delete $self->{lmon}) { 70 if (my $mon = delete $self->{lmon}) {
61 $_->(@reason) for map @$_, values %$mon; 71 $_->(@reason) for map @$_, values %$mon;
62 } 72 }
63} 73}
64 74
65# called after handshake was successful 75# called after handshake was successful
66sub transport_connect { 76sub transport_connect {
67 my ($self, $transport) = @_; 77 my ($self, $transport) = @_;
68 78
69 $self->transport_reset 79 $self->transport_error (transport_error => "switched connections")
70 if $self->{transport}; 80 if $self->{transport};
71 81
72 delete $self->{connect_w}; 82 delete $self->{connect_w};
73 delete $self->{connect_to}; 83 delete $self->{connect_to};
74 84
75 $self->{transport} = $transport; 85 $self->{transport} = $transport;
76 86
87 my $transport_send = $transport->can ("send");
88
77 $self->{send} = sub { 89 $self->{send} = sub {
78 $transport->send ($_[1]); 90 $transport_send->($transport, $_[0]);
79 }; 91 };
92
93 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
80 94
81 $transport->send ($_) 95 $transport->send ($_)
82 for @{ delete $self->{queue} || [] }; 96 for @{ delete $self->{queue} || [] };
83} 97}
84 98
168 182
169package AnyEvent::MP::Node::Self; 183package AnyEvent::MP::Node::Self;
170 184
171use base "AnyEvent::MP::Node"; 185use base "AnyEvent::MP::Node";
172 186
187sub connect {
188 # we are trivially connected
189}
190
173sub transport_reset { 191sub transport_reset {
174 my ($self) = @_; 192 my ($self) = @_;
175 193
194 Scalar::Util::weaken $self;
195
176 $self->{send} = sub { 196 $self->{send} = sub {
177 local $AnyEvent::MP::Kernel::SRCNODE = $_[0]; 197 local $AnyEvent::MP::Kernel::SRCNODE = $self;
178 AnyEvent::MP::Kernel::_inject (@{ $_[1] }); 198 AnyEvent::MP::Kernel::_inject (@{ $_[0] });
179 }; 199 };
180} 200}
181 201
182sub kill { 202sub kill {
183 my ($self, $port, @reason) = @_; 203 my ($self, $port, @reason) = @_;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines