ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
(Generate patch)

Comparing libeio/etp.c (file contents):
Revision 1.4 by root, Thu Jun 25 17:05:07 2015 UTC vs.
Revision 1.5 by root, Thu Jun 25 17:40:24 2015 UTC

106 } 106 }
107 107
108 return buf->ptr; 108 return buf->ptr;
109} 109}
110 110
111/*
112 * a somewhat faster data structure might be nice, but
113 * with 8 priorities this actually needs <20 insns
114 * per shift, the most expensive operation.
115 */
116typedef struct
117{
118 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
119 int size;
120} etp_reqq;
121
122struct etp_pool
123{
124 etp_reqq req_queue;
125 etp_reqq res_queue;
126};
127
128typedef struct etp_pool *etp_pool;
129
111typedef struct etp_worker 130typedef struct etp_worker
112{ 131{
132 etp_pool pool;
133
113 struct etp_tmpbuf tmpbuf; 134 struct etp_tmpbuf tmpbuf;
114 135
115 /* locked by wrklock */ 136 /* locked by wrklock */
116 struct etp_worker *prev, *next; 137 struct etp_worker *prev, *next;
117 138
144 165
145 free (wrk); 166 free (wrk);
146} 167}
147 168
148ETP_API_DECL unsigned int 169ETP_API_DECL unsigned int
149etp_nreqs (void) 170etp_nreqs (etp_pool pool)
150{ 171{
151 int retval; 172 int retval;
152 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 173 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
153 retval = nreqs; 174 retval = nreqs;
154 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 175 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
155 return retval; 176 return retval;
156} 177}
157 178
158ETP_API_DECL unsigned int 179ETP_API_DECL unsigned int
159etp_nready (void) 180etp_nready (etp_pool pool)
160{ 181{
161 unsigned int retval; 182 unsigned int retval;
162 183
163 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 184 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
164 retval = nready; 185 retval = nready;
166 187
167 return retval; 188 return retval;
168} 189}
169 190
170ETP_API_DECL unsigned int 191ETP_API_DECL unsigned int
171etp_npending (void) 192etp_npending (etp_pool pool)
172{ 193{
173 unsigned int retval; 194 unsigned int retval;
174 195
175 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 196 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
176 retval = npending; 197 retval = npending;
178 199
179 return retval; 200 return retval;
180} 201}
181 202
182ETP_API_DECL unsigned int 203ETP_API_DECL unsigned int
183etp_nthreads (void) 204etp_nthreads (etp_pool pool)
184{ 205{
185 unsigned int retval; 206 unsigned int retval;
186 207
187 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 208 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
188 retval = started; 209 retval = started;
189 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 210 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
190 211
191 return retval; 212 return retval;
192} 213}
193
194/*
195 * a somewhat faster data structure might be nice, but
196 * with 8 priorities this actually needs <20 insns
197 * per shift, the most expensive operation.
198 */
199typedef struct {
200 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
201 int size;
202} etp_reqq;
203
204static etp_reqq req_queue;
205static etp_reqq res_queue;
206 214
207static void ecb_noinline ecb_cold 215static void ecb_noinline ecb_cold
208reqq_init (etp_reqq *q) 216reqq_init (etp_reqq *q)
209{ 217{
210 int pri; 218 int pri;
257 265
258 abort (); 266 abort ();
259} 267}
260 268
261ETP_API_DECL int ecb_cold 269ETP_API_DECL int ecb_cold
262etp_init (void (*want_poll)(void), void (*done_poll)(void)) 270etp_init (etp_pool pool, void (*want_poll)(void), void (*done_poll)(void))
263{ 271{
264 X_MUTEX_CREATE (wrklock); 272 X_MUTEX_CREATE (wrklock);
265 X_MUTEX_CREATE (reslock); 273 X_MUTEX_CREATE (reslock);
266 X_MUTEX_CREATE (reqlock); 274 X_MUTEX_CREATE (reqlock);
267 X_COND_CREATE (reqwait); 275 X_COND_CREATE (reqwait);
268 276
269 reqq_init (&req_queue); 277 reqq_init (&pool->req_queue);
270 reqq_init (&res_queue); 278 reqq_init (&pool->res_queue);
271 279
272 wrk_first.next = 280 wrk_first.next =
273 wrk_first.prev = &wrk_first; 281 wrk_first.prev = &wrk_first;
274 282
275 started = 0; 283 started = 0;
304X_THREAD_PROC (etp_proc) 312X_THREAD_PROC (etp_proc)
305{ 313{
306 ETP_REQ *req; 314 ETP_REQ *req;
307 struct timespec ts; 315 struct timespec ts;
308 etp_worker *self = (etp_worker *)thr_arg; 316 etp_worker *self = (etp_worker *)thr_arg;
317 etp_pool pool = self->pool;
309 318
310 etp_proc_init (); 319 etp_proc_init ();
311 320
312 /* try to distribute timeouts somewhat evenly */ 321 /* try to distribute timeouts somewhat evenly */
313 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); 322 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
318 327
319 X_LOCK (reqlock); 328 X_LOCK (reqlock);
320 329
321 for (;;) 330 for (;;)
322 { 331 {
323 req = reqq_shift (&req_queue); 332 req = reqq_shift (&pool->req_queue);
324 333
325 if (req) 334 if (req)
326 break; 335 break;
327 336
328 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */ 337 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
363 372
364 X_LOCK (reslock); 373 X_LOCK (reslock);
365 374
366 ++npending; 375 ++npending;
367 376
368 if (!reqq_push (&res_queue, req) && want_poll_cb) 377 if (!reqq_push (&pool->res_queue, req) && want_poll_cb)
369 want_poll_cb (); 378 want_poll_cb ();
370 379
371 etp_worker_clear (self); 380 etp_worker_clear (self);
372 381
373 X_UNLOCK (reslock); 382 X_UNLOCK (reslock);
382 391
383 return 0; 392 return 0;
384} 393}
385 394
386static void ecb_cold 395static void ecb_cold
387etp_start_thread (void) 396etp_start_thread (etp_pool pool)
388{ 397{
389 etp_worker *wrk = calloc (1, sizeof (etp_worker)); 398 etp_worker *wrk = calloc (1, sizeof (etp_worker));
390 399
391 /*TODO*/ 400 /*TODO*/
392 assert (("unable to allocate worker thread data", wrk)); 401 assert (("unable to allocate worker thread data", wrk));
402
403 wrk->pool = pool;
393 404
394 X_LOCK (wrklock); 405 X_LOCK (wrklock);
395 406
396 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) 407 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
397 { 408 {
406 417
407 X_UNLOCK (wrklock); 418 X_UNLOCK (wrklock);
408} 419}
409 420
410static void 421static void
411etp_maybe_start_thread (void) 422etp_maybe_start_thread (etp_pool pool)
412{ 423{
413 if (ecb_expect_true (etp_nthreads () >= wanted)) 424 if (ecb_expect_true (etp_nthreads (pool) >= wanted))
414 return; 425 return;
415 426
416 /* todo: maybe use idle here, but might be less exact */ 427 /* todo: maybe use idle here, but might be less exact */
417 if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ())) 428 if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
418 return; 429 return;
419 430
420 etp_start_thread (); 431 etp_start_thread (pool);
421} 432}
422 433
423static void ecb_cold 434static void ecb_cold
424etp_end_thread (void) 435etp_end_thread (etp_pool pool)
425{ 436{
426 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */ 437 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
427 438
428 req->type = ETP_TYPE_QUIT; 439 req->type = ETP_TYPE_QUIT;
429 req->pri = ETP_PRI_MAX - ETP_PRI_MIN; 440 req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
430 441
431 X_LOCK (reqlock); 442 X_LOCK (reqlock);
432 reqq_push (&req_queue, req); 443 reqq_push (&pool->req_queue, req);
433 X_COND_SIGNAL (reqwait); 444 X_COND_SIGNAL (reqwait);
434 X_UNLOCK (reqlock); 445 X_UNLOCK (reqlock);
435 446
436 X_LOCK (wrklock); 447 X_LOCK (wrklock);
437 --started; 448 --started;
438 X_UNLOCK (wrklock); 449 X_UNLOCK (wrklock);
439} 450}
440 451
441ETP_API_DECL int 452ETP_API_DECL int
442etp_poll (void) 453etp_poll (etp_pool pool)
443{ 454{
444 unsigned int maxreqs; 455 unsigned int maxreqs;
445 unsigned int maxtime; 456 unsigned int maxtime;
446 struct timeval tv_start, tv_now; 457 struct timeval tv_start, tv_now;
447 458
455 466
456 for (;;) 467 for (;;)
457 { 468 {
458 ETP_REQ *req; 469 ETP_REQ *req;
459 470
460 etp_maybe_start_thread (); 471 etp_maybe_start_thread (pool);
461 472
462 X_LOCK (reslock); 473 X_LOCK (reslock);
463 req = reqq_shift (&res_queue); 474 req = reqq_shift (&pool->res_queue);
464 475
465 if (req) 476 if (req)
466 { 477 {
467 --npending; 478 --npending;
468 479
469 if (!res_queue.size && done_poll_cb) 480 if (!pool->res_queue.size && done_poll_cb)
470 done_poll_cb (); 481 done_poll_cb ();
471 } 482 }
472 483
473 X_UNLOCK (reslock); 484 X_UNLOCK (reslock);
474 485
506 errno = EAGAIN; 517 errno = EAGAIN;
507 return -1; 518 return -1;
508} 519}
509 520
510ETP_API_DECL void 521ETP_API_DECL void
511etp_grp_cancel (ETP_REQ *grp); 522etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
512 523
513ETP_API_DECL void 524ETP_API_DECL void
514etp_cancel (ETP_REQ *req) 525etp_cancel (etp_pool pool, ETP_REQ *req)
515{ 526{
516 req->cancelled = 1; 527 req->cancelled = 1;
517 528
518 etp_grp_cancel (req); 529 etp_grp_cancel (pool, req);
519} 530}
520 531
521ETP_API_DECL void 532ETP_API_DECL void
522etp_grp_cancel (ETP_REQ *grp) 533etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
523{ 534{
524 for (grp = grp->grp_first; grp; grp = grp->grp_next) 535 for (grp = grp->grp_first; grp; grp = grp->grp_next)
525 etp_cancel (grp); 536 etp_cancel (pool, grp);
526} 537}
527 538
528ETP_API_DECL void 539ETP_API_DECL void
529etp_submit (ETP_REQ *req) 540etp_submit (etp_pool pool, ETP_REQ *req)
530{ 541{
531 req->pri -= ETP_PRI_MIN; 542 req->pri -= ETP_PRI_MIN;
532 543
533 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN; 544 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
534 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN; 545 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
542 553
543 X_LOCK (reslock); 554 X_LOCK (reslock);
544 555
545 ++npending; 556 ++npending;
546 557
547 if (!reqq_push (&res_queue, req) && want_poll_cb) 558 if (!reqq_push (&pool->res_queue, req) && want_poll_cb)
548 want_poll_cb (); 559 want_poll_cb ();
549 560
550 X_UNLOCK (reslock); 561 X_UNLOCK (reslock);
551 } 562 }
552 else 563 else
553 { 564 {
554 X_LOCK (reqlock); 565 X_LOCK (reqlock);
555 ++nreqs; 566 ++nreqs;
556 ++nready; 567 ++nready;
557 reqq_push (&req_queue, req); 568 reqq_push (&pool->req_queue, req);
558 X_COND_SIGNAL (reqwait); 569 X_COND_SIGNAL (reqwait);
559 X_UNLOCK (reqlock); 570 X_UNLOCK (reqlock);
560 571
561 etp_maybe_start_thread (); 572 etp_maybe_start_thread (pool);
562 } 573 }
563} 574}
564 575
565ETP_API_DECL void ecb_cold 576ETP_API_DECL void ecb_cold
566etp_set_max_poll_time (double nseconds) 577etp_set_max_poll_time (etp_pool pool, double nseconds)
567{ 578{
568 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 579 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
569 max_poll_time = nseconds * ETP_TICKS; 580 max_poll_time = nseconds * ETP_TICKS;
570 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); 581 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
571} 582}
572 583
573ETP_API_DECL void ecb_cold 584ETP_API_DECL void ecb_cold
574etp_set_max_poll_reqs (unsigned int maxreqs) 585etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
575{ 586{
576 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 587 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
577 max_poll_reqs = maxreqs; 588 max_poll_reqs = maxreqs;
578 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); 589 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
579} 590}
580 591
581ETP_API_DECL void ecb_cold 592ETP_API_DECL void ecb_cold
582etp_set_max_idle (unsigned int nthreads) 593etp_set_max_idle (etp_pool pool, unsigned int nthreads)
583{ 594{
584 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 595 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
585 max_idle = nthreads; 596 max_idle = nthreads;
586 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 597 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
587} 598}
588 599
589ETP_API_DECL void ecb_cold 600ETP_API_DECL void ecb_cold
590etp_set_idle_timeout (unsigned int seconds) 601etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
591{ 602{
592 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 603 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
593 idle_timeout = seconds; 604 idle_timeout = seconds;
594 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 605 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
595} 606}
596 607
597ETP_API_DECL void ecb_cold 608ETP_API_DECL void ecb_cold
598etp_set_min_parallel (unsigned int nthreads) 609etp_set_min_parallel (etp_pool pool, unsigned int nthreads)
599{ 610{
600 if (wanted < nthreads) 611 if (wanted < nthreads)
601 wanted = nthreads; 612 wanted = nthreads;
602} 613}
603 614
604ETP_API_DECL void ecb_cold 615ETP_API_DECL void ecb_cold
605etp_set_max_parallel (unsigned int nthreads) 616etp_set_max_parallel (etp_pool pool, unsigned int nthreads)
606{ 617{
607 if (wanted > nthreads) 618 if (wanted > nthreads)
608 wanted = nthreads; 619 wanted = nthreads;
609 620
610 while (started > wanted) 621 while (started > wanted)
611 etp_end_thread (); 622 etp_end_thread (pool);
612} 623}
613 624

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines