ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.49 by root, Sun Oct 22 13:33:28 2006 UTC vs.
Revision 1.53 by root, Mon Oct 23 00:17:07 2006 UTC

101static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 101static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
102 102
103static volatile aio_req reqs, reqe; /* queue start, queue end */ 103static volatile aio_req reqs, reqe; /* queue start, queue end */
104static volatile aio_req ress, rese; /* queue start, queue end */ 104static volatile aio_req ress, rese; /* queue start, queue end */
105 105
106static void req_invoke (aio_req req);
106static void req_free (aio_req req); 107static void req_free (aio_req req);
107 108
108/* must be called at most once */ 109/* must be called at most once */
109static SV *req_sv (aio_req req, const char *klass) 110static SV *req_sv (aio_req req, const char *klass)
110{ 111{
117 return sv_2mortal (sv_bless (newRV_inc (req->self), gv_stashpv (klass, 1))); 118 return sv_2mortal (sv_bless (newRV_inc (req->self), gv_stashpv (klass, 1)));
118} 119}
119 120
120static aio_req SvAIO_REQ (SV *sv) 121static aio_req SvAIO_REQ (SV *sv)
121{ 122{
123 MAGIC *mg;
124
122 if (!sv_derived_from (sv, AIO_REQ_KLASS) || !SvROK (sv)) 125 if (!sv_derived_from (sv, AIO_REQ_KLASS) || !SvROK (sv))
123 croak ("object of class " AIO_REQ_KLASS " expected"); 126 croak ("object of class " AIO_REQ_KLASS " expected");
124 127
125 MAGIC *mg = mg_find (SvRV (sv), PERL_MAGIC_ext); 128 mg = mg_find (SvRV (sv), PERL_MAGIC_ext);
126 129
127 return mg ? (aio_req)mg->mg_ptr : 0; 130 return mg ? (aio_req)mg->mg_ptr : 0;
128} 131}
129 132
130static void aio_grp_feed (aio_req grp) 133static void aio_grp_feed (aio_req grp)
156 break; 159 break;
157 } 160 }
158 } 161 }
159} 162}
160 163
164static void aio_grp_dec (aio_req grp)
165{
166 --grp->length;
167
168 /* call feeder, if applicable */
169 aio_grp_feed (grp);
170
171 /* finish, if done */
172 if (!grp->length && grp->fd)
173 {
174 req_invoke (grp);
175 req_free (grp);
176 }
177}
178
161static void poll_wait () 179static void poll_wait ()
162{ 180{
163 if (nreqs && !ress) 181 if (nreqs && !ress)
164 { 182 {
165 fd_set rfd; 183 fd_set rfd;
254 272
255 PUTBACK; 273 PUTBACK;
256 call_sv (req->callback, G_VOID | G_EVAL); 274 call_sv (req->callback, G_VOID | G_EVAL);
257 SPAGAIN; 275 SPAGAIN;
258 276
277 FREETMPS;
278 LEAVE;
279
280 errno = errorno;
281
259 if (SvTRUE (ERRSV)) 282 if (SvTRUE (ERRSV))
260 { 283 {
261 req_free (req); 284 req_free (req);
262 croak (0); 285 croak (0);
263 } 286 }
264
265 FREETMPS;
266 LEAVE;
267
268 errno = errorno;
269} 287}
270 288
271static void req_free (aio_req req) 289static void req_free (aio_req req)
272{ 290{
273 if (req->grp) 291 if (req->grp)
279 if (req->grp_prev) req->grp_prev->grp_next = req->grp_next; 297 if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
280 298
281 if (grp->grp_first == req) 299 if (grp->grp_first == req)
282 grp->grp_first = req->grp_next; 300 grp->grp_first = req->grp_next;
283 301
284 --grp->length;
285
286 /* call feeder, if applicable */
287 aio_grp_feed (grp); 302 aio_grp_dec (grp);
288
289 /* finish, if done */
290 if (!grp->length && grp->fd)
291 {
292 req_invoke (grp);
293 req_free (grp);
294 }
295 } 303 }
296 304
297 if (req->self) 305 if (req->self)
298 { 306 {
299 sv_unmagic (req->self, PERL_MAGIC_ext); 307 sv_unmagic (req->self, PERL_MAGIC_ext);
355 pthread_mutex_unlock (&reslock); 363 pthread_mutex_unlock (&reslock);
356 364
357 if (!req) 365 if (!req)
358 break; 366 break;
359 367
360 nreqs--; 368 --nreqs;
361 369
362 if (req->type == REQ_QUIT) 370 if (req->type == REQ_QUIT)
363 started--; 371 started--;
364 else if (req->type == REQ_GROUP && req->length) 372 else if (req->type == REQ_GROUP && req->length)
365 { 373 {
416static void req_send (aio_req req) 424static void req_send (aio_req req)
417{ 425{
418 while (started < wanted && nreqs >= started) 426 while (started < wanted && nreqs >= started)
419 start_thread (); 427 start_thread ();
420 428
421 nreqs++; 429 ++nreqs;
422 430
423 pthread_mutex_lock (&reqlock); 431 pthread_mutex_lock (&reqlock);
424 432
425 req->next = 0; 433 req->next = 0;
426 434
778 pthread_mutex_unlock (&reqlock); 786 pthread_mutex_unlock (&reqlock);
779 787
780 errno = 0; /* strictly unnecessary */ 788 errno = 0; /* strictly unnecessary */
781 789
782 if (!req->cancelled) 790 if (!req->cancelled)
783 switch (req->type) 791 switch (type = req->type) /* remember type for QUIT check */
784 { 792 {
785 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break; 793 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
786 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break; 794 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
787 795
788 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break; 796 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
1271void 1279void
1272add (aio_req grp, ...) 1280add (aio_req grp, ...)
1273 PPCODE: 1281 PPCODE:
1274{ 1282{
1275 int i; 1283 int i;
1284 aio_req req;
1276 1285
1277 if (grp->fd == 2) 1286 if (grp->fd == 2)
1278 croak ("cannot add requests to IO::AIO::GRP after the group finished"); 1287 croak ("cannot add requests to IO::AIO::GRP after the group finished");
1279 1288
1280 for (i = 1; i < items; ++i ) 1289 for (i = 1; i < items; ++i )
1281 { 1290 {
1282 if (GIMME_V != G_VOID) 1291 if (GIMME_V != G_VOID)
1283 XPUSHs (sv_2mortal (newSVsv (ST (i)))); 1292 XPUSHs (sv_2mortal (newSVsv (ST (i))));
1284 1293
1285 aio_req req = SvAIO_REQ (ST (i)); 1294 req = SvAIO_REQ (ST (i));
1286 1295
1287 if (req) 1296 if (req)
1288 { 1297 {
1289 ++grp->length; 1298 ++grp->length;
1290 req->grp = grp; 1299 req->grp = grp;
1300 } 1309 }
1301} 1310}
1302 1311
1303void 1312void
1304result (aio_req grp, ...) 1313result (aio_req grp, ...)
1305 CODE: 1314 CODE:
1306{ 1315{
1307 int i; 1316 int i;
1308 AV *av = newAV (); 1317 AV *av = newAV ();
1309 1318
1310 for (i = 1; i < items; ++i ) 1319 for (i = 1; i < items; ++i )
1311 av_push (av, newSVsv (ST (i))); 1320 av_push (av, newSVsv (ST (i)));
1312 1321
1313 SvREFCNT_dec (grp->data); 1322 SvREFCNT_dec (grp->data);
1314 grp->data = (SV *)av; 1323 grp->data = (SV *)av;
1315} 1324}
1325
1326void
1327lock (aio_req grp)
1328 CODE:
1329 ++grp->length;
1330
1331void
1332unlock (aio_req grp)
1333 CODE:
1334 aio_grp_dec (grp);
1316 1335
1317void 1336void
1318feeder_limit (aio_req grp, int limit) 1337feeder_limit (aio_req grp, int limit)
1319 CODE: 1338 CODE:
1320 grp->fd2 = limit; 1339 grp->fd2 = limit;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines