ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
(Generate patch)

Comparing libeio/etp.c (file contents):
Revision 1.7 by root, Thu Jun 25 18:12:53 2015 UTC vs.
Revision 1.11 by root, Sun May 1 17:15:45 2016 UTC

1/* 1/*
2 * libetp implementation 2 * libetp implementation
3 * 3 *
4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libetp@schmorp.de> 4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann <libetp@schmorp.de>
5 * All rights reserved. 5 * All rights reserved.
6 * 6 *
7 * Redistribution and use in source and binary forms, with or without modifica- 7 * Redistribution and use in source and binary forms, with or without modifica-
8 * tion, are permitted provided that the following conditions are met: 8 * tion, are permitted provided that the following conditions are met:
9 * 9 *
35 * and other provisions required by the GPL. If you do not delete the 35 * and other provisions required by the GPL. If you do not delete the
36 * provisions above, a recipient may use your version of this file under 36 * provisions above, a recipient may use your version of this file under
37 * either the BSD or the GPL. 37 * either the BSD or the GPL.
38 */ 38 */
39 39
40#if HAVE_SYS_PRCTL_H
41# include <sys/prctl.h>
42#endif
43
44#ifdef EIO_STACKSIZE
45# define X_STACKSIZE EIO_STACKSIZE
46#endif
47#include "xthread.h"
48
40#ifndef ETP_API_DECL 49#ifndef ETP_API_DECL
41# define ETP_API_DECL static 50# define ETP_API_DECL static
42#endif 51#endif
43 52
44#ifndef ETP_PRI_MIN 53#ifndef ETP_PRI_MIN
104typedef struct 113typedef struct
105{ 114{
106 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ 115 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
107 int size; 116 int size;
108} etp_reqq; 117} etp_reqq;
118
119typedef struct etp_pool *etp_pool;
120
121typedef struct etp_worker
122{
123 etp_pool pool;
124
125 struct etp_tmpbuf tmpbuf;
126
127 /* locked by pool->wrklock */
128 struct etp_worker *prev, *next;
129
130 xthread_t tid;
131
132#ifdef ETP_WORKER_COMMON
133 ETP_WORKER_COMMON
134#endif
135} etp_worker;
109 136
110struct etp_pool 137struct etp_pool
111{ 138{
112 void *userdata; 139 void *userdata;
113 140
134 xcond_t reqwait; 161 xcond_t reqwait;
135 162
136 etp_worker wrk_first; 163 etp_worker wrk_first;
137}; 164};
138 165
139typedef struct etp_pool *etp_pool;
140
141typedef struct etp_worker
142{
143 etp_pool pool;
144
145 struct etp_tmpbuf tmpbuf;
146
147 /* locked by pool->wrklock */
148 struct etp_worker *prev, *next;
149
150 xthread_t tid;
151
152#ifdef ETP_WORKER_COMMON
153 ETP_WORKER_COMMON
154#endif
155} etp_worker;
156
157#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock) 166#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
158#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock) 167#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
159 168
160/* worker threads management */ 169/* worker threads management */
161 170
309static void ecb_noinline ecb_cold 318static void ecb_noinline ecb_cold
310etp_proc_init (void) 319etp_proc_init (void)
311{ 320{
312#if HAVE_PRCTL_SET_NAME 321#if HAVE_PRCTL_SET_NAME
313 /* provide a more sensible "thread name" */ 322 /* provide a more sensible "thread name" */
314 char name[16 + 1]; 323 char name[15 + 1];
315 const int namelen = sizeof (name) - 1; 324 const int namelen = sizeof (name) - 1;
316 int len; 325 int len;
317 326
318 prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0); 327 prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
319 name [namelen] = 0; 328 name [namelen] = 0;
343 352
344 for (;;) 353 for (;;)
345 { 354 {
346 req = reqq_shift (&pool->req_queue); 355 req = reqq_shift (&pool->req_queue);
347 356
348 if (req) 357 if (ecb_expect_true (req))
349 break; 358 break;
350 359
351 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */ 360 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
352 { 361 {
353 X_UNLOCK (pool->reqlock); 362 X_UNLOCK (pool->reqlock);
377 386
378 --pool->nready; 387 --pool->nready;
379 388
380 X_UNLOCK (pool->reqlock); 389 X_UNLOCK (pool->reqlock);
381 390
382 if (req->type == ETP_TYPE_QUIT) 391 if (ecb_expect_false (req->type == ETP_TYPE_QUIT))
383 goto quit; 392 goto quit;
384 393
385 ETP_EXECUTE (self, req); 394 ETP_EXECUTE (self, req);
386 395
387 X_LOCK (pool->reslock); 396 X_LOCK (pool->reslock);
388 397
389 ++pool->npending; 398 ++pool->npending;
390 399
391 if (!reqq_push (&pool->res_queue, req)) 400 if (!reqq_push (&pool->res_queue, req))
392 ETP_WANT_POLL (poll); 401 ETP_WANT_POLL (pool);
393 402
394 etp_worker_clear (self); 403 etp_worker_clear (self);
395 404
396 X_UNLOCK (pool->reslock); 405 X_UNLOCK (pool->reslock);
397 } 406 }
418 427
419 X_LOCK (pool->wrklock); 428 X_LOCK (pool->wrklock);
420 429
421 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) 430 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
422 { 431 {
423 wrk->prev = &wpool->rk_first; 432 wrk->prev = &pool->wrk_first;
424 wrk->next = pool->wrk_first.next; 433 wrk->next = pool->wrk_first.next;
425 pool->wrk_first.next->prev = wrk; 434 pool->wrk_first.next->prev = wrk;
426 pool->wrk_first.next = wrk; 435 pool->wrk_first.next = wrk;
427 ++pool->started; 436 ++pool->started;
428 } 437 }
485 etp_maybe_start_thread (pool); 494 etp_maybe_start_thread (pool);
486 495
487 X_LOCK (pool->reslock); 496 X_LOCK (pool->reslock);
488 req = reqq_shift (&pool->res_queue); 497 req = reqq_shift (&pool->res_queue);
489 498
490 if (req) 499 if (ecb_expect_true (req))
491 { 500 {
492 --pool->npending; 501 --pool->npending;
493 502
494 if (!pool->res_queue.size) 503 if (!pool->res_queue.size)
495 ETP_DONE_POLL (pool->userdata); 504 ETP_DONE_POLL (pool);
496 } 505 }
497 506
498 X_UNLOCK (pool->reslock); 507 X_UNLOCK (pool->reslock);
499 508
500 if (!req) 509 if (ecb_expect_false (!req))
501 return 0; 510 return 0;
502 511
503 X_LOCK (pool->reqlock); 512 X_LOCK (pool->reqlock);
504 --pool->nreqs; 513 --pool->nreqs;
505 X_UNLOCK (pool->reqlock); 514 X_UNLOCK (pool->reqlock);
586 etp_maybe_start_thread (pool); 595 etp_maybe_start_thread (pool);
587 } 596 }
588} 597}
589 598
590ETP_API_DECL void ecb_cold 599ETP_API_DECL void ecb_cold
591etp_set_max_poll_time (etp_pool pool, double nseconds) 600etp_set_max_poll_time (etp_pool pool, double seconds)
592{ 601{
593 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock); 602 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
594 pool->max_poll_time = nseconds * ETP_TICKS; 603 pool->max_poll_time = seconds * ETP_TICKS;
595 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock); 604 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
596} 605}
597 606
598ETP_API_DECL void ecb_cold 607ETP_API_DECL void ecb_cold
599etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs) 608etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
602 pool->max_poll_reqs = maxreqs; 611 pool->max_poll_reqs = maxreqs;
603 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock); 612 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
604} 613}
605 614
606ETP_API_DECL void ecb_cold 615ETP_API_DECL void ecb_cold
607etp_set_max_idle (etp_pool pool, unsigned int nthreads) 616etp_set_max_idle (etp_pool pool, unsigned int threads)
608{ 617{
609 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); 618 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
610 pool->max_idle = nthreads; 619 pool->max_idle = threads;
611 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); 620 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
612} 621}
613 622
614ETP_API_DECL void ecb_cold 623ETP_API_DECL void ecb_cold
615etp_set_idle_timeout (etp_pool pool, unsigned int seconds) 624etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
618 pool->idle_timeout = seconds; 627 pool->idle_timeout = seconds;
619 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); 628 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
620} 629}
621 630
622ETP_API_DECL void ecb_cold 631ETP_API_DECL void ecb_cold
623etp_set_min_parallel (etp_pool pool, unsigned int nthreads) 632etp_set_min_parallel (etp_pool pool, unsigned int threads)
624{ 633{
625 if (pool->wanted < nthreads) 634 if (pool->wanted < threads)
626 pool->wanted = nthreads; 635 pool->wanted = threads;
627} 636}
628 637
629ETP_API_DECL void ecb_cold 638ETP_API_DECL void ecb_cold
630etp_set_max_parallel (etp_pool pool, unsigned int nthreads) 639etp_set_max_parallel (etp_pool pool, unsigned int threads)
631{ 640{
632 if (pool->wanted > nthreads) 641 if (pool->wanted > threads)
633 pool->wanted = nthreads; 642 pool->wanted = threads;
634 643
635 while (pool->started > pool->wanted) 644 while (pool->started > pool->wanted)
636 etp_end_thread (pool); 645 etp_end_thread (pool);
637} 646}
638 647

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines