ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.49 by root, Sun Oct 22 13:33:28 2006 UTC vs.
Revision 1.57 by root, Mon Oct 23 18:38:15 2006 UTC

55 REQ_STAT, REQ_LSTAT, REQ_FSTAT, 55 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
56 REQ_FSYNC, REQ_FDATASYNC, 56 REQ_FSYNC, REQ_FDATASYNC,
57 REQ_UNLINK, REQ_RMDIR, REQ_RENAME, 57 REQ_UNLINK, REQ_RMDIR, REQ_RENAME,
58 REQ_READDIR, 58 REQ_READDIR,
59 REQ_LINK, REQ_SYMLINK, 59 REQ_LINK, REQ_SYMLINK,
60 REQ_GROUP, REQ_NOP,
60 REQ_SLEEP, 61 REQ_SLEEP,
61 REQ_GROUP,
62}; 62};
63 63
64#define AIO_REQ_KLASS "IO::AIO::REQ" 64#define AIO_REQ_KLASS "IO::AIO::REQ"
65#define AIO_GRP_KLASS "IO::AIO::GRP" 65#define AIO_GRP_KLASS "IO::AIO::GRP"
66 66
101static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 101static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
102 102
103static volatile aio_req reqs, reqe; /* queue start, queue end */ 103static volatile aio_req reqs, reqe; /* queue start, queue end */
104static volatile aio_req ress, rese; /* queue start, queue end */ 104static volatile aio_req ress, rese; /* queue start, queue end */
105 105
106static void req_invoke (aio_req req);
106static void req_free (aio_req req); 107static void req_free (aio_req req);
107 108
108/* must be called at most once */ 109/* must be called at most once */
109static SV *req_sv (aio_req req, const char *klass) 110static SV *req_sv (aio_req req, const char *klass)
110{ 111{
117 return sv_2mortal (sv_bless (newRV_inc (req->self), gv_stashpv (klass, 1))); 118 return sv_2mortal (sv_bless (newRV_inc (req->self), gv_stashpv (klass, 1)));
118} 119}
119 120
120static aio_req SvAIO_REQ (SV *sv) 121static aio_req SvAIO_REQ (SV *sv)
121{ 122{
123 MAGIC *mg;
124
122 if (!sv_derived_from (sv, AIO_REQ_KLASS) || !SvROK (sv)) 125 if (!sv_derived_from (sv, AIO_REQ_KLASS) || !SvROK (sv))
123 croak ("object of class " AIO_REQ_KLASS " expected"); 126 croak ("object of class " AIO_REQ_KLASS " expected");
124 127
125 MAGIC *mg = mg_find (SvRV (sv), PERL_MAGIC_ext); 128 mg = mg_find (SvRV (sv), PERL_MAGIC_ext);
126 129
127 return mg ? (aio_req)mg->mg_ptr : 0; 130 return mg ? (aio_req)mg->mg_ptr : 0;
128} 131}
129 132
130static void aio_grp_feed (aio_req grp) 133static void aio_grp_feed (aio_req grp)
131{ 134{
132 while (grp->length < grp->fd2) 135 while (grp->length < grp->fd2 && !grp->cancelled)
133 { 136 {
134 int old_len = grp->length; 137 int old_len = grp->length;
135 138
136 if (grp->fh2 && SvOK (grp->fh2)) 139 if (grp->fh2 && SvOK (grp->fh2))
137 { 140 {
156 break; 159 break;
157 } 160 }
158 } 161 }
159} 162}
160 163
164static void aio_grp_dec (aio_req grp)
165{
166 --grp->length;
167
168 /* call feeder, if applicable */
169 aio_grp_feed (grp);
170
171 /* finish, if done */
172 if (!grp->length && grp->fd)
173 {
174 req_invoke (grp);
175 req_free (grp);
176 }
177}
178
161static void poll_wait () 179static void poll_wait ()
162{ 180{
163 if (nreqs && !ress) 181 if (nreqs && !ress)
164 { 182 {
165 fd_set rfd; 183 fd_set rfd;
241 for (i = 0; i <= AvFILL (av); ++i) 259 for (i = 0; i <= AvFILL (av); ++i)
242 PUSHs (*av_fetch (av, i, 0)); 260 PUSHs (*av_fetch (av, i, 0));
243 } 261 }
244 break; 262 break;
245 263
264 case REQ_NOP:
246 case REQ_SLEEP: 265 case REQ_SLEEP:
247 break; 266 break;
248 267
249 default: 268 default:
250 PUSHs (sv_2mortal (newSViv (req->result))); 269 PUSHs (sv_2mortal (newSViv (req->result)));
254 273
255 PUTBACK; 274 PUTBACK;
256 call_sv (req->callback, G_VOID | G_EVAL); 275 call_sv (req->callback, G_VOID | G_EVAL);
257 SPAGAIN; 276 SPAGAIN;
258 277
278 FREETMPS;
279 LEAVE;
280
281 errno = errorno;
282
259 if (SvTRUE (ERRSV)) 283 if (SvTRUE (ERRSV))
260 { 284 {
261 req_free (req); 285 req_free (req);
262 croak (0); 286 croak (0);
263 } 287 }
264
265 FREETMPS;
266 LEAVE;
267
268 errno = errorno;
269} 288}
270 289
271static void req_free (aio_req req) 290static void req_free (aio_req req)
272{ 291{
273 if (req->grp) 292 if (req->grp)
279 if (req->grp_prev) req->grp_prev->grp_next = req->grp_next; 298 if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
280 299
281 if (grp->grp_first == req) 300 if (grp->grp_first == req)
282 grp->grp_first = req->grp_next; 301 grp->grp_first = req->grp_next;
283 302
284 --grp->length;
285
286 /* call feeder, if applicable */
287 aio_grp_feed (grp); 303 aio_grp_dec (grp);
288
289 /* finish, if done */
290 if (!grp->length && grp->fd)
291 {
292 req_invoke (grp);
293 req_free (grp);
294 }
295 } 304 }
296 305
297 if (req->self) 306 if (req->self)
298 { 307 {
299 sv_unmagic (req->self, PERL_MAGIC_ext); 308 sv_unmagic (req->self, PERL_MAGIC_ext);
355 pthread_mutex_unlock (&reslock); 364 pthread_mutex_unlock (&reslock);
356 365
357 if (!req) 366 if (!req)
358 break; 367 break;
359 368
360 nreqs--; 369 --nreqs;
361 370
362 if (req->type == REQ_QUIT) 371 if (req->type == REQ_QUIT)
363 started--; 372 started--;
364 else if (req->type == REQ_GROUP && req->length) 373 else if (req->type == REQ_GROUP && req->length)
365 { 374 {
416static void req_send (aio_req req) 425static void req_send (aio_req req)
417{ 426{
418 while (started < wanted && nreqs >= started) 427 while (started < wanted && nreqs >= started)
419 start_thread (); 428 start_thread ();
420 429
421 nreqs++; 430 ++nreqs;
422 431
423 pthread_mutex_lock (&reqlock); 432 pthread_mutex_lock (&reqlock);
424 433
425 req->next = 0; 434 req->next = 0;
426 435
778 pthread_mutex_unlock (&reqlock); 787 pthread_mutex_unlock (&reqlock);
779 788
780 errno = 0; /* strictly unnecessary */ 789 errno = 0; /* strictly unnecessary */
781 790
782 if (!req->cancelled) 791 if (!req->cancelled)
783 switch (req->type) 792 switch (type = req->type) /* remember type for QUIT check */
784 { 793 {
785 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break; 794 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
786 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break; 795 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
787 796
788 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break; 797 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
812 tv.tv_usec = req->fd2; 821 tv.tv_usec = req->fd2;
813 822
814 req->result = select (0, 0, 0, 0, &tv); 823 req->result = select (0, 0, 0, 0, &tv);
815 } 824 }
816 825
826 case REQ_GROUP:
827 case REQ_NOP:
817 case REQ_QUIT: 828 case REQ_QUIT:
818 break; 829 break;
819 830
820 default: 831 default:
821 req->result = ENOSYS; 832 req->result = ENOSYS;
1204 req_send (req); 1215 req_send (req);
1205 XPUSHs (req_sv (req, AIO_GRP_KLASS)); 1216 XPUSHs (req_sv (req, AIO_GRP_KLASS));
1206} 1217}
1207 1218
1208void 1219void
1220aio_nop (callback=&PL_sv_undef)
1221 SV * callback
1222 PPCODE:
1223{
1224 dREQ;
1225
1226 req->type = REQ_NOP;
1227
1228 REQ_SEND;
1229}
1230
1231void
1209flush () 1232flush ()
1210 PROTOTYPE: 1233 PROTOTYPE:
1211 CODE: 1234 CODE:
1212 while (nreqs) 1235 while (nreqs)
1213 { 1236 {
1264cancel (aio_req_ornot req) 1287cancel (aio_req_ornot req)
1265 PROTOTYPE: 1288 PROTOTYPE:
1266 CODE: 1289 CODE:
1267 req_cancel (req); 1290 req_cancel (req);
1268 1291
1292void
1293cb (aio_req req, SV *callback=&PL_sv_undef)
1294 CODE:
1295 SvREFCNT_dec (req->callback);
1296 req->callback = newSVsv (callback);
1297
1269MODULE = IO::AIO PACKAGE = IO::AIO::GRP 1298MODULE = IO::AIO PACKAGE = IO::AIO::GRP
1270 1299
1271void 1300void
1272add (aio_req grp, ...) 1301add (aio_req grp, ...)
1273 PPCODE: 1302 PPCODE:
1274{ 1303{
1275 int i; 1304 int i;
1305 aio_req req;
1276 1306
1277 if (grp->fd == 2) 1307 if (grp->fd == 2)
1278 croak ("cannot add requests to IO::AIO::GRP after the group finished"); 1308 croak ("cannot add requests to IO::AIO::GRP after the group finished");
1279 1309
1280 for (i = 1; i < items; ++i ) 1310 for (i = 1; i < items; ++i )
1281 { 1311 {
1282 if (GIMME_V != G_VOID) 1312 if (GIMME_V != G_VOID)
1283 XPUSHs (sv_2mortal (newSVsv (ST (i)))); 1313 XPUSHs (sv_2mortal (newSVsv (ST (i))));
1284 1314
1285 aio_req req = SvAIO_REQ (ST (i)); 1315 req = SvAIO_REQ (ST (i));
1286 1316
1287 if (req) 1317 if (req)
1288 { 1318 {
1289 ++grp->length; 1319 ++grp->length;
1290 req->grp = grp; 1320 req->grp = grp;
1300 } 1330 }
1301} 1331}
1302 1332
1303void 1333void
1304result (aio_req grp, ...) 1334result (aio_req grp, ...)
1305 CODE: 1335 CODE:
1306{ 1336{
1307 int i; 1337 int i;
1308 AV *av = newAV (); 1338 AV *av = newAV ();
1309 1339
1310 for (i = 1; i < items; ++i ) 1340 for (i = 1; i < items; ++i )
1311 av_push (av, newSVsv (ST (i))); 1341 av_push (av, newSVsv (ST (i)));
1312 1342
1313 SvREFCNT_dec (grp->data); 1343 SvREFCNT_dec (grp->data);
1314 grp->data = (SV *)av; 1344 grp->data = (SV *)av;
1315} 1345}
1316 1346
1317void 1347void
1318feeder_limit (aio_req grp, int limit) 1348feed_limit (aio_req grp, int limit)
1319 CODE: 1349 CODE:
1320 grp->fd2 = limit; 1350 grp->fd2 = limit;
1321 aio_grp_feed (grp); 1351 aio_grp_feed (grp);
1322 1352
1323void 1353void
1324set_feeder (aio_req grp, SV *callback=&PL_sv_undef) 1354feed (aio_req grp, SV *callback=&PL_sv_undef)
1325 CODE: 1355 CODE:
1326{ 1356{
1327 SvREFCNT_dec (grp->fh2); 1357 SvREFCNT_dec (grp->fh2);
1328 grp->fh2 = newSVsv (callback); 1358 grp->fh2 = newSVsv (callback);
1329 1359

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines