ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
Revision: 1.5
Committed: Thu Jun 25 17:40:24 2015 UTC (8 years, 10 months ago) by root
Content type: text/plain
Branch: MAIN
Changes since 1.4: +60 -49 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 /*
2     * libetp implementation
3     *
4     * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libetp@schmorp.de>
5     * All rights reserved.
6     *
7     * Redistribution and use in source and binary forms, with or without modifica-
8     * tion, are permitted provided that the following conditions are met:
9     *
10     * 1. Redistributions of source code must retain the above copyright notice,
11     * this list of conditions and the following disclaimer.
12     *
13     * 2. Redistributions in binary form must reproduce the above copyright
14     * notice, this list of conditions and the following disclaimer in the
15     * documentation and/or other materials provided with the distribution.
16     *
17     * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18     * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19     * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
20     * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21     * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22     * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23     * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24     * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25     * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26     * OF THE POSSIBILITY OF SUCH DAMAGE.
27     *
28     * Alternatively, the contents of this file may be used under the terms of
29     * the GNU General Public License ("GPL") version 2 or any later version,
30     * in which case the provisions of the GPL are applicable instead of
31     * the above. If you wish to allow the use of your version of this file
32     * only under the terms of the GPL and not to allow others to use your
33     * version of this file under the BSD license, indicate your decision
34     * by deleting the provisions above and replace them with the notice
35     * and other provisions required by the GPL. If you do not delete the
36     * provisions above, a recipient may use your version of this file under
37     * either the BSD or the GPL.
38     */
39    
40     #ifndef ETP_API_DECL
41     # define ETP_API_DECL static
42     #endif
43    
44     #ifndef ETP_PRI_MIN
45     # define ETP_PRI_MIN 0
46     # define ETP_PRI_MAX 0
47     #endif
48    
49     #ifndef ETP_TYPE_QUIT
50     # define ETP_TYPE_QUIT 0
51     #endif
52    
53     #ifndef ETP_TYPE_GROUP
54     # define ETP_TYPE_GROUP 1
55     #endif
56    
57     #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
58    
59     #define ETP_TICKS ((1000000 + 1023) >> 10)
60    
61 root 1.4 enum {
62     ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
63     ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */
64     };
65    
66 root 1.1 /* calculate time difference in ~1/ETP_TICKS of a second */
67     ecb_inline int
68     etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
69     {
70     return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
71     + ((tv2->tv_usec - tv1->tv_usec) >> 10);
72     }
73    
74     static unsigned int started, idle, wanted = 4;
75    
76     static void (*want_poll_cb) (void);
77     static void (*done_poll_cb) (void);
78    
79     static unsigned int max_poll_time; /* reslock */
80     static unsigned int max_poll_reqs; /* reslock */
81    
82     static unsigned int nreqs; /* reqlock */
83     static unsigned int nready; /* reqlock */
84     static unsigned int npending; /* reqlock */
85     static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */
86     static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */
87    
88     static xmutex_t wrklock;
89     static xmutex_t reslock;
90     static xmutex_t reqlock;
91     static xcond_t reqwait;
92    
93 root 1.3 struct etp_tmpbuf
94     {
95     void *ptr;
96     int len;
97     };
98    
99     static void *
100     etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
101     {
102     if (buf->len < len)
103     {
104     free (buf->ptr);
105     buf->ptr = malloc (buf->len = len);
106     }
107    
108     return buf->ptr;
109     }
110    
111 root 1.5 /*
112     * a somewhat faster data structure might be nice, but
113     * with 8 priorities this actually needs <20 insns
114     * per shift, the most expensive operation.
115     */
116     typedef struct
117     {
118     ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
119     int size;
120     } etp_reqq;
121    
122     struct etp_pool
123     {
124     etp_reqq req_queue;
125     etp_reqq res_queue;
126     };
127    
128     typedef struct etp_pool *etp_pool;
129    
130 root 1.1 typedef struct etp_worker
131     {
132 root 1.5 etp_pool pool;
133    
134 root 1.3 struct etp_tmpbuf tmpbuf;
135 root 1.1
136     /* locked by wrklock */
137     struct etp_worker *prev, *next;
138    
139     xthread_t tid;
140    
141     #ifdef ETP_WORKER_COMMON
142     ETP_WORKER_COMMON
143     #endif
144     } etp_worker;
145    
146     static etp_worker wrk_first; /* NOT etp */
147    
148     #define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock)
149     #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock)
150    
151     /* worker threads management */
152    
153     static void
154     etp_worker_clear (etp_worker *wrk)
155     {
156     }
157    
158     static void ecb_cold
159     etp_worker_free (etp_worker *wrk)
160     {
161     free (wrk->tmpbuf.ptr);
162    
163     wrk->next->prev = wrk->prev;
164     wrk->prev->next = wrk->next;
165    
166     free (wrk);
167     }
168    
169     ETP_API_DECL unsigned int
170 root 1.5 etp_nreqs (etp_pool pool)
171 root 1.1 {
172     int retval;
173     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
174     retval = nreqs;
175     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
176     return retval;
177     }
178    
179     ETP_API_DECL unsigned int
180 root 1.5 etp_nready (etp_pool pool)
181 root 1.1 {
182     unsigned int retval;
183    
184     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
185     retval = nready;
186     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
187    
188     return retval;
189     }
190    
191     ETP_API_DECL unsigned int
192 root 1.5 etp_npending (etp_pool pool)
193 root 1.1 {
194     unsigned int retval;
195    
196     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
197     retval = npending;
198     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
199    
200     return retval;
201     }
202    
203     ETP_API_DECL unsigned int
204 root 1.5 etp_nthreads (etp_pool pool)
205 root 1.1 {
206     unsigned int retval;
207    
208     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
209     retval = started;
210     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
211    
212     return retval;
213     }
214    
215     static void ecb_noinline ecb_cold
216     reqq_init (etp_reqq *q)
217     {
218     int pri;
219    
220     for (pri = 0; pri < ETP_NUM_PRI; ++pri)
221     q->qs[pri] = q->qe[pri] = 0;
222    
223     q->size = 0;
224     }
225    
226     static int ecb_noinline
227     reqq_push (etp_reqq *q, ETP_REQ *req)
228     {
229     int pri = req->pri;
230     req->next = 0;
231    
232     if (q->qe[pri])
233     {
234     q->qe[pri]->next = req;
235     q->qe[pri] = req;
236     }
237     else
238     q->qe[pri] = q->qs[pri] = req;
239    
240     return q->size++;
241     }
242    
243     static ETP_REQ * ecb_noinline
244     reqq_shift (etp_reqq *q)
245     {
246     int pri;
247    
248     if (!q->size)
249     return 0;
250    
251     --q->size;
252    
253     for (pri = ETP_NUM_PRI; pri--; )
254     {
255 root 1.2 ETP_REQ *req = q->qs[pri];
256 root 1.1
257     if (req)
258     {
259 root 1.2 if (!(q->qs[pri] = (ETP_REQ *)req->next))
260 root 1.1 q->qe[pri] = 0;
261    
262     return req;
263     }
264     }
265    
266     abort ();
267     }
268    
269     ETP_API_DECL int ecb_cold
270 root 1.5 etp_init (etp_pool pool, void (*want_poll)(void), void (*done_poll)(void))
271 root 1.1 {
272     X_MUTEX_CREATE (wrklock);
273     X_MUTEX_CREATE (reslock);
274     X_MUTEX_CREATE (reqlock);
275     X_COND_CREATE (reqwait);
276    
277 root 1.5 reqq_init (&pool->req_queue);
278     reqq_init (&pool->res_queue);
279 root 1.1
280     wrk_first.next =
281     wrk_first.prev = &wrk_first;
282    
283     started = 0;
284     idle = 0;
285     nreqs = 0;
286     nready = 0;
287     npending = 0;
288    
289     want_poll_cb = want_poll;
290     done_poll_cb = done_poll;
291    
292     return 0;
293     }
294    
295 root 1.4 static void ecb_noinline ecb_cold
296     etp_proc_init (void)
297     {
298     #if HAVE_PRCTL_SET_NAME
299     /* provide a more sensible "thread name" */
300     char name[16 + 1];
301     const int namelen = sizeof (name) - 1;
302     int len;
303    
304     prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
305     name [namelen] = 0;
306     len = strlen (name);
307     strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
308     prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
309     #endif
310     }
311    
312     X_THREAD_PROC (etp_proc)
313     {
314     ETP_REQ *req;
315     struct timespec ts;
316     etp_worker *self = (etp_worker *)thr_arg;
317 root 1.5 etp_pool pool = self->pool;
318 root 1.4
319     etp_proc_init ();
320    
321     /* try to distribute timeouts somewhat evenly */
322     ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
323    
324     for (;;)
325     {
326     ts.tv_sec = 0;
327    
328     X_LOCK (reqlock);
329    
330     for (;;)
331     {
332 root 1.5 req = reqq_shift (&pool->req_queue);
333 root 1.4
334     if (req)
335     break;
336    
337     if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
338     {
339     X_UNLOCK (reqlock);
340     X_LOCK (wrklock);
341     --started;
342     X_UNLOCK (wrklock);
343     goto quit;
344     }
345    
346     ++idle;
347    
348     if (idle <= max_idle)
349     /* we are allowed to idle, so do so without any timeout */
350     X_COND_WAIT (reqwait, reqlock);
351     else
352     {
353     /* initialise timeout once */
354     if (!ts.tv_sec)
355     ts.tv_sec = time (0) + idle_timeout;
356    
357     if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
358     ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
359     }
360    
361     --idle;
362     }
363    
364     --nready;
365    
366     X_UNLOCK (reqlock);
367    
368     if (req->type == ETP_TYPE_QUIT)
369     goto quit;
370    
371     ETP_EXECUTE (self, req);
372    
373     X_LOCK (reslock);
374    
375     ++npending;
376    
377 root 1.5 if (!reqq_push (&pool->res_queue, req) && want_poll_cb)
378 root 1.4 want_poll_cb ();
379    
380     etp_worker_clear (self);
381    
382     X_UNLOCK (reslock);
383     }
384    
385     quit:
386     free (req);
387    
388     X_LOCK (wrklock);
389     etp_worker_free (self);
390     X_UNLOCK (wrklock);
391    
392     return 0;
393     }
394 root 1.1
395     static void ecb_cold
396 root 1.5 etp_start_thread (etp_pool pool)
397 root 1.1 {
398     etp_worker *wrk = calloc (1, sizeof (etp_worker));
399    
400     /*TODO*/
401     assert (("unable to allocate worker thread data", wrk));
402    
403 root 1.5 wrk->pool = pool;
404    
405 root 1.1 X_LOCK (wrklock);
406    
407     if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
408     {
409     wrk->prev = &wrk_first;
410     wrk->next = wrk_first.next;
411     wrk_first.next->prev = wrk;
412     wrk_first.next = wrk;
413     ++started;
414     }
415     else
416     free (wrk);
417    
418     X_UNLOCK (wrklock);
419     }
420    
421     static void
422 root 1.5 etp_maybe_start_thread (etp_pool pool)
423 root 1.1 {
424 root 1.5 if (ecb_expect_true (etp_nthreads (pool) >= wanted))
425 root 1.1 return;
426    
427     /* todo: maybe use idle here, but might be less exact */
428 root 1.5 if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
429 root 1.1 return;
430    
431 root 1.5 etp_start_thread (pool);
432 root 1.1 }
433    
434     static void ecb_cold
435 root 1.5 etp_end_thread (etp_pool pool)
436 root 1.1 {
437 root 1.2 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
438 root 1.1
439     req->type = ETP_TYPE_QUIT;
440     req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
441    
442     X_LOCK (reqlock);
443 root 1.5 reqq_push (&pool->req_queue, req);
444 root 1.1 X_COND_SIGNAL (reqwait);
445     X_UNLOCK (reqlock);
446    
447     X_LOCK (wrklock);
448     --started;
449     X_UNLOCK (wrklock);
450     }
451    
452     ETP_API_DECL int
453 root 1.5 etp_poll (etp_pool pool)
454 root 1.1 {
455     unsigned int maxreqs;
456     unsigned int maxtime;
457     struct timeval tv_start, tv_now;
458    
459     X_LOCK (reslock);
460     maxreqs = max_poll_reqs;
461     maxtime = max_poll_time;
462     X_UNLOCK (reslock);
463    
464     if (maxtime)
465     gettimeofday (&tv_start, 0);
466    
467     for (;;)
468     {
469     ETP_REQ *req;
470    
471 root 1.5 etp_maybe_start_thread (pool);
472 root 1.1
473     X_LOCK (reslock);
474 root 1.5 req = reqq_shift (&pool->res_queue);
475 root 1.1
476     if (req)
477     {
478     --npending;
479    
480 root 1.5 if (!pool->res_queue.size && done_poll_cb)
481 root 1.1 done_poll_cb ();
482     }
483    
484     X_UNLOCK (reslock);
485    
486     if (!req)
487     return 0;
488    
489     X_LOCK (reqlock);
490     --nreqs;
491     X_UNLOCK (reqlock);
492    
493     if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
494     {
495 root 1.4 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
496 root 1.1 continue;
497     }
498     else
499     {
500     int res = ETP_FINISH (req);
501     if (ecb_expect_false (res))
502     return res;
503     }
504    
505     if (ecb_expect_false (maxreqs && !--maxreqs))
506     break;
507    
508     if (maxtime)
509     {
510     gettimeofday (&tv_now, 0);
511    
512     if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
513     break;
514     }
515     }
516    
517     errno = EAGAIN;
518     return -1;
519     }
520    
521     ETP_API_DECL void
522 root 1.5 etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
523 root 1.1
524     ETP_API_DECL void
525 root 1.5 etp_cancel (etp_pool pool, ETP_REQ *req)
526 root 1.1 {
527     req->cancelled = 1;
528    
529 root 1.5 etp_grp_cancel (pool, req);
530 root 1.1 }
531    
532     ETP_API_DECL void
533 root 1.5 etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
534 root 1.1 {
535     for (grp = grp->grp_first; grp; grp = grp->grp_next)
536 root 1.5 etp_cancel (pool, grp);
537 root 1.1 }
538    
539     ETP_API_DECL void
540 root 1.5 etp_submit (etp_pool pool, ETP_REQ *req)
541 root 1.1 {
542     req->pri -= ETP_PRI_MIN;
543    
544     if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
545     if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
546    
547     if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
548     {
549     /* I hope this is worth it :/ */
550     X_LOCK (reqlock);
551     ++nreqs;
552     X_UNLOCK (reqlock);
553    
554     X_LOCK (reslock);
555    
556     ++npending;
557    
558 root 1.5 if (!reqq_push (&pool->res_queue, req) && want_poll_cb)
559 root 1.1 want_poll_cb ();
560    
561     X_UNLOCK (reslock);
562     }
563     else
564     {
565     X_LOCK (reqlock);
566     ++nreqs;
567     ++nready;
568 root 1.5 reqq_push (&pool->req_queue, req);
569 root 1.1 X_COND_SIGNAL (reqwait);
570     X_UNLOCK (reqlock);
571    
572 root 1.5 etp_maybe_start_thread (pool);
573 root 1.1 }
574     }
575    
576     ETP_API_DECL void ecb_cold
577 root 1.5 etp_set_max_poll_time (etp_pool pool, double nseconds)
578 root 1.1 {
579     if (WORDACCESS_UNSAFE) X_LOCK (reslock);
580     max_poll_time = nseconds * ETP_TICKS;
581     if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
582     }
583    
584     ETP_API_DECL void ecb_cold
585 root 1.5 etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
586 root 1.1 {
587     if (WORDACCESS_UNSAFE) X_LOCK (reslock);
588     max_poll_reqs = maxreqs;
589     if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
590     }
591    
592     ETP_API_DECL void ecb_cold
593 root 1.5 etp_set_max_idle (etp_pool pool, unsigned int nthreads)
594 root 1.1 {
595     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
596     max_idle = nthreads;
597     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
598     }
599    
600     ETP_API_DECL void ecb_cold
601 root 1.5 etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
602 root 1.1 {
603     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
604     idle_timeout = seconds;
605     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
606     }
607    
608     ETP_API_DECL void ecb_cold
609 root 1.5 etp_set_min_parallel (etp_pool pool, unsigned int nthreads)
610 root 1.1 {
611     if (wanted < nthreads)
612     wanted = nthreads;
613     }
614    
615     ETP_API_DECL void ecb_cold
616 root 1.5 etp_set_max_parallel (etp_pool pool, unsigned int nthreads)
617 root 1.1 {
618     if (wanted > nthreads)
619     wanted = nthreads;
620    
621     while (started > wanted)
622 root 1.5 etp_end_thread (pool);
623 root 1.1 }
624