ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
Revision: 1.2
Committed: Sat Aug 1 07:11:45 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.1: +5 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Node - a single processing node (CPU/process...)
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Node;
8    
9     =head1 DESCRIPTION
10    
11     =cut
12    
13     package AnyEvent::MP::Node;
14    
15     use common::sense;
16    
17     use AE ();
18     use AnyEvent::Socket ();
19    
20     use AnyEvent::MP::Transport ();
21    
22     use base Exporter::;
23    
24     our $VERSION = '0.0';
25    
26     sub new {
27     my ($class, $noderef) = @_;
28    
29     bless { noderef => $noderef }, $class
30     }
31    
32     package AnyEvent::MP::Node::Direct;
33    
34     use base "AnyEvent::MP::Node";
35    
36     sub send {
37     my ($self, $msg) = @_;
38    
39 root 1.2 warn "send $self $self->{noderef}\n";#d#
40    
41 root 1.1 if ($self->{transport}) {
42     $self->{transport}->send ($msg);
43     } elsif ($self->{queue}) {
44     push @{ $self->{queue} }, $msg;
45     } else {
46     $self->{queue} = [$msg];
47     $self->connect;
48     }
49     }
50    
51     sub set_transport {
52     my ($self, $transport) = @_;
53    
54     delete $self->{trial};
55     delete $self->{next_connect};
56    
57 root 1.2 use Data::Dumper; warn "set_transport $self->{noderef} $self $AnyEvent::MP::NODE{'10.0.0.1:4040'}\n";#d#
58    
59 root 1.1 if (
60     exists $self->{remote_uniq}
61     && $self->{remote_uniq} ne $transport->{remote_uniq}
62     ) {
63     # uniq changed, drop queue
64     delete $self->{queue};
65     #TODO: "DOWN"
66     }
67    
68     $self->{remote_uniq} = $transport->{remote_uniq};
69     $self->{transport} = $transport;
70    
71     $transport->send ($_)
72     for @{ delete $self->{queue} || [] };
73     }
74    
75     sub clr_transport {
76     my ($self) = @_;
77    
78     delete $self->{transport};
79     warn "clr_transport\n";
80     }
81    
82     sub connect {
83     my ($self) = @_;
84    
85     Scalar::Util::weaken $self;
86    
87     unless (exists $self->{n_noderef}) {
88     (AnyEvent::MP::normalise_noderef ($self->{noderef}))->cb (sub {
89     $self or return;
90     my $noderef = shift->recv;
91    
92     $self->{n_noderef} = $noderef;
93    
94     $AnyEvent::MP::NODE{$_} = $self
95     for split /,/, $noderef;
96    
97     $self->connect;
98     });
99     return;
100     }
101    
102     $self->{retry} ||= [split /,/, $self->{n_noderef}];
103    
104     my $endpoint = shift @{ $self->{retry} };
105    
106     if (defined $endpoint) {
107     $self->{trial}{$endpoint} ||= do {
108     my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
109     or return;
110    
111     my ($w, $g);
112    
113     $w = AE::timer $AnyEvent::MP::CONNECT_TIMEOUT, 0, sub {
114     delete $self->{trial}{$endpoint};
115     };
116     $g = AnyEvent::MP::Transport::mp_connect
117     $host, $port,
118     sub {
119     delete $self->{trial}{$endpoint}
120     unless @_;
121     $g = shift;
122     };
123     ;
124    
125     [$w, \$g]
126     };
127     } else {
128     delete $self->{retry};
129     }
130    
131     $self->{next_connect} = AE::timer $AnyEvent::MP::CONNECT_INTERVAL, 0, sub {
132     $self->connect;
133     };
134     }
135    
136     package AnyEvent::MP::Node::Self;
137    
138     use base "AnyEvent::MP::Node";
139    
140     sub set_transport {
141     die "FATAL error, set_transport was called";
142     }
143    
144     sub send {
145 root 1.2 AnyEvent::MP::_inject ($_[1]);
146 root 1.1 }
147    
148     =head1 SEE ALSO
149    
150     L<AnyEvent>.
151    
152     =head1 AUTHOR
153    
154     Marc Lehmann <schmorp@schmorp.de>
155     http://home.schmorp.de/
156    
157     =cut
158    
159     1
160