ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.3 by root, Sun Jul 10 20:07:11 2005 UTC vs.
Revision 1.4 by root, Sun Jul 10 20:57:00 2005 UTC

17 17
18typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */ 18typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
19typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */ 19typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
20typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */ 20typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
21 21
22#if __i386 || __amd64
23# define STACKSIZE ( 256 * sizeof (long))
24#elif __ia64 22#if __ia64
25# define STACKSIZE (8192 * sizeof (long)) 23# define STACKSIZE 65536
26#else 24#else
27# define STACKSIZE ( 512 * sizeof (long)) 25# define STACKSIZE 4096
28#endif 26#endif
29 27
30enum { 28enum {
31 REQ_QUIT, 29 REQ_QUIT,
32 REQ_OPEN, REQ_CLOSE, 30 REQ_OPEN, REQ_CLOSE,
55 53
56typedef aio_cb *aio_req; 54typedef aio_cb *aio_req;
57 55
58static int started; 56static int started;
59static int nreqs; 57static int nreqs;
58static int max_outstanding = 1<<30;
60static int respipe [2]; 59static int respipe [2];
61 60
62static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER; 61static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
63static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER; 62static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
64static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 63static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
65 64
66static volatile aio_req reqs, reqe; /* queue start, queue end */ 65static volatile aio_req reqs, reqe; /* queue start, queue end */
67static volatile aio_req ress, rese; /* queue start, queue end */ 66static volatile aio_req ress, rese; /* queue start, queue end */
67
68static void
69poll_wait ()
70{
71 if (!nreqs)
72 return;
73
74 fd_set rfd;
75 FD_ZERO(&rfd);
76 FD_SET(respipe [0], &rfd);
77
78 select (respipe [0] + 1, &rfd, 0, 0, 0);
79}
80
81static int
82poll_cb (pTHX)
83{
84 dSP;
85 int count = 0;
86 aio_req req;
87
88 {
89 /* read and signals sent by the worker threads */
90 char buf [32];
91 while (read (respipe [0], buf, 32) > 0)
92 ;
93 }
94
95 for (;;)
96 {
97 pthread_mutex_lock (&reslock);
98
99 req = ress;
100
101 if (ress)
102 {
103 ress = ress->next;
104 if (!ress) rese = 0;
105 }
106
107 pthread_mutex_unlock (&reslock);
108
109 if (!req)
110 break;
111
112 nreqs--;
113
114 if (req->type == REQ_QUIT)
115 started--;
116 else
117 {
118 int errorno = errno;
119 errno = req->errorno;
120
121 if (req->type == REQ_READ)
122 SvCUR_set (req->data, req->dataoffset
123 + req->result > 0 ? req->result : 0);
124
125 if (req->data)
126 SvREFCNT_dec (req->data);
127
128 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
129 {
130 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
131 PL_laststatval = req->result;
132 PL_statcache = *(req->statdata);
133
134 Safefree (req->statdata);
135 }
136
137 PUSHMARK (SP);
138 XPUSHs (sv_2mortal (newSViv (req->result)));
139
140 if (req->type == REQ_OPEN)
141 {
142 /* convert fd to fh */
143 SV *fh;
144
145 PUTBACK;
146 call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
147 SPAGAIN;
148
149 fh = POPs;
150
151 PUSHMARK (SP);
152 XPUSHs (fh);
153 }
154
155 PUTBACK;
156 call_sv (req->callback, G_VOID | G_EVAL);
157 SPAGAIN;
158
159 if (req->callback)
160 SvREFCNT_dec (req->callback);
161
162 errno = errorno;
163 count++;
164 }
165
166 Safefree (req);
167 }
168
169 return count;
170}
68 171
69static void *aio_proc(void *arg); 172static void *aio_proc(void *arg);
70 173
71static void 174static void
72start_thread (void) 175start_thread (void)
105 else 208 else
106 reqe = reqs = req; 209 reqe = reqs = req;
107 210
108 pthread_cond_signal (&reqwait); 211 pthread_cond_signal (&reqwait);
109 pthread_mutex_unlock (&reqlock); 212 pthread_mutex_unlock (&reqlock);
213
214 while (nreqs > max_outstanding)
215 {
216 poll_wait ();
217 poll_cb ();
218 }
110} 219}
111 220
112static void 221static void
113end_thread (void) 222end_thread (void)
114{ 223{
166 req->callback = SvREFCNT_inc (callback); 275 req->callback = SvREFCNT_inc (callback);
167 276
168 send_req (req); 277 send_req (req);
169} 278}
170 279
171static void
172poll_wait ()
173{
174 if (!nreqs)
175 return;
176
177 fd_set rfd;
178 FD_ZERO(&rfd);
179 FD_SET(respipe [0], &rfd);
180
181 select (respipe [0] + 1, &rfd, 0, 0, 0);
182}
183
184static int
185poll_cb (pTHX)
186{
187 dSP;
188 int count = 0;
189 aio_req req;
190
191 {
192 /* read and signals sent by the worker threads */
193 char buf [32];
194 while (read (respipe [0], buf, 32) > 0)
195 ;
196 }
197
198 for (;;)
199 {
200 pthread_mutex_lock (&reslock);
201
202 req = ress;
203
204 if (ress)
205 {
206 ress = ress->next;
207 if (!ress) rese = 0;
208 }
209
210 pthread_mutex_unlock (&reslock);
211
212 if (!req)
213 break;
214
215 nreqs--;
216
217 if (req->type == REQ_QUIT)
218 started--;
219 else
220 {
221 int errorno = errno;
222 errno = req->errorno;
223
224 if (req->type == REQ_READ)
225 SvCUR_set (req->data, req->dataoffset
226 + req->result > 0 ? req->result : 0);
227
228 if (req->data)
229 SvREFCNT_dec (req->data);
230
231 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
232 {
233 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
234 PL_laststatval = req->result;
235 PL_statcache = *(req->statdata);
236
237 Safefree (req->statdata);
238 }
239
240 PUSHMARK (SP);
241 XPUSHs (sv_2mortal (newSViv (req->result)));
242
243 if (req->type == REQ_OPEN)
244 {
245 /* convert fd to fh */
246 SV *fh;
247
248 PUTBACK;
249 call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
250 SPAGAIN;
251
252 fh = POPs;
253
254 PUSHMARK (SP);
255 XPUSHs (fh);
256 }
257
258 PUTBACK;
259 call_sv (req->callback, G_VOID | G_EVAL);
260 SPAGAIN;
261
262 if (req->callback)
263 SvREFCNT_dec (req->callback);
264
265 errno = errorno;
266 count++;
267 }
268
269 Safefree (req);
270 }
271
272 return count;
273}
274
275static void * 280static void *
276aio_proc (void *thr_arg) 281aio_proc (void *thr_arg)
277{ 282{
278 aio_req req; 283 aio_req req;
279 int type; 284 int type;
398 { 403 {
399 poll_wait (); 404 poll_wait ();
400 poll_cb (aTHX); 405 poll_cb (aTHX);
401 } 406 }
402} 407}
408
409int
410max_outstanding(nreqs)
411 int nreqs
412 PROTOTYPE: $
413 CODE:
414 RETVAL = max_outstanding;
415 max_outstanding = nreqs;
403 416
404void 417void
405aio_open(pathname,flags,mode,callback) 418aio_open(pathname,flags,mode,callback)
406 SV * pathname 419 SV * pathname
407 int flags 420 int flags

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines