ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
(Generate patch)

Comparing libeio/etp.c (file contents):
Revision 1.7 by root, Thu Jun 25 18:12:53 2015 UTC vs.
Revision 1.10 by root, Mon Sep 21 08:01:00 2015 UTC

1/* 1/*
2 * libetp implementation 2 * libetp implementation
3 * 3 *
4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libetp@schmorp.de> 4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann <libetp@schmorp.de>
5 * All rights reserved. 5 * All rights reserved.
6 * 6 *
7 * Redistribution and use in source and binary forms, with or without modifica- 7 * Redistribution and use in source and binary forms, with or without modifica-
8 * tion, are permitted provided that the following conditions are met: 8 * tion, are permitted provided that the following conditions are met:
9 * 9 *
105{ 105{
106 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ 106 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
107 int size; 107 int size;
108} etp_reqq; 108} etp_reqq;
109 109
110typedef struct etp_pool *etp_pool;
111
112typedef struct etp_worker
113{
114 etp_pool pool;
115
116 struct etp_tmpbuf tmpbuf;
117
118 /* locked by pool->wrklock */
119 struct etp_worker *prev, *next;
120
121 xthread_t tid;
122
123#ifdef ETP_WORKER_COMMON
124 ETP_WORKER_COMMON
125#endif
126} etp_worker;
127
110struct etp_pool 128struct etp_pool
111{ 129{
112 void *userdata; 130 void *userdata;
113 131
114 etp_reqq req_queue; 132 etp_reqq req_queue;
134 xcond_t reqwait; 152 xcond_t reqwait;
135 153
136 etp_worker wrk_first; 154 etp_worker wrk_first;
137}; 155};
138 156
139typedef struct etp_pool *etp_pool;
140
141typedef struct etp_worker
142{
143 etp_pool pool;
144
145 struct etp_tmpbuf tmpbuf;
146
147 /* locked by pool->wrklock */
148 struct etp_worker *prev, *next;
149
150 xthread_t tid;
151
152#ifdef ETP_WORKER_COMMON
153 ETP_WORKER_COMMON
154#endif
155} etp_worker;
156
157#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock) 157#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
158#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock) 158#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
159 159
160/* worker threads management */ 160/* worker threads management */
161 161
343 343
344 for (;;) 344 for (;;)
345 { 345 {
346 req = reqq_shift (&pool->req_queue); 346 req = reqq_shift (&pool->req_queue);
347 347
348 if (req) 348 if (ecb_expect_true (req))
349 break; 349 break;
350 350
351 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */ 351 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
352 { 352 {
353 X_UNLOCK (pool->reqlock); 353 X_UNLOCK (pool->reqlock);
377 377
378 --pool->nready; 378 --pool->nready;
379 379
380 X_UNLOCK (pool->reqlock); 380 X_UNLOCK (pool->reqlock);
381 381
382 if (req->type == ETP_TYPE_QUIT) 382 if (ecb_expect_false (req->type == ETP_TYPE_QUIT))
383 goto quit; 383 goto quit;
384 384
385 ETP_EXECUTE (self, req); 385 ETP_EXECUTE (self, req);
386 386
387 X_LOCK (pool->reslock); 387 X_LOCK (pool->reslock);
388 388
389 ++pool->npending; 389 ++pool->npending;
390 390
391 if (!reqq_push (&pool->res_queue, req)) 391 if (!reqq_push (&pool->res_queue, req))
392 ETP_WANT_POLL (poll); 392 ETP_WANT_POLL (pool);
393 393
394 etp_worker_clear (self); 394 etp_worker_clear (self);
395 395
396 X_UNLOCK (pool->reslock); 396 X_UNLOCK (pool->reslock);
397 } 397 }
418 418
419 X_LOCK (pool->wrklock); 419 X_LOCK (pool->wrklock);
420 420
421 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) 421 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
422 { 422 {
423 wrk->prev = &wpool->rk_first; 423 wrk->prev = &pool->wrk_first;
424 wrk->next = pool->wrk_first.next; 424 wrk->next = pool->wrk_first.next;
425 pool->wrk_first.next->prev = wrk; 425 pool->wrk_first.next->prev = wrk;
426 pool->wrk_first.next = wrk; 426 pool->wrk_first.next = wrk;
427 ++pool->started; 427 ++pool->started;
428 } 428 }
485 etp_maybe_start_thread (pool); 485 etp_maybe_start_thread (pool);
486 486
487 X_LOCK (pool->reslock); 487 X_LOCK (pool->reslock);
488 req = reqq_shift (&pool->res_queue); 488 req = reqq_shift (&pool->res_queue);
489 489
490 if (req) 490 if (ecb_expect_true (req))
491 { 491 {
492 --pool->npending; 492 --pool->npending;
493 493
494 if (!pool->res_queue.size) 494 if (!pool->res_queue.size)
495 ETP_DONE_POLL (pool->userdata); 495 ETP_DONE_POLL (pool);
496 } 496 }
497 497
498 X_UNLOCK (pool->reslock); 498 X_UNLOCK (pool->reslock);
499 499
500 if (!req) 500 if (ecb_expect_false (!req))
501 return 0; 501 return 0;
502 502
503 X_LOCK (pool->reqlock); 503 X_LOCK (pool->reqlock);
504 --pool->nreqs; 504 --pool->nreqs;
505 X_UNLOCK (pool->reqlock); 505 X_UNLOCK (pool->reqlock);
586 etp_maybe_start_thread (pool); 586 etp_maybe_start_thread (pool);
587 } 587 }
588} 588}
589 589
590ETP_API_DECL void ecb_cold 590ETP_API_DECL void ecb_cold
591etp_set_max_poll_time (etp_pool pool, double nseconds) 591etp_set_max_poll_time (etp_pool pool, double seconds)
592{ 592{
593 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock); 593 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
594 pool->max_poll_time = nseconds * ETP_TICKS; 594 pool->max_poll_time = seconds * ETP_TICKS;
595 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock); 595 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
596} 596}
597 597
598ETP_API_DECL void ecb_cold 598ETP_API_DECL void ecb_cold
599etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs) 599etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
602 pool->max_poll_reqs = maxreqs; 602 pool->max_poll_reqs = maxreqs;
603 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock); 603 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
604} 604}
605 605
606ETP_API_DECL void ecb_cold 606ETP_API_DECL void ecb_cold
607etp_set_max_idle (etp_pool pool, unsigned int nthreads) 607etp_set_max_idle (etp_pool pool, unsigned int threads)
608{ 608{
609 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); 609 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
610 pool->max_idle = nthreads; 610 pool->max_idle = threads;
611 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); 611 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
612} 612}
613 613
614ETP_API_DECL void ecb_cold 614ETP_API_DECL void ecb_cold
615etp_set_idle_timeout (etp_pool pool, unsigned int seconds) 615etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
618 pool->idle_timeout = seconds; 618 pool->idle_timeout = seconds;
619 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); 619 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
620} 620}
621 621
622ETP_API_DECL void ecb_cold 622ETP_API_DECL void ecb_cold
623etp_set_min_parallel (etp_pool pool, unsigned int nthreads) 623etp_set_min_parallel (etp_pool pool, unsigned int threads)
624{ 624{
625 if (pool->wanted < nthreads) 625 if (pool->wanted < threads)
626 pool->wanted = nthreads; 626 pool->wanted = threads;
627} 627}
628 628
629ETP_API_DECL void ecb_cold 629ETP_API_DECL void ecb_cold
630etp_set_max_parallel (etp_pool pool, unsigned int nthreads) 630etp_set_max_parallel (etp_pool pool, unsigned int threads)
631{ 631{
632 if (pool->wanted > nthreads) 632 if (pool->wanted > threads)
633 pool->wanted = nthreads; 633 pool->wanted = threads;
634 634
635 while (pool->started > pool->wanted) 635 while (pool->started > pool->wanted)
636 etp_end_thread (pool); 636 etp_end_thread (pool);
637} 637}
638 638

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines