ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Channel.pm
Revision: 1.47
Committed: Wed Feb 13 15:40:33 2008 UTC (16 years, 3 months ago) by root
Branch: MAIN
CVS Tags: rel-4_4
Changes since 1.46: +21 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 Coro::Channel - message queues
4
5 =head1 SYNOPSIS
6
7 use Coro::Channel;
8
9 $q1 = new Coro::Channel <maxsize>;
10
11 $q1->put ("xxx");
12 print $q1->get;
13
14 die unless $q1->size;
15
16 =head1 DESCRIPTION
17
18 A Coro::Channel is the equivalent of a pipe: you can put things into it on
19 one end end read things out of it from the other hand. If the capacity of
20 the Channel is maxed out writers will block. Both ends of a Channel can be
21 read/written from as many coroutines as you want.
22
23 =over 4
24
25 =cut
26
27 package Coro::Channel;
28
29 no warnings;
30
31 use Coro ();
32
33 $VERSION = 1.9;
34
35 =item $q = new Coro:Channel $maxsize
36
37 Create a new channel with the given maximum size (unlimited if C<maxsize>
38 is omitted). Giving a size of one gives you a traditional channel, i.e. a
39 queue that can store only a single element.
40
41 =cut
42
43 sub new {
44 # [\@contents, [$getwait], $maxsize, [$putwait]];
45 bless [[], [], $_[1] || (1e30),[]], $_[0];
46 }
47
48 =item $q->put ($scalar)
49
50 Put the given scalar into the queue.
51
52 =cut
53
54 sub put {
55 push @{$_[0][0]}, $_[1];
56
57 (pop @{$_[0][1]})->ready if @{$_[0][1]};
58
59 while (@{$_[0][0]} >= $_[0][2]) {
60 push @{$_[0][3]}, $Coro::current;
61 &Coro::schedule;
62 }
63 }
64
65 =item $q->get
66
67 Return the next element from the queue, waiting if necessary.
68
69 =item $q->timed_get ($timeout)
70
71 Return the next element from the queue, waiting up to C<$timeout> seconds
72 if necessary. If no element arrives within the given time an empty list
73 will be returned.
74
75 =cut
76
77 sub get {
78 (pop @{$_[0][3]})->ready if @{$_[0][3]};
79
80 while (!@{$_[0][0]}) {
81 push @{$_[0][1]}, $Coro::current;
82 &Coro::schedule;
83 }
84
85 shift @{$_[0][0]}
86 }
87
88 sub timed_get {
89 require Coro::Timer;
90 my $timeout = Coro::Timer::timeout ($_[0]);
91
92 (pop @{$_[0][3]})->ready if @{$_[0][3]};
93
94 while (!@{$_[0][0]}) {
95 push @{$_[0][1]}, $Coro::current;
96 &Coro::schedule;
97 return if $timeout;
98 }
99
100 shift @{$_[0][0]}
101 }
102
103 =item $q->size
104
105 Return the number of elements waiting to be consumed. Please note that:
106
107 if ($q->size) {
108 my $data = $q->get;
109 }
110
111 is NOT a race condition but works fine.
112
113 =cut
114
115 sub size {
116 scalar @{$_[0][0]}
117 }
118
119 1;
120
121 =back
122
123 =head1 AUTHOR
124
125 Marc Lehmann <schmorp@schmorp.de>
126 http://home.schmorp.de/
127
128 =cut
129