ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.22 by root, Tue Aug 16 22:22:18 2005 UTC vs.
Revision 1.31 by root, Thu Aug 18 16:34:53 2005 UTC

31 REQ_QUIT, 31 REQ_QUIT,
32 REQ_OPEN, REQ_CLOSE, 32 REQ_OPEN, REQ_CLOSE,
33 REQ_READ, REQ_WRITE, REQ_READAHEAD, 33 REQ_READ, REQ_WRITE, REQ_READAHEAD,
34 REQ_STAT, REQ_LSTAT, REQ_FSTAT, 34 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
35 REQ_FSYNC, REQ_FDATASYNC, 35 REQ_FSYNC, REQ_FDATASYNC,
36 REQ_UNLINK, REQ_RMDIR, REQ_SYMLINK, 36 REQ_UNLINK, REQ_RMDIR,
37 REQ_SYMLINK, 37 REQ_SYMLINK,
38}; 38};
39 39
40typedef struct aio_cb { 40typedef struct aio_cb {
41 struct aio_cb *volatile next; 41 struct aio_cb *volatile next;
55 Stat_t *statdata; 55 Stat_t *statdata;
56} aio_cb; 56} aio_cb;
57 57
58typedef aio_cb *aio_req; 58typedef aio_cb *aio_req;
59 59
60static int started; 60static int started, wanted;
61static volatile int nreqs; 61static volatile int nreqs;
62static int max_outstanding = 1<<30; 62static int max_outstanding = 1<<30;
63static int respipe [2]; 63static int respipe [2];
64 64
65static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER; 65static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
66static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER; 66static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
67static pthread_mutex_t frklock = PTHREAD_MUTEX_INITIALIZER;
68static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 67static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
69 68
70static volatile aio_req reqs, reqe; /* queue start, queue end */ 69static volatile aio_req reqs, reqe; /* queue start, queue end */
71static volatile aio_req ress, rese; /* queue start, queue end */ 70static volatile aio_req ress, rese; /* queue start, queue end */
71
72static void free_req (aio_req req)
73{
74 if (req->data)
75 SvREFCNT_dec (req->data);
76
77 if (req->fh)
78 SvREFCNT_dec (req->fh);
79
80 if (req->statdata)
81 Safefree (req->statdata);
82
83 if (req->callback)
84 SvREFCNT_dec (req->callback);
85
86 Safefree (req);
87}
72 88
73static void 89static void
74poll_wait () 90poll_wait ()
75{ 91{
76 if (nreqs && !ress) 92 if (nreqs && !ress)
86static int 102static int
87poll_cb () 103poll_cb ()
88{ 104{
89 dSP; 105 dSP;
90 int count = 0; 106 int count = 0;
107 int do_croak = 0;
91 aio_req req, prv; 108 aio_req req;
92 109
110 for (;;)
111 {
93 pthread_mutex_lock (&reslock); 112 pthread_mutex_lock (&reslock);
113 req = ress;
94 114
95 { 115 if (req)
116 {
117 ress = req->next;
118
119 if (!ress)
120 {
96 /* read any signals sent by the worker threads */ 121 /* read any signals sent by the worker threads */
97 char buf [32]; 122 char buf [32];
98 while (read (respipe [0], buf, 32) == 32) 123 while (read (respipe [0], buf, 32) == 32)
124 ;
125
126 rese = 0;
127 }
99 ; 128 }
100 }
101 129
102 req = ress;
103 ress = rese = 0;
104
105 pthread_mutex_unlock (&reslock); 130 pthread_mutex_unlock (&reslock);
106 131
107 while (req) 132 if (!req)
108 { 133 break;
134
109 nreqs--; 135 nreqs--;
110 136
111 if (req->type == REQ_QUIT) 137 if (req->type == REQ_QUIT)
112 started--; 138 started--;
113 else 139 else
114 { 140 {
115 int errorno = errno; 141 int errorno = errno;
116 errno = req->errorno; 142 errno = req->errorno;
117 143
118 if (req->type == REQ_READ) 144 if (req->type == REQ_READ)
119 SvCUR_set (req->data, req->dataoffset 145 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
120 + req->result > 0 ? req->result : 0);
121 146
147 if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE))
148 SvREADONLY_off (req->data);
149
122 if (req->data) 150 if (req->statdata)
123 SvREFCNT_dec (req->data);
124
125 if (req->fh)
126 SvREFCNT_dec (req->fh);
127
128 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
129 { 151 {
130 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT; 152 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
131 PL_laststatval = req->result; 153 PL_laststatval = req->result;
132 PL_statcache = *(req->statdata); 154 PL_statcache = *(req->statdata);
133
134 Safefree (req->statdata);
135 } 155 }
136 156
137 ENTER; 157 ENTER;
138 PUSHMARK (SP); 158 PUSHMARK (SP);
139 XPUSHs (sv_2mortal (newSViv (req->result))); 159 XPUSHs (sv_2mortal (newSViv (req->result)));
156 if (SvOK (req->callback)) 176 if (SvOK (req->callback))
157 { 177 {
158 PUTBACK; 178 PUTBACK;
159 call_sv (req->callback, G_VOID | G_EVAL); 179 call_sv (req->callback, G_VOID | G_EVAL);
160 SPAGAIN; 180 SPAGAIN;
181
182 if (SvTRUE (ERRSV))
183 {
184 free_req (req);
185 croak (0);
186 }
161 } 187 }
162 188
163 LEAVE; 189 LEAVE;
164
165 if (req->callback)
166 SvREFCNT_dec (req->callback);
167 190
168 errno = errorno; 191 errno = errorno;
169 count++; 192 count++;
170 } 193 }
171 194
172 prv = req; 195 free_req (req);
173 req = req->next;
174 Safefree (prv);
175
176 /* TODO: croak on errors? */
177 } 196 }
178 197
179 return count; 198 return count;
180} 199}
181 200
202} 221}
203 222
204static void 223static void
205send_req (aio_req req) 224send_req (aio_req req)
206{ 225{
226 while (started < wanted && nreqs >= started)
227 start_thread ();
228
207 nreqs++; 229 nreqs++;
208 230
209 pthread_mutex_lock (&reqlock); 231 pthread_mutex_lock (&reqlock);
210 232
211 req->next = 0; 233 req->next = 0;
219 reqe = reqs = req; 241 reqe = reqs = req;
220 242
221 pthread_cond_signal (&reqwait); 243 pthread_cond_signal (&reqwait);
222 pthread_mutex_unlock (&reqlock); 244 pthread_mutex_unlock (&reqlock);
223 245
224 while (nreqs > max_outstanding) 246 if (nreqs > max_outstanding)
247 for (;;)
248 {
249 poll_cb ();
250
251 if (nreqs <= max_outstanding)
252 break;
253
254 poll_wait ();
255 }
256}
257
258static void
259end_thread (void)
260{
261 aio_req req;
262 Newz (0, req, 1, aio_cb);
263 req->type = REQ_QUIT;
264
265 send_req (req);
266}
267
268static void min_parallel (int nthreads)
269{
270 if (wanted < nthreads)
271 wanted = nthreads;
272}
273
274static void max_parallel (int nthreads)
275{
276 int cur = started;
277
278 if (wanted > nthreads)
279 wanted = nthreads;
280
281 while (cur > wanted)
282 {
283 end_thread ();
284 cur--;
285 }
286
287 while (started > wanted)
225 { 288 {
226 poll_wait (); 289 poll_wait ();
227 poll_cb (); 290 poll_cb ();
228 } 291 }
229} 292}
230 293
231static void 294static void create_pipe ()
232end_thread (void)
233{ 295{
234 aio_req req; 296 if (pipe (respipe))
235 New (0, req, 1, aio_cb); 297 croak ("unable to initialize result pipe");
236 req->type = REQ_QUIT;
237 298
238 send_req (req); 299 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
239} 300 croak ("cannot set result pipe to nonblocking mode");
240 301
241 302 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
242static void min_parallel (int nthreads) 303 croak ("cannot set result pipe to nonblocking mode");
243{
244 while (nthreads > started)
245 start_thread ();
246} 304}
247
248static void max_parallel (int nthreads)
249{
250 int cur = started;
251 while (cur > nthreads)
252 {
253 end_thread ();
254 cur--;
255 }
256
257 while (started > nthreads)
258 {
259 poll_wait ();
260 poll_cb ();
261 }
262}
263
264static int fork_started;
265 305
266static void atfork_prepare (void) 306static void atfork_prepare (void)
267{ 307{
268 pthread_mutex_lock (&frklock);
269
270 fork_started = started;
271
272 for (;;) {
273 while (nreqs)
274 {
275 poll_wait ();
276 poll_cb ();
277 }
278
279 max_parallel (0);
280
281 pthread_mutex_lock (&reqlock); 308 pthread_mutex_lock (&reqlock);
282
283 if (!nreqs && !started)
284 break;
285
286 pthread_mutex_unlock (&reqlock);
287
288 min_parallel (fork_started);
289 }
290
291 pthread_mutex_lock (&reslock); 309 pthread_mutex_lock (&reslock);
292
293 assert (!started);
294 assert (!nreqs);
295 assert (!reqs && !reqe);
296 assert (!ress && !rese);
297} 310}
298 311
299static void atfork_parent (void) 312static void atfork_parent (void)
300{ 313{
301 pthread_mutex_unlock (&reslock); 314 pthread_mutex_unlock (&reslock);
302 min_parallel (fork_started);
303 pthread_mutex_unlock (&reqlock); 315 pthread_mutex_unlock (&reqlock);
304 pthread_mutex_unlock (&frklock);
305} 316}
306 317
307static void atfork_child (void) 318static void atfork_child (void)
308{ 319{
320 aio_req prv;
321
322 started = 0;
323
324 while (reqs)
325 {
326 prv = reqs;
327 reqs = prv->next;
328 free_req (prv);
329 }
330
309 reqs = reqe = 0; 331 reqs = reqe = 0;
332
333 while (ress)
334 {
335 prv = ress;
336 ress = prv->next;
337 free_req (prv);
338 }
339
340 ress = rese = 0;
341
342 close (respipe [0]);
343 close (respipe [1]);
344 create_pipe ();
310 345
311 atfork_parent (); 346 atfork_parent ();
312} 347}
313 348
314/*****************************************************************************/ 349/*****************************************************************************/
479 \ 514 \
480 Newz (0, req, 1, aio_cb); \ 515 Newz (0, req, 1, aio_cb); \
481 if (!req) \ 516 if (!req) \
482 croak ("out of memory during aio_req allocation"); \ 517 croak ("out of memory during aio_req allocation"); \
483 \ 518 \
484 req->callback = SvREFCNT_inc (callback); 519 req->callback = newSVsv (callback);
485 520
486MODULE = IO::AIO PACKAGE = IO::AIO 521MODULE = IO::AIO PACKAGE = IO::AIO
487 522
488PROTOTYPES: ENABLE 523PROTOTYPES: ENABLE
489 524
490BOOT: 525BOOT:
491{ 526{
492 if (pipe (respipe)) 527 create_pipe ();
493 croak ("unable to initialize result pipe");
494
495 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
496 croak ("cannot set result pipe to nonblocking mode");
497
498 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
499 croak ("cannot set result pipe to nonblocking mode");
500
501 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 528 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
502} 529}
503 530
504void 531void
505min_parallel(nthreads) 532min_parallel(nthreads)
610 : IoOFP (sv_2io (fh))); 637 : IoOFP (sv_2io (fh)));
611 req->offset = offset; 638 req->offset = offset;
612 req->length = length; 639 req->length = length;
613 req->data = SvREFCNT_inc (data); 640 req->data = SvREFCNT_inc (data);
614 req->dataptr = (char *)svptr + dataoffset; 641 req->dataptr = (char *)svptr + dataoffset;
615 req->callback = SvREFCNT_inc (callback); 642
643 if (!SvREADONLY (data))
644 {
645 SvREADONLY_on (data);
646 req->data2ptr = (void *)data;
647 }
616 648
617 send_req (req); 649 send_req (req);
618 } 650 }
619} 651}
620 652
649{ 681{
650 dREQ; 682 dREQ;
651 683
652 New (0, req->statdata, 1, Stat_t); 684 New (0, req->statdata, 1, Stat_t);
653 if (!req->statdata) 685 if (!req->statdata)
686 {
687 free_req (req);
654 croak ("out of memory during aio_req->statdata allocation (sorry, i just leaked memory, too)"); 688 croak ("out of memory during aio_req->statdata allocation");
689 }
655 690
656 if (SvPOK (fh_or_path)) 691 if (SvPOK (fh_or_path))
657 { 692 {
658 req->type = ix; 693 req->type = ix;
659 req->data = newSVsv (fh_or_path); 694 req->data = newSVsv (fh_or_path);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines