ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
Revision: 1.6
Committed: Thu Jun 25 18:08:47 2015 UTC (8 years, 11 months ago) by root
Content type: text/plain
Branch: MAIN
Changes since 1.5: +133 -120 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 /*
2     * libetp implementation
3     *
4     * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libetp@schmorp.de>
5     * All rights reserved.
6     *
7     * Redistribution and use in source and binary forms, with or without modifica-
8     * tion, are permitted provided that the following conditions are met:
9     *
10     * 1. Redistributions of source code must retain the above copyright notice,
11     * this list of conditions and the following disclaimer.
12     *
13     * 2. Redistributions in binary form must reproduce the above copyright
14     * notice, this list of conditions and the following disclaimer in the
15     * documentation and/or other materials provided with the distribution.
16     *
17     * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18     * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19     * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
20     * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21     * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22     * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23     * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24     * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25     * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26     * OF THE POSSIBILITY OF SUCH DAMAGE.
27     *
28     * Alternatively, the contents of this file may be used under the terms of
29     * the GNU General Public License ("GPL") version 2 or any later version,
30     * in which case the provisions of the GPL are applicable instead of
31     * the above. If you wish to allow the use of your version of this file
32     * only under the terms of the GPL and not to allow others to use your
33     * version of this file under the BSD license, indicate your decision
34     * by deleting the provisions above and replace them with the notice
35     * and other provisions required by the GPL. If you do not delete the
36     * provisions above, a recipient may use your version of this file under
37     * either the BSD or the GPL.
38     */
39    
40     #ifndef ETP_API_DECL
41     # define ETP_API_DECL static
42     #endif
43    
44     #ifndef ETP_PRI_MIN
45     # define ETP_PRI_MIN 0
46     # define ETP_PRI_MAX 0
47     #endif
48    
49     #ifndef ETP_TYPE_QUIT
50     # define ETP_TYPE_QUIT 0
51     #endif
52    
53     #ifndef ETP_TYPE_GROUP
54     # define ETP_TYPE_GROUP 1
55     #endif
56    
57 root 1.6 #ifndef ETP_WANT_POLL
58     # define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata)
59     #endif
60     #ifndef ETP_DONE_POLL
61     # define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata)
62     #endif
63    
64 root 1.1 #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
65    
66     #define ETP_TICKS ((1000000 + 1023) >> 10)
67    
68 root 1.4 enum {
69     ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
70     ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */
71     };
72    
73 root 1.1 /* calculate time difference in ~1/ETP_TICKS of a second */
74     ecb_inline int
75     etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
76     {
77     return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
78     + ((tv2->tv_usec - tv1->tv_usec) >> 10);
79     }
80    
81 root 1.3 struct etp_tmpbuf
82     {
83     void *ptr;
84     int len;
85     };
86    
87     static void *
88     etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
89     {
90     if (buf->len < len)
91     {
92     free (buf->ptr);
93     buf->ptr = malloc (buf->len = len);
94     }
95    
96     return buf->ptr;
97     }
98    
99 root 1.5 /*
100     * a somewhat faster data structure might be nice, but
101     * with 8 priorities this actually needs <20 insns
102     * per shift, the most expensive operation.
103     */
104     typedef struct
105     {
106     ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
107     int size;
108     } etp_reqq;
109    
110     struct etp_pool
111     {
112 root 1.6 void *userdata;
113    
114 root 1.5 etp_reqq req_queue;
115     etp_reqq res_queue;
116 root 1.6
117     unsigned int started, idle, wanted;
118    
119     unsigned int max_poll_time; /* pool->reslock */
120     unsigned int max_poll_reqs; /* pool->reslock */
121    
122     unsigned int nreqs; /* pool->reqlock */
123     unsigned int nready; /* pool->reqlock */
124     unsigned int npending; /* pool->reqlock */
125     unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */
126     unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */
127    
128     void (*want_poll_cb) (void *userdata);
129     void (*done_poll_cb) (void *userdata);
130    
131     xmutex_t wrklock;
132     xmutex_t reslock;
133     xmutex_t reqlock;
134     xcond_t reqwait;
135 root 1.5 };
136    
137     typedef struct etp_pool *etp_pool;
138    
139 root 1.1 typedef struct etp_worker
140     {
141 root 1.5 etp_pool pool;
142    
143 root 1.3 struct etp_tmpbuf tmpbuf;
144 root 1.1
145 root 1.6 /* locked by pool->wrklock */
146 root 1.1 struct etp_worker *prev, *next;
147    
148     xthread_t tid;
149    
150     #ifdef ETP_WORKER_COMMON
151     ETP_WORKER_COMMON
152     #endif
153     } etp_worker;
154    
155     static etp_worker wrk_first; /* NOT etp */
156    
157 root 1.6 #define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
158     #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
159 root 1.1
160     /* worker threads management */
161    
162     static void
163     etp_worker_clear (etp_worker *wrk)
164     {
165     }
166    
167     static void ecb_cold
168     etp_worker_free (etp_worker *wrk)
169     {
170     free (wrk->tmpbuf.ptr);
171    
172     wrk->next->prev = wrk->prev;
173     wrk->prev->next = wrk->next;
174    
175     free (wrk);
176     }
177    
178     ETP_API_DECL unsigned int
179 root 1.5 etp_nreqs (etp_pool pool)
180 root 1.1 {
181     int retval;
182 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
183     retval = pool->nreqs;
184     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
185 root 1.1 return retval;
186     }
187    
188     ETP_API_DECL unsigned int
189 root 1.5 etp_nready (etp_pool pool)
190 root 1.1 {
191     unsigned int retval;
192    
193 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
194     retval = pool->nready;
195     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
196 root 1.1
197     return retval;
198     }
199    
200     ETP_API_DECL unsigned int
201 root 1.5 etp_npending (etp_pool pool)
202 root 1.1 {
203     unsigned int retval;
204    
205 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
206     retval = pool->npending;
207     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
208 root 1.1
209     return retval;
210     }
211    
212     ETP_API_DECL unsigned int
213 root 1.5 etp_nthreads (etp_pool pool)
214 root 1.1 {
215     unsigned int retval;
216    
217 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
218     retval = pool->started;
219     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
220 root 1.1
221     return retval;
222     }
223    
224     static void ecb_noinline ecb_cold
225     reqq_init (etp_reqq *q)
226     {
227     int pri;
228    
229     for (pri = 0; pri < ETP_NUM_PRI; ++pri)
230     q->qs[pri] = q->qe[pri] = 0;
231    
232     q->size = 0;
233     }
234    
235     static int ecb_noinline
236     reqq_push (etp_reqq *q, ETP_REQ *req)
237     {
238     int pri = req->pri;
239     req->next = 0;
240    
241     if (q->qe[pri])
242     {
243     q->qe[pri]->next = req;
244     q->qe[pri] = req;
245     }
246     else
247     q->qe[pri] = q->qs[pri] = req;
248    
249     return q->size++;
250     }
251    
252     static ETP_REQ * ecb_noinline
253     reqq_shift (etp_reqq *q)
254     {
255     int pri;
256    
257     if (!q->size)
258     return 0;
259    
260     --q->size;
261    
262     for (pri = ETP_NUM_PRI; pri--; )
263     {
264 root 1.2 ETP_REQ *req = q->qs[pri];
265 root 1.1
266     if (req)
267     {
268 root 1.2 if (!(q->qs[pri] = (ETP_REQ *)req->next))
269 root 1.1 q->qe[pri] = 0;
270    
271     return req;
272     }
273     }
274    
275     abort ();
276     }
277    
278     ETP_API_DECL int ecb_cold
279 root 1.6 etp_init (etp_pool pool, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
280 root 1.1 {
281 root 1.6 X_MUTEX_CREATE (pool->wrklock);
282     X_MUTEX_CREATE (pool->reslock);
283     X_MUTEX_CREATE (pool->reqlock);
284     X_COND_CREATE (pool->reqwait);
285 root 1.1
286 root 1.5 reqq_init (&pool->req_queue);
287     reqq_init (&pool->res_queue);
288 root 1.1
289     wrk_first.next =
290     wrk_first.prev = &wrk_first;
291    
292 root 1.6 pool->started = 0;
293     pool->idle = 0;
294     pool->nreqs = 0;
295     pool->nready = 0;
296     pool->npending = 0;
297     pool->wanted = 4;
298    
299     pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */
300     pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
301 root 1.1
302 root 1.6 pool->want_poll_cb = want_poll;
303     pool->done_poll_cb = done_poll;
304 root 1.1
305     return 0;
306     }
307    
308 root 1.4 static void ecb_noinline ecb_cold
309     etp_proc_init (void)
310     {
311     #if HAVE_PRCTL_SET_NAME
312     /* provide a more sensible "thread name" */
313     char name[16 + 1];
314     const int namelen = sizeof (name) - 1;
315     int len;
316    
317     prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
318     name [namelen] = 0;
319     len = strlen (name);
320     strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
321     prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
322     #endif
323     }
324    
325     X_THREAD_PROC (etp_proc)
326     {
327     ETP_REQ *req;
328     struct timespec ts;
329     etp_worker *self = (etp_worker *)thr_arg;
330 root 1.5 etp_pool pool = self->pool;
331 root 1.4
332     etp_proc_init ();
333    
334     /* try to distribute timeouts somewhat evenly */
335     ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
336    
337     for (;;)
338     {
339     ts.tv_sec = 0;
340    
341 root 1.6 X_LOCK (pool->reqlock);
342 root 1.4
343     for (;;)
344     {
345 root 1.5 req = reqq_shift (&pool->req_queue);
346 root 1.4
347     if (req)
348     break;
349    
350     if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
351     {
352 root 1.6 X_UNLOCK (pool->reqlock);
353     X_LOCK (pool->wrklock);
354     --pool->started;
355     X_UNLOCK (pool->wrklock);
356 root 1.4 goto quit;
357     }
358    
359 root 1.6 ++pool->idle;
360 root 1.4
361 root 1.6 if (pool->idle <= pool->max_idle)
362     /* we are allowed to pool->idle, so do so without any timeout */
363     X_COND_WAIT (pool->reqwait, pool->reqlock);
364 root 1.4 else
365     {
366     /* initialise timeout once */
367     if (!ts.tv_sec)
368 root 1.6 ts.tv_sec = time (0) + pool->idle_timeout;
369 root 1.4
370 root 1.6 if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT)
371 root 1.4 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
372     }
373    
374 root 1.6 --pool->idle;
375 root 1.4 }
376    
377 root 1.6 --pool->nready;
378 root 1.4
379 root 1.6 X_UNLOCK (pool->reqlock);
380 root 1.4
381     if (req->type == ETP_TYPE_QUIT)
382     goto quit;
383    
384     ETP_EXECUTE (self, req);
385    
386 root 1.6 X_LOCK (pool->reslock);
387 root 1.4
388 root 1.6 ++pool->npending;
389 root 1.4
390 root 1.6 if (!reqq_push (&pool->res_queue, req))
391     ETP_WANT_POLL (poll);
392 root 1.4
393     etp_worker_clear (self);
394    
395 root 1.6 X_UNLOCK (pool->reslock);
396 root 1.4 }
397    
398     quit:
399     free (req);
400    
401 root 1.6 X_LOCK (pool->wrklock);
402 root 1.4 etp_worker_free (self);
403 root 1.6 X_UNLOCK (pool->wrklock);
404 root 1.4
405     return 0;
406     }
407 root 1.1
408     static void ecb_cold
409 root 1.5 etp_start_thread (etp_pool pool)
410 root 1.1 {
411     etp_worker *wrk = calloc (1, sizeof (etp_worker));
412    
413     /*TODO*/
414     assert (("unable to allocate worker thread data", wrk));
415    
416 root 1.5 wrk->pool = pool;
417    
418 root 1.6 X_LOCK (pool->wrklock);
419 root 1.1
420     if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
421     {
422     wrk->prev = &wrk_first;
423     wrk->next = wrk_first.next;
424     wrk_first.next->prev = wrk;
425     wrk_first.next = wrk;
426 root 1.6 ++pool->started;
427 root 1.1 }
428     else
429     free (wrk);
430    
431 root 1.6 X_UNLOCK (pool->wrklock);
432 root 1.1 }
433    
434     static void
435 root 1.5 etp_maybe_start_thread (etp_pool pool)
436 root 1.1 {
437 root 1.6 if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted))
438 root 1.1 return;
439    
440 root 1.6 /* todo: maybe use pool->idle here, but might be less exact */
441 root 1.5 if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
442 root 1.1 return;
443    
444 root 1.5 etp_start_thread (pool);
445 root 1.1 }
446    
447     static void ecb_cold
448 root 1.5 etp_end_thread (etp_pool pool)
449 root 1.1 {
450 root 1.2 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
451 root 1.1
452     req->type = ETP_TYPE_QUIT;
453     req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
454    
455 root 1.6 X_LOCK (pool->reqlock);
456 root 1.5 reqq_push (&pool->req_queue, req);
457 root 1.6 X_COND_SIGNAL (pool->reqwait);
458     X_UNLOCK (pool->reqlock);
459 root 1.1
460 root 1.6 X_LOCK (pool->wrklock);
461     --pool->started;
462     X_UNLOCK (pool->wrklock);
463 root 1.1 }
464    
465     ETP_API_DECL int
466 root 1.5 etp_poll (etp_pool pool)
467 root 1.1 {
468     unsigned int maxreqs;
469     unsigned int maxtime;
470     struct timeval tv_start, tv_now;
471    
472 root 1.6 X_LOCK (pool->reslock);
473     maxreqs = pool->max_poll_reqs;
474     maxtime = pool->max_poll_time;
475     X_UNLOCK (pool->reslock);
476 root 1.1
477     if (maxtime)
478     gettimeofday (&tv_start, 0);
479    
480     for (;;)
481     {
482     ETP_REQ *req;
483    
484 root 1.5 etp_maybe_start_thread (pool);
485 root 1.1
486 root 1.6 X_LOCK (pool->reslock);
487 root 1.5 req = reqq_shift (&pool->res_queue);
488 root 1.1
489     if (req)
490     {
491 root 1.6 --pool->npending;
492 root 1.1
493 root 1.6 if (!pool->res_queue.size)
494     ETP_DONE_POLL (pool->userdata);
495 root 1.1 }
496    
497 root 1.6 X_UNLOCK (pool->reslock);
498 root 1.1
499     if (!req)
500     return 0;
501    
502 root 1.6 X_LOCK (pool->reqlock);
503     --pool->nreqs;
504     X_UNLOCK (pool->reqlock);
505 root 1.1
506     if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
507     {
508 root 1.4 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
509 root 1.1 continue;
510     }
511     else
512     {
513     int res = ETP_FINISH (req);
514     if (ecb_expect_false (res))
515     return res;
516     }
517    
518     if (ecb_expect_false (maxreqs && !--maxreqs))
519     break;
520    
521     if (maxtime)
522     {
523     gettimeofday (&tv_now, 0);
524    
525     if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
526     break;
527     }
528     }
529    
530     errno = EAGAIN;
531     return -1;
532     }
533    
534     ETP_API_DECL void
535 root 1.5 etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
536 root 1.1
537     ETP_API_DECL void
538 root 1.5 etp_cancel (etp_pool pool, ETP_REQ *req)
539 root 1.1 {
540     req->cancelled = 1;
541    
542 root 1.5 etp_grp_cancel (pool, req);
543 root 1.1 }
544    
545     ETP_API_DECL void
546 root 1.5 etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
547 root 1.1 {
548     for (grp = grp->grp_first; grp; grp = grp->grp_next)
549 root 1.5 etp_cancel (pool, grp);
550 root 1.1 }
551    
552     ETP_API_DECL void
553 root 1.5 etp_submit (etp_pool pool, ETP_REQ *req)
554 root 1.1 {
555     req->pri -= ETP_PRI_MIN;
556    
557     if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
558     if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
559    
560     if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
561     {
562     /* I hope this is worth it :/ */
563 root 1.6 X_LOCK (pool->reqlock);
564     ++pool->nreqs;
565     X_UNLOCK (pool->reqlock);
566 root 1.1
567 root 1.6 X_LOCK (pool->reslock);
568 root 1.1
569 root 1.6 ++pool->npending;
570 root 1.1
571 root 1.6 if (!reqq_push (&pool->res_queue, req))
572     ETP_WANT_POLL (pool);
573 root 1.1
574 root 1.6 X_UNLOCK (pool->reslock);
575 root 1.1 }
576     else
577     {
578 root 1.6 X_LOCK (pool->reqlock);
579     ++pool->nreqs;
580     ++pool->nready;
581 root 1.5 reqq_push (&pool->req_queue, req);
582 root 1.6 X_COND_SIGNAL (pool->reqwait);
583     X_UNLOCK (pool->reqlock);
584 root 1.1
585 root 1.5 etp_maybe_start_thread (pool);
586 root 1.1 }
587     }
588    
589     ETP_API_DECL void ecb_cold
590 root 1.5 etp_set_max_poll_time (etp_pool pool, double nseconds)
591 root 1.1 {
592 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
593     pool->max_poll_time = nseconds * ETP_TICKS;
594     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
595 root 1.1 }
596    
597     ETP_API_DECL void ecb_cold
598 root 1.5 etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
599 root 1.1 {
600 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
601     pool->max_poll_reqs = maxreqs;
602     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
603 root 1.1 }
604    
605     ETP_API_DECL void ecb_cold
606 root 1.5 etp_set_max_idle (etp_pool pool, unsigned int nthreads)
607 root 1.1 {
608 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
609     pool->max_idle = nthreads;
610     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
611 root 1.1 }
612    
613     ETP_API_DECL void ecb_cold
614 root 1.5 etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
615 root 1.1 {
616 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
617     pool->idle_timeout = seconds;
618     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
619 root 1.1 }
620    
621     ETP_API_DECL void ecb_cold
622 root 1.5 etp_set_min_parallel (etp_pool pool, unsigned int nthreads)
623 root 1.1 {
624 root 1.6 if (pool->wanted < nthreads)
625     pool->wanted = nthreads;
626 root 1.1 }
627    
628     ETP_API_DECL void ecb_cold
629 root 1.5 etp_set_max_parallel (etp_pool pool, unsigned int nthreads)
630 root 1.1 {
631 root 1.6 if (pool->wanted > nthreads)
632     pool->wanted = nthreads;
633 root 1.1
634 root 1.6 while (pool->started > pool->wanted)
635 root 1.5 etp_end_thread (pool);
636 root 1.1 }
637