ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Channel.pm
Revision: 1.108
Committed: Sun Jul 3 10:51:40 2011 UTC (12 years, 11 months ago) by root
Branch: MAIN
CVS Tags: rel-6_01
Changes since 1.107: +1 -1 lines
Log Message:
6.01

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Channel - message queues
4    
5     =head1 SYNOPSIS
6    
7 root 1.101 use Coro;
8 root 1.1
9     $q1 = new Coro::Channel <maxsize>;
10    
11 root 1.46 $q1->put ("xxx");
12 root 1.1 print $q1->get;
13    
14     die unless $q1->size;
15    
16     =head1 DESCRIPTION
17    
18 root 1.75 A Coro::Channel is the equivalent of a unix pipe (and similar to amiga
19     message ports): you can put things into it on one end and read things out
20     of it from the other end. If the capacity of the Channel is maxed out
21     writers will block. Both ends of a Channel can be read/written from by as
22     many coroutines as you want concurrently.
23 root 1.8
24 root 1.101 You don't have to load C<Coro::Channel> manually, it will be loaded
25     automatically when you C<use Coro> and call the C<new> constructor.
26    
27 root 1.1 =over 4
28    
29     =cut
30    
31     package Coro::Channel;
32    
33 root 1.92 use common::sense;
34 root 1.45
35 root 1.3 use Coro ();
36 root 1.71 use Coro::Semaphore ();
37 root 1.1
38 root 1.108 our $VERSION = 6.01;
39 root 1.1
40 root 1.70 sub DATA (){ 0 }
41 root 1.71 sub SGET (){ 1 }
42     sub SPUT (){ 2 }
43 root 1.70
44 root 1.1 =item $q = new Coro:Channel $maxsize
45    
46 root 1.75 Create a new channel with the given maximum size (practically unlimited
47     if C<maxsize> is omitted). Giving a size of one gives you a traditional
48     channel, i.e. a queue that can store only a single element (which means
49     there will be no buffering, and C<put> will wait until there is a
50     corresponding C<get> call). To buffer one element you have to specify
51     C<2>, and so on.
52 root 1.1
53     =cut
54    
55     sub new {
56 root 1.84 # we cheat and set infinity == 2*10**9
57 root 1.71 bless [
58 root 1.96 [], # initially empty
59     (Coro::Semaphore::_alloc 0), # counts data
60     (Coro::Semaphore::_alloc +($_[1] || 2_000_000_000) - 1), # counts remaining space
61 root 1.71 ]
62 root 1.1 }
63    
64 root 1.44 =item $q->put ($scalar)
65 root 1.1
66     Put the given scalar into the queue.
67    
68     =cut
69    
70     sub put {
71 root 1.70 push @{$_[0][DATA]}, $_[1];
72 root 1.71 Coro::Semaphore::up $_[0][SGET];
73     Coro::Semaphore::down $_[0][SPUT];
74 root 1.1 }
75    
76     =item $q->get
77    
78     Return the next element from the queue, waiting if necessary.
79    
80     =cut
81    
82     sub get {
83 root 1.71 Coro::Semaphore::down $_[0][SGET];
84     Coro::Semaphore::up $_[0][SPUT];
85 root 1.70 shift @{$_[0][DATA]}
86 root 1.47 }
87    
88 root 1.77 =item $q->shutdown
89    
90     Shuts down the Channel by pushing a virtual end marker onto it: This
91     changes the behaviour of the Channel when it becomes or is empty to return
92     C<undef>, almost as if infinitely many C<undef> elements have been put
93     into the queue.
94    
95     Specifically, this function wakes up any pending C<get> calls and lets
96     them return C<undef>, the same on future C<get> calls. C<size> will return
97     the real number of stored elements, though.
98    
99     Another way to describe the behaviour is that C<get> calls will not block
100     when the queue becomes empty but immediately return C<undef>. This means
101     that calls to C<put> will work normally and the data will be returned on
102     subsequent C<get> calls.
103    
104     This method is useful to signal the end of data to any consumers, quite
105     similar to an end of stream on e.g. a tcp socket: You have one or more
106     producers that C<put> data into the Channel and one or more consumers who
107     C<get> them. When all producers have finished producing data, a call to
108     C<shutdown> signals this fact to any consumers.
109    
110     =cut
111    
112     sub shutdown {
113 root 1.103 Coro::Semaphore::adjust $_[0][SGET], 1_000_000_000;
114 root 1.77 }
115    
116 root 1.1 =item $q->size
117    
118     Return the number of elements waiting to be consumed. Please note that:
119    
120     if ($q->size) {
121     my $data = $q->get;
122 root 1.77 ...
123 root 1.1 }
124    
125 root 1.83 is I<not> a race condition but instead works just fine. Note that the
126     number of elements that wait can be larger than C<$maxsize>, as it
127     includes any coroutines waiting to put data into the channel (but not any
128     shutdown condition).
129    
130 root 1.96 This means that the number returned is I<precisely> the number of calls
131     to C<get> that will succeed instantly and return some data. Calling
132     C<shutdown> has no effect on this number.
133 root 1.1
134     =cut
135    
136     sub size {
137 root 1.70 scalar @{$_[0][DATA]}
138 root 1.1 }
139    
140 root 1.96 # this is not undocumented by accident - if it breaks, you
141     # get to keep the pieces
142     sub adjust {
143     Coro::Semaphore::adjust $_[0][SPUT], $_[1];
144     }
145    
146 root 1.1 1;
147    
148     =back
149    
150     =head1 AUTHOR
151    
152 root 1.35 Marc Lehmann <schmorp@schmorp.de>
153 root 1.33 http://home.schmorp.de/
154 root 1.1
155     =cut
156