ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.29 by root, Wed Aug 17 06:12:10 2005 UTC vs.
Revision 1.30 by root, Thu Aug 18 16:32:10 2005 UTC

55 Stat_t *statdata; 55 Stat_t *statdata;
56} aio_cb; 56} aio_cb;
57 57
58typedef aio_cb *aio_req; 58typedef aio_cb *aio_req;
59 59
60static int started; 60static int started, wanted;
61static volatile int nreqs; 61static volatile int nreqs;
62static int max_outstanding = 1<<30; 62static int max_outstanding = 1<<30;
63static int respipe [2]; 63static int respipe [2];
64 64
65static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER; 65static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
116 { 116 {
117 ress = req->next; 117 ress = req->next;
118 118
119 if (!ress) 119 if (!ress)
120 { 120 {
121 rese = 0;
122
123 /* read any signals sent by the worker threads */ 121 /* read any signals sent by the worker threads */
124 char buf [32]; 122 char buf [32];
125 while (read (respipe [0], buf, 32) == 32) 123 while (read (respipe [0], buf, 32) == 32)
126 ; 124 ;
125
126 rese = 0;
127 } 127 }
128 } 128 }
129 129
130 pthread_mutex_unlock (&reslock); 130 pthread_mutex_unlock (&reslock);
131 131
207 pthread_t tid; 207 pthread_t tid;
208 pthread_attr_t attr; 208 pthread_attr_t attr;
209 209
210 pthread_attr_init (&attr); 210 pthread_attr_init (&attr);
211 pthread_attr_setstacksize (&attr, STACKSIZE); 211 pthread_attr_setstacksize (&attr, STACKSIZE);
212 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); 212// pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
213 213
214 sigfillset (&fullsigset); 214 sigfillset (&fullsigset);
215 sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset); 215 sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
216 216
217 if (pthread_create (&tid, &attr, aio_proc, 0) == 0) 217 if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
221} 221}
222 222
223static void 223static void
224send_req (aio_req req) 224send_req (aio_req req)
225{ 225{
226 while (started < wanted && nreqs >= started)
227 start_thread ();
228
226 nreqs++; 229 nreqs++;
227 230
228 pthread_mutex_lock (&reqlock); 231 pthread_mutex_lock (&reqlock);
229 232
230 req->next = 0; 233 req->next = 0;
262 send_req (req); 265 send_req (req);
263} 266}
264 267
265static void min_parallel (int nthreads) 268static void min_parallel (int nthreads)
266{ 269{
267 while (nthreads > started) 270 if (wanted < nthreads)
268 start_thread (); 271 wanted = nthreads;
269} 272}
270 273
271static void max_parallel (int nthreads) 274static void max_parallel (int nthreads)
272{ 275{
273 int cur = started; 276 int cur = started;
274 277
278 if (wanted > nthreads)
279 wanted = nthreads;
280
275 while (cur > nthreads) 281 while (cur > wanted)
276 { 282 {
277 end_thread (); 283 end_thread ();
278 cur--; 284 cur--;
279 } 285 }
280 286
281 while (started > nthreads) 287 while (started > wanted)
282 { 288 {
283 poll_wait (); 289 poll_wait ();
284 poll_cb (); 290 poll_cb ();
285 } 291 }
286} 292}
311 317
312static void atfork_child (void) 318static void atfork_child (void)
313{ 319{
314 aio_req prv; 320 aio_req prv;
315 321
316 int restart = started;
317 started = 0; 322 started = 0;
318 323
319 while (reqs) 324 while (reqs)
320 { 325 {
321 prv = reqs; 326 prv = reqs;
337 close (respipe [0]); 342 close (respipe [0]);
338 close (respipe [1]); 343 close (respipe [1]);
339 create_pipe (); 344 create_pipe ();
340 345
341 atfork_parent (); 346 atfork_parent ();
342
343 min_parallel (restart);
344} 347}
345 348
346/*****************************************************************************/ 349/*****************************************************************************/
347/* work around various missing functions */ 350/* work around various missing functions */
348 351

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines