ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Channel.pm
Revision: 1.151
Committed: Mon Mar 16 11:12:52 2020 UTC (4 years, 2 months ago) by root
Branch: MAIN
CVS Tags: rel-6_57, HEAD
Changes since 1.150: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 Coro::Channel - message queues
4
5 =head1 SYNOPSIS
6
7 use Coro;
8
9 $q1 = new Coro::Channel <maxsize>;
10
11 $q1->put ("xxx");
12 print $q1->get;
13
14 die unless $q1->size;
15
16 =head1 DESCRIPTION
17
18 A Coro::Channel is the equivalent of a unix pipe (and similar to amiga
19 message ports): you can put things into it on one end and read things out
20 of it from the other end. If the capacity of the Channel is maxed out
21 writers will block. Both ends of a Channel can be read/written from by as
22 many coroutines as you want concurrently.
23
24 You don't have to load C<Coro::Channel> manually, it will be loaded
25 automatically when you C<use Coro> and call the C<new> constructor.
26
27 =over 4
28
29 =cut
30
31 package Coro::Channel;
32
33 use common::sense;
34
35 use Coro ();
36 use Coro::Semaphore ();
37
38 our $VERSION = 6.57;
39
40 sub DATA (){ 0 }
41 sub SGET (){ 1 }
42 sub SPUT (){ 2 }
43
44 =item $q = new Coro:Channel $maxsize
45
46 Create a new channel with the given maximum size (practically unlimited
47 if C<maxsize> is omitted or zero). Giving a size of one gives you a
48 traditional channel, i.e. a queue that can store only a single element
49 (which means there will be no buffering, and C<put> will wait until there
50 is a corresponding C<get> call). To buffer one element you have to specify
51 C<2>, and so on.
52
53 =cut
54
55 sub new {
56 # we cheat and set infinity == 2*10**9
57 bless [
58 [], # initially empty
59 (Coro::Semaphore::_alloc 0), # counts data
60 (Coro::Semaphore::_alloc +($_[1] || 2_000_000_000) - 1), # counts remaining space
61 ]
62 }
63
64 =item $q->put ($scalar)
65
66 Put the given scalar into the queue.
67
68 =cut
69
70 sub put {
71 push @{$_[0][DATA]}, $_[1];
72 Coro::Semaphore::up $_[0][SGET];
73 Coro::Semaphore::down $_[0][SPUT];
74 }
75
76 =item $q->get
77
78 Return the next element from the queue, waiting if necessary.
79
80 =cut
81
82 sub get {
83 Coro::Semaphore::down $_[0][SGET];
84 Coro::Semaphore::up $_[0][SPUT];
85 shift @{$_[0][DATA]}
86 }
87
88 =item $q->shutdown
89
90 Shuts down the Channel by pushing a virtual end marker onto it: This
91 changes the behaviour of the Channel when it becomes or is empty to return
92 C<undef>, almost as if infinitely many C<undef> elements had been put
93 into the queue.
94
95 Specifically, this function wakes up any pending C<get> calls and lets
96 them return C<undef>, the same on future C<get> calls. C<size> will return
97 the real number of stored elements, though.
98
99 Another way to describe the behaviour is that C<get> calls will not block
100 when the queue becomes empty but immediately return C<undef>. This means
101 that calls to C<put> will work normally and the data will be returned on
102 subsequent C<get> calls.
103
104 This method is useful to signal the end of data to any consumers, quite
105 similar to an end of stream on e.g. a tcp socket: You have one or more
106 producers that C<put> data into the Channel and one or more consumers who
107 C<get> them. When all producers have finished producing data, a call to
108 C<shutdown> signals this fact to any consumers.
109
110 A common implementation uses one or more threads that C<get> from
111 a channel until it returns C<undef>. To clean everything up, first
112 C<shutdown> the channel, then C<join> the threads.
113
114 =cut
115
116 sub shutdown {
117 Coro::Semaphore::adjust $_[0][SGET], 1_000_000_000;
118 }
119
120 =item $q->size
121
122 Return the number of elements waiting to be consumed. Please note that:
123
124 if ($q->size) {
125 my $data = $q->get;
126 ...
127 }
128
129 is I<not> a race condition but instead works just fine. Note that the
130 number of elements that wait can be larger than C<$maxsize>, as it
131 includes any coroutines waiting to put data into the channel (but not any
132 shutdown condition).
133
134 This means that the number returned is I<precisely> the number of calls
135 to C<get> that will succeed instantly and return some data. Calling
136 C<shutdown> has no effect on this number.
137
138 =cut
139
140 sub size {
141 scalar @{$_[0][DATA]}
142 }
143
144 # this is not undocumented by accident - if it breaks, you
145 # get to keep the pieces
146 sub adjust {
147 Coro::Semaphore::adjust $_[0][SPUT], $_[1];
148 }
149
150 1;
151
152 =back
153
154 =head1 AUTHOR/SUPPORT/CONTACT
155
156 Marc A. Lehmann <schmorp@schmorp.de>
157 http://software.schmorp.de/pkg/Coro.html
158
159 =cut
160