ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
Revision: 1.3
Committed: Thu Jun 25 15:59:57 2015 UTC (8 years, 11 months ago) by root
Content type: text/plain
Branch: MAIN
Changes since 1.2: +19 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 /*
2 * libetp implementation
3 *
4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libetp@schmorp.de>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without modifica-
8 * tion, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 *
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19 * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
20 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21 * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25 * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26 * OF THE POSSIBILITY OF SUCH DAMAGE.
27 *
28 * Alternatively, the contents of this file may be used under the terms of
29 * the GNU General Public License ("GPL") version 2 or any later version,
30 * in which case the provisions of the GPL are applicable instead of
31 * the above. If you wish to allow the use of your version of this file
32 * only under the terms of the GPL and not to allow others to use your
33 * version of this file under the BSD license, indicate your decision
34 * by deleting the provisions above and replace them with the notice
35 * and other provisions required by the GPL. If you do not delete the
36 * provisions above, a recipient may use your version of this file under
37 * either the BSD or the GPL.
38 */
39
40 #ifndef ETP_API_DECL
41 # define ETP_API_DECL static
42 #endif
43
44 #ifndef ETP_PRI_MIN
45 # define ETP_PRI_MIN 0
46 # define ETP_PRI_MAX 0
47 #endif
48
49 #ifndef ETP_TYPE_QUIT
50 # define ETP_TYPE_QUIT 0
51 #endif
52
53 #ifndef ETP_TYPE_GROUP
54 # define ETP_TYPE_GROUP 1
55 #endif
56
57 #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
58
59 #define ETP_TICKS ((1000000 + 1023) >> 10)
60
61 /* calculate time difference in ~1/ETP_TICKS of a second */
62 ecb_inline int
63 etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
64 {
65 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
66 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
67 }
68
69 static unsigned int started, idle, wanted = 4;
70
71 static void (*want_poll_cb) (void);
72 static void (*done_poll_cb) (void);
73
74 static unsigned int max_poll_time; /* reslock */
75 static unsigned int max_poll_reqs; /* reslock */
76
77 static unsigned int nreqs; /* reqlock */
78 static unsigned int nready; /* reqlock */
79 static unsigned int npending; /* reqlock */
80 static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */
81 static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */
82
83 static xmutex_t wrklock;
84 static xmutex_t reslock;
85 static xmutex_t reqlock;
86 static xcond_t reqwait;
87
88 struct etp_tmpbuf
89 {
90 void *ptr;
91 int len;
92 };
93
94 static void *
95 etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
96 {
97 if (buf->len < len)
98 {
99 free (buf->ptr);
100 buf->ptr = malloc (buf->len = len);
101 }
102
103 return buf->ptr;
104 }
105
106 typedef struct etp_worker
107 {
108 struct etp_tmpbuf tmpbuf;
109
110 /* locked by wrklock */
111 struct etp_worker *prev, *next;
112
113 xthread_t tid;
114
115 #ifdef ETP_WORKER_COMMON
116 ETP_WORKER_COMMON
117 #endif
118 } etp_worker;
119
120 static etp_worker wrk_first; /* NOT etp */
121
122 #define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock)
123 #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock)
124
125 /* worker threads management */
126
127 static void
128 etp_worker_clear (etp_worker *wrk)
129 {
130 }
131
132 static void ecb_cold
133 etp_worker_free (etp_worker *wrk)
134 {
135 free (wrk->tmpbuf.ptr);
136
137 wrk->next->prev = wrk->prev;
138 wrk->prev->next = wrk->next;
139
140 free (wrk);
141 }
142
143 ETP_API_DECL unsigned int
144 etp_nreqs (void)
145 {
146 int retval;
147 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
148 retval = nreqs;
149 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
150 return retval;
151 }
152
153 ETP_API_DECL unsigned int
154 etp_nready (void)
155 {
156 unsigned int retval;
157
158 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
159 retval = nready;
160 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
161
162 return retval;
163 }
164
165 ETP_API_DECL unsigned int
166 etp_npending (void)
167 {
168 unsigned int retval;
169
170 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
171 retval = npending;
172 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
173
174 return retval;
175 }
176
177 ETP_API_DECL unsigned int
178 etp_nthreads (void)
179 {
180 unsigned int retval;
181
182 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
183 retval = started;
184 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
185
186 return retval;
187 }
188
189 /*
190 * a somewhat faster data structure might be nice, but
191 * with 8 priorities this actually needs <20 insns
192 * per shift, the most expensive operation.
193 */
194 typedef struct {
195 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
196 int size;
197 } etp_reqq;
198
199 static etp_reqq req_queue;
200 static etp_reqq res_queue;
201
202 static void ecb_noinline ecb_cold
203 reqq_init (etp_reqq *q)
204 {
205 int pri;
206
207 for (pri = 0; pri < ETP_NUM_PRI; ++pri)
208 q->qs[pri] = q->qe[pri] = 0;
209
210 q->size = 0;
211 }
212
213 static int ecb_noinline
214 reqq_push (etp_reqq *q, ETP_REQ *req)
215 {
216 int pri = req->pri;
217 req->next = 0;
218
219 if (q->qe[pri])
220 {
221 q->qe[pri]->next = req;
222 q->qe[pri] = req;
223 }
224 else
225 q->qe[pri] = q->qs[pri] = req;
226
227 return q->size++;
228 }
229
230 static ETP_REQ * ecb_noinline
231 reqq_shift (etp_reqq *q)
232 {
233 int pri;
234
235 if (!q->size)
236 return 0;
237
238 --q->size;
239
240 for (pri = ETP_NUM_PRI; pri--; )
241 {
242 ETP_REQ *req = q->qs[pri];
243
244 if (req)
245 {
246 if (!(q->qs[pri] = (ETP_REQ *)req->next))
247 q->qe[pri] = 0;
248
249 return req;
250 }
251 }
252
253 abort ();
254 }
255
256 ETP_API_DECL int ecb_cold
257 etp_init (void (*want_poll)(void), void (*done_poll)(void))
258 {
259 X_MUTEX_CREATE (wrklock);
260 X_MUTEX_CREATE (reslock);
261 X_MUTEX_CREATE (reqlock);
262 X_COND_CREATE (reqwait);
263
264 reqq_init (&req_queue);
265 reqq_init (&res_queue);
266
267 wrk_first.next =
268 wrk_first.prev = &wrk_first;
269
270 started = 0;
271 idle = 0;
272 nreqs = 0;
273 nready = 0;
274 npending = 0;
275
276 want_poll_cb = want_poll;
277 done_poll_cb = done_poll;
278
279 return 0;
280 }
281
282 /* not yet in etp.c */
283 X_THREAD_PROC (etp_proc);
284
285 static void ecb_cold
286 etp_start_thread (void)
287 {
288 etp_worker *wrk = calloc (1, sizeof (etp_worker));
289
290 /*TODO*/
291 assert (("unable to allocate worker thread data", wrk));
292
293 X_LOCK (wrklock);
294
295 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
296 {
297 wrk->prev = &wrk_first;
298 wrk->next = wrk_first.next;
299 wrk_first.next->prev = wrk;
300 wrk_first.next = wrk;
301 ++started;
302 }
303 else
304 free (wrk);
305
306 X_UNLOCK (wrklock);
307 }
308
309 static void
310 etp_maybe_start_thread (void)
311 {
312 if (ecb_expect_true (etp_nthreads () >= wanted))
313 return;
314
315 /* todo: maybe use idle here, but might be less exact */
316 if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ()))
317 return;
318
319 etp_start_thread ();
320 }
321
322 static void ecb_cold
323 etp_end_thread (void)
324 {
325 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
326
327 req->type = ETP_TYPE_QUIT;
328 req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
329
330 X_LOCK (reqlock);
331 reqq_push (&req_queue, req);
332 X_COND_SIGNAL (reqwait);
333 X_UNLOCK (reqlock);
334
335 X_LOCK (wrklock);
336 --started;
337 X_UNLOCK (wrklock);
338 }
339
340 ETP_API_DECL int
341 etp_poll (void)
342 {
343 unsigned int maxreqs;
344 unsigned int maxtime;
345 struct timeval tv_start, tv_now;
346
347 X_LOCK (reslock);
348 maxreqs = max_poll_reqs;
349 maxtime = max_poll_time;
350 X_UNLOCK (reslock);
351
352 if (maxtime)
353 gettimeofday (&tv_start, 0);
354
355 for (;;)
356 {
357 ETP_REQ *req;
358
359 etp_maybe_start_thread ();
360
361 X_LOCK (reslock);
362 req = reqq_shift (&res_queue);
363
364 if (req)
365 {
366 --npending;
367
368 if (!res_queue.size && done_poll_cb)
369 done_poll_cb ();
370 }
371
372 X_UNLOCK (reslock);
373
374 if (!req)
375 return 0;
376
377 X_LOCK (reqlock);
378 --nreqs;
379 X_UNLOCK (reqlock);
380
381 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
382 {
383 req->int1 = 1; /* mark request as delayed */
384 continue;
385 }
386 else
387 {
388 int res = ETP_FINISH (req);
389 if (ecb_expect_false (res))
390 return res;
391 }
392
393 if (ecb_expect_false (maxreqs && !--maxreqs))
394 break;
395
396 if (maxtime)
397 {
398 gettimeofday (&tv_now, 0);
399
400 if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
401 break;
402 }
403 }
404
405 errno = EAGAIN;
406 return -1;
407 }
408
409 ETP_API_DECL void
410 etp_grp_cancel (ETP_REQ *grp);
411
412 ETP_API_DECL void
413 etp_cancel (ETP_REQ *req)
414 {
415 req->cancelled = 1;
416
417 etp_grp_cancel (req);
418 }
419
420 ETP_API_DECL void
421 etp_grp_cancel (ETP_REQ *grp)
422 {
423 for (grp = grp->grp_first; grp; grp = grp->grp_next)
424 etp_cancel (grp);
425 }
426
427 ETP_API_DECL void
428 etp_submit (ETP_REQ *req)
429 {
430 req->pri -= ETP_PRI_MIN;
431
432 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
433 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
434
435 if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
436 {
437 /* I hope this is worth it :/ */
438 X_LOCK (reqlock);
439 ++nreqs;
440 X_UNLOCK (reqlock);
441
442 X_LOCK (reslock);
443
444 ++npending;
445
446 if (!reqq_push (&res_queue, req) && want_poll_cb)
447 want_poll_cb ();
448
449 X_UNLOCK (reslock);
450 }
451 else
452 {
453 X_LOCK (reqlock);
454 ++nreqs;
455 ++nready;
456 reqq_push (&req_queue, req);
457 X_COND_SIGNAL (reqwait);
458 X_UNLOCK (reqlock);
459
460 etp_maybe_start_thread ();
461 }
462 }
463
464 ETP_API_DECL void ecb_cold
465 etp_set_max_poll_time (double nseconds)
466 {
467 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
468 max_poll_time = nseconds * ETP_TICKS;
469 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
470 }
471
472 ETP_API_DECL void ecb_cold
473 etp_set_max_poll_reqs (unsigned int maxreqs)
474 {
475 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
476 max_poll_reqs = maxreqs;
477 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
478 }
479
480 ETP_API_DECL void ecb_cold
481 etp_set_max_idle (unsigned int nthreads)
482 {
483 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
484 max_idle = nthreads;
485 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
486 }
487
488 ETP_API_DECL void ecb_cold
489 etp_set_idle_timeout (unsigned int seconds)
490 {
491 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
492 idle_timeout = seconds;
493 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
494 }
495
496 ETP_API_DECL void ecb_cold
497 etp_set_min_parallel (unsigned int nthreads)
498 {
499 if (wanted < nthreads)
500 wanted = nthreads;
501 }
502
503 ETP_API_DECL void ecb_cold
504 etp_set_max_parallel (unsigned int nthreads)
505 {
506 if (wanted > nthreads)
507 wanted = nthreads;
508
509 while (started > wanted)
510 etp_end_thread ();
511 }
512