ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.27 by root, Wed Aug 17 04:47:02 2005 UTC vs.
Revision 1.31 by root, Thu Aug 18 16:34:53 2005 UTC

55 Stat_t *statdata; 55 Stat_t *statdata;
56} aio_cb; 56} aio_cb;
57 57
58typedef aio_cb *aio_req; 58typedef aio_cb *aio_req;
59 59
60static int started; 60static int started, wanted;
61static volatile int nreqs; 61static volatile int nreqs;
62static int max_outstanding = 1<<30; 62static int max_outstanding = 1<<30;
63static int respipe [2]; 63static int respipe [2];
64 64
65static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER; 65static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
116 { 116 {
117 ress = req->next; 117 ress = req->next;
118 118
119 if (!ress) 119 if (!ress)
120 { 120 {
121 rese = 0;
122
123 /* read any signals sent by the worker threads */ 121 /* read any signals sent by the worker threads */
124 char buf [32]; 122 char buf [32];
125 while (read (respipe [0], buf, 32) == 32) 123 while (read (respipe [0], buf, 32) == 32)
126 ; 124 ;
125
126 rese = 0;
127 } 127 }
128 } 128 }
129 129
130 pthread_mutex_unlock (&reslock); 130 pthread_mutex_unlock (&reslock);
131 131
142 errno = req->errorno; 142 errno = req->errorno;
143 143
144 if (req->type == REQ_READ) 144 if (req->type == REQ_READ)
145 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0)); 145 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
146 146
147 if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE))
148 SvREADONLY_off (req->data);
149
147 if (req->statdata) 150 if (req->statdata)
148 { 151 {
149 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT; 152 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
150 PL_laststatval = req->result; 153 PL_laststatval = req->result;
151 PL_statcache = *(req->statdata); 154 PL_statcache = *(req->statdata);
218} 221}
219 222
220static void 223static void
221send_req (aio_req req) 224send_req (aio_req req)
222{ 225{
226 while (started < wanted && nreqs >= started)
227 start_thread ();
228
223 nreqs++; 229 nreqs++;
224 230
225 pthread_mutex_lock (&reqlock); 231 pthread_mutex_lock (&reqlock);
226 232
227 req->next = 0; 233 req->next = 0;
259 send_req (req); 265 send_req (req);
260} 266}
261 267
262static void min_parallel (int nthreads) 268static void min_parallel (int nthreads)
263{ 269{
264 while (nthreads > started) 270 if (wanted < nthreads)
265 start_thread (); 271 wanted = nthreads;
266} 272}
267 273
268static void max_parallel (int nthreads) 274static void max_parallel (int nthreads)
269{ 275{
270 int cur = started; 276 int cur = started;
271 277
278 if (wanted > nthreads)
279 wanted = nthreads;
280
272 while (cur > nthreads) 281 while (cur > wanted)
273 { 282 {
274 end_thread (); 283 end_thread ();
275 cur--; 284 cur--;
276 } 285 }
277 286
278 while (started > nthreads) 287 while (started > wanted)
279 { 288 {
280 poll_wait (); 289 poll_wait ();
281 poll_cb (); 290 poll_cb ();
282 } 291 }
283} 292}
294 croak ("cannot set result pipe to nonblocking mode"); 303 croak ("cannot set result pipe to nonblocking mode");
295} 304}
296 305
297static void atfork_prepare (void) 306static void atfork_prepare (void)
298{ 307{
299 for (;;) {
300
301 for (;;)
302 {
303 poll_cb ();
304
305 if (!nreqs)
306 break;
307
308 poll_wait ();
309 }
310
311 pthread_mutex_lock (&reqlock); 308 pthread_mutex_lock (&reqlock);
312
313 if (!nreqs)
314 break;
315
316 pthread_mutex_unlock (&reqlock);
317 }
318
319 pthread_mutex_lock (&reslock); 309 pthread_mutex_lock (&reslock);
320
321 assert (!nreqs && !reqs && !ress);
322} 310}
323 311
324static void atfork_parent (void) 312static void atfork_parent (void)
325{ 313{
326 pthread_mutex_unlock (&reslock); 314 pthread_mutex_unlock (&reslock);
327 pthread_mutex_unlock (&reqlock); 315 pthread_mutex_unlock (&reqlock);
328} 316}
329 317
330static void atfork_child (void) 318static void atfork_child (void)
331{ 319{
332 int restart = started; 320 aio_req prv;
321
333 started = 0; 322 started = 0;
334 323
324 while (reqs)
325 {
326 prv = reqs;
327 reqs = prv->next;
328 free_req (prv);
329 }
330
331 reqs = reqe = 0;
332
333 while (ress)
334 {
335 prv = ress;
336 ress = prv->next;
337 free_req (prv);
338 }
339
340 ress = rese = 0;
341
342 close (respipe [0]);
343 close (respipe [1]);
344 create_pipe ();
345
335 atfork_parent (); 346 atfork_parent ();
336
337 min_parallel (restart);
338} 347}
339 348
340/*****************************************************************************/ 349/*****************************************************************************/
341/* work around various missing functions */ 350/* work around various missing functions */
342 351
629 req->offset = offset; 638 req->offset = offset;
630 req->length = length; 639 req->length = length;
631 req->data = SvREFCNT_inc (data); 640 req->data = SvREFCNT_inc (data);
632 req->dataptr = (char *)svptr + dataoffset; 641 req->dataptr = (char *)svptr + dataoffset;
633 642
643 if (!SvREADONLY (data))
644 {
645 SvREADONLY_on (data);
646 req->data2ptr = (void *)data;
647 }
648
634 send_req (req); 649 send_req (req);
635 } 650 }
636} 651}
637 652
638void 653void

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines