ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Channel.pm
Revision: 1.136
Committed: Sat Jul 11 01:59:05 2015 UTC (8 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-6_47
Changes since 1.135: +1 -1 lines
Log Message:
6.47

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Channel - message queues
4    
5     =head1 SYNOPSIS
6    
7 root 1.101 use Coro;
8 root 1.1
9     $q1 = new Coro::Channel <maxsize>;
10    
11 root 1.46 $q1->put ("xxx");
12 root 1.1 print $q1->get;
13    
14     die unless $q1->size;
15    
16     =head1 DESCRIPTION
17    
18 root 1.75 A Coro::Channel is the equivalent of a unix pipe (and similar to amiga
19     message ports): you can put things into it on one end and read things out
20     of it from the other end. If the capacity of the Channel is maxed out
21     writers will block. Both ends of a Channel can be read/written from by as
22     many coroutines as you want concurrently.
23 root 1.8
24 root 1.101 You don't have to load C<Coro::Channel> manually, it will be loaded
25     automatically when you C<use Coro> and call the C<new> constructor.
26    
27 root 1.1 =over 4
28    
29     =cut
30    
31     package Coro::Channel;
32    
33 root 1.92 use common::sense;
34 root 1.45
35 root 1.3 use Coro ();
36 root 1.71 use Coro::Semaphore ();
37 root 1.1
38 root 1.136 our $VERSION = 6.47;
39 root 1.1
40 root 1.70 sub DATA (){ 0 }
41 root 1.71 sub SGET (){ 1 }
42     sub SPUT (){ 2 }
43 root 1.70
44 root 1.1 =item $q = new Coro:Channel $maxsize
45    
46 root 1.75 Create a new channel with the given maximum size (practically unlimited
47 root 1.128 if C<maxsize> is omitted or zero). Giving a size of one gives you a
48     traditional channel, i.e. a queue that can store only a single element
49     (which means there will be no buffering, and C<put> will wait until there
50     is a corresponding C<get> call). To buffer one element you have to specify
51 root 1.75 C<2>, and so on.
52 root 1.1
53     =cut
54    
55     sub new {
56 root 1.84 # we cheat and set infinity == 2*10**9
57 root 1.71 bless [
58 root 1.96 [], # initially empty
59     (Coro::Semaphore::_alloc 0), # counts data
60     (Coro::Semaphore::_alloc +($_[1] || 2_000_000_000) - 1), # counts remaining space
61 root 1.71 ]
62 root 1.1 }
63    
64 root 1.44 =item $q->put ($scalar)
65 root 1.1
66     Put the given scalar into the queue.
67    
68     =cut
69    
70     sub put {
71 root 1.70 push @{$_[0][DATA]}, $_[1];
72 root 1.71 Coro::Semaphore::up $_[0][SGET];
73     Coro::Semaphore::down $_[0][SPUT];
74 root 1.1 }
75    
76     =item $q->get
77    
78     Return the next element from the queue, waiting if necessary.
79    
80     =cut
81    
82     sub get {
83 root 1.71 Coro::Semaphore::down $_[0][SGET];
84     Coro::Semaphore::up $_[0][SPUT];
85 root 1.70 shift @{$_[0][DATA]}
86 root 1.47 }
87    
88 root 1.77 =item $q->shutdown
89    
90     Shuts down the Channel by pushing a virtual end marker onto it: This
91     changes the behaviour of the Channel when it becomes or is empty to return
92 root 1.118 C<undef>, almost as if infinitely many C<undef> elements had been put
93 root 1.77 into the queue.
94    
95     Specifically, this function wakes up any pending C<get> calls and lets
96     them return C<undef>, the same on future C<get> calls. C<size> will return
97     the real number of stored elements, though.
98    
99     Another way to describe the behaviour is that C<get> calls will not block
100     when the queue becomes empty but immediately return C<undef>. This means
101     that calls to C<put> will work normally and the data will be returned on
102     subsequent C<get> calls.
103    
104     This method is useful to signal the end of data to any consumers, quite
105     similar to an end of stream on e.g. a tcp socket: You have one or more
106     producers that C<put> data into the Channel and one or more consumers who
107     C<get> them. When all producers have finished producing data, a call to
108     C<shutdown> signals this fact to any consumers.
109    
110 root 1.118 A common implementation uses one or more threads that C<get> from
111     a channel until it returns C<undef>. To clean everything up, first
112     C<shutdown> the channel, then C<join> the threads.
113    
114 root 1.77 =cut
115    
116     sub shutdown {
117 root 1.103 Coro::Semaphore::adjust $_[0][SGET], 1_000_000_000;
118 root 1.77 }
119    
120 root 1.1 =item $q->size
121    
122     Return the number of elements waiting to be consumed. Please note that:
123    
124     if ($q->size) {
125     my $data = $q->get;
126 root 1.77 ...
127 root 1.1 }
128    
129 root 1.83 is I<not> a race condition but instead works just fine. Note that the
130     number of elements that wait can be larger than C<$maxsize>, as it
131     includes any coroutines waiting to put data into the channel (but not any
132     shutdown condition).
133    
134 root 1.96 This means that the number returned is I<precisely> the number of calls
135     to C<get> that will succeed instantly and return some data. Calling
136     C<shutdown> has no effect on this number.
137 root 1.1
138     =cut
139    
140     sub size {
141 root 1.70 scalar @{$_[0][DATA]}
142 root 1.1 }
143    
144 root 1.96 # this is not undocumented by accident - if it breaks, you
145     # get to keep the pieces
146     sub adjust {
147     Coro::Semaphore::adjust $_[0][SPUT], $_[1];
148     }
149    
150 root 1.1 1;
151    
152     =back
153    
154 root 1.132 =head1 AUTHOR/SUPPORT/CONTACT
155 root 1.1
156 root 1.132 Marc A. Lehmann <schmorp@schmorp.de>
157     http://software.schmorp.de/pkg/Coro.html
158 root 1.1
159     =cut
160