ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
Revision: 1.4
Committed: Sat Aug 1 15:04:30 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.3: +2 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Node - a single processing node (CPU/process...)
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Node;
8    
9     =head1 DESCRIPTION
10    
11     =cut
12    
13     package AnyEvent::MP::Node;
14    
15     use common::sense;
16    
17     use AE ();
18     use AnyEvent::Socket ();
19    
20     use AnyEvent::MP::Transport ();
21    
22     use base Exporter::;
23    
24     our $VERSION = '0.0';
25    
26     sub new {
27     my ($class, $noderef) = @_;
28    
29     bless { noderef => $noderef }, $class
30     }
31    
32     package AnyEvent::MP::Node::Direct;
33    
34     use base "AnyEvent::MP::Node";
35    
36     sub send {
37     my ($self, $msg) = @_;
38    
39     if ($self->{transport}) {
40     $self->{transport}->send ($msg);
41     } elsif ($self->{queue}) {
42     push @{ $self->{queue} }, $msg;
43     } else {
44     $self->{queue} = [$msg];
45     $self->connect;
46     }
47     }
48    
49     sub set_transport {
50     my ($self, $transport) = @_;
51    
52     delete $self->{trial};
53     delete $self->{next_connect};
54    
55     if (
56     exists $self->{remote_uniq}
57     && $self->{remote_uniq} ne $transport->{remote_uniq}
58     ) {
59     # uniq changed, drop queue
60     delete $self->{queue};
61     #TODO: "DOWN"
62     }
63    
64     $self->{remote_uniq} = $transport->{remote_uniq};
65     $self->{transport} = $transport;
66    
67     $transport->send ($_)
68     for @{ delete $self->{queue} || [] };
69     }
70    
71     sub clr_transport {
72     my ($self) = @_;
73    
74     delete $self->{transport};
75 root 1.4
76     $self->connect;
77 root 1.1 }
78    
79     sub connect {
80     my ($self) = @_;
81    
82     Scalar::Util::weaken $self;
83    
84     unless (exists $self->{n_noderef}) {
85     (AnyEvent::MP::normalise_noderef ($self->{noderef}))->cb (sub {
86     $self or return;
87     my $noderef = shift->recv;
88    
89     $self->{n_noderef} = $noderef;
90    
91     $AnyEvent::MP::NODE{$_} = $self
92     for split /,/, $noderef;
93    
94     $self->connect;
95     });
96     return;
97     }
98    
99     $self->{retry} ||= [split /,/, $self->{n_noderef}];
100    
101     my $endpoint = shift @{ $self->{retry} };
102    
103     if (defined $endpoint) {
104     $self->{trial}{$endpoint} ||= do {
105     my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
106     or return;
107    
108     my ($w, $g);
109    
110     $w = AE::timer $AnyEvent::MP::CONNECT_TIMEOUT, 0, sub {
111     delete $self->{trial}{$endpoint};
112     };
113     $g = AnyEvent::MP::Transport::mp_connect
114     $host, $port,
115     sub {
116     delete $self->{trial}{$endpoint}
117     unless @_;
118     $g = shift;
119     };
120     ;
121    
122     [$w, \$g]
123     };
124     } else {
125     delete $self->{retry};
126     }
127    
128     $self->{next_connect} = AE::timer $AnyEvent::MP::CONNECT_INTERVAL, 0, sub {
129     $self->connect;
130     };
131     }
132    
133     package AnyEvent::MP::Node::Self;
134    
135     use base "AnyEvent::MP::Node";
136    
137     sub set_transport {
138     die "FATAL error, set_transport was called";
139     }
140    
141     sub send {
142 root 1.2 AnyEvent::MP::_inject ($_[1]);
143 root 1.1 }
144    
145     =head1 SEE ALSO
146    
147     L<AnyEvent>.
148    
149     =head1 AUTHOR
150    
151     Marc Lehmann <schmorp@schmorp.de>
152     http://home.schmorp.de/
153    
154     =cut
155    
156     1
157