ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.4 by root, Sat Aug 1 15:04:30 2009 UTC vs.
Revision 1.5 by root, Sun Aug 2 14:44:37 2009 UTC

20use AnyEvent::MP::Transport (); 20use AnyEvent::MP::Transport ();
21 21
22use base Exporter::; 22use base Exporter::;
23 23
24our $VERSION = '0.0'; 24our $VERSION = '0.0';
25
26our $DEFAULT_PORT = "4040";
27
28sub normalise_noderef($) {
29 my ($noderef) = @_;
30
31 my $cv = AE::cv;
32 my @res;
33
34 $cv->begin (sub {
35 my %seen;
36 my @refs;
37 for (sort { $a->[0] <=> $b->[0] } @res) {
38 push @refs, $_->[1] unless $seen{$_->[1]}++
39 }
40 shift->send (join ",", @refs);
41 });
42
43 $noderef = $DEFAULT_PORT unless length $noderef;
44
45 my $idx;
46 for my $t (split /,/, $noderef) {
47 my $pri = ++$idx;
48
49 #TODO: this should be outside normalise_noderef and in become_public
50 if ($t =~ /^\d*$/) {
51 require POSIX;
52 my $nodename = (POSIX::uname ())[1];
53
54 $cv->begin;
55 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
56 for (@_) {
57 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
58 push @res, [
59 $pri += 1e-5,
60 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
61 ];
62 }
63 $cv->end;
64 };
65
66# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
67#
68# for (@ipv4) {
69# push @res, [
70# $pri,
71# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
72# ];
73# }
74 } else {
75 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
76 or Carp::croak "$t: unparsable transport descriptor";
77
78 $cv->begin;
79 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
80 for (@_) {
81 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
82 push @res, [
83 $pri += 1e-5,
84 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
85 ];
86 }
87 $cv->end;
88 }
89 }
90 }
91
92 $cv->end;
93
94 $cv
95}
25 96
26sub new { 97sub new {
27 my ($class, $noderef) = @_; 98 my ($class, $noderef) = @_;
28 99
29 bless { noderef => $noderef }, $class 100 bless { noderef => $noderef }, $class
80 my ($self) = @_; 151 my ($self) = @_;
81 152
82 Scalar::Util::weaken $self; 153 Scalar::Util::weaken $self;
83 154
84 unless (exists $self->{n_noderef}) { 155 unless (exists $self->{n_noderef}) {
156 return if $self->{n_noderef_}++;
85 (AnyEvent::MP::normalise_noderef ($self->{noderef}))->cb (sub { 157 (AnyEvent::MP::Node::normalise_noderef ($self->{noderef}))->cb (sub {
86 $self or return; 158 $self or return;
159 delete $self->{n_noderef_};
87 my $noderef = shift->recv; 160 my $noderef = shift->recv;
88 161
89 $self->{n_noderef} = $noderef; 162 $self->{n_noderef} = $noderef;
90 163
91 $AnyEvent::MP::NODE{$_} = $self 164 $AnyEvent::MP::Base::NODE{$_} = $self
92 for split /,/, $noderef; 165 for split /,/, $noderef;
93 166
94 $self->connect; 167 $self->connect;
95 }); 168 });
96 return; 169 return;
105 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 178 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
106 or return; 179 or return;
107 180
108 my ($w, $g); 181 my ($w, $g);
109 182
110 $w = AE::timer $AnyEvent::MP::CONNECT_TIMEOUT, 0, sub { 183 $w = AE::timer $AnyEvent::MP::Base::CONNECT_TIMEOUT, 0, sub {
111 delete $self->{trial}{$endpoint}; 184 delete $self->{trial}{$endpoint};
112 }; 185 };
113 $g = AnyEvent::MP::Transport::mp_connect 186 $g = AnyEvent::MP::Transport::mp_connect
114 $host, $port, 187 $host, $port,
115 sub { 188 sub {
123 }; 196 };
124 } else { 197 } else {
125 delete $self->{retry}; 198 delete $self->{retry};
126 } 199 }
127 200
128 $self->{next_connect} = AE::timer $AnyEvent::MP::CONNECT_INTERVAL, 0, sub { 201 $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub {
129 $self->connect; 202 $self->connect;
130 }; 203 };
131} 204}
132 205
133package AnyEvent::MP::Node::Self; 206package AnyEvent::MP::Node::Self;
137sub set_transport { 210sub set_transport {
138 die "FATAL error, set_transport was called"; 211 die "FATAL error, set_transport was called";
139} 212}
140 213
141sub send { 214sub send {
142 AnyEvent::MP::_inject ($_[1]); 215 AnyEvent::MP::Base::_inject ($_[1]);
143} 216}
144 217
145=head1 SEE ALSO 218=head1 SEE ALSO
146 219
147L<AnyEvent>. 220L<AnyEvent>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines