ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Channel.pm
Revision: 1.87
Committed: Mon Jul 6 03:42:29 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
CVS Tags: rel-5_151
Changes since 1.86: +1 -1 lines
Log Message:
5.151

File Contents

# Content
1 =head1 NAME
2
3 Coro::Channel - message queues
4
5 =head1 SYNOPSIS
6
7 use Coro::Channel;
8
9 $q1 = new Coro::Channel <maxsize>;
10
11 $q1->put ("xxx");
12 print $q1->get;
13
14 die unless $q1->size;
15
16 =head1 DESCRIPTION
17
18 A Coro::Channel is the equivalent of a unix pipe (and similar to amiga
19 message ports): you can put things into it on one end and read things out
20 of it from the other end. If the capacity of the Channel is maxed out
21 writers will block. Both ends of a Channel can be read/written from by as
22 many coroutines as you want concurrently.
23
24 =over 4
25
26 =cut
27
28 package Coro::Channel;
29
30 use strict qw(vars subs);
31 no warnings;
32
33 use Coro ();
34 use Coro::Semaphore ();
35
36 our $VERSION = 5.151;
37
38 sub DATA (){ 0 }
39 sub SGET (){ 1 }
40 sub SPUT (){ 2 }
41 sub CEOS (){ 3 }
42
43 =item $q = new Coro:Channel $maxsize
44
45 Create a new channel with the given maximum size (practically unlimited
46 if C<maxsize> is omitted). Giving a size of one gives you a traditional
47 channel, i.e. a queue that can store only a single element (which means
48 there will be no buffering, and C<put> will wait until there is a
49 corresponding C<get> call). To buffer one element you have to specify
50 C<2>, and so on.
51
52 =cut
53
54 sub new {
55 # we cheat and set infinity == 2*10**9
56 bless [
57 [],
58 (Coro::Semaphore::_alloc 0),
59 (Coro::Semaphore::_alloc +($_[1] || 2_000_000_000) - 1),
60 ]
61 }
62
63 =item $q->put ($scalar)
64
65 Put the given scalar into the queue.
66
67 =cut
68
69 sub put {
70 push @{$_[0][DATA]}, $_[1];
71 Coro::Semaphore::up $_[0][SGET];
72 Coro::Semaphore::down $_[0][SPUT];
73 }
74
75 =item $q->get
76
77 Return the next element from the queue, waiting if necessary.
78
79 =cut
80
81 sub get {
82 Coro::Semaphore::down $_[0][SGET];
83 Coro::Semaphore::up $_[0][SPUT];
84 shift @{$_[0][DATA]}
85 }
86
87 =item $q->shutdown
88
89 Shuts down the Channel by pushing a virtual end marker onto it: This
90 changes the behaviour of the Channel when it becomes or is empty to return
91 C<undef>, almost as if infinitely many C<undef> elements have been put
92 into the queue.
93
94 Specifically, this function wakes up any pending C<get> calls and lets
95 them return C<undef>, the same on future C<get> calls. C<size> will return
96 the real number of stored elements, though.
97
98 Another way to describe the behaviour is that C<get> calls will not block
99 when the queue becomes empty but immediately return C<undef>. This means
100 that calls to C<put> will work normally and the data will be returned on
101 subsequent C<get> calls.
102
103 This method is useful to signal the end of data to any consumers, quite
104 similar to an end of stream on e.g. a tcp socket: You have one or more
105 producers that C<put> data into the Channel and one or more consumers who
106 C<get> them. When all producers have finished producing data, a call to
107 C<shutdown> signals this fact to any consumers.
108
109 =cut
110
111 sub shutdown {
112 Coro::Semaphore::adjust $_[0][SGET], 2_000_000_000;
113 }
114
115 =item $q->size
116
117 Return the number of elements waiting to be consumed. Please note that:
118
119 if ($q->size) {
120 my $data = $q->get;
121 ...
122 }
123
124 is I<not> a race condition but instead works just fine. Note that the
125 number of elements that wait can be larger than C<$maxsize>, as it
126 includes any coroutines waiting to put data into the channel (but not any
127 shutdown condition).
128
129 This means that the number returned is I<precisely> the number of calls to
130 C<get> that will succeed instantly and returning some data.
131
132 =cut
133
134 sub size {
135 scalar @{$_[0][DATA]}
136 }
137
138 1;
139
140 =back
141
142 =head1 AUTHOR
143
144 Marc Lehmann <schmorp@schmorp.de>
145 http://home.schmorp.de/
146
147 =cut
148