ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.2 by root, Sat Aug 1 07:11:45 2009 UTC vs.
Revision 1.5 by root, Sun Aug 2 14:44:37 2009 UTC

21 21
22use base Exporter::; 22use base Exporter::;
23 23
24our $VERSION = '0.0'; 24our $VERSION = '0.0';
25 25
26our $DEFAULT_PORT = "4040";
27
28sub normalise_noderef($) {
29 my ($noderef) = @_;
30
31 my $cv = AE::cv;
32 my @res;
33
34 $cv->begin (sub {
35 my %seen;
36 my @refs;
37 for (sort { $a->[0] <=> $b->[0] } @res) {
38 push @refs, $_->[1] unless $seen{$_->[1]}++
39 }
40 shift->send (join ",", @refs);
41 });
42
43 $noderef = $DEFAULT_PORT unless length $noderef;
44
45 my $idx;
46 for my $t (split /,/, $noderef) {
47 my $pri = ++$idx;
48
49 #TODO: this should be outside normalise_noderef and in become_public
50 if ($t =~ /^\d*$/) {
51 require POSIX;
52 my $nodename = (POSIX::uname ())[1];
53
54 $cv->begin;
55 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
56 for (@_) {
57 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
58 push @res, [
59 $pri += 1e-5,
60 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
61 ];
62 }
63 $cv->end;
64 };
65
66# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
67#
68# for (@ipv4) {
69# push @res, [
70# $pri,
71# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
72# ];
73# }
74 } else {
75 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
76 or Carp::croak "$t: unparsable transport descriptor";
77
78 $cv->begin;
79 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
80 for (@_) {
81 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
82 push @res, [
83 $pri += 1e-5,
84 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
85 ];
86 }
87 $cv->end;
88 }
89 }
90 }
91
92 $cv->end;
93
94 $cv
95}
96
26sub new { 97sub new {
27 my ($class, $noderef) = @_; 98 my ($class, $noderef) = @_;
28 99
29 bless { noderef => $noderef }, $class 100 bless { noderef => $noderef }, $class
30} 101}
33 104
34use base "AnyEvent::MP::Node"; 105use base "AnyEvent::MP::Node";
35 106
36sub send { 107sub send {
37 my ($self, $msg) = @_; 108 my ($self, $msg) = @_;
38
39 warn "send $self $self->{noderef}\n";#d#
40 109
41 if ($self->{transport}) { 110 if ($self->{transport}) {
42 $self->{transport}->send ($msg); 111 $self->{transport}->send ($msg);
43 } elsif ($self->{queue}) { 112 } elsif ($self->{queue}) {
44 push @{ $self->{queue} }, $msg; 113 push @{ $self->{queue} }, $msg;
52 my ($self, $transport) = @_; 121 my ($self, $transport) = @_;
53 122
54 delete $self->{trial}; 123 delete $self->{trial};
55 delete $self->{next_connect}; 124 delete $self->{next_connect};
56 125
57 use Data::Dumper; warn "set_transport $self->{noderef} $self $AnyEvent::MP::NODE{'10.0.0.1:4040'}\n";#d#
58
59 if ( 126 if (
60 exists $self->{remote_uniq} 127 exists $self->{remote_uniq}
61 && $self->{remote_uniq} ne $transport->{remote_uniq} 128 && $self->{remote_uniq} ne $transport->{remote_uniq}
62 ) { 129 ) {
63 # uniq changed, drop queue 130 # uniq changed, drop queue
74 141
75sub clr_transport { 142sub clr_transport {
76 my ($self) = @_; 143 my ($self) = @_;
77 144
78 delete $self->{transport}; 145 delete $self->{transport};
79 warn "clr_transport\n"; 146
147 $self->connect;
80} 148}
81 149
82sub connect { 150sub connect {
83 my ($self) = @_; 151 my ($self) = @_;
84 152
85 Scalar::Util::weaken $self; 153 Scalar::Util::weaken $self;
86 154
87 unless (exists $self->{n_noderef}) { 155 unless (exists $self->{n_noderef}) {
156 return if $self->{n_noderef_}++;
88 (AnyEvent::MP::normalise_noderef ($self->{noderef}))->cb (sub { 157 (AnyEvent::MP::Node::normalise_noderef ($self->{noderef}))->cb (sub {
89 $self or return; 158 $self or return;
159 delete $self->{n_noderef_};
90 my $noderef = shift->recv; 160 my $noderef = shift->recv;
91 161
92 $self->{n_noderef} = $noderef; 162 $self->{n_noderef} = $noderef;
93 163
94 $AnyEvent::MP::NODE{$_} = $self 164 $AnyEvent::MP::Base::NODE{$_} = $self
95 for split /,/, $noderef; 165 for split /,/, $noderef;
96 166
97 $self->connect; 167 $self->connect;
98 }); 168 });
99 return; 169 return;
108 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 178 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
109 or return; 179 or return;
110 180
111 my ($w, $g); 181 my ($w, $g);
112 182
113 $w = AE::timer $AnyEvent::MP::CONNECT_TIMEOUT, 0, sub { 183 $w = AE::timer $AnyEvent::MP::Base::CONNECT_TIMEOUT, 0, sub {
114 delete $self->{trial}{$endpoint}; 184 delete $self->{trial}{$endpoint};
115 }; 185 };
116 $g = AnyEvent::MP::Transport::mp_connect 186 $g = AnyEvent::MP::Transport::mp_connect
117 $host, $port, 187 $host, $port,
118 sub { 188 sub {
126 }; 196 };
127 } else { 197 } else {
128 delete $self->{retry}; 198 delete $self->{retry};
129 } 199 }
130 200
131 $self->{next_connect} = AE::timer $AnyEvent::MP::CONNECT_INTERVAL, 0, sub { 201 $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub {
132 $self->connect; 202 $self->connect;
133 }; 203 };
134} 204}
135 205
136package AnyEvent::MP::Node::Self; 206package AnyEvent::MP::Node::Self;
140sub set_transport { 210sub set_transport {
141 die "FATAL error, set_transport was called"; 211 die "FATAL error, set_transport was called";
142} 212}
143 213
144sub send { 214sub send {
145 AnyEvent::MP::_inject ($_[1]); 215 AnyEvent::MP::Base::_inject ($_[1]);
146} 216}
147 217
148=head1 SEE ALSO 218=head1 SEE ALSO
149 219
150L<AnyEvent>. 220L<AnyEvent>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines