ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Channel.pm
Revision: 1.112
Committed: Thu Aug 4 19:37:58 2011 UTC (12 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-6_05
Changes since 1.111: +1 -1 lines
Log Message:
6.05

File Contents

# Content
1 =head1 NAME
2
3 Coro::Channel - message queues
4
5 =head1 SYNOPSIS
6
7 use Coro;
8
9 $q1 = new Coro::Channel <maxsize>;
10
11 $q1->put ("xxx");
12 print $q1->get;
13
14 die unless $q1->size;
15
16 =head1 DESCRIPTION
17
18 A Coro::Channel is the equivalent of a unix pipe (and similar to amiga
19 message ports): you can put things into it on one end and read things out
20 of it from the other end. If the capacity of the Channel is maxed out
21 writers will block. Both ends of a Channel can be read/written from by as
22 many coroutines as you want concurrently.
23
24 You don't have to load C<Coro::Channel> manually, it will be loaded
25 automatically when you C<use Coro> and call the C<new> constructor.
26
27 =over 4
28
29 =cut
30
31 package Coro::Channel;
32
33 use common::sense;
34
35 use Coro ();
36 use Coro::Semaphore ();
37
38 our $VERSION = 6.05;
39
40 sub DATA (){ 0 }
41 sub SGET (){ 1 }
42 sub SPUT (){ 2 }
43
44 =item $q = new Coro:Channel $maxsize
45
46 Create a new channel with the given maximum size (practically unlimited
47 if C<maxsize> is omitted). Giving a size of one gives you a traditional
48 channel, i.e. a queue that can store only a single element (which means
49 there will be no buffering, and C<put> will wait until there is a
50 corresponding C<get> call). To buffer one element you have to specify
51 C<2>, and so on.
52
53 =cut
54
55 sub new {
56 # we cheat and set infinity == 2*10**9
57 bless [
58 [], # initially empty
59 (Coro::Semaphore::_alloc 0), # counts data
60 (Coro::Semaphore::_alloc +($_[1] || 2_000_000_000) - 1), # counts remaining space
61 ]
62 }
63
64 =item $q->put ($scalar)
65
66 Put the given scalar into the queue.
67
68 =cut
69
70 sub put {
71 push @{$_[0][DATA]}, $_[1];
72 Coro::Semaphore::up $_[0][SGET];
73 Coro::Semaphore::down $_[0][SPUT];
74 }
75
76 =item $q->get
77
78 Return the next element from the queue, waiting if necessary.
79
80 =cut
81
82 sub get {
83 Coro::Semaphore::down $_[0][SGET];
84 Coro::Semaphore::up $_[0][SPUT];
85 shift @{$_[0][DATA]}
86 }
87
88 =item $q->shutdown
89
90 Shuts down the Channel by pushing a virtual end marker onto it: This
91 changes the behaviour of the Channel when it becomes or is empty to return
92 C<undef>, almost as if infinitely many C<undef> elements have been put
93 into the queue.
94
95 Specifically, this function wakes up any pending C<get> calls and lets
96 them return C<undef>, the same on future C<get> calls. C<size> will return
97 the real number of stored elements, though.
98
99 Another way to describe the behaviour is that C<get> calls will not block
100 when the queue becomes empty but immediately return C<undef>. This means
101 that calls to C<put> will work normally and the data will be returned on
102 subsequent C<get> calls.
103
104 This method is useful to signal the end of data to any consumers, quite
105 similar to an end of stream on e.g. a tcp socket: You have one or more
106 producers that C<put> data into the Channel and one or more consumers who
107 C<get> them. When all producers have finished producing data, a call to
108 C<shutdown> signals this fact to any consumers.
109
110 =cut
111
112 sub shutdown {
113 Coro::Semaphore::adjust $_[0][SGET], 1_000_000_000;
114 }
115
116 =item $q->size
117
118 Return the number of elements waiting to be consumed. Please note that:
119
120 if ($q->size) {
121 my $data = $q->get;
122 ...
123 }
124
125 is I<not> a race condition but instead works just fine. Note that the
126 number of elements that wait can be larger than C<$maxsize>, as it
127 includes any coroutines waiting to put data into the channel (but not any
128 shutdown condition).
129
130 This means that the number returned is I<precisely> the number of calls
131 to C<get> that will succeed instantly and return some data. Calling
132 C<shutdown> has no effect on this number.
133
134 =cut
135
136 sub size {
137 scalar @{$_[0][DATA]}
138 }
139
140 # this is not undocumented by accident - if it breaks, you
141 # get to keep the pieces
142 sub adjust {
143 Coro::Semaphore::adjust $_[0][SPUT], $_[1];
144 }
145
146 1;
147
148 =back
149
150 =head1 AUTHOR
151
152 Marc Lehmann <schmorp@schmorp.de>
153 http://home.schmorp.de/
154
155 =cut
156