ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
Revision: 1.1
Committed: Sun Apr 14 09:43:19 2013 UTC (11 years, 1 month ago) by root
Content type: text/plain
Branch: MAIN
CVS Tags: rel-4_2, rel-4_3, rel-4_31
Log Message:
move etp to etp.c, still not independent

File Contents

# Content
1 /*
2 * libetp implementation
3 *
4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libetp@schmorp.de>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without modifica-
8 * tion, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 *
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19 * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
20 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21 * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25 * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26 * OF THE POSSIBILITY OF SUCH DAMAGE.
27 *
28 * Alternatively, the contents of this file may be used under the terms of
29 * the GNU General Public License ("GPL") version 2 or any later version,
30 * in which case the provisions of the GPL are applicable instead of
31 * the above. If you wish to allow the use of your version of this file
32 * only under the terms of the GPL and not to allow others to use your
33 * version of this file under the BSD license, indicate your decision
34 * by deleting the provisions above and replace them with the notice
35 * and other provisions required by the GPL. If you do not delete the
36 * provisions above, a recipient may use your version of this file under
37 * either the BSD or the GPL.
38 */
39
40 #ifndef ETP_API_DECL
41 # define ETP_API_DECL static
42 #endif
43
44 #ifndef ETP_PRI_MIN
45 # define ETP_PRI_MIN 0
46 # define ETP_PRI_MAX 0
47 #endif
48
49 #ifndef ETP_TYPE_QUIT
50 # define ETP_TYPE_QUIT 0
51 #endif
52
53 #ifndef ETP_TYPE_GROUP
54 # define ETP_TYPE_GROUP 1
55 #endif
56
57 #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
58
59 #define ETP_TICKS ((1000000 + 1023) >> 10)
60
61 /* calculate time difference in ~1/ETP_TICKS of a second */
62 ecb_inline int
63 etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
64 {
65 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
66 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
67 }
68
69 static unsigned int started, idle, wanted = 4;
70
71 static void (*want_poll_cb) (void);
72 static void (*done_poll_cb) (void);
73
74 static unsigned int max_poll_time; /* reslock */
75 static unsigned int max_poll_reqs; /* reslock */
76
77 static unsigned int nreqs; /* reqlock */
78 static unsigned int nready; /* reqlock */
79 static unsigned int npending; /* reqlock */
80 static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */
81 static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */
82
83 static xmutex_t wrklock;
84 static xmutex_t reslock;
85 static xmutex_t reqlock;
86 static xcond_t reqwait;
87
88 typedef struct etp_worker
89 {
90 struct tmpbuf tmpbuf;
91
92 /* locked by wrklock */
93 struct etp_worker *prev, *next;
94
95 xthread_t tid;
96
97 #ifdef ETP_WORKER_COMMON
98 ETP_WORKER_COMMON
99 #endif
100 } etp_worker;
101
102 static etp_worker wrk_first; /* NOT etp */
103
104 #define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock)
105 #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock)
106
107 /* worker threads management */
108
109 static void
110 etp_worker_clear (etp_worker *wrk)
111 {
112 }
113
114 static void ecb_cold
115 etp_worker_free (etp_worker *wrk)
116 {
117 free (wrk->tmpbuf.ptr);
118
119 wrk->next->prev = wrk->prev;
120 wrk->prev->next = wrk->next;
121
122 free (wrk);
123 }
124
125 ETP_API_DECL unsigned int
126 etp_nreqs (void)
127 {
128 int retval;
129 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
130 retval = nreqs;
131 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
132 return retval;
133 }
134
135 ETP_API_DECL unsigned int
136 etp_nready (void)
137 {
138 unsigned int retval;
139
140 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
141 retval = nready;
142 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
143
144 return retval;
145 }
146
147 ETP_API_DECL unsigned int
148 etp_npending (void)
149 {
150 unsigned int retval;
151
152 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
153 retval = npending;
154 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
155
156 return retval;
157 }
158
159 ETP_API_DECL unsigned int
160 etp_nthreads (void)
161 {
162 unsigned int retval;
163
164 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
165 retval = started;
166 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
167
168 return retval;
169 }
170
171 /*
172 * a somewhat faster data structure might be nice, but
173 * with 8 priorities this actually needs <20 insns
174 * per shift, the most expensive operation.
175 */
176 typedef struct {
177 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
178 int size;
179 } etp_reqq;
180
181 static etp_reqq req_queue;
182 static etp_reqq res_queue;
183
184 static void ecb_noinline ecb_cold
185 reqq_init (etp_reqq *q)
186 {
187 int pri;
188
189 for (pri = 0; pri < ETP_NUM_PRI; ++pri)
190 q->qs[pri] = q->qe[pri] = 0;
191
192 q->size = 0;
193 }
194
195 static int ecb_noinline
196 reqq_push (etp_reqq *q, ETP_REQ *req)
197 {
198 int pri = req->pri;
199 req->next = 0;
200
201 if (q->qe[pri])
202 {
203 q->qe[pri]->next = req;
204 q->qe[pri] = req;
205 }
206 else
207 q->qe[pri] = q->qs[pri] = req;
208
209 return q->size++;
210 }
211
212 static ETP_REQ * ecb_noinline
213 reqq_shift (etp_reqq *q)
214 {
215 int pri;
216
217 if (!q->size)
218 return 0;
219
220 --q->size;
221
222 for (pri = ETP_NUM_PRI; pri--; )
223 {
224 eio_req *req = q->qs[pri];
225
226 if (req)
227 {
228 if (!(q->qs[pri] = (eio_req *)req->next))
229 q->qe[pri] = 0;
230
231 return req;
232 }
233 }
234
235 abort ();
236 }
237
238 ETP_API_DECL int ecb_cold
239 etp_init (void (*want_poll)(void), void (*done_poll)(void))
240 {
241 X_MUTEX_CREATE (wrklock);
242 X_MUTEX_CREATE (reslock);
243 X_MUTEX_CREATE (reqlock);
244 X_COND_CREATE (reqwait);
245
246 reqq_init (&req_queue);
247 reqq_init (&res_queue);
248
249 wrk_first.next =
250 wrk_first.prev = &wrk_first;
251
252 started = 0;
253 idle = 0;
254 nreqs = 0;
255 nready = 0;
256 npending = 0;
257
258 want_poll_cb = want_poll;
259 done_poll_cb = done_poll;
260
261 return 0;
262 }
263
264 /* not yet in etp.c */
265 X_THREAD_PROC (etp_proc);
266
267 static void ecb_cold
268 etp_start_thread (void)
269 {
270 etp_worker *wrk = calloc (1, sizeof (etp_worker));
271
272 /*TODO*/
273 assert (("unable to allocate worker thread data", wrk));
274
275 X_LOCK (wrklock);
276
277 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
278 {
279 wrk->prev = &wrk_first;
280 wrk->next = wrk_first.next;
281 wrk_first.next->prev = wrk;
282 wrk_first.next = wrk;
283 ++started;
284 }
285 else
286 free (wrk);
287
288 X_UNLOCK (wrklock);
289 }
290
291 static void
292 etp_maybe_start_thread (void)
293 {
294 if (ecb_expect_true (etp_nthreads () >= wanted))
295 return;
296
297 /* todo: maybe use idle here, but might be less exact */
298 if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ()))
299 return;
300
301 etp_start_thread ();
302 }
303
304 static void ecb_cold
305 etp_end_thread (void)
306 {
307 eio_req *req = calloc (1, sizeof (eio_req)); /* will be freed by worker */
308
309 req->type = ETP_TYPE_QUIT;
310 req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
311
312 X_LOCK (reqlock);
313 reqq_push (&req_queue, req);
314 X_COND_SIGNAL (reqwait);
315 X_UNLOCK (reqlock);
316
317 X_LOCK (wrklock);
318 --started;
319 X_UNLOCK (wrklock);
320 }
321
322 ETP_API_DECL int
323 etp_poll (void)
324 {
325 unsigned int maxreqs;
326 unsigned int maxtime;
327 struct timeval tv_start, tv_now;
328
329 X_LOCK (reslock);
330 maxreqs = max_poll_reqs;
331 maxtime = max_poll_time;
332 X_UNLOCK (reslock);
333
334 if (maxtime)
335 gettimeofday (&tv_start, 0);
336
337 for (;;)
338 {
339 ETP_REQ *req;
340
341 etp_maybe_start_thread ();
342
343 X_LOCK (reslock);
344 req = reqq_shift (&res_queue);
345
346 if (req)
347 {
348 --npending;
349
350 if (!res_queue.size && done_poll_cb)
351 done_poll_cb ();
352 }
353
354 X_UNLOCK (reslock);
355
356 if (!req)
357 return 0;
358
359 X_LOCK (reqlock);
360 --nreqs;
361 X_UNLOCK (reqlock);
362
363 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
364 {
365 req->int1 = 1; /* mark request as delayed */
366 continue;
367 }
368 else
369 {
370 int res = ETP_FINISH (req);
371 if (ecb_expect_false (res))
372 return res;
373 }
374
375 if (ecb_expect_false (maxreqs && !--maxreqs))
376 break;
377
378 if (maxtime)
379 {
380 gettimeofday (&tv_now, 0);
381
382 if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
383 break;
384 }
385 }
386
387 errno = EAGAIN;
388 return -1;
389 }
390
391 ETP_API_DECL void
392 etp_grp_cancel (ETP_REQ *grp);
393
394 ETP_API_DECL void
395 etp_cancel (ETP_REQ *req)
396 {
397 req->cancelled = 1;
398
399 etp_grp_cancel (req);
400 }
401
402 ETP_API_DECL void
403 etp_grp_cancel (ETP_REQ *grp)
404 {
405 for (grp = grp->grp_first; grp; grp = grp->grp_next)
406 etp_cancel (grp);
407 }
408
409 ETP_API_DECL void
410 etp_submit (ETP_REQ *req)
411 {
412 req->pri -= ETP_PRI_MIN;
413
414 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
415 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
416
417 if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
418 {
419 /* I hope this is worth it :/ */
420 X_LOCK (reqlock);
421 ++nreqs;
422 X_UNLOCK (reqlock);
423
424 X_LOCK (reslock);
425
426 ++npending;
427
428 if (!reqq_push (&res_queue, req) && want_poll_cb)
429 want_poll_cb ();
430
431 X_UNLOCK (reslock);
432 }
433 else
434 {
435 X_LOCK (reqlock);
436 ++nreqs;
437 ++nready;
438 reqq_push (&req_queue, req);
439 X_COND_SIGNAL (reqwait);
440 X_UNLOCK (reqlock);
441
442 etp_maybe_start_thread ();
443 }
444 }
445
446 ETP_API_DECL void ecb_cold
447 etp_set_max_poll_time (double nseconds)
448 {
449 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
450 max_poll_time = nseconds * ETP_TICKS;
451 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
452 }
453
454 ETP_API_DECL void ecb_cold
455 etp_set_max_poll_reqs (unsigned int maxreqs)
456 {
457 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
458 max_poll_reqs = maxreqs;
459 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
460 }
461
462 ETP_API_DECL void ecb_cold
463 etp_set_max_idle (unsigned int nthreads)
464 {
465 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
466 max_idle = nthreads;
467 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
468 }
469
470 ETP_API_DECL void ecb_cold
471 etp_set_idle_timeout (unsigned int seconds)
472 {
473 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
474 idle_timeout = seconds;
475 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
476 }
477
478 ETP_API_DECL void ecb_cold
479 etp_set_min_parallel (unsigned int nthreads)
480 {
481 if (wanted < nthreads)
482 wanted = nthreads;
483 }
484
485 ETP_API_DECL void ecb_cold
486 etp_set_max_parallel (unsigned int nthreads)
487 {
488 if (wanted > nthreads)
489 wanted = nthreads;
490
491 while (started > wanted)
492 etp_end_thread ();
493 }
494