ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Channel.pm
Revision: 1.80
Committed: Sat Dec 13 22:08:13 2008 UTC (15 years, 5 months ago) by root
Branch: MAIN
CVS Tags: rel-6_13
Changes since 1.79: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Channel - message queues
4    
5     =head1 SYNOPSIS
6    
7     use Coro::Channel;
8    
9     $q1 = new Coro::Channel <maxsize>;
10    
11 root 1.46 $q1->put ("xxx");
12 root 1.1 print $q1->get;
13    
14     die unless $q1->size;
15    
16     =head1 DESCRIPTION
17    
18 root 1.75 A Coro::Channel is the equivalent of a unix pipe (and similar to amiga
19     message ports): you can put things into it on one end and read things out
20     of it from the other end. If the capacity of the Channel is maxed out
21     writers will block. Both ends of a Channel can be read/written from by as
22     many coroutines as you want concurrently.
23 root 1.8
24 root 1.1 =over 4
25    
26     =cut
27    
28     package Coro::Channel;
29    
30 root 1.60 use strict qw(vars subs);
31 root 1.45 no warnings;
32    
33 root 1.3 use Coro ();
34 root 1.71 use Coro::Semaphore ();
35 root 1.1
36 root 1.80 our $VERSION = 5.13;
37 root 1.1
38 root 1.70 sub DATA (){ 0 }
39 root 1.71 sub SGET (){ 1 }
40     sub SPUT (){ 2 }
41 root 1.77 sub CEOS (){ 3 }
42 root 1.70
43 root 1.1 =item $q = new Coro:Channel $maxsize
44    
45 root 1.75 Create a new channel with the given maximum size (practically unlimited
46     if C<maxsize> is omitted). Giving a size of one gives you a traditional
47     channel, i.e. a queue that can store only a single element (which means
48     there will be no buffering, and C<put> will wait until there is a
49     corresponding C<get> call). To buffer one element you have to specify
50     C<2>, and so on.
51 root 1.1
52     =cut
53    
54     sub new {
55 root 1.71 # we cheat and set infinity == 10**9
56     bless [
57     [],
58 root 1.73 (Coro::Semaphore::_alloc 0),
59 root 1.77 (Coro::Semaphore::_alloc +($_[1] || 2_000_000_000) - 1),
60 root 1.71 ]
61 root 1.1 }
62    
63 root 1.44 =item $q->put ($scalar)
64 root 1.1
65     Put the given scalar into the queue.
66    
67     =cut
68    
69     sub put {
70 root 1.70 push @{$_[0][DATA]}, $_[1];
71 root 1.71 Coro::Semaphore::up $_[0][SGET];
72     Coro::Semaphore::down $_[0][SPUT];
73 root 1.1 }
74    
75     =item $q->get
76    
77     Return the next element from the queue, waiting if necessary.
78    
79     =cut
80    
81     sub get {
82 root 1.71 Coro::Semaphore::down $_[0][SGET];
83     Coro::Semaphore::up $_[0][SPUT];
84 root 1.70 shift @{$_[0][DATA]}
85 root 1.47 }
86    
87 root 1.77 =item $q->shutdown
88    
89     Shuts down the Channel by pushing a virtual end marker onto it: This
90     changes the behaviour of the Channel when it becomes or is empty to return
91     C<undef>, almost as if infinitely many C<undef> elements have been put
92     into the queue.
93    
94     Specifically, this function wakes up any pending C<get> calls and lets
95     them return C<undef>, the same on future C<get> calls. C<size> will return
96     the real number of stored elements, though.
97    
98     Another way to describe the behaviour is that C<get> calls will not block
99     when the queue becomes empty but immediately return C<undef>. This means
100     that calls to C<put> will work normally and the data will be returned on
101     subsequent C<get> calls.
102    
103     This method is useful to signal the end of data to any consumers, quite
104     similar to an end of stream on e.g. a tcp socket: You have one or more
105     producers that C<put> data into the Channel and one or more consumers who
106     C<get> them. When all producers have finished producing data, a call to
107     C<shutdown> signals this fact to any consumers.
108    
109     =cut
110    
111     sub shutdown {
112 root 1.79 Coro::Semaphore::adjust $_[0][SGET], 2_000_000_000;
113 root 1.77 }
114    
115 root 1.1 =item $q->size
116    
117     Return the number of elements waiting to be consumed. Please note that:
118    
119     if ($q->size) {
120     my $data = $q->get;
121 root 1.77 ...
122 root 1.1 }
123    
124 root 1.75 is I<not> a race condition but instead works just fine.
125 root 1.1
126     =cut
127    
128     sub size {
129 root 1.70 scalar @{$_[0][DATA]}
130 root 1.1 }
131    
132     1;
133    
134     =back
135    
136     =head1 AUTHOR
137    
138 root 1.35 Marc Lehmann <schmorp@schmorp.de>
139 root 1.33 http://home.schmorp.de/
140 root 1.1
141     =cut
142