ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Channel.pm
Revision: 1.53
Committed: Fri May 30 21:34:52 2008 UTC (16 years ago) by root
Branch: MAIN
CVS Tags: rel-4_741
Changes since 1.52: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 Coro::Channel - message queues
4
5 =head1 SYNOPSIS
6
7 use Coro::Channel;
8
9 $q1 = new Coro::Channel <maxsize>;
10
11 $q1->put ("xxx");
12 print $q1->get;
13
14 die unless $q1->size;
15
16 =head1 DESCRIPTION
17
18 A Coro::Channel is the equivalent of a pipe: you can put things into it on
19 one end end read things out of it from the other hand. If the capacity of
20 the Channel is maxed out writers will block. Both ends of a Channel can be
21 read/written from as many coroutines as you want.
22
23 =over 4
24
25 =cut
26
27 package Coro::Channel;
28
29 no warnings;
30
31 use Coro ();
32
33 $VERSION = 4.741;
34
35 =item $q = new Coro:Channel $maxsize
36
37 Create a new channel with the given maximum size (unlimited if C<maxsize>
38 is omitted). Giving a size of one gives you a traditional channel, i.e. a
39 queue that can store only a single element (which means there will be no
40 buffering). To buffer one element you have to specify C<2>, and so on.
41
42 =cut
43
44 sub new {
45 # [\@contents, [$getwait], $maxsize, [$putwait]];
46 bless [[], [], $_[1] || (1e30),[]], $_[0];
47 }
48
49 =item $q->put ($scalar)
50
51 Put the given scalar into the queue.
52
53 =cut
54
55 sub put {
56 push @{$_[0][0]}, $_[1];
57 (pop @{$_[0][1]})->ready if @{$_[0][1]};
58
59 while (@{$_[0][0]} >= $_[0][2]) {
60 push @{$_[0][3]}, $Coro::current;
61 &Coro::schedule;
62 }
63 }
64
65 =item $q->get
66
67 Return the next element from the queue, waiting if necessary.
68
69 =item $q->timed_get ($timeout)
70
71 Return the next element from the queue, waiting up to C<$timeout> seconds
72 if necessary. If no element arrives within the given time an empty list
73 will be returned.
74
75 =cut
76
77 sub get {
78 while (!@{$_[0][0]}) {
79 push @{$_[0][1]}, $Coro::current;
80 &Coro::schedule;
81 }
82
83 (pop @{$_[0][3]})->ready if @{$_[0][3]};
84 shift @{$_[0][0]}
85 }
86
87 sub timed_get {
88 require Coro::Timer;
89 my $timeout = Coro::Timer::timeout ($_[0]);
90
91 (pop @{$_[0][3]})->ready if @{$_[0][3]};
92
93 while (!@{$_[0][0]}) {
94 push @{$_[0][1]}, $Coro::current;
95 &Coro::schedule;
96 return if $timeout;
97 }
98
99 shift @{$_[0][0]}
100 }
101
102 =item $q->size
103
104 Return the number of elements waiting to be consumed. Please note that:
105
106 if ($q->size) {
107 my $data = $q->get;
108 }
109
110 is NOT a race condition but works fine.
111
112 =cut
113
114 sub size {
115 scalar @{$_[0][0]}
116 }
117
118 1;
119
120 =back
121
122 =head1 AUTHOR
123
124 Marc Lehmann <schmorp@schmorp.de>
125 http://home.schmorp.de/
126
127 =cut
128