ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
Revision: 1.13
Committed: Tue Aug 14 11:44:53 2018 UTC (5 years, 9 months ago) by root
Content type: text/plain
Branch: MAIN
Changes since 1.12: +4 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 /*
2     * libetp implementation
3     *
4 root 1.9 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann <libetp@schmorp.de>
5 root 1.1 * All rights reserved.
6     *
7     * Redistribution and use in source and binary forms, with or without modifica-
8     * tion, are permitted provided that the following conditions are met:
9     *
10     * 1. Redistributions of source code must retain the above copyright notice,
11     * this list of conditions and the following disclaimer.
12     *
13     * 2. Redistributions in binary form must reproduce the above copyright
14     * notice, this list of conditions and the following disclaimer in the
15     * documentation and/or other materials provided with the distribution.
16     *
17     * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18     * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19     * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
20     * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21     * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22     * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23     * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24     * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25     * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26     * OF THE POSSIBILITY OF SUCH DAMAGE.
27     *
28     * Alternatively, the contents of this file may be used under the terms of
29     * the GNU General Public License ("GPL") version 2 or any later version,
30     * in which case the provisions of the GPL are applicable instead of
31     * the above. If you wish to allow the use of your version of this file
32     * only under the terms of the GPL and not to allow others to use your
33     * version of this file under the BSD license, indicate your decision
34     * by deleting the provisions above and replace them with the notice
35     * and other provisions required by the GPL. If you do not delete the
36     * provisions above, a recipient may use your version of this file under
37     * either the BSD or the GPL.
38     */
39    
40 root 1.13 #if HAVE_SYS_PRCTL_H
41     # include <sys/prctl.h>
42     #endif
43    
44 root 1.1 #ifndef ETP_API_DECL
45     # define ETP_API_DECL static
46     #endif
47    
48     #ifndef ETP_PRI_MIN
49     # define ETP_PRI_MIN 0
50     # define ETP_PRI_MAX 0
51     #endif
52    
53     #ifndef ETP_TYPE_QUIT
54     # define ETP_TYPE_QUIT 0
55     #endif
56    
57     #ifndef ETP_TYPE_GROUP
58     # define ETP_TYPE_GROUP 1
59     #endif
60    
61 root 1.6 #ifndef ETP_WANT_POLL
62     # define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata)
63     #endif
64     #ifndef ETP_DONE_POLL
65     # define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata)
66     #endif
67    
68 root 1.1 #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
69    
70     #define ETP_TICKS ((1000000 + 1023) >> 10)
71    
72 root 1.4 enum {
73     ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
74     ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */
75     };
76    
77 root 1.1 /* calculate time difference in ~1/ETP_TICKS of a second */
78     ecb_inline int
79     etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
80     {
81     return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
82     + ((tv2->tv_usec - tv1->tv_usec) >> 10);
83     }
84    
85 root 1.3 struct etp_tmpbuf
86     {
87     void *ptr;
88     int len;
89     };
90    
91     static void *
92     etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
93     {
94     if (buf->len < len)
95     {
96     free (buf->ptr);
97     buf->ptr = malloc (buf->len = len);
98     }
99    
100     return buf->ptr;
101     }
102    
103 root 1.5 /*
104     * a somewhat faster data structure might be nice, but
105     * with 8 priorities this actually needs <20 insns
106     * per shift, the most expensive operation.
107     */
108     typedef struct
109     {
110     ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
111     int size;
112     } etp_reqq;
113    
114 root 1.8 typedef struct etp_pool *etp_pool;
115    
116     typedef struct etp_worker
117     {
118     etp_pool pool;
119    
120     struct etp_tmpbuf tmpbuf;
121    
122     /* locked by pool->wrklock */
123     struct etp_worker *prev, *next;
124    
125     xthread_t tid;
126    
127     #ifdef ETP_WORKER_COMMON
128     ETP_WORKER_COMMON
129     #endif
130     } etp_worker;
131    
132 root 1.5 struct etp_pool
133     {
134 root 1.6 void *userdata;
135    
136 root 1.5 etp_reqq req_queue;
137     etp_reqq res_queue;
138 root 1.6
139     unsigned int started, idle, wanted;
140    
141     unsigned int max_poll_time; /* pool->reslock */
142     unsigned int max_poll_reqs; /* pool->reslock */
143    
144     unsigned int nreqs; /* pool->reqlock */
145     unsigned int nready; /* pool->reqlock */
146     unsigned int npending; /* pool->reqlock */
147     unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */
148     unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */
149    
150     void (*want_poll_cb) (void *userdata);
151     void (*done_poll_cb) (void *userdata);
152    
153     xmutex_t wrklock;
154     xmutex_t reslock;
155     xmutex_t reqlock;
156     xcond_t reqwait;
157 root 1.7
158     etp_worker wrk_first;
159 root 1.5 };
160    
161 root 1.6 #define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
162     #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
163 root 1.1
164     /* worker threads management */
165    
166     static void
167     etp_worker_clear (etp_worker *wrk)
168     {
169     }
170    
171     static void ecb_cold
172     etp_worker_free (etp_worker *wrk)
173     {
174     free (wrk->tmpbuf.ptr);
175    
176     wrk->next->prev = wrk->prev;
177     wrk->prev->next = wrk->next;
178    
179     free (wrk);
180     }
181    
182     ETP_API_DECL unsigned int
183 root 1.5 etp_nreqs (etp_pool pool)
184 root 1.1 {
185     int retval;
186 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
187     retval = pool->nreqs;
188     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
189 root 1.1 return retval;
190     }
191    
192     ETP_API_DECL unsigned int
193 root 1.5 etp_nready (etp_pool pool)
194 root 1.1 {
195     unsigned int retval;
196    
197 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
198     retval = pool->nready;
199     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
200 root 1.1
201     return retval;
202     }
203    
204     ETP_API_DECL unsigned int
205 root 1.5 etp_npending (etp_pool pool)
206 root 1.1 {
207     unsigned int retval;
208    
209 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
210     retval = pool->npending;
211     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
212 root 1.1
213     return retval;
214     }
215    
216     ETP_API_DECL unsigned int
217 root 1.5 etp_nthreads (etp_pool pool)
218 root 1.1 {
219     unsigned int retval;
220    
221 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
222     retval = pool->started;
223     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
224 root 1.1
225     return retval;
226     }
227    
228     static void ecb_noinline ecb_cold
229     reqq_init (etp_reqq *q)
230     {
231     int pri;
232    
233     for (pri = 0; pri < ETP_NUM_PRI; ++pri)
234     q->qs[pri] = q->qe[pri] = 0;
235    
236     q->size = 0;
237     }
238    
239     static int ecb_noinline
240     reqq_push (etp_reqq *q, ETP_REQ *req)
241     {
242     int pri = req->pri;
243     req->next = 0;
244    
245     if (q->qe[pri])
246     {
247     q->qe[pri]->next = req;
248     q->qe[pri] = req;
249     }
250     else
251     q->qe[pri] = q->qs[pri] = req;
252    
253     return q->size++;
254     }
255    
256     static ETP_REQ * ecb_noinline
257     reqq_shift (etp_reqq *q)
258     {
259     int pri;
260    
261     if (!q->size)
262     return 0;
263    
264     --q->size;
265    
266     for (pri = ETP_NUM_PRI; pri--; )
267     {
268 root 1.2 ETP_REQ *req = q->qs[pri];
269 root 1.1
270     if (req)
271     {
272 root 1.2 if (!(q->qs[pri] = (ETP_REQ *)req->next))
273 root 1.1 q->qe[pri] = 0;
274    
275     return req;
276     }
277     }
278    
279     abort ();
280     }
281    
282     ETP_API_DECL int ecb_cold
283 root 1.7 etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
284 root 1.1 {
285 root 1.6 X_MUTEX_CREATE (pool->wrklock);
286     X_MUTEX_CREATE (pool->reslock);
287     X_MUTEX_CREATE (pool->reqlock);
288     X_COND_CREATE (pool->reqwait);
289 root 1.1
290 root 1.5 reqq_init (&pool->req_queue);
291     reqq_init (&pool->res_queue);
292 root 1.1
293 root 1.7 pool->wrk_first.next =
294     pool->wrk_first.prev = &pool->wrk_first;
295 root 1.1
296 root 1.6 pool->started = 0;
297     pool->idle = 0;
298     pool->nreqs = 0;
299     pool->nready = 0;
300     pool->npending = 0;
301     pool->wanted = 4;
302    
303     pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */
304     pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
305 root 1.1
306 root 1.7 pool->userdata = userdata;
307 root 1.6 pool->want_poll_cb = want_poll;
308     pool->done_poll_cb = done_poll;
309 root 1.1
310     return 0;
311     }
312    
313 root 1.4 static void ecb_noinline ecb_cold
314     etp_proc_init (void)
315     {
316     #if HAVE_PRCTL_SET_NAME
317     /* provide a more sensible "thread name" */
318 root 1.12 char name[16 + 1];
319 root 1.4 const int namelen = sizeof (name) - 1;
320     int len;
321    
322     prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
323     name [namelen] = 0;
324     len = strlen (name);
325     strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
326     prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
327     #endif
328     }
329    
330     X_THREAD_PROC (etp_proc)
331     {
332     ETP_REQ *req;
333     struct timespec ts;
334     etp_worker *self = (etp_worker *)thr_arg;
335 root 1.5 etp_pool pool = self->pool;
336 root 1.4
337     etp_proc_init ();
338    
339     /* try to distribute timeouts somewhat evenly */
340     ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
341    
342     for (;;)
343     {
344     ts.tv_sec = 0;
345    
346 root 1.6 X_LOCK (pool->reqlock);
347 root 1.4
348     for (;;)
349     {
350 root 1.5 req = reqq_shift (&pool->req_queue);
351 root 1.4
352 root 1.9 if (ecb_expect_true (req))
353 root 1.4 break;
354    
355     if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
356     {
357 root 1.6 X_UNLOCK (pool->reqlock);
358     X_LOCK (pool->wrklock);
359     --pool->started;
360     X_UNLOCK (pool->wrklock);
361 root 1.4 goto quit;
362     }
363    
364 root 1.6 ++pool->idle;
365 root 1.4
366 root 1.6 if (pool->idle <= pool->max_idle)
367     /* we are allowed to pool->idle, so do so without any timeout */
368     X_COND_WAIT (pool->reqwait, pool->reqlock);
369 root 1.4 else
370     {
371     /* initialise timeout once */
372     if (!ts.tv_sec)
373 root 1.6 ts.tv_sec = time (0) + pool->idle_timeout;
374 root 1.4
375 root 1.6 if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT)
376 root 1.4 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
377     }
378    
379 root 1.6 --pool->idle;
380 root 1.4 }
381    
382 root 1.6 --pool->nready;
383 root 1.4
384 root 1.6 X_UNLOCK (pool->reqlock);
385 root 1.4
386 root 1.9 if (ecb_expect_false (req->type == ETP_TYPE_QUIT))
387 root 1.4 goto quit;
388    
389     ETP_EXECUTE (self, req);
390    
391 root 1.6 X_LOCK (pool->reslock);
392 root 1.4
393 root 1.6 ++pool->npending;
394 root 1.4
395 root 1.6 if (!reqq_push (&pool->res_queue, req))
396 root 1.9 ETP_WANT_POLL (pool);
397 root 1.4
398     etp_worker_clear (self);
399    
400 root 1.6 X_UNLOCK (pool->reslock);
401 root 1.4 }
402    
403     quit:
404     free (req);
405    
406 root 1.6 X_LOCK (pool->wrklock);
407 root 1.4 etp_worker_free (self);
408 root 1.6 X_UNLOCK (pool->wrklock);
409 root 1.4
410     return 0;
411     }
412 root 1.1
413     static void ecb_cold
414 root 1.5 etp_start_thread (etp_pool pool)
415 root 1.1 {
416     etp_worker *wrk = calloc (1, sizeof (etp_worker));
417    
418     /*TODO*/
419     assert (("unable to allocate worker thread data", wrk));
420    
421 root 1.5 wrk->pool = pool;
422    
423 root 1.6 X_LOCK (pool->wrklock);
424 root 1.1
425     if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
426     {
427 root 1.8 wrk->prev = &pool->wrk_first;
428 root 1.7 wrk->next = pool->wrk_first.next;
429     pool->wrk_first.next->prev = wrk;
430     pool->wrk_first.next = wrk;
431 root 1.6 ++pool->started;
432 root 1.1 }
433     else
434     free (wrk);
435    
436 root 1.6 X_UNLOCK (pool->wrklock);
437 root 1.1 }
438    
439     static void
440 root 1.5 etp_maybe_start_thread (etp_pool pool)
441 root 1.1 {
442 root 1.6 if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted))
443 root 1.1 return;
444    
445 root 1.6 /* todo: maybe use pool->idle here, but might be less exact */
446 root 1.5 if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
447 root 1.1 return;
448    
449 root 1.5 etp_start_thread (pool);
450 root 1.1 }
451    
452     static void ecb_cold
453 root 1.5 etp_end_thread (etp_pool pool)
454 root 1.1 {
455 root 1.2 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
456 root 1.1
457     req->type = ETP_TYPE_QUIT;
458     req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
459    
460 root 1.6 X_LOCK (pool->reqlock);
461 root 1.5 reqq_push (&pool->req_queue, req);
462 root 1.6 X_COND_SIGNAL (pool->reqwait);
463     X_UNLOCK (pool->reqlock);
464 root 1.1
465 root 1.6 X_LOCK (pool->wrklock);
466     --pool->started;
467     X_UNLOCK (pool->wrklock);
468 root 1.1 }
469    
470     ETP_API_DECL int
471 root 1.5 etp_poll (etp_pool pool)
472 root 1.1 {
473     unsigned int maxreqs;
474     unsigned int maxtime;
475     struct timeval tv_start, tv_now;
476    
477 root 1.6 X_LOCK (pool->reslock);
478     maxreqs = pool->max_poll_reqs;
479     maxtime = pool->max_poll_time;
480     X_UNLOCK (pool->reslock);
481 root 1.1
482     if (maxtime)
483     gettimeofday (&tv_start, 0);
484    
485     for (;;)
486     {
487     ETP_REQ *req;
488    
489 root 1.5 etp_maybe_start_thread (pool);
490 root 1.1
491 root 1.6 X_LOCK (pool->reslock);
492 root 1.5 req = reqq_shift (&pool->res_queue);
493 root 1.1
494 root 1.9 if (ecb_expect_true (req))
495 root 1.1 {
496 root 1.6 --pool->npending;
497 root 1.1
498 root 1.6 if (!pool->res_queue.size)
499 root 1.9 ETP_DONE_POLL (pool);
500 root 1.1 }
501    
502 root 1.6 X_UNLOCK (pool->reslock);
503 root 1.1
504 root 1.9 if (ecb_expect_false (!req))
505 root 1.1 return 0;
506    
507 root 1.6 X_LOCK (pool->reqlock);
508     --pool->nreqs;
509     X_UNLOCK (pool->reqlock);
510 root 1.1
511     if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
512     {
513 root 1.4 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
514 root 1.1 continue;
515     }
516     else
517     {
518     int res = ETP_FINISH (req);
519     if (ecb_expect_false (res))
520     return res;
521     }
522    
523     if (ecb_expect_false (maxreqs && !--maxreqs))
524     break;
525    
526     if (maxtime)
527     {
528     gettimeofday (&tv_now, 0);
529    
530     if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
531     break;
532     }
533     }
534    
535     errno = EAGAIN;
536     return -1;
537     }
538    
539     ETP_API_DECL void
540 root 1.5 etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
541 root 1.1
542     ETP_API_DECL void
543 root 1.5 etp_cancel (etp_pool pool, ETP_REQ *req)
544 root 1.1 {
545     req->cancelled = 1;
546    
547 root 1.5 etp_grp_cancel (pool, req);
548 root 1.1 }
549    
550     ETP_API_DECL void
551 root 1.5 etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
552 root 1.1 {
553     for (grp = grp->grp_first; grp; grp = grp->grp_next)
554 root 1.5 etp_cancel (pool, grp);
555 root 1.1 }
556    
557     ETP_API_DECL void
558 root 1.5 etp_submit (etp_pool pool, ETP_REQ *req)
559 root 1.1 {
560     req->pri -= ETP_PRI_MIN;
561    
562     if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
563     if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
564    
565     if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
566     {
567     /* I hope this is worth it :/ */
568 root 1.6 X_LOCK (pool->reqlock);
569     ++pool->nreqs;
570     X_UNLOCK (pool->reqlock);
571 root 1.1
572 root 1.6 X_LOCK (pool->reslock);
573 root 1.1
574 root 1.6 ++pool->npending;
575 root 1.1
576 root 1.6 if (!reqq_push (&pool->res_queue, req))
577     ETP_WANT_POLL (pool);
578 root 1.1
579 root 1.6 X_UNLOCK (pool->reslock);
580 root 1.1 }
581     else
582     {
583 root 1.6 X_LOCK (pool->reqlock);
584     ++pool->nreqs;
585     ++pool->nready;
586 root 1.5 reqq_push (&pool->req_queue, req);
587 root 1.6 X_COND_SIGNAL (pool->reqwait);
588     X_UNLOCK (pool->reqlock);
589 root 1.1
590 root 1.5 etp_maybe_start_thread (pool);
591 root 1.1 }
592     }
593    
594     ETP_API_DECL void ecb_cold
595 root 1.10 etp_set_max_poll_time (etp_pool pool, double seconds)
596 root 1.1 {
597 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
598 root 1.10 pool->max_poll_time = seconds * ETP_TICKS;
599 root 1.6 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
600 root 1.1 }
601    
602     ETP_API_DECL void ecb_cold
603 root 1.5 etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
604 root 1.1 {
605 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
606     pool->max_poll_reqs = maxreqs;
607     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
608 root 1.1 }
609    
610     ETP_API_DECL void ecb_cold
611 root 1.10 etp_set_max_idle (etp_pool pool, unsigned int threads)
612 root 1.1 {
613 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
614 root 1.10 pool->max_idle = threads;
615 root 1.6 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
616 root 1.1 }
617    
618     ETP_API_DECL void ecb_cold
619 root 1.5 etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
620 root 1.1 {
621 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
622     pool->idle_timeout = seconds;
623     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
624 root 1.1 }
625    
626     ETP_API_DECL void ecb_cold
627 root 1.10 etp_set_min_parallel (etp_pool pool, unsigned int threads)
628 root 1.1 {
629 root 1.10 if (pool->wanted < threads)
630     pool->wanted = threads;
631 root 1.1 }
632    
633     ETP_API_DECL void ecb_cold
634 root 1.10 etp_set_max_parallel (etp_pool pool, unsigned int threads)
635 root 1.1 {
636 root 1.10 if (pool->wanted > threads)
637     pool->wanted = threads;
638 root 1.1
639 root 1.6 while (pool->started > pool->wanted)
640 root 1.5 etp_end_thread (pool);
641 root 1.1 }
642