ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.47 by root, Sun Oct 22 10:10:23 2006 UTC vs.
Revision 1.49 by root, Sun Oct 22 13:33:28 2006 UTC

64#define AIO_REQ_KLASS "IO::AIO::REQ" 64#define AIO_REQ_KLASS "IO::AIO::REQ"
65#define AIO_GRP_KLASS "IO::AIO::GRP" 65#define AIO_GRP_KLASS "IO::AIO::GRP"
66 66
67typedef struct aio_cb 67typedef struct aio_cb
68{ 68{
69 struct aio_cb *grp, *grp_prev, *grp_next;
70
71 struct aio_cb *volatile next; 69 struct aio_cb *volatile next;
70
71 struct aio_cb *grp, *grp_prev, *grp_next, *grp_first;
72 72
73 SV *self; /* the perl counterpart of this request, if any */ 73 SV *self; /* the perl counterpart of this request, if any */
74 74
75 SV *data, *callback; 75 SV *data, *callback;
76 SV *fh, *fh2; 76 SV *fh, *fh2;
106static void req_free (aio_req req); 106static void req_free (aio_req req);
107 107
108/* must be called at most once */ 108/* must be called at most once */
109static SV *req_sv (aio_req req, const char *klass) 109static SV *req_sv (aio_req req, const char *klass)
110{ 110{
111 if (!req->self)
112 {
111 req->self = (SV *)newHV (); 113 req->self = (SV *)newHV ();
112 sv_magic (req->self, 0, PERL_MAGIC_ext, (char *)req, 0); 114 sv_magic (req->self, 0, PERL_MAGIC_ext, (char *)req, 0);
115 }
113 116
114 return sv_2mortal (sv_bless (newRV_inc (req->self), gv_stashpv (klass, 1))); 117 return sv_2mortal (sv_bless (newRV_inc (req->self), gv_stashpv (klass, 1)));
115} 118}
116 119
117static aio_req SvAIO_REQ (SV *sv) 120static aio_req SvAIO_REQ (SV *sv)
120 croak ("object of class " AIO_REQ_KLASS " expected"); 123 croak ("object of class " AIO_REQ_KLASS " expected");
121 124
122 MAGIC *mg = mg_find (SvRV (sv), PERL_MAGIC_ext); 125 MAGIC *mg = mg_find (SvRV (sv), PERL_MAGIC_ext);
123 126
124 return mg ? (aio_req)mg->mg_ptr : 0; 127 return mg ? (aio_req)mg->mg_ptr : 0;
128}
129
130static void aio_grp_feed (aio_req grp)
131{
132 while (grp->length < grp->fd2)
133 {
134 int old_len = grp->length;
135
136 if (grp->fh2 && SvOK (grp->fh2))
137 {
138 dSP;
139
140 ENTER;
141 SAVETMPS;
142 PUSHMARK (SP);
143 XPUSHs (req_sv (grp, AIO_GRP_KLASS));
144 PUTBACK;
145 call_sv (grp->fh2, G_VOID | G_EVAL);
146 SPAGAIN;
147 FREETMPS;
148 LEAVE;
149 }
150
151 /* stop if no progress has been made */
152 if (old_len == grp->length)
153 {
154 SvREFCNT_dec (grp->fh2);
155 grp->fh2 = 0;
156 break;
157 }
158 }
125} 159}
126 160
127static void poll_wait () 161static void poll_wait ()
128{ 162{
129 if (nreqs && !ress) 163 if (nreqs && !ress)
145 return; 179 return;
146 180
147 errno = req->errorno; 181 errno = req->errorno;
148 182
149 ENTER; 183 ENTER;
184 SAVETMPS;
150 PUSHMARK (SP); 185 PUSHMARK (SP);
186 EXTEND (SP, 1);
151 187
152 switch (req->type) 188 switch (req->type)
153 { 189 {
154 case REQ_READDIR: 190 case REQ_READDIR:
155 { 191 {
170 } 206 }
171 207
172 rv = sv_2mortal (newRV_noinc ((SV *)av)); 208 rv = sv_2mortal (newRV_noinc ((SV *)av));
173 } 209 }
174 210
175 XPUSHs (rv); 211 PUSHs (rv);
176 } 212 }
177 break; 213 break;
178 214
179 case REQ_OPEN: 215 case REQ_OPEN:
180 { 216 {
181 /* convert fd to fh */ 217 /* convert fd to fh */
182 SV *fh; 218 SV *fh;
183 219
184 XPUSHs (sv_2mortal (newSViv (req->result))); 220 PUSHs (sv_2mortal (newSViv (req->result)));
185 PUTBACK; 221 PUTBACK;
186 call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL); 222 call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
187 SPAGAIN; 223 SPAGAIN;
188 224
189 fh = SvREFCNT_inc (POPs); 225 fh = SvREFCNT_inc (POPs);
191 PUSHMARK (SP); 227 PUSHMARK (SP);
192 XPUSHs (sv_2mortal (fh)); 228 XPUSHs (sv_2mortal (fh));
193 } 229 }
194 break; 230 break;
195 231
232 case REQ_GROUP:
233 req->fd = 2; /* mark group as finished */
234
235 if (req->data)
236 {
237 int i;
238 AV *av = (AV *)req->data;
239
240 EXTEND (SP, AvFILL (av) + 1);
241 for (i = 0; i <= AvFILL (av); ++i)
242 PUSHs (*av_fetch (av, i, 0));
243 }
244 break;
245
196 case REQ_SLEEP: 246 case REQ_SLEEP:
197 case REQ_GROUP:
198 break; 247 break;
199 248
200 default: 249 default:
201 XPUSHs (sv_2mortal (newSViv (req->result))); 250 PUSHs (sv_2mortal (newSViv (req->result)));
202 break; 251 break;
203 } 252 }
204 253
205 254
206 PUTBACK; 255 PUTBACK;
211 { 260 {
212 req_free (req); 261 req_free (req);
213 croak (0); 262 croak (0);
214 } 263 }
215 264
265 FREETMPS;
216 LEAVE; 266 LEAVE;
217 267
218 errno = errorno; 268 errno = errorno;
219} 269}
220 270
223 if (req->grp) 273 if (req->grp)
224 { 274 {
225 aio_req grp = req->grp; 275 aio_req grp = req->grp;
226 276
227 /* unlink request */ 277 /* unlink request */
228 req->grp_next->grp_prev = req->grp_prev; 278 if (req->grp_next) req->grp_next->grp_prev = req->grp_prev;
229 req->grp_prev->grp_next = req->grp_next; 279 if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
230 280
281 if (grp->grp_first == req)
282 grp->grp_first = req->grp_next;
283
284 --grp->length;
285
286 /* call feeder, if applicable */
287 aio_grp_feed (grp);
288
289 /* finish, if done */
231 if (grp->grp_next == grp && grp->fd) 290 if (!grp->length && grp->fd)
232 { 291 {
233 req_invoke (grp); 292 req_invoke (grp);
234 req_free (grp); 293 req_free (grp);
235 } 294 }
236 } 295 }
239 { 298 {
240 sv_unmagic (req->self, PERL_MAGIC_ext); 299 sv_unmagic (req->self, PERL_MAGIC_ext);
241 SvREFCNT_dec (req->self); 300 SvREFCNT_dec (req->self);
242 } 301 }
243 302
244 if (req->data)
245 SvREFCNT_dec (req->data); 303 SvREFCNT_dec (req->data);
246
247 if (req->fh)
248 SvREFCNT_dec (req->fh); 304 SvREFCNT_dec (req->fh);
249
250 if (req->fh2)
251 SvREFCNT_dec (req->fh2); 305 SvREFCNT_dec (req->fh2);
252
253 if (req->statdata)
254 Safefree (req->statdata);
255
256 if (req->callback)
257 SvREFCNT_dec (req->callback); 306 SvREFCNT_dec (req->callback);
307 Safefree (req->statdata);
258 308
259 if (req->type == REQ_READDIR && req->result >= 0) 309 if (req->type == REQ_READDIR && req->result >= 0)
260 free (req->data2ptr); 310 free (req->data2ptr);
261 311
262 Safefree (req); 312 Safefree (req);
268 318
269 if (req->type == REQ_GROUP) 319 if (req->type == REQ_GROUP)
270 { 320 {
271 aio_req sub; 321 aio_req sub;
272 322
273 for (sub = req->grp_next; sub != req; sub = sub->grp_next) 323 for (sub = req->grp_first; sub; sub = sub->grp_next)
274 req_cancel (sub); 324 req_cancel (sub);
275 } 325 }
276} 326}
277 327
278static int poll_cb () 328static int poll_cb ()
309 359
310 nreqs--; 360 nreqs--;
311 361
312 if (req->type == REQ_QUIT) 362 if (req->type == REQ_QUIT)
313 started--; 363 started--;
314 else if (req->type == REQ_GROUP && req->grp_next != req) 364 else if (req->type == REQ_GROUP && req->length)
315 { 365 {
316 req->fd = 1; /* mark request as delayed */ 366 req->fd = 1; /* mark request as delayed */
317 continue; 367 continue;
318 } 368 }
319 else 369 else
1149 PROTOTYPE: ;$ 1199 PROTOTYPE: ;$
1150 PPCODE: 1200 PPCODE:
1151{ 1201{
1152 dREQ; 1202 dREQ;
1153 req->type = REQ_GROUP; 1203 req->type = REQ_GROUP;
1154 req->grp_next = req;
1155 req->grp_prev = req;
1156
1157 req_send (req); 1204 req_send (req);
1158 XPUSHs (req_sv (req, AIO_GRP_KLASS)); 1205 XPUSHs (req_sv (req, AIO_GRP_KLASS));
1159} 1206}
1160 1207
1161void 1208void
1207 CODE: 1254 CODE:
1208 RETVAL = nreqs; 1255 RETVAL = nreqs;
1209 OUTPUT: 1256 OUTPUT:
1210 RETVAL 1257 RETVAL
1211 1258
1259PROTOTYPES: DISABLE
1260
1212MODULE = IO::AIO PACKAGE = IO::AIO::REQ 1261MODULE = IO::AIO PACKAGE = IO::AIO::REQ
1213 1262
1214void 1263void
1215cancel (aio_req_ornot req) 1264cancel (aio_req_ornot req)
1216 PROTOTYPE: 1265 PROTOTYPE:
1219 1268
1220MODULE = IO::AIO PACKAGE = IO::AIO::GRP 1269MODULE = IO::AIO PACKAGE = IO::AIO::GRP
1221 1270
1222void 1271void
1223add (aio_req grp, ...) 1272add (aio_req grp, ...)
1224 PROTOTYPE: $;@
1225 PPCODE: 1273 PPCODE:
1226{ 1274{
1227 int i; 1275 int i;
1276
1277 if (grp->fd == 2)
1278 croak ("cannot add requests to IO::AIO::GRP after the group finished");
1228 1279
1229 for (i = 1; i < items; ++i ) 1280 for (i = 1; i < items; ++i )
1230 { 1281 {
1231 if (GIMME_V != G_VOID) 1282 if (GIMME_V != G_VOID)
1232 XPUSHs (sv_2mortal (newSVsv (ST (i)))); 1283 XPUSHs (sv_2mortal (newSVsv (ST (i))));
1233 1284
1234 aio_req req = SvAIO_REQ (ST (i)); 1285 aio_req req = SvAIO_REQ (ST (i));
1235 1286
1236 if (req) 1287 if (req)
1237 { 1288 {
1238 req->grp_prev = grp; 1289 ++grp->length;
1239 req->grp_next = grp->grp_next;
1240 grp->grp_next->grp_prev = req;
1241 grp->grp_next = req;
1242
1243 req->grp = grp; 1290 req->grp = grp;
1291
1292 req->grp_prev = 0;
1293 req->grp_next = grp->grp_first;
1294
1295 if (grp->grp_first)
1296 grp->grp_first->grp_prev = req;
1297
1298 grp->grp_first = req;
1244 } 1299 }
1245 } 1300 }
1246} 1301}
1247 1302
1303void
1304result (aio_req grp, ...)
1305 CODE:
1306{
1307 int i;
1308 AV *av = newAV ();
1309
1310 for (i = 1; i < items; ++i )
1311 av_push (av, newSVsv (ST (i)));
1312
1313 SvREFCNT_dec (grp->data);
1314 grp->data = (SV *)av;
1315}
1316
1317void
1318feeder_limit (aio_req grp, int limit)
1319 CODE:
1320 grp->fd2 = limit;
1321 aio_grp_feed (grp);
1322
1323void
1324set_feeder (aio_req grp, SV *callback=&PL_sv_undef)
1325 CODE:
1326{
1327 SvREFCNT_dec (grp->fh2);
1328 grp->fh2 = newSVsv (callback);
1329
1330 if (grp->fd2 <= 0)
1331 grp->fd2 = 2;
1332
1333 aio_grp_feed (grp);
1334}
1335

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines