ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
Revision: 1.2
Committed: Sat Aug 1 07:11:45 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.1: +5 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Node - a single processing node (CPU/process...)
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Node;
8
9 =head1 DESCRIPTION
10
11 =cut
12
13 package AnyEvent::MP::Node;
14
15 use common::sense;
16
17 use AE ();
18 use AnyEvent::Socket ();
19
20 use AnyEvent::MP::Transport ();
21
22 use base Exporter::;
23
24 our $VERSION = '0.0';
25
26 sub new {
27 my ($class, $noderef) = @_;
28
29 bless { noderef => $noderef }, $class
30 }
31
32 package AnyEvent::MP::Node::Direct;
33
34 use base "AnyEvent::MP::Node";
35
36 sub send {
37 my ($self, $msg) = @_;
38
39 warn "send $self $self->{noderef}\n";#d#
40
41 if ($self->{transport}) {
42 $self->{transport}->send ($msg);
43 } elsif ($self->{queue}) {
44 push @{ $self->{queue} }, $msg;
45 } else {
46 $self->{queue} = [$msg];
47 $self->connect;
48 }
49 }
50
51 sub set_transport {
52 my ($self, $transport) = @_;
53
54 delete $self->{trial};
55 delete $self->{next_connect};
56
57 use Data::Dumper; warn "set_transport $self->{noderef} $self $AnyEvent::MP::NODE{'10.0.0.1:4040'}\n";#d#
58
59 if (
60 exists $self->{remote_uniq}
61 && $self->{remote_uniq} ne $transport->{remote_uniq}
62 ) {
63 # uniq changed, drop queue
64 delete $self->{queue};
65 #TODO: "DOWN"
66 }
67
68 $self->{remote_uniq} = $transport->{remote_uniq};
69 $self->{transport} = $transport;
70
71 $transport->send ($_)
72 for @{ delete $self->{queue} || [] };
73 }
74
75 sub clr_transport {
76 my ($self) = @_;
77
78 delete $self->{transport};
79 warn "clr_transport\n";
80 }
81
82 sub connect {
83 my ($self) = @_;
84
85 Scalar::Util::weaken $self;
86
87 unless (exists $self->{n_noderef}) {
88 (AnyEvent::MP::normalise_noderef ($self->{noderef}))->cb (sub {
89 $self or return;
90 my $noderef = shift->recv;
91
92 $self->{n_noderef} = $noderef;
93
94 $AnyEvent::MP::NODE{$_} = $self
95 for split /,/, $noderef;
96
97 $self->connect;
98 });
99 return;
100 }
101
102 $self->{retry} ||= [split /,/, $self->{n_noderef}];
103
104 my $endpoint = shift @{ $self->{retry} };
105
106 if (defined $endpoint) {
107 $self->{trial}{$endpoint} ||= do {
108 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
109 or return;
110
111 my ($w, $g);
112
113 $w = AE::timer $AnyEvent::MP::CONNECT_TIMEOUT, 0, sub {
114 delete $self->{trial}{$endpoint};
115 };
116 $g = AnyEvent::MP::Transport::mp_connect
117 $host, $port,
118 sub {
119 delete $self->{trial}{$endpoint}
120 unless @_;
121 $g = shift;
122 };
123 ;
124
125 [$w, \$g]
126 };
127 } else {
128 delete $self->{retry};
129 }
130
131 $self->{next_connect} = AE::timer $AnyEvent::MP::CONNECT_INTERVAL, 0, sub {
132 $self->connect;
133 };
134 }
135
136 package AnyEvent::MP::Node::Self;
137
138 use base "AnyEvent::MP::Node";
139
140 sub set_transport {
141 die "FATAL error, set_transport was called";
142 }
143
144 sub send {
145 AnyEvent::MP::_inject ($_[1]);
146 }
147
148 =head1 SEE ALSO
149
150 L<AnyEvent>.
151
152 =head1 AUTHOR
153
154 Marc Lehmann <schmorp@schmorp.de>
155 http://home.schmorp.de/
156
157 =cut
158
159 1
160