ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
Revision: 1.4
Committed: Sat Aug 1 15:04:30 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
Changes since 1.3: +2 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Node - a single processing node (CPU/process...)
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Node;
8
9 =head1 DESCRIPTION
10
11 =cut
12
13 package AnyEvent::MP::Node;
14
15 use common::sense;
16
17 use AE ();
18 use AnyEvent::Socket ();
19
20 use AnyEvent::MP::Transport ();
21
22 use base Exporter::;
23
24 our $VERSION = '0.0';
25
26 sub new {
27 my ($class, $noderef) = @_;
28
29 bless { noderef => $noderef }, $class
30 }
31
32 package AnyEvent::MP::Node::Direct;
33
34 use base "AnyEvent::MP::Node";
35
36 sub send {
37 my ($self, $msg) = @_;
38
39 if ($self->{transport}) {
40 $self->{transport}->send ($msg);
41 } elsif ($self->{queue}) {
42 push @{ $self->{queue} }, $msg;
43 } else {
44 $self->{queue} = [$msg];
45 $self->connect;
46 }
47 }
48
49 sub set_transport {
50 my ($self, $transport) = @_;
51
52 delete $self->{trial};
53 delete $self->{next_connect};
54
55 if (
56 exists $self->{remote_uniq}
57 && $self->{remote_uniq} ne $transport->{remote_uniq}
58 ) {
59 # uniq changed, drop queue
60 delete $self->{queue};
61 #TODO: "DOWN"
62 }
63
64 $self->{remote_uniq} = $transport->{remote_uniq};
65 $self->{transport} = $transport;
66
67 $transport->send ($_)
68 for @{ delete $self->{queue} || [] };
69 }
70
71 sub clr_transport {
72 my ($self) = @_;
73
74 delete $self->{transport};
75
76 $self->connect;
77 }
78
79 sub connect {
80 my ($self) = @_;
81
82 Scalar::Util::weaken $self;
83
84 unless (exists $self->{n_noderef}) {
85 (AnyEvent::MP::normalise_noderef ($self->{noderef}))->cb (sub {
86 $self or return;
87 my $noderef = shift->recv;
88
89 $self->{n_noderef} = $noderef;
90
91 $AnyEvent::MP::NODE{$_} = $self
92 for split /,/, $noderef;
93
94 $self->connect;
95 });
96 return;
97 }
98
99 $self->{retry} ||= [split /,/, $self->{n_noderef}];
100
101 my $endpoint = shift @{ $self->{retry} };
102
103 if (defined $endpoint) {
104 $self->{trial}{$endpoint} ||= do {
105 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
106 or return;
107
108 my ($w, $g);
109
110 $w = AE::timer $AnyEvent::MP::CONNECT_TIMEOUT, 0, sub {
111 delete $self->{trial}{$endpoint};
112 };
113 $g = AnyEvent::MP::Transport::mp_connect
114 $host, $port,
115 sub {
116 delete $self->{trial}{$endpoint}
117 unless @_;
118 $g = shift;
119 };
120 ;
121
122 [$w, \$g]
123 };
124 } else {
125 delete $self->{retry};
126 }
127
128 $self->{next_connect} = AE::timer $AnyEvent::MP::CONNECT_INTERVAL, 0, sub {
129 $self->connect;
130 };
131 }
132
133 package AnyEvent::MP::Node::Self;
134
135 use base "AnyEvent::MP::Node";
136
137 sub set_transport {
138 die "FATAL error, set_transport was called";
139 }
140
141 sub send {
142 AnyEvent::MP::_inject ($_[1]);
143 }
144
145 =head1 SEE ALSO
146
147 L<AnyEvent>.
148
149 =head1 AUTHOR
150
151 Marc Lehmann <schmorp@schmorp.de>
152 http://home.schmorp.de/
153
154 =cut
155
156 1
157