ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
Revision: 1.1
Committed: Sun Apr 14 09:43:19 2013 UTC (11 years, 4 months ago) by root
Content type: text/plain
Branch: MAIN
CVS Tags: rel-4_2, rel-4_3, rel-4_31
Log Message:
move etp to etp.c, still not independent

File Contents

# User Rev Content
1 root 1.1 /*
2     * libetp implementation
3     *
4     * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libetp@schmorp.de>
5     * All rights reserved.
6     *
7     * Redistribution and use in source and binary forms, with or without modifica-
8     * tion, are permitted provided that the following conditions are met:
9     *
10     * 1. Redistributions of source code must retain the above copyright notice,
11     * this list of conditions and the following disclaimer.
12     *
13     * 2. Redistributions in binary form must reproduce the above copyright
14     * notice, this list of conditions and the following disclaimer in the
15     * documentation and/or other materials provided with the distribution.
16     *
17     * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18     * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19     * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
20     * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21     * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22     * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23     * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24     * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25     * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26     * OF THE POSSIBILITY OF SUCH DAMAGE.
27     *
28     * Alternatively, the contents of this file may be used under the terms of
29     * the GNU General Public License ("GPL") version 2 or any later version,
30     * in which case the provisions of the GPL are applicable instead of
31     * the above. If you wish to allow the use of your version of this file
32     * only under the terms of the GPL and not to allow others to use your
33     * version of this file under the BSD license, indicate your decision
34     * by deleting the provisions above and replace them with the notice
35     * and other provisions required by the GPL. If you do not delete the
36     * provisions above, a recipient may use your version of this file under
37     * either the BSD or the GPL.
38     */
39    
40     #ifndef ETP_API_DECL
41     # define ETP_API_DECL static
42     #endif
43    
44     #ifndef ETP_PRI_MIN
45     # define ETP_PRI_MIN 0
46     # define ETP_PRI_MAX 0
47     #endif
48    
49     #ifndef ETP_TYPE_QUIT
50     # define ETP_TYPE_QUIT 0
51     #endif
52    
53     #ifndef ETP_TYPE_GROUP
54     # define ETP_TYPE_GROUP 1
55     #endif
56    
57     #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
58    
59     #define ETP_TICKS ((1000000 + 1023) >> 10)
60    
61     /* calculate time difference in ~1/ETP_TICKS of a second */
62     ecb_inline int
63     etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
64     {
65     return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
66     + ((tv2->tv_usec - tv1->tv_usec) >> 10);
67     }
68    
69     static unsigned int started, idle, wanted = 4;
70    
71     static void (*want_poll_cb) (void);
72     static void (*done_poll_cb) (void);
73    
74     static unsigned int max_poll_time; /* reslock */
75     static unsigned int max_poll_reqs; /* reslock */
76    
77     static unsigned int nreqs; /* reqlock */
78     static unsigned int nready; /* reqlock */
79     static unsigned int npending; /* reqlock */
80     static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */
81     static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */
82    
83     static xmutex_t wrklock;
84     static xmutex_t reslock;
85     static xmutex_t reqlock;
86     static xcond_t reqwait;
87    
88     typedef struct etp_worker
89     {
90     struct tmpbuf tmpbuf;
91    
92     /* locked by wrklock */
93     struct etp_worker *prev, *next;
94    
95     xthread_t tid;
96    
97     #ifdef ETP_WORKER_COMMON
98     ETP_WORKER_COMMON
99     #endif
100     } etp_worker;
101    
102     static etp_worker wrk_first; /* NOT etp */
103    
104     #define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock)
105     #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock)
106    
107     /* worker threads management */
108    
109     static void
110     etp_worker_clear (etp_worker *wrk)
111     {
112     }
113    
114     static void ecb_cold
115     etp_worker_free (etp_worker *wrk)
116     {
117     free (wrk->tmpbuf.ptr);
118    
119     wrk->next->prev = wrk->prev;
120     wrk->prev->next = wrk->next;
121    
122     free (wrk);
123     }
124    
125     ETP_API_DECL unsigned int
126     etp_nreqs (void)
127     {
128     int retval;
129     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
130     retval = nreqs;
131     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
132     return retval;
133     }
134    
135     ETP_API_DECL unsigned int
136     etp_nready (void)
137     {
138     unsigned int retval;
139    
140     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
141     retval = nready;
142     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
143    
144     return retval;
145     }
146    
147     ETP_API_DECL unsigned int
148     etp_npending (void)
149     {
150     unsigned int retval;
151    
152     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
153     retval = npending;
154     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
155    
156     return retval;
157     }
158    
159     ETP_API_DECL unsigned int
160     etp_nthreads (void)
161     {
162     unsigned int retval;
163    
164     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
165     retval = started;
166     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
167    
168     return retval;
169     }
170    
171     /*
172     * a somewhat faster data structure might be nice, but
173     * with 8 priorities this actually needs <20 insns
174     * per shift, the most expensive operation.
175     */
176     typedef struct {
177     ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
178     int size;
179     } etp_reqq;
180    
181     static etp_reqq req_queue;
182     static etp_reqq res_queue;
183    
184     static void ecb_noinline ecb_cold
185     reqq_init (etp_reqq *q)
186     {
187     int pri;
188    
189     for (pri = 0; pri < ETP_NUM_PRI; ++pri)
190     q->qs[pri] = q->qe[pri] = 0;
191    
192     q->size = 0;
193     }
194    
195     static int ecb_noinline
196     reqq_push (etp_reqq *q, ETP_REQ *req)
197     {
198     int pri = req->pri;
199     req->next = 0;
200    
201     if (q->qe[pri])
202     {
203     q->qe[pri]->next = req;
204     q->qe[pri] = req;
205     }
206     else
207     q->qe[pri] = q->qs[pri] = req;
208    
209     return q->size++;
210     }
211    
212     static ETP_REQ * ecb_noinline
213     reqq_shift (etp_reqq *q)
214     {
215     int pri;
216    
217     if (!q->size)
218     return 0;
219    
220     --q->size;
221    
222     for (pri = ETP_NUM_PRI; pri--; )
223     {
224     eio_req *req = q->qs[pri];
225    
226     if (req)
227     {
228     if (!(q->qs[pri] = (eio_req *)req->next))
229     q->qe[pri] = 0;
230    
231     return req;
232     }
233     }
234    
235     abort ();
236     }
237    
238     ETP_API_DECL int ecb_cold
239     etp_init (void (*want_poll)(void), void (*done_poll)(void))
240     {
241     X_MUTEX_CREATE (wrklock);
242     X_MUTEX_CREATE (reslock);
243     X_MUTEX_CREATE (reqlock);
244     X_COND_CREATE (reqwait);
245    
246     reqq_init (&req_queue);
247     reqq_init (&res_queue);
248    
249     wrk_first.next =
250     wrk_first.prev = &wrk_first;
251    
252     started = 0;
253     idle = 0;
254     nreqs = 0;
255     nready = 0;
256     npending = 0;
257    
258     want_poll_cb = want_poll;
259     done_poll_cb = done_poll;
260    
261     return 0;
262     }
263    
264     /* not yet in etp.c */
265     X_THREAD_PROC (etp_proc);
266    
267     static void ecb_cold
268     etp_start_thread (void)
269     {
270     etp_worker *wrk = calloc (1, sizeof (etp_worker));
271    
272     /*TODO*/
273     assert (("unable to allocate worker thread data", wrk));
274    
275     X_LOCK (wrklock);
276    
277     if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
278     {
279     wrk->prev = &wrk_first;
280     wrk->next = wrk_first.next;
281     wrk_first.next->prev = wrk;
282     wrk_first.next = wrk;
283     ++started;
284     }
285     else
286     free (wrk);
287    
288     X_UNLOCK (wrklock);
289     }
290    
291     static void
292     etp_maybe_start_thread (void)
293     {
294     if (ecb_expect_true (etp_nthreads () >= wanted))
295     return;
296    
297     /* todo: maybe use idle here, but might be less exact */
298     if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ()))
299     return;
300    
301     etp_start_thread ();
302     }
303    
304     static void ecb_cold
305     etp_end_thread (void)
306     {
307     eio_req *req = calloc (1, sizeof (eio_req)); /* will be freed by worker */
308    
309     req->type = ETP_TYPE_QUIT;
310     req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
311    
312     X_LOCK (reqlock);
313     reqq_push (&req_queue, req);
314     X_COND_SIGNAL (reqwait);
315     X_UNLOCK (reqlock);
316    
317     X_LOCK (wrklock);
318     --started;
319     X_UNLOCK (wrklock);
320     }
321    
322     ETP_API_DECL int
323     etp_poll (void)
324     {
325     unsigned int maxreqs;
326     unsigned int maxtime;
327     struct timeval tv_start, tv_now;
328    
329     X_LOCK (reslock);
330     maxreqs = max_poll_reqs;
331     maxtime = max_poll_time;
332     X_UNLOCK (reslock);
333    
334     if (maxtime)
335     gettimeofday (&tv_start, 0);
336    
337     for (;;)
338     {
339     ETP_REQ *req;
340    
341     etp_maybe_start_thread ();
342    
343     X_LOCK (reslock);
344     req = reqq_shift (&res_queue);
345    
346     if (req)
347     {
348     --npending;
349    
350     if (!res_queue.size && done_poll_cb)
351     done_poll_cb ();
352     }
353    
354     X_UNLOCK (reslock);
355    
356     if (!req)
357     return 0;
358    
359     X_LOCK (reqlock);
360     --nreqs;
361     X_UNLOCK (reqlock);
362    
363     if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
364     {
365     req->int1 = 1; /* mark request as delayed */
366     continue;
367     }
368     else
369     {
370     int res = ETP_FINISH (req);
371     if (ecb_expect_false (res))
372     return res;
373     }
374    
375     if (ecb_expect_false (maxreqs && !--maxreqs))
376     break;
377    
378     if (maxtime)
379     {
380     gettimeofday (&tv_now, 0);
381    
382     if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
383     break;
384     }
385     }
386    
387     errno = EAGAIN;
388     return -1;
389     }
390    
391     ETP_API_DECL void
392     etp_grp_cancel (ETP_REQ *grp);
393    
394     ETP_API_DECL void
395     etp_cancel (ETP_REQ *req)
396     {
397     req->cancelled = 1;
398    
399     etp_grp_cancel (req);
400     }
401    
402     ETP_API_DECL void
403     etp_grp_cancel (ETP_REQ *grp)
404     {
405     for (grp = grp->grp_first; grp; grp = grp->grp_next)
406     etp_cancel (grp);
407     }
408    
409     ETP_API_DECL void
410     etp_submit (ETP_REQ *req)
411     {
412     req->pri -= ETP_PRI_MIN;
413    
414     if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
415     if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
416    
417     if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
418     {
419     /* I hope this is worth it :/ */
420     X_LOCK (reqlock);
421     ++nreqs;
422     X_UNLOCK (reqlock);
423    
424     X_LOCK (reslock);
425    
426     ++npending;
427    
428     if (!reqq_push (&res_queue, req) && want_poll_cb)
429     want_poll_cb ();
430    
431     X_UNLOCK (reslock);
432     }
433     else
434     {
435     X_LOCK (reqlock);
436     ++nreqs;
437     ++nready;
438     reqq_push (&req_queue, req);
439     X_COND_SIGNAL (reqwait);
440     X_UNLOCK (reqlock);
441    
442     etp_maybe_start_thread ();
443     }
444     }
445    
446     ETP_API_DECL void ecb_cold
447     etp_set_max_poll_time (double nseconds)
448     {
449     if (WORDACCESS_UNSAFE) X_LOCK (reslock);
450     max_poll_time = nseconds * ETP_TICKS;
451     if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
452     }
453    
454     ETP_API_DECL void ecb_cold
455     etp_set_max_poll_reqs (unsigned int maxreqs)
456     {
457     if (WORDACCESS_UNSAFE) X_LOCK (reslock);
458     max_poll_reqs = maxreqs;
459     if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
460     }
461    
462     ETP_API_DECL void ecb_cold
463     etp_set_max_idle (unsigned int nthreads)
464     {
465     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
466     max_idle = nthreads;
467     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
468     }
469    
470     ETP_API_DECL void ecb_cold
471     etp_set_idle_timeout (unsigned int seconds)
472     {
473     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
474     idle_timeout = seconds;
475     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
476     }
477    
478     ETP_API_DECL void ecb_cold
479     etp_set_min_parallel (unsigned int nthreads)
480     {
481     if (wanted < nthreads)
482     wanted = nthreads;
483     }
484    
485     ETP_API_DECL void ecb_cold
486     etp_set_max_parallel (unsigned int nthreads)
487     {
488     if (wanted > nthreads)
489     wanted = nthreads;
490    
491     while (started > wanted)
492     etp_end_thread ();
493     }
494