ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.1 by root, Fri Jul 31 20:55:46 2009 UTC vs.
Revision 1.5 by root, Sun Aug 2 14:44:37 2009 UTC

20use AnyEvent::MP::Transport (); 20use AnyEvent::MP::Transport ();
21 21
22use base Exporter::; 22use base Exporter::;
23 23
24our $VERSION = '0.0'; 24our $VERSION = '0.0';
25
26our $DEFAULT_PORT = "4040";
27
28sub normalise_noderef($) {
29 my ($noderef) = @_;
30
31 my $cv = AE::cv;
32 my @res;
33
34 $cv->begin (sub {
35 my %seen;
36 my @refs;
37 for (sort { $a->[0] <=> $b->[0] } @res) {
38 push @refs, $_->[1] unless $seen{$_->[1]}++
39 }
40 shift->send (join ",", @refs);
41 });
42
43 $noderef = $DEFAULT_PORT unless length $noderef;
44
45 my $idx;
46 for my $t (split /,/, $noderef) {
47 my $pri = ++$idx;
48
49 #TODO: this should be outside normalise_noderef and in become_public
50 if ($t =~ /^\d*$/) {
51 require POSIX;
52 my $nodename = (POSIX::uname ())[1];
53
54 $cv->begin;
55 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
56 for (@_) {
57 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
58 push @res, [
59 $pri += 1e-5,
60 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
61 ];
62 }
63 $cv->end;
64 };
65
66# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
67#
68# for (@ipv4) {
69# push @res, [
70# $pri,
71# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
72# ];
73# }
74 } else {
75 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
76 or Carp::croak "$t: unparsable transport descriptor";
77
78 $cv->begin;
79 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
80 for (@_) {
81 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
82 push @res, [
83 $pri += 1e-5,
84 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
85 ];
86 }
87 $cv->end;
88 }
89 }
90 }
91
92 $cv->end;
93
94 $cv
95}
25 96
26sub new { 97sub new {
27 my ($class, $noderef) = @_; 98 my ($class, $noderef) = @_;
28 99
29 bless { noderef => $noderef }, $class 100 bless { noderef => $noderef }, $class
70 141
71sub clr_transport { 142sub clr_transport {
72 my ($self) = @_; 143 my ($self) = @_;
73 144
74 delete $self->{transport}; 145 delete $self->{transport};
75 warn "clr_transport\n"; 146
147 $self->connect;
76} 148}
77 149
78sub connect { 150sub connect {
79 my ($self) = @_; 151 my ($self) = @_;
80 152
81 Scalar::Util::weaken $self; 153 Scalar::Util::weaken $self;
82 154
83 unless (exists $self->{n_noderef}) { 155 unless (exists $self->{n_noderef}) {
156 return if $self->{n_noderef_}++;
84 (AnyEvent::MP::normalise_noderef ($self->{noderef}))->cb (sub { 157 (AnyEvent::MP::Node::normalise_noderef ($self->{noderef}))->cb (sub {
85 $self or return; 158 $self or return;
159 delete $self->{n_noderef_};
86 my $noderef = shift->recv; 160 my $noderef = shift->recv;
87 161
88 $self->{n_noderef} = $noderef; 162 $self->{n_noderef} = $noderef;
89 163
90 $AnyEvent::MP::NODE{$_} = $self 164 $AnyEvent::MP::Base::NODE{$_} = $self
91 for split /,/, $noderef; 165 for split /,/, $noderef;
92 166
93 $self->connect; 167 $self->connect;
94 }); 168 });
95 return; 169 return;
104 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint 178 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
105 or return; 179 or return;
106 180
107 my ($w, $g); 181 my ($w, $g);
108 182
109 $w = AE::timer $AnyEvent::MP::CONNECT_TIMEOUT, 0, sub { 183 $w = AE::timer $AnyEvent::MP::Base::CONNECT_TIMEOUT, 0, sub {
110 delete $self->{trial}{$endpoint}; 184 delete $self->{trial}{$endpoint};
111 }; 185 };
112 $g = AnyEvent::MP::Transport::mp_connect 186 $g = AnyEvent::MP::Transport::mp_connect
113 $host, $port, 187 $host, $port,
114 sub { 188 sub {
122 }; 196 };
123 } else { 197 } else {
124 delete $self->{retry}; 198 delete $self->{retry};
125 } 199 }
126 200
127 $self->{next_connect} = AE::timer $AnyEvent::MP::CONNECT_INTERVAL, 0, sub { 201 $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub {
128 $self->connect; 202 $self->connect;
129 }; 203 };
130} 204}
131 205
132package AnyEvent::MP::Node::Self; 206package AnyEvent::MP::Node::Self;
136sub set_transport { 210sub set_transport {
137 die "FATAL error, set_transport was called"; 211 die "FATAL error, set_transport was called";
138} 212}
139 213
140sub send { 214sub send {
141 die "self-send not implemented yet\n";#d# 215 AnyEvent::MP::Base::_inject ($_[1]);
142} 216}
143 217
144=head1 SEE ALSO 218=head1 SEE ALSO
145 219
146L<AnyEvent>. 220L<AnyEvent>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines