ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.24 by root, Wed Aug 17 03:01:56 2005 UTC vs.
Revision 1.27 by root, Wed Aug 17 04:47:02 2005 UTC

62static int max_outstanding = 1<<30; 62static int max_outstanding = 1<<30;
63static int respipe [2]; 63static int respipe [2];
64 64
65static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER; 65static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
66static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER; 66static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
67static pthread_mutex_t frklock = PTHREAD_MUTEX_INITIALIZER;
68static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 67static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
69 68
70static volatile aio_req reqs, reqe; /* queue start, queue end */ 69static volatile aio_req reqs, reqe; /* queue start, queue end */
71static volatile aio_req ress, rese; /* queue start, queue end */ 70static volatile aio_req ress, rese; /* queue start, queue end */
71
72static void free_req (aio_req req)
73{
74 if (req->data)
75 SvREFCNT_dec (req->data);
76
77 if (req->fh)
78 SvREFCNT_dec (req->fh);
79
80 if (req->statdata)
81 Safefree (req->statdata);
82
83 if (req->callback)
84 SvREFCNT_dec (req->callback);
85
86 Safefree (req);
87}
72 88
73static void 89static void
74poll_wait () 90poll_wait ()
75{ 91{
76 if (nreqs && !ress) 92 if (nreqs && !ress)
87poll_cb () 103poll_cb ()
88{ 104{
89 dSP; 105 dSP;
90 int count = 0; 106 int count = 0;
91 int do_croak = 0; 107 int do_croak = 0;
92 aio_req req, prv; 108 aio_req req;
93 109
94 pthread_mutex_lock (&reslock); 110 for (;;)
95
96 {
97 /* read any signals sent by the worker threads */
98 char buf [32];
99 while (read (respipe [0], buf, 32) == 32)
100 ;
101 }
102
103 req = ress;
104 ress = rese = 0;
105
106 pthread_mutex_unlock (&reslock);
107
108 while (req)
109 { 111 {
112 pthread_mutex_lock (&reslock);
113 req = ress;
114
115 if (req)
116 {
117 ress = req->next;
118
119 if (!ress)
120 {
121 rese = 0;
122
123 /* read any signals sent by the worker threads */
124 char buf [32];
125 while (read (respipe [0], buf, 32) == 32)
126 ;
127 }
128 }
129
130 pthread_mutex_unlock (&reslock);
131
132 if (!req)
133 break;
134
110 nreqs--; 135 nreqs--;
111 136
112 if (req->type == REQ_QUIT) 137 if (req->type == REQ_QUIT)
113 started--; 138 started--;
114 else 139 else
115 { 140 {
116 int errorno = errno; 141 int errorno = errno;
117 errno = req->errorno; 142 errno = req->errorno;
118 143
119 if (req->type == REQ_READ) 144 if (req->type == REQ_READ)
120 SvCUR_set (req->data, req->dataoffset 145 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
121 + req->result > 0 ? req->result : 0);
122 146
123 if (req->data) 147 if (req->statdata)
124 SvREFCNT_dec (req->data);
125
126 if (req->fh)
127 SvREFCNT_dec (req->fh);
128
129 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
130 { 148 {
131 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT; 149 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
132 PL_laststatval = req->result; 150 PL_laststatval = req->result;
133 PL_statcache = *(req->statdata); 151 PL_statcache = *(req->statdata);
134
135 Safefree (req->statdata);
136 } 152 }
137 153
138 ENTER; 154 ENTER;
139 PUSHMARK (SP); 155 PUSHMARK (SP);
140 XPUSHs (sv_2mortal (newSViv (req->result))); 156 XPUSHs (sv_2mortal (newSViv (req->result)));
157 if (SvOK (req->callback)) 173 if (SvOK (req->callback))
158 { 174 {
159 PUTBACK; 175 PUTBACK;
160 call_sv (req->callback, G_VOID | G_EVAL); 176 call_sv (req->callback, G_VOID | G_EVAL);
161 SPAGAIN; 177 SPAGAIN;
178
179 if (SvTRUE (ERRSV))
180 {
181 free_req (req);
182 croak (0);
183 }
162 } 184 }
163 185
164 do_croak = SvTRUE (ERRSV);
165
166 LEAVE; 186 LEAVE;
167
168 if (req->callback)
169 SvREFCNT_dec (req->callback);
170 187
171 errno = errorno; 188 errno = errorno;
172 count++; 189 count++;
173 } 190 }
174 191
175 prv = req; 192 free_req (req);
176 req = req->next;
177 Safefree (prv);
178
179 if (do_croak)
180 croak (0);
181 } 193 }
182 194
183 return count; 195 return count;
184} 196}
185 197
223 reqe = reqs = req; 235 reqe = reqs = req;
224 236
225 pthread_cond_signal (&reqwait); 237 pthread_cond_signal (&reqwait);
226 pthread_mutex_unlock (&reqlock); 238 pthread_mutex_unlock (&reqlock);
227 239
228 while (nreqs > max_outstanding) 240 if (nreqs > max_outstanding)
241 for (;;)
229 { 242 {
243 poll_cb ();
244
245 if (nreqs <= max_outstanding)
246 break;
247
230 poll_wait (); 248 poll_wait ();
231 poll_cb ();
232 } 249 }
233} 250}
234 251
235static void 252static void
236end_thread (void) 253end_thread (void)
237{ 254{
238 aio_req req; 255 aio_req req;
239 New (0, req, 1, aio_cb); 256 Newz (0, req, 1, aio_cb);
240 req->type = REQ_QUIT; 257 req->type = REQ_QUIT;
241 258
242 send_req (req); 259 send_req (req);
243} 260}
244
245 261
246static void min_parallel (int nthreads) 262static void min_parallel (int nthreads)
247{ 263{
248 while (nthreads > started) 264 while (nthreads > started)
249 start_thread (); 265 start_thread ();
250} 266}
251 267
252static void max_parallel (int nthreads) 268static void max_parallel (int nthreads)
253{ 269{
254 int cur = started; 270 int cur = started;
271
255 while (cur > nthreads) 272 while (cur > nthreads)
256 { 273 {
257 end_thread (); 274 end_thread ();
258 cur--; 275 cur--;
259 } 276 }
263 poll_wait (); 280 poll_wait ();
264 poll_cb (); 281 poll_cb ();
265 } 282 }
266} 283}
267 284
268static int fork_started; 285static void create_pipe ()
286{
287 if (pipe (respipe))
288 croak ("unable to initialize result pipe");
289
290 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
291 croak ("cannot set result pipe to nonblocking mode");
292
293 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
294 croak ("cannot set result pipe to nonblocking mode");
295}
269 296
270static void atfork_prepare (void) 297static void atfork_prepare (void)
271{ 298{
272 pthread_mutex_lock (&frklock);
273
274 fork_started = started;
275
276 for (;;) { 299 for (;;) {
277 while (nreqs) 300
301 for (;;)
278 { 302 {
303 poll_cb ();
304
305 if (!nreqs)
306 break;
307
279 poll_wait (); 308 poll_wait ();
280 poll_cb ();
281 } 309 }
282 310
283 max_parallel (0);
284
285 pthread_mutex_lock (&reqlock); 311 pthread_mutex_lock (&reqlock);
286 312
287 if (!nreqs && !started) 313 if (!nreqs)
288 break; 314 break;
289 315
290 pthread_mutex_unlock (&reqlock); 316 pthread_mutex_unlock (&reqlock);
291
292 min_parallel (fork_started);
293 } 317 }
294 318
295 pthread_mutex_lock (&reslock); 319 pthread_mutex_lock (&reslock);
296 320
297 assert (!started);
298 assert (!nreqs);
299 assert (!reqs && !reqe); 321 assert (!nreqs && !reqs && !ress);
300 assert (!ress && !rese);
301} 322}
302 323
303static void atfork_parent (void) 324static void atfork_parent (void)
304{ 325{
305 pthread_mutex_unlock (&reslock); 326 pthread_mutex_unlock (&reslock);
306 min_parallel (fork_started);
307 pthread_mutex_unlock (&reqlock); 327 pthread_mutex_unlock (&reqlock);
308 pthread_mutex_unlock (&frklock);
309} 328}
310 329
311static void atfork_child (void) 330static void atfork_child (void)
312{ 331{
313 reqs = reqe = 0; 332 int restart = started;
333 started = 0;
314 334
315 atfork_parent (); 335 atfork_parent ();
336
337 min_parallel (restart);
316} 338}
317 339
318/*****************************************************************************/ 340/*****************************************************************************/
319/* work around various missing functions */ 341/* work around various missing functions */
320 342
483 \ 505 \
484 Newz (0, req, 1, aio_cb); \ 506 Newz (0, req, 1, aio_cb); \
485 if (!req) \ 507 if (!req) \
486 croak ("out of memory during aio_req allocation"); \ 508 croak ("out of memory during aio_req allocation"); \
487 \ 509 \
488 req->callback = SvREFCNT_inc (callback); 510 req->callback = newSVsv (callback);
489 511
490MODULE = IO::AIO PACKAGE = IO::AIO 512MODULE = IO::AIO PACKAGE = IO::AIO
491 513
492PROTOTYPES: ENABLE 514PROTOTYPES: ENABLE
493 515
494BOOT: 516BOOT:
495{ 517{
496 if (pipe (respipe)) 518 create_pipe ();
497 croak ("unable to initialize result pipe");
498
499 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
500 croak ("cannot set result pipe to nonblocking mode");
501
502 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
503 croak ("cannot set result pipe to nonblocking mode");
504
505 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 519 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
506} 520}
507 521
508void 522void
509min_parallel(nthreads) 523min_parallel(nthreads)
614 : IoOFP (sv_2io (fh))); 628 : IoOFP (sv_2io (fh)));
615 req->offset = offset; 629 req->offset = offset;
616 req->length = length; 630 req->length = length;
617 req->data = SvREFCNT_inc (data); 631 req->data = SvREFCNT_inc (data);
618 req->dataptr = (char *)svptr + dataoffset; 632 req->dataptr = (char *)svptr + dataoffset;
619 req->callback = SvREFCNT_inc (callback);
620 633
621 send_req (req); 634 send_req (req);
622 } 635 }
623} 636}
624 637
653{ 666{
654 dREQ; 667 dREQ;
655 668
656 New (0, req->statdata, 1, Stat_t); 669 New (0, req->statdata, 1, Stat_t);
657 if (!req->statdata) 670 if (!req->statdata)
671 {
672 free_req (req);
658 croak ("out of memory during aio_req->statdata allocation (sorry, i just leaked memory, too)"); 673 croak ("out of memory during aio_req->statdata allocation");
674 }
659 675
660 if (SvPOK (fh_or_path)) 676 if (SvPOK (fh_or_path))
661 { 677 {
662 req->type = ix; 678 req->type = ix;
663 req->data = newSVsv (fh_or_path); 679 req->data = newSVsv (fh_or_path);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines