ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.45 by root, Sun Oct 22 00:19:05 2006 UTC vs.
Revision 1.49 by root, Sun Oct 22 13:33:28 2006 UTC

64#define AIO_REQ_KLASS "IO::AIO::REQ" 64#define AIO_REQ_KLASS "IO::AIO::REQ"
65#define AIO_GRP_KLASS "IO::AIO::GRP" 65#define AIO_GRP_KLASS "IO::AIO::GRP"
66 66
67typedef struct aio_cb 67typedef struct aio_cb
68{ 68{
69 struct aio_cb *grp, *grp_prev, *grp_next;
70
71 struct aio_cb *volatile next; 69 struct aio_cb *volatile next;
70
71 struct aio_cb *grp, *grp_prev, *grp_next, *grp_first;
72 72
73 SV *self; /* the perl counterpart of this request, if any */ 73 SV *self; /* the perl counterpart of this request, if any */
74 74
75 SV *data, *callback; 75 SV *data, *callback;
76 SV *fh, *fh2; 76 SV *fh, *fh2;
88 unsigned char cancelled; 88 unsigned char cancelled;
89} aio_cb; 89} aio_cb;
90 90
91typedef aio_cb *aio_req; 91typedef aio_cb *aio_req;
92typedef aio_cb *aio_req_ornot; 92typedef aio_cb *aio_req_ornot;
93typedef aio_cb *aio_group;
94 93
95static int started, wanted; 94static int started, wanted;
96static volatile int nreqs; 95static volatile int nreqs;
97static int max_outstanding = 1<<30; 96static int max_outstanding = 1<<30;
98static int respipe [2]; 97static int respipe [2];
107static void req_free (aio_req req); 106static void req_free (aio_req req);
108 107
109/* must be called at most once */ 108/* must be called at most once */
110static SV *req_sv (aio_req req, const char *klass) 109static SV *req_sv (aio_req req, const char *klass)
111{ 110{
111 if (!req->self)
112 {
112 req->self = (SV *)newHV (); 113 req->self = (SV *)newHV ();
113 sv_magic (req->self, 0, PERL_MAGIC_ext, (char *)req, 0); 114 sv_magic (req->self, 0, PERL_MAGIC_ext, (char *)req, 0);
115 }
114 116
115 return sv_2mortal (sv_bless (newRV_inc (req->self), gv_stashpv (klass, 1))); 117 return sv_2mortal (sv_bless (newRV_inc (req->self), gv_stashpv (klass, 1)));
116} 118}
117 119
118static aio_req SvAIO_REQ (SV *sv) 120static aio_req SvAIO_REQ (SV *sv)
121 croak ("object of class " AIO_REQ_KLASS " expected"); 123 croak ("object of class " AIO_REQ_KLASS " expected");
122 124
123 MAGIC *mg = mg_find (SvRV (sv), PERL_MAGIC_ext); 125 MAGIC *mg = mg_find (SvRV (sv), PERL_MAGIC_ext);
124 126
125 return mg ? (aio_req)mg->mg_ptr : 0; 127 return mg ? (aio_req)mg->mg_ptr : 0;
128}
129
130static void aio_grp_feed (aio_req grp)
131{
132 while (grp->length < grp->fd2)
133 {
134 int old_len = grp->length;
135
136 if (grp->fh2 && SvOK (grp->fh2))
137 {
138 dSP;
139
140 ENTER;
141 SAVETMPS;
142 PUSHMARK (SP);
143 XPUSHs (req_sv (grp, AIO_GRP_KLASS));
144 PUTBACK;
145 call_sv (grp->fh2, G_VOID | G_EVAL);
146 SPAGAIN;
147 FREETMPS;
148 LEAVE;
149 }
150
151 /* stop if no progress has been made */
152 if (old_len == grp->length)
153 {
154 SvREFCNT_dec (grp->fh2);
155 grp->fh2 = 0;
156 break;
157 }
158 }
126} 159}
127 160
128static void poll_wait () 161static void poll_wait ()
129{ 162{
130 if (nreqs && !ress) 163 if (nreqs && !ress)
146 return; 179 return;
147 180
148 errno = req->errorno; 181 errno = req->errorno;
149 182
150 ENTER; 183 ENTER;
184 SAVETMPS;
151 PUSHMARK (SP); 185 PUSHMARK (SP);
186 EXTEND (SP, 1);
152 187
153 switch (req->type) 188 switch (req->type)
154 { 189 {
155 case REQ_READDIR: 190 case REQ_READDIR:
156 { 191 {
171 } 206 }
172 207
173 rv = sv_2mortal (newRV_noinc ((SV *)av)); 208 rv = sv_2mortal (newRV_noinc ((SV *)av));
174 } 209 }
175 210
176 XPUSHs (rv); 211 PUSHs (rv);
177 } 212 }
178 break; 213 break;
179 214
180 case REQ_OPEN: 215 case REQ_OPEN:
181 { 216 {
182 /* convert fd to fh */ 217 /* convert fd to fh */
183 SV *fh; 218 SV *fh;
184 219
185 XPUSHs (sv_2mortal (newSViv (req->result))); 220 PUSHs (sv_2mortal (newSViv (req->result)));
186 PUTBACK; 221 PUTBACK;
187 call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL); 222 call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
188 SPAGAIN; 223 SPAGAIN;
189 224
190 fh = SvREFCNT_inc (POPs); 225 fh = SvREFCNT_inc (POPs);
192 PUSHMARK (SP); 227 PUSHMARK (SP);
193 XPUSHs (sv_2mortal (fh)); 228 XPUSHs (sv_2mortal (fh));
194 } 229 }
195 break; 230 break;
196 231
232 case REQ_GROUP:
233 req->fd = 2; /* mark group as finished */
234
235 if (req->data)
236 {
237 int i;
238 AV *av = (AV *)req->data;
239
240 EXTEND (SP, AvFILL (av) + 1);
241 for (i = 0; i <= AvFILL (av); ++i)
242 PUSHs (*av_fetch (av, i, 0));
243 }
244 break;
245
197 case REQ_SLEEP: 246 case REQ_SLEEP:
198 case REQ_GROUP:
199 break; 247 break;
200 248
201 default: 249 default:
202 XPUSHs (sv_2mortal (newSViv (req->result))); 250 PUSHs (sv_2mortal (newSViv (req->result)));
203 break; 251 break;
204 } 252 }
205 253
206 254
207 PUTBACK; 255 PUTBACK;
212 { 260 {
213 req_free (req); 261 req_free (req);
214 croak (0); 262 croak (0);
215 } 263 }
216 264
265 FREETMPS;
217 LEAVE; 266 LEAVE;
218 267
219 errno = errorno; 268 errno = errorno;
220} 269}
221 270
224 if (req->grp) 273 if (req->grp)
225 { 274 {
226 aio_req grp = req->grp; 275 aio_req grp = req->grp;
227 276
228 /* unlink request */ 277 /* unlink request */
229 req->grp_next->grp_prev = req->grp_prev; 278 if (req->grp_next) req->grp_next->grp_prev = req->grp_prev;
230 req->grp_prev->grp_next = req->grp_next; 279 if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
231 280
232 if (grp->grp_next == grp) 281 if (grp->grp_first == req)
282 grp->grp_first = req->grp_next;
283
284 --grp->length;
285
286 /* call feeder, if applicable */
287 aio_grp_feed (grp);
288
289 /* finish, if done */
290 if (!grp->length && grp->fd)
233 { 291 {
234 req_invoke (grp); 292 req_invoke (grp);
235 req_free (grp); 293 req_free (grp);
236 } 294 }
237 } 295 }
240 { 298 {
241 sv_unmagic (req->self, PERL_MAGIC_ext); 299 sv_unmagic (req->self, PERL_MAGIC_ext);
242 SvREFCNT_dec (req->self); 300 SvREFCNT_dec (req->self);
243 } 301 }
244 302
245 if (req->data)
246 SvREFCNT_dec (req->data); 303 SvREFCNT_dec (req->data);
247
248 if (req->fh)
249 SvREFCNT_dec (req->fh); 304 SvREFCNT_dec (req->fh);
250
251 if (req->fh2)
252 SvREFCNT_dec (req->fh2); 305 SvREFCNT_dec (req->fh2);
253
254 if (req->statdata)
255 Safefree (req->statdata);
256
257 if (req->callback)
258 SvREFCNT_dec (req->callback); 306 SvREFCNT_dec (req->callback);
307 Safefree (req->statdata);
259 308
260 if (req->type == REQ_READDIR && req->result >= 0) 309 if (req->type == REQ_READDIR && req->result >= 0)
261 free (req->data2ptr); 310 free (req->data2ptr);
262 311
263 Safefree (req); 312 Safefree (req);
269 318
270 if (req->type == REQ_GROUP) 319 if (req->type == REQ_GROUP)
271 { 320 {
272 aio_req sub; 321 aio_req sub;
273 322
274 for (sub = req->grp_next; sub != req; sub = sub->grp_next) 323 for (sub = req->grp_first; sub; sub = sub->grp_next)
275 req_cancel (sub); 324 req_cancel (sub);
276 } 325 }
277} 326}
278 327
279static int poll_cb () 328static int poll_cb ()
310 359
311 nreqs--; 360 nreqs--;
312 361
313 if (req->type == REQ_QUIT) 362 if (req->type == REQ_QUIT)
314 started--; 363 started--;
315 else if (req->type == REQ_GROUP && req->grp_next != req) 364 else if (req->type == REQ_GROUP && req->length)
365 {
366 req->fd = 1; /* mark request as delayed */
316 continue; 367 continue;
368 }
317 else 369 else
318 { 370 {
319 if (req->type == REQ_READ) 371 if (req->type == REQ_READ)
320 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0)); 372 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
321 373
1142} 1194}
1143 1195
1144void 1196void
1145aio_group (callback=&PL_sv_undef) 1197aio_group (callback=&PL_sv_undef)
1146 SV * callback 1198 SV * callback
1147 PROTOTYPE: ;& 1199 PROTOTYPE: ;$
1148 PPCODE: 1200 PPCODE:
1149{ 1201{
1150 dREQ; 1202 dREQ;
1151 req->type = REQ_GROUP; 1203 req->type = REQ_GROUP;
1152 req->grp_next = req;
1153 req->grp_prev = req;
1154
1155 req_send (req); 1204 req_send (req);
1156 XPUSHs (req_sv (req, AIO_GRP_KLASS)); 1205 XPUSHs (req_sv (req, AIO_GRP_KLASS));
1157} 1206}
1158 1207
1159void 1208void
1205 CODE: 1254 CODE:
1206 RETVAL = nreqs; 1255 RETVAL = nreqs;
1207 OUTPUT: 1256 OUTPUT:
1208 RETVAL 1257 RETVAL
1209 1258
1259PROTOTYPES: DISABLE
1260
1210MODULE = IO::AIO PACKAGE = IO::AIO::REQ 1261MODULE = IO::AIO PACKAGE = IO::AIO::REQ
1211 1262
1212void 1263void
1213cancel (aio_req_ornot req) 1264cancel (aio_req_ornot req)
1214 PROTOTYPE: 1265 PROTOTYPE:
1217 1268
1218MODULE = IO::AIO PACKAGE = IO::AIO::GRP 1269MODULE = IO::AIO PACKAGE = IO::AIO::GRP
1219 1270
1220void 1271void
1221add (aio_req grp, ...) 1272add (aio_req grp, ...)
1222 PROTOTYPE: $;@
1223 PPCODE: 1273 PPCODE:
1224{ 1274{
1225 int i; 1275 int i;
1276
1277 if (grp->fd == 2)
1278 croak ("cannot add requests to IO::AIO::GRP after the group finished");
1226 1279
1227 for (i = 1; i < items; ++i ) 1280 for (i = 1; i < items; ++i )
1228 { 1281 {
1229 aio_req req = SvAIO_REQ (ST (i));
1230
1231 req->grp_prev = grp;
1232 req->grp_next = grp->grp_next;
1233 grp->grp_next->grp_prev = req;
1234 grp->grp_next = req;
1235
1236 req->grp = grp;
1237
1238 if (GIMME_V != G_VOID) 1282 if (GIMME_V != G_VOID)
1239 XPUSHs (sv_2mortal (newSVsv (ST (i)))); 1283 XPUSHs (sv_2mortal (newSVsv (ST (i))));
1284
1285 aio_req req = SvAIO_REQ (ST (i));
1286
1287 if (req)
1288 {
1289 ++grp->length;
1290 req->grp = grp;
1291
1292 req->grp_prev = 0;
1293 req->grp_next = grp->grp_first;
1294
1295 if (grp->grp_first)
1296 grp->grp_first->grp_prev = req;
1297
1298 grp->grp_first = req;
1299 }
1240 } 1300 }
1241} 1301}
1242 1302
1303void
1304result (aio_req grp, ...)
1305 CODE:
1306{
1307 int i;
1308 AV *av = newAV ();
1309
1310 for (i = 1; i < items; ++i )
1311 av_push (av, newSVsv (ST (i)));
1312
1313 SvREFCNT_dec (grp->data);
1314 grp->data = (SV *)av;
1315}
1316
1317void
1318feeder_limit (aio_req grp, int limit)
1319 CODE:
1320 grp->fd2 = limit;
1321 aio_grp_feed (grp);
1322
1323void
1324set_feeder (aio_req grp, SV *callback=&PL_sv_undef)
1325 CODE:
1326{
1327 SvREFCNT_dec (grp->fh2);
1328 grp->fh2 = newSVsv (callback);
1329
1330 if (grp->fd2 <= 0)
1331 grp->fd2 = 2;
1332
1333 aio_grp_feed (grp);
1334}
1335

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines