ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.53 by root, Mon Oct 23 00:17:07 2006 UTC vs.
Revision 1.58 by root, Mon Oct 23 18:48:08 2006 UTC

55 REQ_STAT, REQ_LSTAT, REQ_FSTAT, 55 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
56 REQ_FSYNC, REQ_FDATASYNC, 56 REQ_FSYNC, REQ_FDATASYNC,
57 REQ_UNLINK, REQ_RMDIR, REQ_RENAME, 57 REQ_UNLINK, REQ_RMDIR, REQ_RENAME,
58 REQ_READDIR, 58 REQ_READDIR,
59 REQ_LINK, REQ_SYMLINK, 59 REQ_LINK, REQ_SYMLINK,
60 REQ_GROUP, REQ_NOP,
60 REQ_SLEEP, 61 REQ_SLEEP,
61 REQ_GROUP,
62}; 62};
63 63
64#define AIO_REQ_KLASS "IO::AIO::REQ" 64#define AIO_REQ_KLASS "IO::AIO::REQ"
65#define AIO_GRP_KLASS "IO::AIO::GRP" 65#define AIO_GRP_KLASS "IO::AIO::GRP"
66 66
83 int type; 83 int type;
84 int fd, fd2; 84 int fd, fd2;
85 int errorno; 85 int errorno;
86 STRLEN dataoffset; 86 STRLEN dataoffset;
87 mode_t mode; /* open */ 87 mode_t mode; /* open */
88 unsigned char pri;
88 unsigned char cancelled; 89 unsigned char flags;
89} aio_cb; 90} aio_cb;
91
92enum {
93 FLAG_CANCELLED = 0x01,
94};
90 95
91typedef aio_cb *aio_req; 96typedef aio_cb *aio_req;
92typedef aio_cb *aio_req_ornot; 97typedef aio_cb *aio_req_ornot;
93 98
94static int started, wanted; 99static int started, wanted;
130 return mg ? (aio_req)mg->mg_ptr : 0; 135 return mg ? (aio_req)mg->mg_ptr : 0;
131} 136}
132 137
133static void aio_grp_feed (aio_req grp) 138static void aio_grp_feed (aio_req grp)
134{ 139{
135 while (grp->length < grp->fd2) 140 while (grp->length < grp->fd2 && !(grp->flags & FLAG_CANCELLED))
136 { 141 {
137 int old_len = grp->length; 142 int old_len = grp->length;
138 143
139 if (grp->fh2 && SvOK (grp->fh2)) 144 if (grp->fh2 && SvOK (grp->fh2))
140 { 145 {
191static void req_invoke (aio_req req) 196static void req_invoke (aio_req req)
192{ 197{
193 dSP; 198 dSP;
194 int errorno = errno; 199 int errorno = errno;
195 200
196 if (req->cancelled || !SvOK (req->callback)) 201 if (req->flags & FLAG_CANCELLED || !SvOK (req->callback))
197 return; 202 return;
198 203
199 errno = req->errorno; 204 errno = req->errorno;
200 205
201 ENTER; 206 ENTER;
259 for (i = 0; i <= AvFILL (av); ++i) 264 for (i = 0; i <= AvFILL (av); ++i)
260 PUSHs (*av_fetch (av, i, 0)); 265 PUSHs (*av_fetch (av, i, 0));
261 } 266 }
262 break; 267 break;
263 268
269 case REQ_NOP:
264 case REQ_SLEEP: 270 case REQ_SLEEP:
265 break; 271 break;
266 272
267 default: 273 default:
268 PUSHs (sv_2mortal (newSViv (req->result))); 274 PUSHs (sv_2mortal (newSViv (req->result)));
320 Safefree (req); 326 Safefree (req);
321} 327}
322 328
323static void req_cancel (aio_req req) 329static void req_cancel (aio_req req)
324{ 330{
325 req->cancelled = 1; 331 req->flags |= FLAG_CANCELLED;
326 332
327 if (req->type == REQ_GROUP) 333 if (req->type == REQ_GROUP)
328 { 334 {
329 aio_req sub; 335 aio_req sub;
330 336
785 791
786 pthread_mutex_unlock (&reqlock); 792 pthread_mutex_unlock (&reqlock);
787 793
788 errno = 0; /* strictly unnecessary */ 794 errno = 0; /* strictly unnecessary */
789 795
790 if (!req->cancelled) 796 if (!(req->flags & FLAG_CANCELLED))
791 switch (type = req->type) /* remember type for QUIT check */ 797 switch (type = req->type) /* remember type for QUIT check */
792 { 798 {
793 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break; 799 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
794 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break; 800 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
795 801
820 tv.tv_usec = req->fd2; 826 tv.tv_usec = req->fd2;
821 827
822 req->result = select (0, 0, 0, 0, &tv); 828 req->result = select (0, 0, 0, 0, &tv);
823 } 829 }
824 830
831 case REQ_GROUP:
832 case REQ_NOP:
825 case REQ_QUIT: 833 case REQ_QUIT:
826 break; 834 break;
827 835
828 default: 836 default:
829 req->result = ENOSYS; 837 req->result = ENOSYS;
1212 req_send (req); 1220 req_send (req);
1213 XPUSHs (req_sv (req, AIO_GRP_KLASS)); 1221 XPUSHs (req_sv (req, AIO_GRP_KLASS));
1214} 1222}
1215 1223
1216void 1224void
1225aio_nop (callback=&PL_sv_undef)
1226 SV * callback
1227 PPCODE:
1228{
1229 dREQ;
1230
1231 req->type = REQ_NOP;
1232
1233 REQ_SEND;
1234}
1235
1236void
1217flush () 1237flush ()
1218 PROTOTYPE: 1238 PROTOTYPE:
1219 CODE: 1239 CODE:
1220 while (nreqs) 1240 while (nreqs)
1221 { 1241 {
1272cancel (aio_req_ornot req) 1292cancel (aio_req_ornot req)
1273 PROTOTYPE: 1293 PROTOTYPE:
1274 CODE: 1294 CODE:
1275 req_cancel (req); 1295 req_cancel (req);
1276 1296
1297void
1298cb (aio_req req, SV *callback=&PL_sv_undef)
1299 CODE:
1300 SvREFCNT_dec (req->callback);
1301 req->callback = newSVsv (callback);
1302
1277MODULE = IO::AIO PACKAGE = IO::AIO::GRP 1303MODULE = IO::AIO PACKAGE = IO::AIO::GRP
1278 1304
1279void 1305void
1280add (aio_req grp, ...) 1306add (aio_req grp, ...)
1281 PPCODE: 1307 PPCODE:
1322 SvREFCNT_dec (grp->data); 1348 SvREFCNT_dec (grp->data);
1323 grp->data = (SV *)av; 1349 grp->data = (SV *)av;
1324} 1350}
1325 1351
1326void 1352void
1327lock (aio_req grp)
1328 CODE:
1329 ++grp->length;
1330
1331void
1332unlock (aio_req grp)
1333 CODE:
1334 aio_grp_dec (grp);
1335
1336void
1337feeder_limit (aio_req grp, int limit) 1353feed_limit (aio_req grp, int limit)
1338 CODE: 1354 CODE:
1339 grp->fd2 = limit; 1355 grp->fd2 = limit;
1340 aio_grp_feed (grp); 1356 aio_grp_feed (grp);
1341 1357
1342void 1358void
1343set_feeder (aio_req grp, SV *callback=&PL_sv_undef) 1359feed (aio_req grp, SV *callback=&PL_sv_undef)
1344 CODE: 1360 CODE:
1345{ 1361{
1346 SvREFCNT_dec (grp->fh2); 1362 SvREFCNT_dec (grp->fh2);
1347 grp->fh2 = newSVsv (callback); 1363 grp->fh2 = newSVsv (callback);
1348 1364

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines