ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.24 by root, Wed Aug 17 03:01:56 2005 UTC vs.
Revision 1.26 by root, Wed Aug 17 03:52:20 2005 UTC

68static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 68static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
69 69
70static volatile aio_req reqs, reqe; /* queue start, queue end */ 70static volatile aio_req reqs, reqe; /* queue start, queue end */
71static volatile aio_req ress, rese; /* queue start, queue end */ 71static volatile aio_req ress, rese; /* queue start, queue end */
72 72
73static void free_req (aio_req req)
74{
75 if (req->data)
76 SvREFCNT_dec (req->data);
77
78 if (req->fh)
79 SvREFCNT_dec (req->fh);
80
81 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
82 Safefree (req->statdata);
83
84 if (req->callback)
85 SvREFCNT_dec (req->callback);
86
87 Safefree (req);
88}
89
73static void 90static void
74poll_wait () 91poll_wait ()
75{ 92{
76 if (nreqs && !ress) 93 if (nreqs && !ress)
77 { 94 {
118 135
119 if (req->type == REQ_READ) 136 if (req->type == REQ_READ)
120 SvCUR_set (req->data, req->dataoffset 137 SvCUR_set (req->data, req->dataoffset
121 + req->result > 0 ? req->result : 0); 138 + req->result > 0 ? req->result : 0);
122 139
123 if (req->data)
124 SvREFCNT_dec (req->data);
125
126 if (req->fh)
127 SvREFCNT_dec (req->fh);
128
129 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT) 140 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
130 { 141 {
131 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT; 142 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
132 PL_laststatval = req->result; 143 PL_laststatval = req->result;
133 PL_statcache = *(req->statdata); 144 PL_statcache = *(req->statdata);
134
135 Safefree (req->statdata);
136 } 145 }
137 146
138 ENTER; 147 ENTER;
139 PUSHMARK (SP); 148 PUSHMARK (SP);
140 XPUSHs (sv_2mortal (newSViv (req->result))); 149 XPUSHs (sv_2mortal (newSViv (req->result)));
159 PUTBACK; 168 PUTBACK;
160 call_sv (req->callback, G_VOID | G_EVAL); 169 call_sv (req->callback, G_VOID | G_EVAL);
161 SPAGAIN; 170 SPAGAIN;
162 } 171 }
163 172
173 LEAVE;
174
164 do_croak = SvTRUE (ERRSV); 175 do_croak = SvTRUE (ERRSV);
165
166 LEAVE;
167 176
168 if (req->callback)
169 SvREFCNT_dec (req->callback);
170
171 errno = errorno; 177 errno = errorno;
172 count++; 178 count++;
173 } 179 }
174 180
175 prv = req; 181 prv = req;
176 req = req->next; 182 req = req->next;
177 Safefree (prv); 183 free_req (prv);
178 184
179 if (do_croak) 185 if (do_croak)
180 croak (0); 186 croak (0);
181 } 187 }
182 188
234 240
235static void 241static void
236end_thread (void) 242end_thread (void)
237{ 243{
238 aio_req req; 244 aio_req req;
239 New (0, req, 1, aio_cb); 245 Newz (0, req, 1, aio_cb);
240 req->type = REQ_QUIT; 246 req->type = REQ_QUIT;
241 247
242 send_req (req); 248 send_req (req);
243} 249}
244 250
263 poll_wait (); 269 poll_wait ();
264 poll_cb (); 270 poll_cb ();
265 } 271 }
266} 272}
267 273
268static int fork_started; 274static void create_pipe ()
275{
276 if (pipe (respipe))
277 croak ("unable to initialize result pipe");
278
279 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
280 croak ("cannot set result pipe to nonblocking mode");
281
282 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
283 croak ("cannot set result pipe to nonblocking mode");
284}
269 285
270static void atfork_prepare (void) 286static void atfork_prepare (void)
271{ 287{
272 pthread_mutex_lock (&frklock); 288 pthread_mutex_lock (&frklock);
273
274 fork_started = started;
275
276 for (;;) {
277 while (nreqs)
278 {
279 poll_wait ();
280 poll_cb ();
281 }
282
283 max_parallel (0);
284
285 pthread_mutex_lock (&reqlock); 289 pthread_mutex_lock (&reqlock);
286
287 if (!nreqs && !started)
288 break;
289
290 pthread_mutex_unlock (&reqlock);
291
292 min_parallel (fork_started);
293 }
294
295 pthread_mutex_lock (&reslock); 290 pthread_mutex_lock (&reslock);
296
297 assert (!started);
298 assert (!nreqs);
299 assert (!reqs && !reqe);
300 assert (!ress && !rese);
301} 291}
302 292
303static void atfork_parent (void) 293static void atfork_parent (void)
304{ 294{
305 pthread_mutex_unlock (&reslock); 295 pthread_mutex_unlock (&reslock);
306 min_parallel (fork_started);
307 pthread_mutex_unlock (&reqlock); 296 pthread_mutex_unlock (&reqlock);
308 pthread_mutex_unlock (&frklock); 297 pthread_mutex_unlock (&frklock);
309} 298}
310 299
311static void atfork_child (void) 300static void atfork_child (void)
312{ 301{
302 int restart = started;
303 started = 0;
304
305 while (reqs)
306 {
307 free_req (reqs);
308 reqs = reqs->next;
309 }
310
313 reqs = reqe = 0; 311 reqs = reqe = 0;
314 312
313 while (ress)
314 {
315 free_req (ress);
316 ress = ress->next;
317 }
318
319 ress = rese = 0;
320
321 close (respipe [0]);
322 close (respipe [1]);
323
324 create_pipe ();
325
315 atfork_parent (); 326 atfork_parent ();
327
328 min_parallel (restart);
316} 329}
317 330
318/*****************************************************************************/ 331/*****************************************************************************/
319/* work around various missing functions */ 332/* work around various missing functions */
320 333
491 504
492PROTOTYPES: ENABLE 505PROTOTYPES: ENABLE
493 506
494BOOT: 507BOOT:
495{ 508{
496 if (pipe (respipe)) 509 create_pipe ();
497 croak ("unable to initialize result pipe");
498
499 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
500 croak ("cannot set result pipe to nonblocking mode");
501
502 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
503 croak ("cannot set result pipe to nonblocking mode");
504
505 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 510 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
506} 511}
507 512
508void 513void
509min_parallel(nthreads) 514min_parallel(nthreads)

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines