ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Channel.pm
Revision: 1.69
Committed: Fri Nov 14 02:42:26 2008 UTC (15 years, 6 months ago) by root
Branch: MAIN
Changes since 1.68: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Channel - message queues
4    
5     =head1 SYNOPSIS
6    
7     use Coro::Channel;
8    
9     $q1 = new Coro::Channel <maxsize>;
10    
11 root 1.46 $q1->put ("xxx");
12 root 1.1 print $q1->get;
13    
14     die unless $q1->size;
15    
16     =head1 DESCRIPTION
17    
18 root 1.8 A Coro::Channel is the equivalent of a pipe: you can put things into it on
19     one end end read things out of it from the other hand. If the capacity of
20     the Channel is maxed out writers will block. Both ends of a Channel can be
21     read/written from as many coroutines as you want.
22    
23 root 1.1 =over 4
24    
25     =cut
26    
27     package Coro::Channel;
28    
29 root 1.60 use strict qw(vars subs);
30 root 1.45 no warnings;
31    
32 root 1.3 use Coro ();
33 root 1.1
34 root 1.69 our $VERSION = 5.0;
35 root 1.1
36     =item $q = new Coro:Channel $maxsize
37    
38     Create a new channel with the given maximum size (unlimited if C<maxsize>
39 root 1.8 is omitted). Giving a size of one gives you a traditional channel, i.e. a
40 root 1.48 queue that can store only a single element (which means there will be no
41     buffering). To buffer one element you have to specify C<2>, and so on.
42 root 1.1
43     =cut
44    
45     sub new {
46 root 1.7 # [\@contents, [$getwait], $maxsize, [$putwait]];
47 root 1.8 bless [[], [], $_[1] || (1e30),[]], $_[0];
48 root 1.1 }
49    
50 root 1.44 =item $q->put ($scalar)
51 root 1.1
52     Put the given scalar into the queue.
53    
54     =cut
55    
56     sub put {
57     push @{$_[0][0]}, $_[1];
58     (pop @{$_[0][1]})->ready if @{$_[0][1]};
59 root 1.7
60 root 1.8 while (@{$_[0][0]} >= $_[0][2]) {
61     push @{$_[0][3]}, $Coro::current;
62     &Coro::schedule;
63 root 1.7 }
64 root 1.1 }
65    
66     =item $q->get
67    
68     Return the next element from the queue, waiting if necessary.
69    
70 root 1.47 =item $q->timed_get ($timeout)
71    
72     Return the next element from the queue, waiting up to C<$timeout> seconds
73     if necessary. If no element arrives within the given time an empty list
74     will be returned.
75    
76 root 1.1 =cut
77    
78     sub get {
79     while (!@{$_[0][0]}) {
80 root 1.3 push @{$_[0][1]}, $Coro::current;
81     &Coro::schedule;
82 root 1.1 }
83 root 1.7
84 root 1.48 (pop @{$_[0][3]})->ready if @{$_[0][3]};
85 root 1.46 shift @{$_[0][0]}
86 root 1.1 }
87    
88 root 1.47 sub timed_get {
89     require Coro::Timer;
90     my $timeout = Coro::Timer::timeout ($_[0]);
91    
92     (pop @{$_[0][3]})->ready if @{$_[0][3]};
93    
94     while (!@{$_[0][0]}) {
95     push @{$_[0][1]}, $Coro::current;
96     &Coro::schedule;
97     return if $timeout;
98     }
99    
100     shift @{$_[0][0]}
101     }
102    
103 root 1.1 =item $q->size
104    
105     Return the number of elements waiting to be consumed. Please note that:
106    
107     if ($q->size) {
108     my $data = $q->get;
109     }
110    
111     is NOT a race condition but works fine.
112    
113     =cut
114    
115     sub size {
116 root 1.46 scalar @{$_[0][0]}
117 root 1.1 }
118    
119     1;
120    
121     =back
122    
123     =head1 AUTHOR
124    
125 root 1.35 Marc Lehmann <schmorp@schmorp.de>
126 root 1.33 http://home.schmorp.de/
127 root 1.1
128     =cut
129