ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.49 by root, Sun Oct 22 13:33:28 2006 UTC vs.
Revision 1.54 by root, Mon Oct 23 00:34:36 2006 UTC

55 REQ_STAT, REQ_LSTAT, REQ_FSTAT, 55 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
56 REQ_FSYNC, REQ_FDATASYNC, 56 REQ_FSYNC, REQ_FDATASYNC,
57 REQ_UNLINK, REQ_RMDIR, REQ_RENAME, 57 REQ_UNLINK, REQ_RMDIR, REQ_RENAME,
58 REQ_READDIR, 58 REQ_READDIR,
59 REQ_LINK, REQ_SYMLINK, 59 REQ_LINK, REQ_SYMLINK,
60 REQ_GROUP, REQ_NOP,
60 REQ_SLEEP, 61 REQ_SLEEP,
61 REQ_GROUP,
62}; 62};
63 63
64#define AIO_REQ_KLASS "IO::AIO::REQ" 64#define AIO_REQ_KLASS "IO::AIO::REQ"
65#define AIO_GRP_KLASS "IO::AIO::GRP" 65#define AIO_GRP_KLASS "IO::AIO::GRP"
66 66
101static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 101static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
102 102
103static volatile aio_req reqs, reqe; /* queue start, queue end */ 103static volatile aio_req reqs, reqe; /* queue start, queue end */
104static volatile aio_req ress, rese; /* queue start, queue end */ 104static volatile aio_req ress, rese; /* queue start, queue end */
105 105
106static void req_invoke (aio_req req);
106static void req_free (aio_req req); 107static void req_free (aio_req req);
107 108
108/* must be called at most once */ 109/* must be called at most once */
109static SV *req_sv (aio_req req, const char *klass) 110static SV *req_sv (aio_req req, const char *klass)
110{ 111{
117 return sv_2mortal (sv_bless (newRV_inc (req->self), gv_stashpv (klass, 1))); 118 return sv_2mortal (sv_bless (newRV_inc (req->self), gv_stashpv (klass, 1)));
118} 119}
119 120
120static aio_req SvAIO_REQ (SV *sv) 121static aio_req SvAIO_REQ (SV *sv)
121{ 122{
123 MAGIC *mg;
124
122 if (!sv_derived_from (sv, AIO_REQ_KLASS) || !SvROK (sv)) 125 if (!sv_derived_from (sv, AIO_REQ_KLASS) || !SvROK (sv))
123 croak ("object of class " AIO_REQ_KLASS " expected"); 126 croak ("object of class " AIO_REQ_KLASS " expected");
124 127
125 MAGIC *mg = mg_find (SvRV (sv), PERL_MAGIC_ext); 128 mg = mg_find (SvRV (sv), PERL_MAGIC_ext);
126 129
127 return mg ? (aio_req)mg->mg_ptr : 0; 130 return mg ? (aio_req)mg->mg_ptr : 0;
128} 131}
129 132
130static void aio_grp_feed (aio_req grp) 133static void aio_grp_feed (aio_req grp)
156 break; 159 break;
157 } 160 }
158 } 161 }
159} 162}
160 163
164static void aio_grp_dec (aio_req grp)
165{
166 --grp->length;
167
168 /* call feeder, if applicable */
169 aio_grp_feed (grp);
170
171 /* finish, if done */
172 if (!grp->length && grp->fd)
173 {
174 req_invoke (grp);
175 req_free (grp);
176 }
177}
178
161static void poll_wait () 179static void poll_wait ()
162{ 180{
163 if (nreqs && !ress) 181 if (nreqs && !ress)
164 { 182 {
165 fd_set rfd; 183 fd_set rfd;
241 for (i = 0; i <= AvFILL (av); ++i) 259 for (i = 0; i <= AvFILL (av); ++i)
242 PUSHs (*av_fetch (av, i, 0)); 260 PUSHs (*av_fetch (av, i, 0));
243 } 261 }
244 break; 262 break;
245 263
264 case REQ_NOP:
246 case REQ_SLEEP: 265 case REQ_SLEEP:
247 break; 266 break;
248 267
249 default: 268 default:
250 PUSHs (sv_2mortal (newSViv (req->result))); 269 PUSHs (sv_2mortal (newSViv (req->result)));
254 273
255 PUTBACK; 274 PUTBACK;
256 call_sv (req->callback, G_VOID | G_EVAL); 275 call_sv (req->callback, G_VOID | G_EVAL);
257 SPAGAIN; 276 SPAGAIN;
258 277
278 FREETMPS;
279 LEAVE;
280
281 errno = errorno;
282
259 if (SvTRUE (ERRSV)) 283 if (SvTRUE (ERRSV))
260 { 284 {
261 req_free (req); 285 req_free (req);
262 croak (0); 286 croak (0);
263 } 287 }
264
265 FREETMPS;
266 LEAVE;
267
268 errno = errorno;
269} 288}
270 289
271static void req_free (aio_req req) 290static void req_free (aio_req req)
272{ 291{
273 if (req->grp) 292 if (req->grp)
279 if (req->grp_prev) req->grp_prev->grp_next = req->grp_next; 298 if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
280 299
281 if (grp->grp_first == req) 300 if (grp->grp_first == req)
282 grp->grp_first = req->grp_next; 301 grp->grp_first = req->grp_next;
283 302
284 --grp->length;
285
286 /* call feeder, if applicable */
287 aio_grp_feed (grp); 303 aio_grp_dec (grp);
288
289 /* finish, if done */
290 if (!grp->length && grp->fd)
291 {
292 req_invoke (grp);
293 req_free (grp);
294 }
295 } 304 }
296 305
297 if (req->self) 306 if (req->self)
298 { 307 {
299 sv_unmagic (req->self, PERL_MAGIC_ext); 308 sv_unmagic (req->self, PERL_MAGIC_ext);
355 pthread_mutex_unlock (&reslock); 364 pthread_mutex_unlock (&reslock);
356 365
357 if (!req) 366 if (!req)
358 break; 367 break;
359 368
360 nreqs--; 369 --nreqs;
361 370
362 if (req->type == REQ_QUIT) 371 if (req->type == REQ_QUIT)
363 started--; 372 started--;
364 else if (req->type == REQ_GROUP && req->length) 373 else if (req->type == REQ_GROUP && req->length)
365 { 374 {
416static void req_send (aio_req req) 425static void req_send (aio_req req)
417{ 426{
418 while (started < wanted && nreqs >= started) 427 while (started < wanted && nreqs >= started)
419 start_thread (); 428 start_thread ();
420 429
421 nreqs++; 430 ++nreqs;
422 431
423 pthread_mutex_lock (&reqlock); 432 pthread_mutex_lock (&reqlock);
424 433
425 req->next = 0; 434 req->next = 0;
426 435
778 pthread_mutex_unlock (&reqlock); 787 pthread_mutex_unlock (&reqlock);
779 788
780 errno = 0; /* strictly unnecessary */ 789 errno = 0; /* strictly unnecessary */
781 790
782 if (!req->cancelled) 791 if (!req->cancelled)
783 switch (req->type) 792 switch (type = req->type) /* remember type for QUIT check */
784 { 793 {
785 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break; 794 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
786 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break; 795 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
787 796
788 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break; 797 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
812 tv.tv_usec = req->fd2; 821 tv.tv_usec = req->fd2;
813 822
814 req->result = select (0, 0, 0, 0, &tv); 823 req->result = select (0, 0, 0, 0, &tv);
815 } 824 }
816 825
826 case REQ_GROUP: /*TODO: should not be handled here */
827 case REQ_NOP: /*TODO: should not be handled here */
817 case REQ_QUIT: 828 case REQ_QUIT:
818 break; 829 break;
819 830
820 default: 831 default:
821 req->result = ENOSYS; 832 req->result = ENOSYS;
1204 req_send (req); 1215 req_send (req);
1205 XPUSHs (req_sv (req, AIO_GRP_KLASS)); 1216 XPUSHs (req_sv (req, AIO_GRP_KLASS));
1206} 1217}
1207 1218
1208void 1219void
1220aio_nop (callback=&PL_sv_undef)
1221 SV * callback
1222 PPCODE:
1223{
1224 dREQ;
1225
1226 req->type = REQ_NOP;
1227
1228 REQ_SEND;
1229}
1230
1231void
1209flush () 1232flush ()
1210 PROTOTYPE: 1233 PROTOTYPE:
1211 CODE: 1234 CODE:
1212 while (nreqs) 1235 while (nreqs)
1213 { 1236 {
1271void 1294void
1272add (aio_req grp, ...) 1295add (aio_req grp, ...)
1273 PPCODE: 1296 PPCODE:
1274{ 1297{
1275 int i; 1298 int i;
1299 aio_req req;
1276 1300
1277 if (grp->fd == 2) 1301 if (grp->fd == 2)
1278 croak ("cannot add requests to IO::AIO::GRP after the group finished"); 1302 croak ("cannot add requests to IO::AIO::GRP after the group finished");
1279 1303
1280 for (i = 1; i < items; ++i ) 1304 for (i = 1; i < items; ++i )
1281 { 1305 {
1282 if (GIMME_V != G_VOID) 1306 if (GIMME_V != G_VOID)
1283 XPUSHs (sv_2mortal (newSVsv (ST (i)))); 1307 XPUSHs (sv_2mortal (newSVsv (ST (i))));
1284 1308
1285 aio_req req = SvAIO_REQ (ST (i)); 1309 req = SvAIO_REQ (ST (i));
1286 1310
1287 if (req) 1311 if (req)
1288 { 1312 {
1289 ++grp->length; 1313 ++grp->length;
1290 req->grp = grp; 1314 req->grp = grp;
1300 } 1324 }
1301} 1325}
1302 1326
1303void 1327void
1304result (aio_req grp, ...) 1328result (aio_req grp, ...)
1305 CODE: 1329 CODE:
1306{ 1330{
1307 int i; 1331 int i;
1308 AV *av = newAV (); 1332 AV *av = newAV ();
1309 1333
1310 for (i = 1; i < items; ++i ) 1334 for (i = 1; i < items; ++i )
1311 av_push (av, newSVsv (ST (i))); 1335 av_push (av, newSVsv (ST (i)));
1312 1336
1313 SvREFCNT_dec (grp->data); 1337 SvREFCNT_dec (grp->data);
1314 grp->data = (SV *)av; 1338 grp->data = (SV *)av;
1315} 1339}
1340
1341void
1342lock (aio_req grp)
1343 CODE:
1344 ++grp->length;
1345
1346void
1347unlock (aio_req grp)
1348 CODE:
1349 aio_grp_dec (grp);
1316 1350
1317void 1351void
1318feeder_limit (aio_req grp, int limit) 1352feeder_limit (aio_req grp, int limit)
1319 CODE: 1353 CODE:
1320 grp->fd2 = limit; 1354 grp->fd2 = limit;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines