ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Channel.pm
Revision: 1.94
Committed: Tue Dec 1 00:02:08 2009 UTC (14 years, 6 months ago) by root
Branch: MAIN
Changes since 1.93: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Channel - message queues
4    
5     =head1 SYNOPSIS
6    
7     use Coro::Channel;
8    
9     $q1 = new Coro::Channel <maxsize>;
10    
11 root 1.46 $q1->put ("xxx");
12 root 1.1 print $q1->get;
13    
14     die unless $q1->size;
15    
16     =head1 DESCRIPTION
17    
18 root 1.75 A Coro::Channel is the equivalent of a unix pipe (and similar to amiga
19     message ports): you can put things into it on one end and read things out
20     of it from the other end. If the capacity of the Channel is maxed out
21     writers will block. Both ends of a Channel can be read/written from by as
22     many coroutines as you want concurrently.
23 root 1.8
24 root 1.1 =over 4
25    
26     =cut
27    
28     package Coro::Channel;
29    
30 root 1.92 use common::sense;
31 root 1.45
32 root 1.3 use Coro ();
33 root 1.71 use Coro::Semaphore ();
34 root 1.1
35 root 1.93 our $VERSION = 5.2;
36 root 1.1
37 root 1.70 sub DATA (){ 0 }
38 root 1.71 sub SGET (){ 1 }
39     sub SPUT (){ 2 }
40 root 1.77 sub CEOS (){ 3 }
41 root 1.70
42 root 1.1 =item $q = new Coro:Channel $maxsize
43    
44 root 1.75 Create a new channel with the given maximum size (practically unlimited
45     if C<maxsize> is omitted). Giving a size of one gives you a traditional
46     channel, i.e. a queue that can store only a single element (which means
47     there will be no buffering, and C<put> will wait until there is a
48     corresponding C<get> call). To buffer one element you have to specify
49     C<2>, and so on.
50 root 1.1
51     =cut
52    
53     sub new {
54 root 1.84 # we cheat and set infinity == 2*10**9
55 root 1.71 bless [
56     [],
57 root 1.73 (Coro::Semaphore::_alloc 0),
58 root 1.77 (Coro::Semaphore::_alloc +($_[1] || 2_000_000_000) - 1),
59 root 1.71 ]
60 root 1.1 }
61    
62 root 1.44 =item $q->put ($scalar)
63 root 1.1
64     Put the given scalar into the queue.
65    
66     =cut
67    
68     sub put {
69 root 1.70 push @{$_[0][DATA]}, $_[1];
70 root 1.71 Coro::Semaphore::up $_[0][SGET];
71     Coro::Semaphore::down $_[0][SPUT];
72 root 1.1 }
73    
74     =item $q->get
75    
76     Return the next element from the queue, waiting if necessary.
77    
78     =cut
79    
80     sub get {
81 root 1.71 Coro::Semaphore::down $_[0][SGET];
82     Coro::Semaphore::up $_[0][SPUT];
83 root 1.70 shift @{$_[0][DATA]}
84 root 1.47 }
85    
86 root 1.77 =item $q->shutdown
87    
88     Shuts down the Channel by pushing a virtual end marker onto it: This
89     changes the behaviour of the Channel when it becomes or is empty to return
90     C<undef>, almost as if infinitely many C<undef> elements have been put
91     into the queue.
92    
93     Specifically, this function wakes up any pending C<get> calls and lets
94     them return C<undef>, the same on future C<get> calls. C<size> will return
95     the real number of stored elements, though.
96    
97     Another way to describe the behaviour is that C<get> calls will not block
98     when the queue becomes empty but immediately return C<undef>. This means
99     that calls to C<put> will work normally and the data will be returned on
100     subsequent C<get> calls.
101    
102     This method is useful to signal the end of data to any consumers, quite
103     similar to an end of stream on e.g. a tcp socket: You have one or more
104     producers that C<put> data into the Channel and one or more consumers who
105     C<get> them. When all producers have finished producing data, a call to
106     C<shutdown> signals this fact to any consumers.
107    
108     =cut
109    
110     sub shutdown {
111 root 1.79 Coro::Semaphore::adjust $_[0][SGET], 2_000_000_000;
112 root 1.77 }
113    
114 root 1.1 =item $q->size
115    
116     Return the number of elements waiting to be consumed. Please note that:
117    
118     if ($q->size) {
119     my $data = $q->get;
120 root 1.77 ...
121 root 1.1 }
122    
123 root 1.83 is I<not> a race condition but instead works just fine. Note that the
124     number of elements that wait can be larger than C<$maxsize>, as it
125     includes any coroutines waiting to put data into the channel (but not any
126     shutdown condition).
127    
128     This means that the number returned is I<precisely> the number of calls to
129 root 1.94 C<get> that will succeed instantly and return some data.
130 root 1.1
131     =cut
132    
133     sub size {
134 root 1.70 scalar @{$_[0][DATA]}
135 root 1.1 }
136    
137     1;
138    
139     =back
140    
141     =head1 AUTHOR
142    
143 root 1.35 Marc Lehmann <schmorp@schmorp.de>
144 root 1.33 http://home.schmorp.de/
145 root 1.1
146     =cut
147