ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
Revision: 1.1
Committed: Fri Jul 31 20:55:46 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Node - a single processing node (CPU/process...)
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Node;
8
9 =head1 DESCRIPTION
10
11 =cut
12
13 package AnyEvent::MP::Node;
14
15 use common::sense;
16
17 use AE ();
18 use AnyEvent::Socket ();
19
20 use AnyEvent::MP::Transport ();
21
22 use base Exporter::;
23
24 our $VERSION = '0.0';
25
26 sub new {
27 my ($class, $noderef) = @_;
28
29 bless { noderef => $noderef }, $class
30 }
31
32 package AnyEvent::MP::Node::Direct;
33
34 use base "AnyEvent::MP::Node";
35
36 sub send {
37 my ($self, $msg) = @_;
38
39 if ($self->{transport}) {
40 $self->{transport}->send ($msg);
41 } elsif ($self->{queue}) {
42 push @{ $self->{queue} }, $msg;
43 } else {
44 $self->{queue} = [$msg];
45 $self->connect;
46 }
47 }
48
49 sub set_transport {
50 my ($self, $transport) = @_;
51
52 delete $self->{trial};
53 delete $self->{next_connect};
54
55 if (
56 exists $self->{remote_uniq}
57 && $self->{remote_uniq} ne $transport->{remote_uniq}
58 ) {
59 # uniq changed, drop queue
60 delete $self->{queue};
61 #TODO: "DOWN"
62 }
63
64 $self->{remote_uniq} = $transport->{remote_uniq};
65 $self->{transport} = $transport;
66
67 $transport->send ($_)
68 for @{ delete $self->{queue} || [] };
69 }
70
71 sub clr_transport {
72 my ($self) = @_;
73
74 delete $self->{transport};
75 warn "clr_transport\n";
76 }
77
78 sub connect {
79 my ($self) = @_;
80
81 Scalar::Util::weaken $self;
82
83 unless (exists $self->{n_noderef}) {
84 (AnyEvent::MP::normalise_noderef ($self->{noderef}))->cb (sub {
85 $self or return;
86 my $noderef = shift->recv;
87
88 $self->{n_noderef} = $noderef;
89
90 $AnyEvent::MP::NODE{$_} = $self
91 for split /,/, $noderef;
92
93 $self->connect;
94 });
95 return;
96 }
97
98 $self->{retry} ||= [split /,/, $self->{n_noderef}];
99
100 my $endpoint = shift @{ $self->{retry} };
101
102 if (defined $endpoint) {
103 $self->{trial}{$endpoint} ||= do {
104 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
105 or return;
106
107 my ($w, $g);
108
109 $w = AE::timer $AnyEvent::MP::CONNECT_TIMEOUT, 0, sub {
110 delete $self->{trial}{$endpoint};
111 };
112 $g = AnyEvent::MP::Transport::mp_connect
113 $host, $port,
114 sub {
115 delete $self->{trial}{$endpoint}
116 unless @_;
117 $g = shift;
118 };
119 ;
120
121 [$w, \$g]
122 };
123 } else {
124 delete $self->{retry};
125 }
126
127 $self->{next_connect} = AE::timer $AnyEvent::MP::CONNECT_INTERVAL, 0, sub {
128 $self->connect;
129 };
130 }
131
132 package AnyEvent::MP::Node::Self;
133
134 use base "AnyEvent::MP::Node";
135
136 sub set_transport {
137 die "FATAL error, set_transport was called";
138 }
139
140 sub send {
141 die "self-send not implemented yet\n";#d#
142 }
143
144 =head1 SEE ALSO
145
146 L<AnyEvent>.
147
148 =head1 AUTHOR
149
150 Marc Lehmann <schmorp@schmorp.de>
151 http://home.schmorp.de/
152
153 =cut
154
155 1
156