ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.25 by root, Wed Aug 17 03:16:56 2005 UTC vs.
Revision 1.26 by root, Wed Aug 17 03:52:20 2005 UTC

68static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 68static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
69 69
70static volatile aio_req reqs, reqe; /* queue start, queue end */ 70static volatile aio_req reqs, reqe; /* queue start, queue end */
71static volatile aio_req ress, rese; /* queue start, queue end */ 71static volatile aio_req ress, rese; /* queue start, queue end */
72 72
73static void free_req (aio_req req)
74{
75 if (req->data)
76 SvREFCNT_dec (req->data);
77
78 if (req->fh)
79 SvREFCNT_dec (req->fh);
80
81 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
82 Safefree (req->statdata);
83
84 if (req->callback)
85 SvREFCNT_dec (req->callback);
86
87 Safefree (req);
88}
89
73static void 90static void
74poll_wait () 91poll_wait ()
75{ 92{
76 if (nreqs && !ress) 93 if (nreqs && !ress)
77 { 94 {
118 135
119 if (req->type == REQ_READ) 136 if (req->type == REQ_READ)
120 SvCUR_set (req->data, req->dataoffset 137 SvCUR_set (req->data, req->dataoffset
121 + req->result > 0 ? req->result : 0); 138 + req->result > 0 ? req->result : 0);
122 139
123 if (req->data)
124 SvREFCNT_dec (req->data);
125
126 if (req->fh)
127 SvREFCNT_dec (req->fh);
128
129 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT) 140 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
130 { 141 {
131 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT; 142 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
132 PL_laststatval = req->result; 143 PL_laststatval = req->result;
133 PL_statcache = *(req->statdata); 144 PL_statcache = *(req->statdata);
134
135 Safefree (req->statdata);
136 } 145 }
137 146
138 ENTER; 147 ENTER;
139 PUSHMARK (SP); 148 PUSHMARK (SP);
140 XPUSHs (sv_2mortal (newSViv (req->result))); 149 XPUSHs (sv_2mortal (newSViv (req->result)));
159 PUTBACK; 168 PUTBACK;
160 call_sv (req->callback, G_VOID | G_EVAL); 169 call_sv (req->callback, G_VOID | G_EVAL);
161 SPAGAIN; 170 SPAGAIN;
162 } 171 }
163 172
173 LEAVE;
174
164 do_croak = SvTRUE (ERRSV); 175 do_croak = SvTRUE (ERRSV);
165
166 LEAVE;
167 176
168 if (req->callback)
169 SvREFCNT_dec (req->callback);
170
171 errno = errorno; 177 errno = errorno;
172 count++; 178 count++;
173 } 179 }
174 180
175 prv = req; 181 prv = req;
176 req = req->next; 182 req = req->next;
177 Safefree (prv); 183 free_req (prv);
178 184
179 if (do_croak) 185 if (do_croak)
180 croak (0); 186 croak (0);
181 } 187 }
182 188
234 240
235static void 241static void
236end_thread (void) 242end_thread (void)
237{ 243{
238 aio_req req; 244 aio_req req;
239 New (0, req, 1, aio_cb); 245 Newz (0, req, 1, aio_cb);
240 req->type = REQ_QUIT; 246 req->type = REQ_QUIT;
241 247
242 send_req (req); 248 send_req (req);
243} 249}
244 250
263 poll_wait (); 269 poll_wait ();
264 poll_cb (); 270 poll_cb ();
265 } 271 }
266} 272}
267 273
268static int fork_started; 274static void create_pipe ()
275{
276 if (pipe (respipe))
277 croak ("unable to initialize result pipe");
278
279 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
280 croak ("cannot set result pipe to nonblocking mode");
281
282 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
283 croak ("cannot set result pipe to nonblocking mode");
284}
269 285
270static void atfork_prepare (void) 286static void atfork_prepare (void)
271{ 287{
272 int nstarted;
273
274 for (;;) {
275 while (nreqs)
276 {
277 poll_wait ();
278 poll_cb ();
279 }
280
281 nstarted = started;
282 max_parallel (0);
283
284 pthread_mutex_lock (&reqlock);
285
286 if (!nreqs && !started)
287 break;
288
289 pthread_mutex_unlock (&reqlock);
290
291 min_parallel (fork_started);
292 }
293
294 pthread_mutex_lock (&frklock); 288 pthread_mutex_lock (&frklock);
295 fork_started = nstarted; 289 pthread_mutex_lock (&reqlock);
296 pthread_mutex_lock (&reslock); 290 pthread_mutex_lock (&reslock);
297
298 assert (!started);
299 assert (!nreqs);
300 assert (!reqs && !reqe);
301 assert (!ress && !rese);
302} 291}
303 292
304static void atfork_parent (void) 293static void atfork_parent (void)
305{ 294{
306 pthread_mutex_unlock (&reslock); 295 pthread_mutex_unlock (&reslock);
296 pthread_mutex_unlock (&reqlock);
307 pthread_mutex_unlock (&frklock); 297 pthread_mutex_unlock (&frklock);
308 pthread_mutex_unlock (&reqlock); 298}
309 299
300static void atfork_child (void)
301{
302 int restart = started;
303 started = 0;
304
305 while (reqs)
306 {
307 free_req (reqs);
308 reqs = reqs->next;
309 }
310
311 reqs = reqe = 0;
312
313 while (ress)
314 {
315 free_req (ress);
316 ress = ress->next;
317 }
318
319 ress = rese = 0;
320
321 close (respipe [0]);
322 close (respipe [1]);
323
324 create_pipe ();
325
326 atfork_parent ();
327
310 min_parallel (fork_started); 328 min_parallel (restart);
311} 329}
312
313#define atfork_child atfork_parent
314 330
315/*****************************************************************************/ 331/*****************************************************************************/
316/* work around various missing functions */ 332/* work around various missing functions */
317 333
318#if !HAVE_PREADWRITE 334#if !HAVE_PREADWRITE
488 504
489PROTOTYPES: ENABLE 505PROTOTYPES: ENABLE
490 506
491BOOT: 507BOOT:
492{ 508{
493 if (pipe (respipe)) 509 create_pipe ();
494 croak ("unable to initialize result pipe");
495
496 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
497 croak ("cannot set result pipe to nonblocking mode");
498
499 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
500 croak ("cannot set result pipe to nonblocking mode");
501
502 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 510 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
503} 511}
504 512
505void 513void
506min_parallel(nthreads) 514min_parallel(nthreads)

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines