ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.49 by root, Sun Oct 22 13:33:28 2006 UTC vs.
Revision 1.51 by root, Sun Oct 22 22:14:33 2006 UTC

101static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 101static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
102 102
103static volatile aio_req reqs, reqe; /* queue start, queue end */ 103static volatile aio_req reqs, reqe; /* queue start, queue end */
104static volatile aio_req ress, rese; /* queue start, queue end */ 104static volatile aio_req ress, rese; /* queue start, queue end */
105 105
106static void req_invoke (aio_req req);
106static void req_free (aio_req req); 107static void req_free (aio_req req);
107 108
108/* must be called at most once */ 109/* must be called at most once */
109static SV *req_sv (aio_req req, const char *klass) 110static SV *req_sv (aio_req req, const char *klass)
110{ 111{
156 break; 157 break;
157 } 158 }
158 } 159 }
159} 160}
160 161
162static void aio_grp_dec (aio_req grp)
163{
164 --grp->length;
165
166 /* call feeder, if applicable */
167 aio_grp_feed (grp);
168
169 /* finish, if done */
170 if (!grp->length && grp->fd)
171 {
172 req_invoke (grp);
173 req_free (grp);
174 }
175}
176
161static void poll_wait () 177static void poll_wait ()
162{ 178{
163 if (nreqs && !ress) 179 if (nreqs && !ress)
164 { 180 {
165 fd_set rfd; 181 fd_set rfd;
254 270
255 PUTBACK; 271 PUTBACK;
256 call_sv (req->callback, G_VOID | G_EVAL); 272 call_sv (req->callback, G_VOID | G_EVAL);
257 SPAGAIN; 273 SPAGAIN;
258 274
275 FREETMPS;
276 LEAVE;
277
278 errno = errorno;
279
259 if (SvTRUE (ERRSV)) 280 if (SvTRUE (ERRSV))
260 { 281 {
261 req_free (req); 282 req_free (req);
262 croak (0); 283 croak (0);
263 } 284 }
264
265 FREETMPS;
266 LEAVE;
267
268 errno = errorno;
269} 285}
270 286
271static void req_free (aio_req req) 287static void req_free (aio_req req)
272{ 288{
273 if (req->grp) 289 if (req->grp)
279 if (req->grp_prev) req->grp_prev->grp_next = req->grp_next; 295 if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
280 296
281 if (grp->grp_first == req) 297 if (grp->grp_first == req)
282 grp->grp_first = req->grp_next; 298 grp->grp_first = req->grp_next;
283 299
284 --grp->length;
285
286 /* call feeder, if applicable */
287 aio_grp_feed (grp); 300 aio_grp_dec (grp);
288
289 /* finish, if done */
290 if (!grp->length && grp->fd)
291 {
292 req_invoke (grp);
293 req_free (grp);
294 }
295 } 301 }
296 302
297 if (req->self) 303 if (req->self)
298 { 304 {
299 sv_unmagic (req->self, PERL_MAGIC_ext); 305 sv_unmagic (req->self, PERL_MAGIC_ext);
355 pthread_mutex_unlock (&reslock); 361 pthread_mutex_unlock (&reslock);
356 362
357 if (!req) 363 if (!req)
358 break; 364 break;
359 365
360 nreqs--; 366 --nreqs;
361 367
362 if (req->type == REQ_QUIT) 368 if (req->type == REQ_QUIT)
363 started--; 369 started--;
364 else if (req->type == REQ_GROUP && req->length) 370 else if (req->type == REQ_GROUP && req->length)
365 { 371 {
416static void req_send (aio_req req) 422static void req_send (aio_req req)
417{ 423{
418 while (started < wanted && nreqs >= started) 424 while (started < wanted && nreqs >= started)
419 start_thread (); 425 start_thread ();
420 426
421 nreqs++; 427 ++nreqs;
422 428
423 pthread_mutex_lock (&reqlock); 429 pthread_mutex_lock (&reqlock);
424 430
425 req->next = 0; 431 req->next = 0;
426 432
778 pthread_mutex_unlock (&reqlock); 784 pthread_mutex_unlock (&reqlock);
779 785
780 errno = 0; /* strictly unnecessary */ 786 errno = 0; /* strictly unnecessary */
781 787
782 if (!req->cancelled) 788 if (!req->cancelled)
783 switch (req->type) 789 switch (type = req->type) /* remember type for QUIT check */
784 { 790 {
785 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break; 791 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
786 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break; 792 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
787 793
788 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break; 794 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
826 832
827 pthread_mutex_lock (&reslock); 833 pthread_mutex_lock (&reslock);
828 834
829 req->next = 0; 835 req->next = 0;
830 836
837 printf ("queue rese %p\n", rese);//D
831 if (rese) 838 if (rese)
832 { 839 {
833 rese->next = req; 840 rese->next = req;
834 rese = req; 841 rese = req;
835 } 842 }
1300 } 1307 }
1301} 1308}
1302 1309
1303void 1310void
1304result (aio_req grp, ...) 1311result (aio_req grp, ...)
1305 CODE: 1312 CODE:
1306{ 1313{
1307 int i; 1314 int i;
1308 AV *av = newAV (); 1315 AV *av = newAV ();
1309 1316
1310 for (i = 1; i < items; ++i ) 1317 for (i = 1; i < items; ++i )
1311 av_push (av, newSVsv (ST (i))); 1318 av_push (av, newSVsv (ST (i)));
1312 1319
1313 SvREFCNT_dec (grp->data); 1320 SvREFCNT_dec (grp->data);
1314 grp->data = (SV *)av; 1321 grp->data = (SV *)av;
1315} 1322}
1323
1324void
1325lock (aio_req grp)
1326 CODE:
1327 ++grp->length;
1328
1329void
1330unlock (aio_req grp)
1331 CODE:
1332 aio_grp_dec (grp);
1316 1333
1317void 1334void
1318feeder_limit (aio_req grp, int limit) 1335feeder_limit (aio_req grp, int limit)
1319 CODE: 1336 CODE:
1320 grp->fd2 = limit; 1337 grp->fd2 = limit;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines