ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
Revision: 1.3
Committed: Sat Aug 1 07:36:30 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.2: +0 -4 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Node - a single processing node (CPU/process...)
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Node;
8    
9     =head1 DESCRIPTION
10    
11     =cut
12    
13     package AnyEvent::MP::Node;
14    
15     use common::sense;
16    
17     use AE ();
18     use AnyEvent::Socket ();
19    
20     use AnyEvent::MP::Transport ();
21    
22     use base Exporter::;
23    
24     our $VERSION = '0.0';
25    
26     sub new {
27     my ($class, $noderef) = @_;
28    
29     bless { noderef => $noderef }, $class
30     }
31    
32     package AnyEvent::MP::Node::Direct;
33    
34     use base "AnyEvent::MP::Node";
35    
36     sub send {
37     my ($self, $msg) = @_;
38    
39     if ($self->{transport}) {
40     $self->{transport}->send ($msg);
41     } elsif ($self->{queue}) {
42     push @{ $self->{queue} }, $msg;
43     } else {
44     $self->{queue} = [$msg];
45     $self->connect;
46     }
47     }
48    
49     sub set_transport {
50     my ($self, $transport) = @_;
51    
52     delete $self->{trial};
53     delete $self->{next_connect};
54    
55     if (
56     exists $self->{remote_uniq}
57     && $self->{remote_uniq} ne $transport->{remote_uniq}
58     ) {
59     # uniq changed, drop queue
60     delete $self->{queue};
61     #TODO: "DOWN"
62     }
63    
64     $self->{remote_uniq} = $transport->{remote_uniq};
65     $self->{transport} = $transport;
66    
67     $transport->send ($_)
68     for @{ delete $self->{queue} || [] };
69     }
70    
71     sub clr_transport {
72     my ($self) = @_;
73    
74     delete $self->{transport};
75     warn "clr_transport\n";
76     }
77    
78     sub connect {
79     my ($self) = @_;
80    
81     Scalar::Util::weaken $self;
82    
83     unless (exists $self->{n_noderef}) {
84     (AnyEvent::MP::normalise_noderef ($self->{noderef}))->cb (sub {
85     $self or return;
86     my $noderef = shift->recv;
87    
88     $self->{n_noderef} = $noderef;
89    
90     $AnyEvent::MP::NODE{$_} = $self
91     for split /,/, $noderef;
92    
93     $self->connect;
94     });
95     return;
96     }
97    
98     $self->{retry} ||= [split /,/, $self->{n_noderef}];
99    
100     my $endpoint = shift @{ $self->{retry} };
101    
102     if (defined $endpoint) {
103     $self->{trial}{$endpoint} ||= do {
104     my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
105     or return;
106    
107     my ($w, $g);
108    
109     $w = AE::timer $AnyEvent::MP::CONNECT_TIMEOUT, 0, sub {
110     delete $self->{trial}{$endpoint};
111     };
112     $g = AnyEvent::MP::Transport::mp_connect
113     $host, $port,
114     sub {
115     delete $self->{trial}{$endpoint}
116     unless @_;
117     $g = shift;
118     };
119     ;
120    
121     [$w, \$g]
122     };
123     } else {
124     delete $self->{retry};
125     }
126    
127     $self->{next_connect} = AE::timer $AnyEvent::MP::CONNECT_INTERVAL, 0, sub {
128     $self->connect;
129     };
130     }
131    
132     package AnyEvent::MP::Node::Self;
133    
134     use base "AnyEvent::MP::Node";
135    
136     sub set_transport {
137     die "FATAL error, set_transport was called";
138     }
139    
140     sub send {
141 root 1.2 AnyEvent::MP::_inject ($_[1]);
142 root 1.1 }
143    
144     =head1 SEE ALSO
145    
146     L<AnyEvent>.
147    
148     =head1 AUTHOR
149    
150     Marc Lehmann <schmorp@schmorp.de>
151     http://home.schmorp.de/
152    
153     =cut
154    
155     1
156