ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
Revision: 1.12
Committed: Tue Aug 14 09:29:50 2018 UTC (5 years, 9 months ago) by root
Content type: text/plain
Branch: MAIN
CVS Tags: rel-4_53
Changes since 1.11: +1 -10 lines
Log Message:
4.53

File Contents

# User Rev Content
1 root 1.1 /*
2     * libetp implementation
3     *
4 root 1.9 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann <libetp@schmorp.de>
5 root 1.1 * All rights reserved.
6     *
7     * Redistribution and use in source and binary forms, with or without modifica-
8     * tion, are permitted provided that the following conditions are met:
9     *
10     * 1. Redistributions of source code must retain the above copyright notice,
11     * this list of conditions and the following disclaimer.
12     *
13     * 2. Redistributions in binary form must reproduce the above copyright
14     * notice, this list of conditions and the following disclaimer in the
15     * documentation and/or other materials provided with the distribution.
16     *
17     * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18     * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19     * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
20     * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21     * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22     * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23     * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24     * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25     * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26     * OF THE POSSIBILITY OF SUCH DAMAGE.
27     *
28     * Alternatively, the contents of this file may be used under the terms of
29     * the GNU General Public License ("GPL") version 2 or any later version,
30     * in which case the provisions of the GPL are applicable instead of
31     * the above. If you wish to allow the use of your version of this file
32     * only under the terms of the GPL and not to allow others to use your
33     * version of this file under the BSD license, indicate your decision
34     * by deleting the provisions above and replace them with the notice
35     * and other provisions required by the GPL. If you do not delete the
36     * provisions above, a recipient may use your version of this file under
37     * either the BSD or the GPL.
38     */
39    
40     #ifndef ETP_API_DECL
41     # define ETP_API_DECL static
42     #endif
43    
44     #ifndef ETP_PRI_MIN
45     # define ETP_PRI_MIN 0
46     # define ETP_PRI_MAX 0
47     #endif
48    
49     #ifndef ETP_TYPE_QUIT
50     # define ETP_TYPE_QUIT 0
51     #endif
52    
53     #ifndef ETP_TYPE_GROUP
54     # define ETP_TYPE_GROUP 1
55     #endif
56    
57 root 1.6 #ifndef ETP_WANT_POLL
58     # define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata)
59     #endif
60     #ifndef ETP_DONE_POLL
61     # define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata)
62     #endif
63    
64 root 1.1 #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
65    
66     #define ETP_TICKS ((1000000 + 1023) >> 10)
67    
68 root 1.4 enum {
69     ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
70     ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */
71     };
72    
73 root 1.1 /* calculate time difference in ~1/ETP_TICKS of a second */
74     ecb_inline int
75     etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
76     {
77     return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
78     + ((tv2->tv_usec - tv1->tv_usec) >> 10);
79     }
80    
81 root 1.3 struct etp_tmpbuf
82     {
83     void *ptr;
84     int len;
85     };
86    
87     static void *
88     etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
89     {
90     if (buf->len < len)
91     {
92     free (buf->ptr);
93     buf->ptr = malloc (buf->len = len);
94     }
95    
96     return buf->ptr;
97     }
98    
99 root 1.5 /*
100     * a somewhat faster data structure might be nice, but
101     * with 8 priorities this actually needs <20 insns
102     * per shift, the most expensive operation.
103     */
104     typedef struct
105     {
106     ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
107     int size;
108     } etp_reqq;
109    
110 root 1.8 typedef struct etp_pool *etp_pool;
111    
112     typedef struct etp_worker
113     {
114     etp_pool pool;
115    
116     struct etp_tmpbuf tmpbuf;
117    
118     /* locked by pool->wrklock */
119     struct etp_worker *prev, *next;
120    
121     xthread_t tid;
122    
123     #ifdef ETP_WORKER_COMMON
124     ETP_WORKER_COMMON
125     #endif
126     } etp_worker;
127    
128 root 1.5 struct etp_pool
129     {
130 root 1.6 void *userdata;
131    
132 root 1.5 etp_reqq req_queue;
133     etp_reqq res_queue;
134 root 1.6
135     unsigned int started, idle, wanted;
136    
137     unsigned int max_poll_time; /* pool->reslock */
138     unsigned int max_poll_reqs; /* pool->reslock */
139    
140     unsigned int nreqs; /* pool->reqlock */
141     unsigned int nready; /* pool->reqlock */
142     unsigned int npending; /* pool->reqlock */
143     unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */
144     unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */
145    
146     void (*want_poll_cb) (void *userdata);
147     void (*done_poll_cb) (void *userdata);
148    
149     xmutex_t wrklock;
150     xmutex_t reslock;
151     xmutex_t reqlock;
152     xcond_t reqwait;
153 root 1.7
154     etp_worker wrk_first;
155 root 1.5 };
156    
157 root 1.6 #define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
158     #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
159 root 1.1
160     /* worker threads management */
161    
162     static void
163     etp_worker_clear (etp_worker *wrk)
164     {
165     }
166    
167     static void ecb_cold
168     etp_worker_free (etp_worker *wrk)
169     {
170     free (wrk->tmpbuf.ptr);
171    
172     wrk->next->prev = wrk->prev;
173     wrk->prev->next = wrk->next;
174    
175     free (wrk);
176     }
177    
178     ETP_API_DECL unsigned int
179 root 1.5 etp_nreqs (etp_pool pool)
180 root 1.1 {
181     int retval;
182 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
183     retval = pool->nreqs;
184     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
185 root 1.1 return retval;
186     }
187    
188     ETP_API_DECL unsigned int
189 root 1.5 etp_nready (etp_pool pool)
190 root 1.1 {
191     unsigned int retval;
192    
193 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
194     retval = pool->nready;
195     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
196 root 1.1
197     return retval;
198     }
199    
200     ETP_API_DECL unsigned int
201 root 1.5 etp_npending (etp_pool pool)
202 root 1.1 {
203     unsigned int retval;
204    
205 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
206     retval = pool->npending;
207     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
208 root 1.1
209     return retval;
210     }
211    
212     ETP_API_DECL unsigned int
213 root 1.5 etp_nthreads (etp_pool pool)
214 root 1.1 {
215     unsigned int retval;
216    
217 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
218     retval = pool->started;
219     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
220 root 1.1
221     return retval;
222     }
223    
224     static void ecb_noinline ecb_cold
225     reqq_init (etp_reqq *q)
226     {
227     int pri;
228    
229     for (pri = 0; pri < ETP_NUM_PRI; ++pri)
230     q->qs[pri] = q->qe[pri] = 0;
231    
232     q->size = 0;
233     }
234    
235     static int ecb_noinline
236     reqq_push (etp_reqq *q, ETP_REQ *req)
237     {
238     int pri = req->pri;
239     req->next = 0;
240    
241     if (q->qe[pri])
242     {
243     q->qe[pri]->next = req;
244     q->qe[pri] = req;
245     }
246     else
247     q->qe[pri] = q->qs[pri] = req;
248    
249     return q->size++;
250     }
251    
252     static ETP_REQ * ecb_noinline
253     reqq_shift (etp_reqq *q)
254     {
255     int pri;
256    
257     if (!q->size)
258     return 0;
259    
260     --q->size;
261    
262     for (pri = ETP_NUM_PRI; pri--; )
263     {
264 root 1.2 ETP_REQ *req = q->qs[pri];
265 root 1.1
266     if (req)
267     {
268 root 1.2 if (!(q->qs[pri] = (ETP_REQ *)req->next))
269 root 1.1 q->qe[pri] = 0;
270    
271     return req;
272     }
273     }
274    
275     abort ();
276     }
277    
278     ETP_API_DECL int ecb_cold
279 root 1.7 etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
280 root 1.1 {
281 root 1.6 X_MUTEX_CREATE (pool->wrklock);
282     X_MUTEX_CREATE (pool->reslock);
283     X_MUTEX_CREATE (pool->reqlock);
284     X_COND_CREATE (pool->reqwait);
285 root 1.1
286 root 1.5 reqq_init (&pool->req_queue);
287     reqq_init (&pool->res_queue);
288 root 1.1
289 root 1.7 pool->wrk_first.next =
290     pool->wrk_first.prev = &pool->wrk_first;
291 root 1.1
292 root 1.6 pool->started = 0;
293     pool->idle = 0;
294     pool->nreqs = 0;
295     pool->nready = 0;
296     pool->npending = 0;
297     pool->wanted = 4;
298    
299     pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */
300     pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
301 root 1.1
302 root 1.7 pool->userdata = userdata;
303 root 1.6 pool->want_poll_cb = want_poll;
304     pool->done_poll_cb = done_poll;
305 root 1.1
306     return 0;
307     }
308    
309 root 1.4 static void ecb_noinline ecb_cold
310     etp_proc_init (void)
311     {
312     #if HAVE_PRCTL_SET_NAME
313     /* provide a more sensible "thread name" */
314 root 1.12 char name[16 + 1];
315 root 1.4 const int namelen = sizeof (name) - 1;
316     int len;
317    
318     prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
319     name [namelen] = 0;
320     len = strlen (name);
321     strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
322     prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
323     #endif
324     }
325    
326     X_THREAD_PROC (etp_proc)
327     {
328     ETP_REQ *req;
329     struct timespec ts;
330     etp_worker *self = (etp_worker *)thr_arg;
331 root 1.5 etp_pool pool = self->pool;
332 root 1.4
333     etp_proc_init ();
334    
335     /* try to distribute timeouts somewhat evenly */
336     ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
337    
338     for (;;)
339     {
340     ts.tv_sec = 0;
341    
342 root 1.6 X_LOCK (pool->reqlock);
343 root 1.4
344     for (;;)
345     {
346 root 1.5 req = reqq_shift (&pool->req_queue);
347 root 1.4
348 root 1.9 if (ecb_expect_true (req))
349 root 1.4 break;
350    
351     if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
352     {
353 root 1.6 X_UNLOCK (pool->reqlock);
354     X_LOCK (pool->wrklock);
355     --pool->started;
356     X_UNLOCK (pool->wrklock);
357 root 1.4 goto quit;
358     }
359    
360 root 1.6 ++pool->idle;
361 root 1.4
362 root 1.6 if (pool->idle <= pool->max_idle)
363     /* we are allowed to pool->idle, so do so without any timeout */
364     X_COND_WAIT (pool->reqwait, pool->reqlock);
365 root 1.4 else
366     {
367     /* initialise timeout once */
368     if (!ts.tv_sec)
369 root 1.6 ts.tv_sec = time (0) + pool->idle_timeout;
370 root 1.4
371 root 1.6 if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT)
372 root 1.4 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
373     }
374    
375 root 1.6 --pool->idle;
376 root 1.4 }
377    
378 root 1.6 --pool->nready;
379 root 1.4
380 root 1.6 X_UNLOCK (pool->reqlock);
381 root 1.4
382 root 1.9 if (ecb_expect_false (req->type == ETP_TYPE_QUIT))
383 root 1.4 goto quit;
384    
385     ETP_EXECUTE (self, req);
386    
387 root 1.6 X_LOCK (pool->reslock);
388 root 1.4
389 root 1.6 ++pool->npending;
390 root 1.4
391 root 1.6 if (!reqq_push (&pool->res_queue, req))
392 root 1.9 ETP_WANT_POLL (pool);
393 root 1.4
394     etp_worker_clear (self);
395    
396 root 1.6 X_UNLOCK (pool->reslock);
397 root 1.4 }
398    
399     quit:
400     free (req);
401    
402 root 1.6 X_LOCK (pool->wrklock);
403 root 1.4 etp_worker_free (self);
404 root 1.6 X_UNLOCK (pool->wrklock);
405 root 1.4
406     return 0;
407     }
408 root 1.1
409     static void ecb_cold
410 root 1.5 etp_start_thread (etp_pool pool)
411 root 1.1 {
412     etp_worker *wrk = calloc (1, sizeof (etp_worker));
413    
414     /*TODO*/
415     assert (("unable to allocate worker thread data", wrk));
416    
417 root 1.5 wrk->pool = pool;
418    
419 root 1.6 X_LOCK (pool->wrklock);
420 root 1.1
421     if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
422     {
423 root 1.8 wrk->prev = &pool->wrk_first;
424 root 1.7 wrk->next = pool->wrk_first.next;
425     pool->wrk_first.next->prev = wrk;
426     pool->wrk_first.next = wrk;
427 root 1.6 ++pool->started;
428 root 1.1 }
429     else
430     free (wrk);
431    
432 root 1.6 X_UNLOCK (pool->wrklock);
433 root 1.1 }
434    
435     static void
436 root 1.5 etp_maybe_start_thread (etp_pool pool)
437 root 1.1 {
438 root 1.6 if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted))
439 root 1.1 return;
440    
441 root 1.6 /* todo: maybe use pool->idle here, but might be less exact */
442 root 1.5 if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
443 root 1.1 return;
444    
445 root 1.5 etp_start_thread (pool);
446 root 1.1 }
447    
448     static void ecb_cold
449 root 1.5 etp_end_thread (etp_pool pool)
450 root 1.1 {
451 root 1.2 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
452 root 1.1
453     req->type = ETP_TYPE_QUIT;
454     req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
455    
456 root 1.6 X_LOCK (pool->reqlock);
457 root 1.5 reqq_push (&pool->req_queue, req);
458 root 1.6 X_COND_SIGNAL (pool->reqwait);
459     X_UNLOCK (pool->reqlock);
460 root 1.1
461 root 1.6 X_LOCK (pool->wrklock);
462     --pool->started;
463     X_UNLOCK (pool->wrklock);
464 root 1.1 }
465    
466     ETP_API_DECL int
467 root 1.5 etp_poll (etp_pool pool)
468 root 1.1 {
469     unsigned int maxreqs;
470     unsigned int maxtime;
471     struct timeval tv_start, tv_now;
472    
473 root 1.6 X_LOCK (pool->reslock);
474     maxreqs = pool->max_poll_reqs;
475     maxtime = pool->max_poll_time;
476     X_UNLOCK (pool->reslock);
477 root 1.1
478     if (maxtime)
479     gettimeofday (&tv_start, 0);
480    
481     for (;;)
482     {
483     ETP_REQ *req;
484    
485 root 1.5 etp_maybe_start_thread (pool);
486 root 1.1
487 root 1.6 X_LOCK (pool->reslock);
488 root 1.5 req = reqq_shift (&pool->res_queue);
489 root 1.1
490 root 1.9 if (ecb_expect_true (req))
491 root 1.1 {
492 root 1.6 --pool->npending;
493 root 1.1
494 root 1.6 if (!pool->res_queue.size)
495 root 1.9 ETP_DONE_POLL (pool);
496 root 1.1 }
497    
498 root 1.6 X_UNLOCK (pool->reslock);
499 root 1.1
500 root 1.9 if (ecb_expect_false (!req))
501 root 1.1 return 0;
502    
503 root 1.6 X_LOCK (pool->reqlock);
504     --pool->nreqs;
505     X_UNLOCK (pool->reqlock);
506 root 1.1
507     if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
508     {
509 root 1.4 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
510 root 1.1 continue;
511     }
512     else
513     {
514     int res = ETP_FINISH (req);
515     if (ecb_expect_false (res))
516     return res;
517     }
518    
519     if (ecb_expect_false (maxreqs && !--maxreqs))
520     break;
521    
522     if (maxtime)
523     {
524     gettimeofday (&tv_now, 0);
525    
526     if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
527     break;
528     }
529     }
530    
531     errno = EAGAIN;
532     return -1;
533     }
534    
535     ETP_API_DECL void
536 root 1.5 etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
537 root 1.1
538     ETP_API_DECL void
539 root 1.5 etp_cancel (etp_pool pool, ETP_REQ *req)
540 root 1.1 {
541     req->cancelled = 1;
542    
543 root 1.5 etp_grp_cancel (pool, req);
544 root 1.1 }
545    
546     ETP_API_DECL void
547 root 1.5 etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
548 root 1.1 {
549     for (grp = grp->grp_first; grp; grp = grp->grp_next)
550 root 1.5 etp_cancel (pool, grp);
551 root 1.1 }
552    
553     ETP_API_DECL void
554 root 1.5 etp_submit (etp_pool pool, ETP_REQ *req)
555 root 1.1 {
556     req->pri -= ETP_PRI_MIN;
557    
558     if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
559     if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
560    
561     if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
562     {
563     /* I hope this is worth it :/ */
564 root 1.6 X_LOCK (pool->reqlock);
565     ++pool->nreqs;
566     X_UNLOCK (pool->reqlock);
567 root 1.1
568 root 1.6 X_LOCK (pool->reslock);
569 root 1.1
570 root 1.6 ++pool->npending;
571 root 1.1
572 root 1.6 if (!reqq_push (&pool->res_queue, req))
573     ETP_WANT_POLL (pool);
574 root 1.1
575 root 1.6 X_UNLOCK (pool->reslock);
576 root 1.1 }
577     else
578     {
579 root 1.6 X_LOCK (pool->reqlock);
580     ++pool->nreqs;
581     ++pool->nready;
582 root 1.5 reqq_push (&pool->req_queue, req);
583 root 1.6 X_COND_SIGNAL (pool->reqwait);
584     X_UNLOCK (pool->reqlock);
585 root 1.1
586 root 1.5 etp_maybe_start_thread (pool);
587 root 1.1 }
588     }
589    
590     ETP_API_DECL void ecb_cold
591 root 1.10 etp_set_max_poll_time (etp_pool pool, double seconds)
592 root 1.1 {
593 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
594 root 1.10 pool->max_poll_time = seconds * ETP_TICKS;
595 root 1.6 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
596 root 1.1 }
597    
598     ETP_API_DECL void ecb_cold
599 root 1.5 etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
600 root 1.1 {
601 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
602     pool->max_poll_reqs = maxreqs;
603     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
604 root 1.1 }
605    
606     ETP_API_DECL void ecb_cold
607 root 1.10 etp_set_max_idle (etp_pool pool, unsigned int threads)
608 root 1.1 {
609 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
610 root 1.10 pool->max_idle = threads;
611 root 1.6 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
612 root 1.1 }
613    
614     ETP_API_DECL void ecb_cold
615 root 1.5 etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
616 root 1.1 {
617 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
618     pool->idle_timeout = seconds;
619     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
620 root 1.1 }
621    
622     ETP_API_DECL void ecb_cold
623 root 1.10 etp_set_min_parallel (etp_pool pool, unsigned int threads)
624 root 1.1 {
625 root 1.10 if (pool->wanted < threads)
626     pool->wanted = threads;
627 root 1.1 }
628    
629     ETP_API_DECL void ecb_cold
630 root 1.10 etp_set_max_parallel (etp_pool pool, unsigned int threads)
631 root 1.1 {
632 root 1.10 if (pool->wanted > threads)
633     pool->wanted = threads;
634 root 1.1
635 root 1.6 while (pool->started > pool->wanted)
636 root 1.5 etp_end_thread (pool);
637 root 1.1 }
638