ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.22 by root, Tue Aug 16 22:22:18 2005 UTC vs.
Revision 1.26 by root, Wed Aug 17 03:52:20 2005 UTC

31 REQ_QUIT, 31 REQ_QUIT,
32 REQ_OPEN, REQ_CLOSE, 32 REQ_OPEN, REQ_CLOSE,
33 REQ_READ, REQ_WRITE, REQ_READAHEAD, 33 REQ_READ, REQ_WRITE, REQ_READAHEAD,
34 REQ_STAT, REQ_LSTAT, REQ_FSTAT, 34 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
35 REQ_FSYNC, REQ_FDATASYNC, 35 REQ_FSYNC, REQ_FDATASYNC,
36 REQ_UNLINK, REQ_RMDIR, REQ_SYMLINK, 36 REQ_UNLINK, REQ_RMDIR,
37 REQ_SYMLINK, 37 REQ_SYMLINK,
38}; 38};
39 39
40typedef struct aio_cb { 40typedef struct aio_cb {
41 struct aio_cb *volatile next; 41 struct aio_cb *volatile next;
68static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 68static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
69 69
70static volatile aio_req reqs, reqe; /* queue start, queue end */ 70static volatile aio_req reqs, reqe; /* queue start, queue end */
71static volatile aio_req ress, rese; /* queue start, queue end */ 71static volatile aio_req ress, rese; /* queue start, queue end */
72 72
73static void free_req (aio_req req)
74{
75 if (req->data)
76 SvREFCNT_dec (req->data);
77
78 if (req->fh)
79 SvREFCNT_dec (req->fh);
80
81 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
82 Safefree (req->statdata);
83
84 if (req->callback)
85 SvREFCNT_dec (req->callback);
86
87 Safefree (req);
88}
89
73static void 90static void
74poll_wait () 91poll_wait ()
75{ 92{
76 if (nreqs && !ress) 93 if (nreqs && !ress)
77 { 94 {
86static int 103static int
87poll_cb () 104poll_cb ()
88{ 105{
89 dSP; 106 dSP;
90 int count = 0; 107 int count = 0;
108 int do_croak = 0;
91 aio_req req, prv; 109 aio_req req, prv;
92 110
93 pthread_mutex_lock (&reslock); 111 pthread_mutex_lock (&reslock);
94 112
95 { 113 {
117 135
118 if (req->type == REQ_READ) 136 if (req->type == REQ_READ)
119 SvCUR_set (req->data, req->dataoffset 137 SvCUR_set (req->data, req->dataoffset
120 + req->result > 0 ? req->result : 0); 138 + req->result > 0 ? req->result : 0);
121 139
122 if (req->data)
123 SvREFCNT_dec (req->data);
124
125 if (req->fh)
126 SvREFCNT_dec (req->fh);
127
128 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT) 140 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
129 { 141 {
130 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT; 142 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
131 PL_laststatval = req->result; 143 PL_laststatval = req->result;
132 PL_statcache = *(req->statdata); 144 PL_statcache = *(req->statdata);
133
134 Safefree (req->statdata);
135 } 145 }
136 146
137 ENTER; 147 ENTER;
138 PUSHMARK (SP); 148 PUSHMARK (SP);
139 XPUSHs (sv_2mortal (newSViv (req->result))); 149 XPUSHs (sv_2mortal (newSViv (req->result)));
159 call_sv (req->callback, G_VOID | G_EVAL); 169 call_sv (req->callback, G_VOID | G_EVAL);
160 SPAGAIN; 170 SPAGAIN;
161 } 171 }
162 172
163 LEAVE; 173 LEAVE;
174
175 do_croak = SvTRUE (ERRSV);
164 176
165 if (req->callback)
166 SvREFCNT_dec (req->callback);
167
168 errno = errorno; 177 errno = errorno;
169 count++; 178 count++;
170 } 179 }
171 180
172 prv = req; 181 prv = req;
173 req = req->next; 182 req = req->next;
174 Safefree (prv); 183 free_req (prv);
175 184
176 /* TODO: croak on errors? */ 185 if (do_croak)
186 croak (0);
177 } 187 }
178 188
179 return count; 189 return count;
180} 190}
181 191
230 240
231static void 241static void
232end_thread (void) 242end_thread (void)
233{ 243{
234 aio_req req; 244 aio_req req;
235 New (0, req, 1, aio_cb); 245 Newz (0, req, 1, aio_cb);
236 req->type = REQ_QUIT; 246 req->type = REQ_QUIT;
237 247
238 send_req (req); 248 send_req (req);
239} 249}
240 250
259 poll_wait (); 269 poll_wait ();
260 poll_cb (); 270 poll_cb ();
261 } 271 }
262} 272}
263 273
264static int fork_started; 274static void create_pipe ()
275{
276 if (pipe (respipe))
277 croak ("unable to initialize result pipe");
278
279 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
280 croak ("cannot set result pipe to nonblocking mode");
281
282 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
283 croak ("cannot set result pipe to nonblocking mode");
284}
265 285
266static void atfork_prepare (void) 286static void atfork_prepare (void)
267{ 287{
268 pthread_mutex_lock (&frklock); 288 pthread_mutex_lock (&frklock);
269
270 fork_started = started;
271
272 for (;;) {
273 while (nreqs)
274 {
275 poll_wait ();
276 poll_cb ();
277 }
278
279 max_parallel (0);
280
281 pthread_mutex_lock (&reqlock); 289 pthread_mutex_lock (&reqlock);
282
283 if (!nreqs && !started)
284 break;
285
286 pthread_mutex_unlock (&reqlock);
287
288 min_parallel (fork_started);
289 }
290
291 pthread_mutex_lock (&reslock); 290 pthread_mutex_lock (&reslock);
292
293 assert (!started);
294 assert (!nreqs);
295 assert (!reqs && !reqe);
296 assert (!ress && !rese);
297} 291}
298 292
299static void atfork_parent (void) 293static void atfork_parent (void)
300{ 294{
301 pthread_mutex_unlock (&reslock); 295 pthread_mutex_unlock (&reslock);
302 min_parallel (fork_started);
303 pthread_mutex_unlock (&reqlock); 296 pthread_mutex_unlock (&reqlock);
304 pthread_mutex_unlock (&frklock); 297 pthread_mutex_unlock (&frklock);
305} 298}
306 299
307static void atfork_child (void) 300static void atfork_child (void)
308{ 301{
302 int restart = started;
303 started = 0;
304
305 while (reqs)
306 {
307 free_req (reqs);
308 reqs = reqs->next;
309 }
310
309 reqs = reqe = 0; 311 reqs = reqe = 0;
310 312
313 while (ress)
314 {
315 free_req (ress);
316 ress = ress->next;
317 }
318
319 ress = rese = 0;
320
321 close (respipe [0]);
322 close (respipe [1]);
323
324 create_pipe ();
325
311 atfork_parent (); 326 atfork_parent ();
327
328 min_parallel (restart);
312} 329}
313 330
314/*****************************************************************************/ 331/*****************************************************************************/
315/* work around various missing functions */ 332/* work around various missing functions */
316 333
487 504
488PROTOTYPES: ENABLE 505PROTOTYPES: ENABLE
489 506
490BOOT: 507BOOT:
491{ 508{
492 if (pipe (respipe)) 509 create_pipe ();
493 croak ("unable to initialize result pipe");
494
495 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
496 croak ("cannot set result pipe to nonblocking mode");
497
498 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
499 croak ("cannot set result pipe to nonblocking mode");
500
501 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 510 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
502} 511}
503 512
504void 513void
505min_parallel(nthreads) 514min_parallel(nthreads)

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines