ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.26 by root, Wed Aug 17 03:52:20 2005 UTC vs.
Revision 1.31 by root, Thu Aug 18 16:34:53 2005 UTC

55 Stat_t *statdata; 55 Stat_t *statdata;
56} aio_cb; 56} aio_cb;
57 57
58typedef aio_cb *aio_req; 58typedef aio_cb *aio_req;
59 59
60static int started; 60static int started, wanted;
61static volatile int nreqs; 61static volatile int nreqs;
62static int max_outstanding = 1<<30; 62static int max_outstanding = 1<<30;
63static int respipe [2]; 63static int respipe [2];
64 64
65static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER; 65static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
66static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER; 66static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
67static pthread_mutex_t frklock = PTHREAD_MUTEX_INITIALIZER;
68static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 67static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
69 68
70static volatile aio_req reqs, reqe; /* queue start, queue end */ 69static volatile aio_req reqs, reqe; /* queue start, queue end */
71static volatile aio_req ress, rese; /* queue start, queue end */ 70static volatile aio_req ress, rese; /* queue start, queue end */
72 71
76 SvREFCNT_dec (req->data); 75 SvREFCNT_dec (req->data);
77 76
78 if (req->fh) 77 if (req->fh)
79 SvREFCNT_dec (req->fh); 78 SvREFCNT_dec (req->fh);
80 79
81 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT) 80 if (req->statdata)
82 Safefree (req->statdata); 81 Safefree (req->statdata);
83 82
84 if (req->callback) 83 if (req->callback)
85 SvREFCNT_dec (req->callback); 84 SvREFCNT_dec (req->callback);
86 85
104poll_cb () 103poll_cb ()
105{ 104{
106 dSP; 105 dSP;
107 int count = 0; 106 int count = 0;
108 int do_croak = 0; 107 int do_croak = 0;
109 aio_req req, prv; 108 aio_req req;
110 109
110 for (;;)
111 {
111 pthread_mutex_lock (&reslock); 112 pthread_mutex_lock (&reslock);
113 req = ress;
112 114
113 { 115 if (req)
116 {
117 ress = req->next;
118
119 if (!ress)
120 {
114 /* read any signals sent by the worker threads */ 121 /* read any signals sent by the worker threads */
115 char buf [32]; 122 char buf [32];
116 while (read (respipe [0], buf, 32) == 32) 123 while (read (respipe [0], buf, 32) == 32)
124 ;
125
126 rese = 0;
127 }
117 ; 128 }
118 }
119 129
120 req = ress;
121 ress = rese = 0;
122
123 pthread_mutex_unlock (&reslock); 130 pthread_mutex_unlock (&reslock);
124 131
125 while (req) 132 if (!req)
126 { 133 break;
134
127 nreqs--; 135 nreqs--;
128 136
129 if (req->type == REQ_QUIT) 137 if (req->type == REQ_QUIT)
130 started--; 138 started--;
131 else 139 else
132 { 140 {
133 int errorno = errno; 141 int errorno = errno;
134 errno = req->errorno; 142 errno = req->errorno;
135 143
136 if (req->type == REQ_READ) 144 if (req->type == REQ_READ)
137 SvCUR_set (req->data, req->dataoffset 145 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
138 + req->result > 0 ? req->result : 0);
139 146
140 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT) 147 if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE))
148 SvREADONLY_off (req->data);
149
150 if (req->statdata)
141 { 151 {
142 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT; 152 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
143 PL_laststatval = req->result; 153 PL_laststatval = req->result;
144 PL_statcache = *(req->statdata); 154 PL_statcache = *(req->statdata);
145 } 155 }
166 if (SvOK (req->callback)) 176 if (SvOK (req->callback))
167 { 177 {
168 PUTBACK; 178 PUTBACK;
169 call_sv (req->callback, G_VOID | G_EVAL); 179 call_sv (req->callback, G_VOID | G_EVAL);
170 SPAGAIN; 180 SPAGAIN;
181
182 if (SvTRUE (ERRSV))
183 {
184 free_req (req);
185 croak (0);
186 }
171 } 187 }
172 188
173 LEAVE; 189 LEAVE;
174 190
175 do_croak = SvTRUE (ERRSV);
176
177 errno = errorno; 191 errno = errorno;
178 count++; 192 count++;
179 } 193 }
180 194
181 prv = req;
182 req = req->next;
183 free_req (prv); 195 free_req (req);
184
185 if (do_croak)
186 croak (0);
187 } 196 }
188 197
189 return count; 198 return count;
190} 199}
191 200
212} 221}
213 222
214static void 223static void
215send_req (aio_req req) 224send_req (aio_req req)
216{ 225{
226 while (started < wanted && nreqs >= started)
227 start_thread ();
228
217 nreqs++; 229 nreqs++;
218 230
219 pthread_mutex_lock (&reqlock); 231 pthread_mutex_lock (&reqlock);
220 232
221 req->next = 0; 233 req->next = 0;
229 reqe = reqs = req; 241 reqe = reqs = req;
230 242
231 pthread_cond_signal (&reqwait); 243 pthread_cond_signal (&reqwait);
232 pthread_mutex_unlock (&reqlock); 244 pthread_mutex_unlock (&reqlock);
233 245
234 while (nreqs > max_outstanding) 246 if (nreqs > max_outstanding)
247 for (;;)
235 { 248 {
249 poll_cb ();
250
251 if (nreqs <= max_outstanding)
252 break;
253
236 poll_wait (); 254 poll_wait ();
237 poll_cb ();
238 } 255 }
239} 256}
240 257
241static void 258static void
242end_thread (void) 259end_thread (void)
243{ 260{
246 req->type = REQ_QUIT; 263 req->type = REQ_QUIT;
247 264
248 send_req (req); 265 send_req (req);
249} 266}
250 267
251
252static void min_parallel (int nthreads) 268static void min_parallel (int nthreads)
253{ 269{
254 while (nthreads > started) 270 if (wanted < nthreads)
255 start_thread (); 271 wanted = nthreads;
256} 272}
257 273
258static void max_parallel (int nthreads) 274static void max_parallel (int nthreads)
259{ 275{
260 int cur = started; 276 int cur = started;
277
278 if (wanted > nthreads)
279 wanted = nthreads;
280
261 while (cur > nthreads) 281 while (cur > wanted)
262 { 282 {
263 end_thread (); 283 end_thread ();
264 cur--; 284 cur--;
265 } 285 }
266 286
267 while (started > nthreads) 287 while (started > wanted)
268 { 288 {
269 poll_wait (); 289 poll_wait ();
270 poll_cb (); 290 poll_cb ();
271 } 291 }
272} 292}
283 croak ("cannot set result pipe to nonblocking mode"); 303 croak ("cannot set result pipe to nonblocking mode");
284} 304}
285 305
286static void atfork_prepare (void) 306static void atfork_prepare (void)
287{ 307{
288 pthread_mutex_lock (&frklock);
289 pthread_mutex_lock (&reqlock); 308 pthread_mutex_lock (&reqlock);
290 pthread_mutex_lock (&reslock); 309 pthread_mutex_lock (&reslock);
291} 310}
292 311
293static void atfork_parent (void) 312static void atfork_parent (void)
294{ 313{
295 pthread_mutex_unlock (&reslock); 314 pthread_mutex_unlock (&reslock);
296 pthread_mutex_unlock (&reqlock); 315 pthread_mutex_unlock (&reqlock);
297 pthread_mutex_unlock (&frklock);
298} 316}
299 317
300static void atfork_child (void) 318static void atfork_child (void)
301{ 319{
302 int restart = started; 320 aio_req prv;
321
303 started = 0; 322 started = 0;
304 323
305 while (reqs) 324 while (reqs)
306 { 325 {
307 free_req (reqs); 326 prv = reqs;
308 reqs = reqs->next; 327 reqs = prv->next;
328 free_req (prv);
309 } 329 }
310 330
311 reqs = reqe = 0; 331 reqs = reqe = 0;
312 332
313 while (ress) 333 while (ress)
314 { 334 {
315 free_req (ress); 335 prv = ress;
316 ress = ress->next; 336 ress = prv->next;
337 free_req (prv);
317 } 338 }
318 339
319 ress = rese = 0; 340 ress = rese = 0;
320 341
321 close (respipe [0]); 342 close (respipe [0]);
322 close (respipe [1]); 343 close (respipe [1]);
323
324 create_pipe (); 344 create_pipe ();
325 345
326 atfork_parent (); 346 atfork_parent ();
327
328 min_parallel (restart);
329} 347}
330 348
331/*****************************************************************************/ 349/*****************************************************************************/
332/* work around various missing functions */ 350/* work around various missing functions */
333 351
496 \ 514 \
497 Newz (0, req, 1, aio_cb); \ 515 Newz (0, req, 1, aio_cb); \
498 if (!req) \ 516 if (!req) \
499 croak ("out of memory during aio_req allocation"); \ 517 croak ("out of memory during aio_req allocation"); \
500 \ 518 \
501 req->callback = SvREFCNT_inc (callback); 519 req->callback = newSVsv (callback);
502 520
503MODULE = IO::AIO PACKAGE = IO::AIO 521MODULE = IO::AIO PACKAGE = IO::AIO
504 522
505PROTOTYPES: ENABLE 523PROTOTYPES: ENABLE
506 524
619 : IoOFP (sv_2io (fh))); 637 : IoOFP (sv_2io (fh)));
620 req->offset = offset; 638 req->offset = offset;
621 req->length = length; 639 req->length = length;
622 req->data = SvREFCNT_inc (data); 640 req->data = SvREFCNT_inc (data);
623 req->dataptr = (char *)svptr + dataoffset; 641 req->dataptr = (char *)svptr + dataoffset;
624 req->callback = SvREFCNT_inc (callback); 642
643 if (!SvREADONLY (data))
644 {
645 SvREADONLY_on (data);
646 req->data2ptr = (void *)data;
647 }
625 648
626 send_req (req); 649 send_req (req);
627 } 650 }
628} 651}
629 652
658{ 681{
659 dREQ; 682 dREQ;
660 683
661 New (0, req->statdata, 1, Stat_t); 684 New (0, req->statdata, 1, Stat_t);
662 if (!req->statdata) 685 if (!req->statdata)
686 {
687 free_req (req);
663 croak ("out of memory during aio_req->statdata allocation (sorry, i just leaked memory, too)"); 688 croak ("out of memory during aio_req->statdata allocation");
689 }
664 690
665 if (SvPOK (fh_or_path)) 691 if (SvPOK (fh_or_path))
666 { 692 {
667 req->type = ix; 693 req->type = ix;
668 req->data = newSVsv (fh_or_path); 694 req->data = newSVsv (fh_or_path);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines