ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
Revision: 1.4
Committed: Thu Jun 25 17:05:07 2015 UTC (8 years, 11 months ago) by root
Content type: text/plain
Branch: MAIN
Changes since 1.3: +104 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 /*
2     * libetp implementation
3     *
4     * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libetp@schmorp.de>
5     * All rights reserved.
6     *
7     * Redistribution and use in source and binary forms, with or without modifica-
8     * tion, are permitted provided that the following conditions are met:
9     *
10     * 1. Redistributions of source code must retain the above copyright notice,
11     * this list of conditions and the following disclaimer.
12     *
13     * 2. Redistributions in binary form must reproduce the above copyright
14     * notice, this list of conditions and the following disclaimer in the
15     * documentation and/or other materials provided with the distribution.
16     *
17     * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18     * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19     * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
20     * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21     * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22     * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23     * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24     * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25     * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26     * OF THE POSSIBILITY OF SUCH DAMAGE.
27     *
28     * Alternatively, the contents of this file may be used under the terms of
29     * the GNU General Public License ("GPL") version 2 or any later version,
30     * in which case the provisions of the GPL are applicable instead of
31     * the above. If you wish to allow the use of your version of this file
32     * only under the terms of the GPL and not to allow others to use your
33     * version of this file under the BSD license, indicate your decision
34     * by deleting the provisions above and replace them with the notice
35     * and other provisions required by the GPL. If you do not delete the
36     * provisions above, a recipient may use your version of this file under
37     * either the BSD or the GPL.
38     */
39    
40     #ifndef ETP_API_DECL
41     # define ETP_API_DECL static
42     #endif
43    
44     #ifndef ETP_PRI_MIN
45     # define ETP_PRI_MIN 0
46     # define ETP_PRI_MAX 0
47     #endif
48    
49     #ifndef ETP_TYPE_QUIT
50     # define ETP_TYPE_QUIT 0
51     #endif
52    
53     #ifndef ETP_TYPE_GROUP
54     # define ETP_TYPE_GROUP 1
55     #endif
56    
57     #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
58    
59     #define ETP_TICKS ((1000000 + 1023) >> 10)
60    
61 root 1.4 enum {
62     ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
63     ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */
64     };
65    
66 root 1.1 /* calculate time difference in ~1/ETP_TICKS of a second */
67     ecb_inline int
68     etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
69     {
70     return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
71     + ((tv2->tv_usec - tv1->tv_usec) >> 10);
72     }
73    
74     static unsigned int started, idle, wanted = 4;
75    
76     static void (*want_poll_cb) (void);
77     static void (*done_poll_cb) (void);
78    
79     static unsigned int max_poll_time; /* reslock */
80     static unsigned int max_poll_reqs; /* reslock */
81    
82     static unsigned int nreqs; /* reqlock */
83     static unsigned int nready; /* reqlock */
84     static unsigned int npending; /* reqlock */
85     static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */
86     static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */
87    
88     static xmutex_t wrklock;
89     static xmutex_t reslock;
90     static xmutex_t reqlock;
91     static xcond_t reqwait;
92    
93 root 1.3 struct etp_tmpbuf
94     {
95     void *ptr;
96     int len;
97     };
98    
99     static void *
100     etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
101     {
102     if (buf->len < len)
103     {
104     free (buf->ptr);
105     buf->ptr = malloc (buf->len = len);
106     }
107    
108     return buf->ptr;
109     }
110    
111 root 1.1 typedef struct etp_worker
112     {
113 root 1.3 struct etp_tmpbuf tmpbuf;
114 root 1.1
115     /* locked by wrklock */
116     struct etp_worker *prev, *next;
117    
118     xthread_t tid;
119    
120     #ifdef ETP_WORKER_COMMON
121     ETP_WORKER_COMMON
122     #endif
123     } etp_worker;
124    
125     static etp_worker wrk_first; /* NOT etp */
126    
127     #define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock)
128     #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock)
129    
130     /* worker threads management */
131    
132     static void
133     etp_worker_clear (etp_worker *wrk)
134     {
135     }
136    
137     static void ecb_cold
138     etp_worker_free (etp_worker *wrk)
139     {
140     free (wrk->tmpbuf.ptr);
141    
142     wrk->next->prev = wrk->prev;
143     wrk->prev->next = wrk->next;
144    
145     free (wrk);
146     }
147    
148     ETP_API_DECL unsigned int
149     etp_nreqs (void)
150     {
151     int retval;
152     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
153     retval = nreqs;
154     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
155     return retval;
156     }
157    
158     ETP_API_DECL unsigned int
159     etp_nready (void)
160     {
161     unsigned int retval;
162    
163     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
164     retval = nready;
165     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
166    
167     return retval;
168     }
169    
170     ETP_API_DECL unsigned int
171     etp_npending (void)
172     {
173     unsigned int retval;
174    
175     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
176     retval = npending;
177     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
178    
179     return retval;
180     }
181    
182     ETP_API_DECL unsigned int
183     etp_nthreads (void)
184     {
185     unsigned int retval;
186    
187     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
188     retval = started;
189     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
190    
191     return retval;
192     }
193    
194     /*
195     * a somewhat faster data structure might be nice, but
196     * with 8 priorities this actually needs <20 insns
197     * per shift, the most expensive operation.
198     */
199     typedef struct {
200     ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
201     int size;
202     } etp_reqq;
203    
204     static etp_reqq req_queue;
205     static etp_reqq res_queue;
206    
207     static void ecb_noinline ecb_cold
208     reqq_init (etp_reqq *q)
209     {
210     int pri;
211    
212     for (pri = 0; pri < ETP_NUM_PRI; ++pri)
213     q->qs[pri] = q->qe[pri] = 0;
214    
215     q->size = 0;
216     }
217    
218     static int ecb_noinline
219     reqq_push (etp_reqq *q, ETP_REQ *req)
220     {
221     int pri = req->pri;
222     req->next = 0;
223    
224     if (q->qe[pri])
225     {
226     q->qe[pri]->next = req;
227     q->qe[pri] = req;
228     }
229     else
230     q->qe[pri] = q->qs[pri] = req;
231    
232     return q->size++;
233     }
234    
235     static ETP_REQ * ecb_noinline
236     reqq_shift (etp_reqq *q)
237     {
238     int pri;
239    
240     if (!q->size)
241     return 0;
242    
243     --q->size;
244    
245     for (pri = ETP_NUM_PRI; pri--; )
246     {
247 root 1.2 ETP_REQ *req = q->qs[pri];
248 root 1.1
249     if (req)
250     {
251 root 1.2 if (!(q->qs[pri] = (ETP_REQ *)req->next))
252 root 1.1 q->qe[pri] = 0;
253    
254     return req;
255     }
256     }
257    
258     abort ();
259     }
260    
261     ETP_API_DECL int ecb_cold
262     etp_init (void (*want_poll)(void), void (*done_poll)(void))
263     {
264     X_MUTEX_CREATE (wrklock);
265     X_MUTEX_CREATE (reslock);
266     X_MUTEX_CREATE (reqlock);
267     X_COND_CREATE (reqwait);
268    
269     reqq_init (&req_queue);
270     reqq_init (&res_queue);
271    
272     wrk_first.next =
273     wrk_first.prev = &wrk_first;
274    
275     started = 0;
276     idle = 0;
277     nreqs = 0;
278     nready = 0;
279     npending = 0;
280    
281     want_poll_cb = want_poll;
282     done_poll_cb = done_poll;
283    
284     return 0;
285     }
286    
287 root 1.4 static void ecb_noinline ecb_cold
288     etp_proc_init (void)
289     {
290     #if HAVE_PRCTL_SET_NAME
291     /* provide a more sensible "thread name" */
292     char name[16 + 1];
293     const int namelen = sizeof (name) - 1;
294     int len;
295    
296     prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
297     name [namelen] = 0;
298     len = strlen (name);
299     strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
300     prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
301     #endif
302     }
303    
304     X_THREAD_PROC (etp_proc)
305     {
306     ETP_REQ *req;
307     struct timespec ts;
308     etp_worker *self = (etp_worker *)thr_arg;
309    
310     etp_proc_init ();
311    
312     /* try to distribute timeouts somewhat evenly */
313     ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
314    
315     for (;;)
316     {
317     ts.tv_sec = 0;
318    
319     X_LOCK (reqlock);
320    
321     for (;;)
322     {
323     req = reqq_shift (&req_queue);
324    
325     if (req)
326     break;
327    
328     if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
329     {
330     X_UNLOCK (reqlock);
331     X_LOCK (wrklock);
332     --started;
333     X_UNLOCK (wrklock);
334     goto quit;
335     }
336    
337     ++idle;
338    
339     if (idle <= max_idle)
340     /* we are allowed to idle, so do so without any timeout */
341     X_COND_WAIT (reqwait, reqlock);
342     else
343     {
344     /* initialise timeout once */
345     if (!ts.tv_sec)
346     ts.tv_sec = time (0) + idle_timeout;
347    
348     if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
349     ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
350     }
351    
352     --idle;
353     }
354    
355     --nready;
356    
357     X_UNLOCK (reqlock);
358    
359     if (req->type == ETP_TYPE_QUIT)
360     goto quit;
361    
362     ETP_EXECUTE (self, req);
363    
364     X_LOCK (reslock);
365    
366     ++npending;
367    
368     if (!reqq_push (&res_queue, req) && want_poll_cb)
369     want_poll_cb ();
370    
371     etp_worker_clear (self);
372    
373     X_UNLOCK (reslock);
374     }
375    
376     quit:
377     free (req);
378    
379     X_LOCK (wrklock);
380     etp_worker_free (self);
381     X_UNLOCK (wrklock);
382    
383     return 0;
384     }
385 root 1.1
386     static void ecb_cold
387     etp_start_thread (void)
388     {
389     etp_worker *wrk = calloc (1, sizeof (etp_worker));
390    
391     /*TODO*/
392     assert (("unable to allocate worker thread data", wrk));
393    
394     X_LOCK (wrklock);
395    
396     if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
397     {
398     wrk->prev = &wrk_first;
399     wrk->next = wrk_first.next;
400     wrk_first.next->prev = wrk;
401     wrk_first.next = wrk;
402     ++started;
403     }
404     else
405     free (wrk);
406    
407     X_UNLOCK (wrklock);
408     }
409    
410     static void
411     etp_maybe_start_thread (void)
412     {
413     if (ecb_expect_true (etp_nthreads () >= wanted))
414     return;
415    
416     /* todo: maybe use idle here, but might be less exact */
417     if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ()))
418     return;
419    
420     etp_start_thread ();
421     }
422    
423     static void ecb_cold
424     etp_end_thread (void)
425     {
426 root 1.2 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
427 root 1.1
428     req->type = ETP_TYPE_QUIT;
429     req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
430    
431     X_LOCK (reqlock);
432     reqq_push (&req_queue, req);
433     X_COND_SIGNAL (reqwait);
434     X_UNLOCK (reqlock);
435    
436     X_LOCK (wrklock);
437     --started;
438     X_UNLOCK (wrklock);
439     }
440    
441     ETP_API_DECL int
442     etp_poll (void)
443     {
444     unsigned int maxreqs;
445     unsigned int maxtime;
446     struct timeval tv_start, tv_now;
447    
448     X_LOCK (reslock);
449     maxreqs = max_poll_reqs;
450     maxtime = max_poll_time;
451     X_UNLOCK (reslock);
452    
453     if (maxtime)
454     gettimeofday (&tv_start, 0);
455    
456     for (;;)
457     {
458     ETP_REQ *req;
459    
460     etp_maybe_start_thread ();
461    
462     X_LOCK (reslock);
463     req = reqq_shift (&res_queue);
464    
465     if (req)
466     {
467     --npending;
468    
469     if (!res_queue.size && done_poll_cb)
470     done_poll_cb ();
471     }
472    
473     X_UNLOCK (reslock);
474    
475     if (!req)
476     return 0;
477    
478     X_LOCK (reqlock);
479     --nreqs;
480     X_UNLOCK (reqlock);
481    
482     if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
483     {
484 root 1.4 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
485 root 1.1 continue;
486     }
487     else
488     {
489     int res = ETP_FINISH (req);
490     if (ecb_expect_false (res))
491     return res;
492     }
493    
494     if (ecb_expect_false (maxreqs && !--maxreqs))
495     break;
496    
497     if (maxtime)
498     {
499     gettimeofday (&tv_now, 0);
500    
501     if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
502     break;
503     }
504     }
505    
506     errno = EAGAIN;
507     return -1;
508     }
509    
510     ETP_API_DECL void
511     etp_grp_cancel (ETP_REQ *grp);
512    
513     ETP_API_DECL void
514     etp_cancel (ETP_REQ *req)
515     {
516     req->cancelled = 1;
517    
518     etp_grp_cancel (req);
519     }
520    
521     ETP_API_DECL void
522     etp_grp_cancel (ETP_REQ *grp)
523     {
524     for (grp = grp->grp_first; grp; grp = grp->grp_next)
525     etp_cancel (grp);
526     }
527    
528     ETP_API_DECL void
529     etp_submit (ETP_REQ *req)
530     {
531     req->pri -= ETP_PRI_MIN;
532    
533     if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
534     if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
535    
536     if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
537     {
538     /* I hope this is worth it :/ */
539     X_LOCK (reqlock);
540     ++nreqs;
541     X_UNLOCK (reqlock);
542    
543     X_LOCK (reslock);
544    
545     ++npending;
546    
547     if (!reqq_push (&res_queue, req) && want_poll_cb)
548     want_poll_cb ();
549    
550     X_UNLOCK (reslock);
551     }
552     else
553     {
554     X_LOCK (reqlock);
555     ++nreqs;
556     ++nready;
557     reqq_push (&req_queue, req);
558     X_COND_SIGNAL (reqwait);
559     X_UNLOCK (reqlock);
560    
561     etp_maybe_start_thread ();
562     }
563     }
564    
565     ETP_API_DECL void ecb_cold
566     etp_set_max_poll_time (double nseconds)
567     {
568     if (WORDACCESS_UNSAFE) X_LOCK (reslock);
569     max_poll_time = nseconds * ETP_TICKS;
570     if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
571     }
572    
573     ETP_API_DECL void ecb_cold
574     etp_set_max_poll_reqs (unsigned int maxreqs)
575     {
576     if (WORDACCESS_UNSAFE) X_LOCK (reslock);
577     max_poll_reqs = maxreqs;
578     if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
579     }
580    
581     ETP_API_DECL void ecb_cold
582     etp_set_max_idle (unsigned int nthreads)
583     {
584     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
585     max_idle = nthreads;
586     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
587     }
588    
589     ETP_API_DECL void ecb_cold
590     etp_set_idle_timeout (unsigned int seconds)
591     {
592     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
593     idle_timeout = seconds;
594     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
595     }
596    
597     ETP_API_DECL void ecb_cold
598     etp_set_min_parallel (unsigned int nthreads)
599     {
600     if (wanted < nthreads)
601     wanted = nthreads;
602     }
603    
604     ETP_API_DECL void ecb_cold
605     etp_set_max_parallel (unsigned int nthreads)
606     {
607     if (wanted > nthreads)
608     wanted = nthreads;
609    
610     while (started > wanted)
611     etp_end_thread ();
612     }
613