ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Channel.pm
Revision: 1.99
Committed: Sat Oct 23 09:28:49 2010 UTC (13 years, 7 months ago) by root
Branch: MAIN
CVS Tags: rel-5_24
Changes since 1.98: +1 -1 lines
Log Message:
5_24

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Channel - message queues
4    
5     =head1 SYNOPSIS
6    
7     use Coro::Channel;
8    
9     $q1 = new Coro::Channel <maxsize>;
10    
11 root 1.46 $q1->put ("xxx");
12 root 1.1 print $q1->get;
13    
14     die unless $q1->size;
15    
16     =head1 DESCRIPTION
17    
18 root 1.75 A Coro::Channel is the equivalent of a unix pipe (and similar to amiga
19     message ports): you can put things into it on one end and read things out
20     of it from the other end. If the capacity of the Channel is maxed out
21     writers will block. Both ends of a Channel can be read/written from by as
22     many coroutines as you want concurrently.
23 root 1.8
24 root 1.1 =over 4
25    
26     =cut
27    
28     package Coro::Channel;
29    
30 root 1.92 use common::sense;
31 root 1.45
32 root 1.3 use Coro ();
33 root 1.71 use Coro::Semaphore ();
34 root 1.1
35 root 1.99 our $VERSION = 5.24;
36 root 1.1
37 root 1.70 sub DATA (){ 0 }
38 root 1.71 sub SGET (){ 1 }
39     sub SPUT (){ 2 }
40 root 1.70
41 root 1.1 =item $q = new Coro:Channel $maxsize
42    
43 root 1.75 Create a new channel with the given maximum size (practically unlimited
44     if C<maxsize> is omitted). Giving a size of one gives you a traditional
45     channel, i.e. a queue that can store only a single element (which means
46     there will be no buffering, and C<put> will wait until there is a
47     corresponding C<get> call). To buffer one element you have to specify
48     C<2>, and so on.
49 root 1.1
50     =cut
51    
52     sub new {
53 root 1.84 # we cheat and set infinity == 2*10**9
54 root 1.71 bless [
55 root 1.96 [], # initially empty
56     (Coro::Semaphore::_alloc 0), # counts data
57     (Coro::Semaphore::_alloc +($_[1] || 2_000_000_000) - 1), # counts remaining space
58 root 1.71 ]
59 root 1.1 }
60    
61 root 1.44 =item $q->put ($scalar)
62 root 1.1
63     Put the given scalar into the queue.
64    
65     =cut
66    
67     sub put {
68 root 1.70 push @{$_[0][DATA]}, $_[1];
69 root 1.71 Coro::Semaphore::up $_[0][SGET];
70     Coro::Semaphore::down $_[0][SPUT];
71 root 1.1 }
72    
73     =item $q->get
74    
75     Return the next element from the queue, waiting if necessary.
76    
77     =cut
78    
79     sub get {
80 root 1.71 Coro::Semaphore::down $_[0][SGET];
81     Coro::Semaphore::up $_[0][SPUT];
82 root 1.70 shift @{$_[0][DATA]}
83 root 1.47 }
84    
85 root 1.77 =item $q->shutdown
86    
87     Shuts down the Channel by pushing a virtual end marker onto it: This
88     changes the behaviour of the Channel when it becomes or is empty to return
89     C<undef>, almost as if infinitely many C<undef> elements have been put
90     into the queue.
91    
92     Specifically, this function wakes up any pending C<get> calls and lets
93     them return C<undef>, the same on future C<get> calls. C<size> will return
94     the real number of stored elements, though.
95    
96     Another way to describe the behaviour is that C<get> calls will not block
97     when the queue becomes empty but immediately return C<undef>. This means
98     that calls to C<put> will work normally and the data will be returned on
99     subsequent C<get> calls.
100    
101     This method is useful to signal the end of data to any consumers, quite
102     similar to an end of stream on e.g. a tcp socket: You have one or more
103     producers that C<put> data into the Channel and one or more consumers who
104     C<get> them. When all producers have finished producing data, a call to
105     C<shutdown> signals this fact to any consumers.
106    
107     =cut
108    
109     sub shutdown {
110 root 1.79 Coro::Semaphore::adjust $_[0][SGET], 2_000_000_000;
111 root 1.77 }
112    
113 root 1.1 =item $q->size
114    
115     Return the number of elements waiting to be consumed. Please note that:
116    
117     if ($q->size) {
118     my $data = $q->get;
119 root 1.77 ...
120 root 1.1 }
121    
122 root 1.83 is I<not> a race condition but instead works just fine. Note that the
123     number of elements that wait can be larger than C<$maxsize>, as it
124     includes any coroutines waiting to put data into the channel (but not any
125     shutdown condition).
126    
127 root 1.96 This means that the number returned is I<precisely> the number of calls
128     to C<get> that will succeed instantly and return some data. Calling
129     C<shutdown> has no effect on this number.
130 root 1.1
131     =cut
132    
133     sub size {
134 root 1.70 scalar @{$_[0][DATA]}
135 root 1.1 }
136    
137 root 1.96 # this is not undocumented by accident - if it breaks, you
138     # get to keep the pieces
139     sub adjust {
140     Coro::Semaphore::adjust $_[0][SPUT], $_[1];
141     }
142    
143 root 1.1 1;
144    
145     =back
146    
147     =head1 AUTHOR
148    
149 root 1.35 Marc Lehmann <schmorp@schmorp.de>
150 root 1.33 http://home.schmorp.de/
151 root 1.1
152     =cut
153