1 |
/* |
2 |
* libetp implementation |
3 |
* |
4 |
* Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libetp@schmorp.de> |
5 |
* All rights reserved. |
6 |
* |
7 |
* Redistribution and use in source and binary forms, with or without modifica- |
8 |
* tion, are permitted provided that the following conditions are met: |
9 |
* |
10 |
* 1. Redistributions of source code must retain the above copyright notice, |
11 |
* this list of conditions and the following disclaimer. |
12 |
* |
13 |
* 2. Redistributions in binary form must reproduce the above copyright |
14 |
* notice, this list of conditions and the following disclaimer in the |
15 |
* documentation and/or other materials provided with the distribution. |
16 |
* |
17 |
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED |
18 |
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER- |
19 |
* CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO |
20 |
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE- |
21 |
* CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
22 |
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; |
23 |
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, |
24 |
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH- |
25 |
* ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED |
26 |
* OF THE POSSIBILITY OF SUCH DAMAGE. |
27 |
* |
28 |
* Alternatively, the contents of this file may be used under the terms of |
29 |
* the GNU General Public License ("GPL") version 2 or any later version, |
30 |
* in which case the provisions of the GPL are applicable instead of |
31 |
* the above. If you wish to allow the use of your version of this file |
32 |
* only under the terms of the GPL and not to allow others to use your |
33 |
* version of this file under the BSD license, indicate your decision |
34 |
* by deleting the provisions above and replace them with the notice |
35 |
* and other provisions required by the GPL. If you do not delete the |
36 |
* provisions above, a recipient may use your version of this file under |
37 |
* either the BSD or the GPL. |
38 |
*/ |
39 |
|
40 |
#ifndef ETP_API_DECL |
41 |
# define ETP_API_DECL static |
42 |
#endif |
43 |
|
44 |
#ifndef ETP_PRI_MIN |
45 |
# define ETP_PRI_MIN 0 |
46 |
# define ETP_PRI_MAX 0 |
47 |
#endif |
48 |
|
49 |
#ifndef ETP_TYPE_QUIT |
50 |
# define ETP_TYPE_QUIT 0 |
51 |
#endif |
52 |
|
53 |
#ifndef ETP_TYPE_GROUP |
54 |
# define ETP_TYPE_GROUP 1 |
55 |
#endif |
56 |
|
57 |
#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1) |
58 |
|
59 |
#define ETP_TICKS ((1000000 + 1023) >> 10) |
60 |
|
61 |
enum { |
62 |
ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */ |
63 |
ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */ |
64 |
}; |
65 |
|
66 |
/* calculate time difference in ~1/ETP_TICKS of a second */ |
67 |
ecb_inline int |
68 |
etp_tvdiff (struct timeval *tv1, struct timeval *tv2) |
69 |
{ |
70 |
return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS |
71 |
+ ((tv2->tv_usec - tv1->tv_usec) >> 10); |
72 |
} |
73 |
|
74 |
static unsigned int started, idle, wanted = 4; |
75 |
|
76 |
static void (*want_poll_cb) (void); |
77 |
static void (*done_poll_cb) (void); |
78 |
|
79 |
static unsigned int max_poll_time; /* reslock */ |
80 |
static unsigned int max_poll_reqs; /* reslock */ |
81 |
|
82 |
static unsigned int nreqs; /* reqlock */ |
83 |
static unsigned int nready; /* reqlock */ |
84 |
static unsigned int npending; /* reqlock */ |
85 |
static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */ |
86 |
static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */ |
87 |
|
88 |
static xmutex_t wrklock; |
89 |
static xmutex_t reslock; |
90 |
static xmutex_t reqlock; |
91 |
static xcond_t reqwait; |
92 |
|
93 |
struct etp_tmpbuf |
94 |
{ |
95 |
void *ptr; |
96 |
int len; |
97 |
}; |
98 |
|
99 |
static void * |
100 |
etp_tmpbuf_get (struct etp_tmpbuf *buf, int len) |
101 |
{ |
102 |
if (buf->len < len) |
103 |
{ |
104 |
free (buf->ptr); |
105 |
buf->ptr = malloc (buf->len = len); |
106 |
} |
107 |
|
108 |
return buf->ptr; |
109 |
} |
110 |
|
111 |
typedef struct etp_worker |
112 |
{ |
113 |
struct etp_tmpbuf tmpbuf; |
114 |
|
115 |
/* locked by wrklock */ |
116 |
struct etp_worker *prev, *next; |
117 |
|
118 |
xthread_t tid; |
119 |
|
120 |
#ifdef ETP_WORKER_COMMON |
121 |
ETP_WORKER_COMMON |
122 |
#endif |
123 |
} etp_worker; |
124 |
|
125 |
static etp_worker wrk_first; /* NOT etp */ |
126 |
|
127 |
#define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock) |
128 |
#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock) |
129 |
|
130 |
/* worker threads management */ |
131 |
|
132 |
static void |
133 |
etp_worker_clear (etp_worker *wrk) |
134 |
{ |
135 |
} |
136 |
|
137 |
static void ecb_cold |
138 |
etp_worker_free (etp_worker *wrk) |
139 |
{ |
140 |
free (wrk->tmpbuf.ptr); |
141 |
|
142 |
wrk->next->prev = wrk->prev; |
143 |
wrk->prev->next = wrk->next; |
144 |
|
145 |
free (wrk); |
146 |
} |
147 |
|
148 |
ETP_API_DECL unsigned int |
149 |
etp_nreqs (void) |
150 |
{ |
151 |
int retval; |
152 |
if (WORDACCESS_UNSAFE) X_LOCK (reqlock); |
153 |
retval = nreqs; |
154 |
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); |
155 |
return retval; |
156 |
} |
157 |
|
158 |
ETP_API_DECL unsigned int |
159 |
etp_nready (void) |
160 |
{ |
161 |
unsigned int retval; |
162 |
|
163 |
if (WORDACCESS_UNSAFE) X_LOCK (reqlock); |
164 |
retval = nready; |
165 |
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); |
166 |
|
167 |
return retval; |
168 |
} |
169 |
|
170 |
ETP_API_DECL unsigned int |
171 |
etp_npending (void) |
172 |
{ |
173 |
unsigned int retval; |
174 |
|
175 |
if (WORDACCESS_UNSAFE) X_LOCK (reqlock); |
176 |
retval = npending; |
177 |
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); |
178 |
|
179 |
return retval; |
180 |
} |
181 |
|
182 |
ETP_API_DECL unsigned int |
183 |
etp_nthreads (void) |
184 |
{ |
185 |
unsigned int retval; |
186 |
|
187 |
if (WORDACCESS_UNSAFE) X_LOCK (reqlock); |
188 |
retval = started; |
189 |
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); |
190 |
|
191 |
return retval; |
192 |
} |
193 |
|
194 |
/* |
195 |
* a somewhat faster data structure might be nice, but |
196 |
* with 8 priorities this actually needs <20 insns |
197 |
* per shift, the most expensive operation. |
198 |
*/ |
199 |
typedef struct { |
200 |
ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ |
201 |
int size; |
202 |
} etp_reqq; |
203 |
|
204 |
static etp_reqq req_queue; |
205 |
static etp_reqq res_queue; |
206 |
|
207 |
static void ecb_noinline ecb_cold |
208 |
reqq_init (etp_reqq *q) |
209 |
{ |
210 |
int pri; |
211 |
|
212 |
for (pri = 0; pri < ETP_NUM_PRI; ++pri) |
213 |
q->qs[pri] = q->qe[pri] = 0; |
214 |
|
215 |
q->size = 0; |
216 |
} |
217 |
|
218 |
static int ecb_noinline |
219 |
reqq_push (etp_reqq *q, ETP_REQ *req) |
220 |
{ |
221 |
int pri = req->pri; |
222 |
req->next = 0; |
223 |
|
224 |
if (q->qe[pri]) |
225 |
{ |
226 |
q->qe[pri]->next = req; |
227 |
q->qe[pri] = req; |
228 |
} |
229 |
else |
230 |
q->qe[pri] = q->qs[pri] = req; |
231 |
|
232 |
return q->size++; |
233 |
} |
234 |
|
235 |
static ETP_REQ * ecb_noinline |
236 |
reqq_shift (etp_reqq *q) |
237 |
{ |
238 |
int pri; |
239 |
|
240 |
if (!q->size) |
241 |
return 0; |
242 |
|
243 |
--q->size; |
244 |
|
245 |
for (pri = ETP_NUM_PRI; pri--; ) |
246 |
{ |
247 |
ETP_REQ *req = q->qs[pri]; |
248 |
|
249 |
if (req) |
250 |
{ |
251 |
if (!(q->qs[pri] = (ETP_REQ *)req->next)) |
252 |
q->qe[pri] = 0; |
253 |
|
254 |
return req; |
255 |
} |
256 |
} |
257 |
|
258 |
abort (); |
259 |
} |
260 |
|
261 |
ETP_API_DECL int ecb_cold |
262 |
etp_init (void (*want_poll)(void), void (*done_poll)(void)) |
263 |
{ |
264 |
X_MUTEX_CREATE (wrklock); |
265 |
X_MUTEX_CREATE (reslock); |
266 |
X_MUTEX_CREATE (reqlock); |
267 |
X_COND_CREATE (reqwait); |
268 |
|
269 |
reqq_init (&req_queue); |
270 |
reqq_init (&res_queue); |
271 |
|
272 |
wrk_first.next = |
273 |
wrk_first.prev = &wrk_first; |
274 |
|
275 |
started = 0; |
276 |
idle = 0; |
277 |
nreqs = 0; |
278 |
nready = 0; |
279 |
npending = 0; |
280 |
|
281 |
want_poll_cb = want_poll; |
282 |
done_poll_cb = done_poll; |
283 |
|
284 |
return 0; |
285 |
} |
286 |
|
287 |
static void ecb_noinline ecb_cold |
288 |
etp_proc_init (void) |
289 |
{ |
290 |
#if HAVE_PRCTL_SET_NAME |
291 |
/* provide a more sensible "thread name" */ |
292 |
char name[16 + 1]; |
293 |
const int namelen = sizeof (name) - 1; |
294 |
int len; |
295 |
|
296 |
prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0); |
297 |
name [namelen] = 0; |
298 |
len = strlen (name); |
299 |
strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio"); |
300 |
prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0); |
301 |
#endif |
302 |
} |
303 |
|
304 |
X_THREAD_PROC (etp_proc) |
305 |
{ |
306 |
ETP_REQ *req; |
307 |
struct timespec ts; |
308 |
etp_worker *self = (etp_worker *)thr_arg; |
309 |
|
310 |
etp_proc_init (); |
311 |
|
312 |
/* try to distribute timeouts somewhat evenly */ |
313 |
ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); |
314 |
|
315 |
for (;;) |
316 |
{ |
317 |
ts.tv_sec = 0; |
318 |
|
319 |
X_LOCK (reqlock); |
320 |
|
321 |
for (;;) |
322 |
{ |
323 |
req = reqq_shift (&req_queue); |
324 |
|
325 |
if (req) |
326 |
break; |
327 |
|
328 |
if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */ |
329 |
{ |
330 |
X_UNLOCK (reqlock); |
331 |
X_LOCK (wrklock); |
332 |
--started; |
333 |
X_UNLOCK (wrklock); |
334 |
goto quit; |
335 |
} |
336 |
|
337 |
++idle; |
338 |
|
339 |
if (idle <= max_idle) |
340 |
/* we are allowed to idle, so do so without any timeout */ |
341 |
X_COND_WAIT (reqwait, reqlock); |
342 |
else |
343 |
{ |
344 |
/* initialise timeout once */ |
345 |
if (!ts.tv_sec) |
346 |
ts.tv_sec = time (0) + idle_timeout; |
347 |
|
348 |
if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT) |
349 |
ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */ |
350 |
} |
351 |
|
352 |
--idle; |
353 |
} |
354 |
|
355 |
--nready; |
356 |
|
357 |
X_UNLOCK (reqlock); |
358 |
|
359 |
if (req->type == ETP_TYPE_QUIT) |
360 |
goto quit; |
361 |
|
362 |
ETP_EXECUTE (self, req); |
363 |
|
364 |
X_LOCK (reslock); |
365 |
|
366 |
++npending; |
367 |
|
368 |
if (!reqq_push (&res_queue, req) && want_poll_cb) |
369 |
want_poll_cb (); |
370 |
|
371 |
etp_worker_clear (self); |
372 |
|
373 |
X_UNLOCK (reslock); |
374 |
} |
375 |
|
376 |
quit: |
377 |
free (req); |
378 |
|
379 |
X_LOCK (wrklock); |
380 |
etp_worker_free (self); |
381 |
X_UNLOCK (wrklock); |
382 |
|
383 |
return 0; |
384 |
} |
385 |
|
386 |
static void ecb_cold |
387 |
etp_start_thread (void) |
388 |
{ |
389 |
etp_worker *wrk = calloc (1, sizeof (etp_worker)); |
390 |
|
391 |
/*TODO*/ |
392 |
assert (("unable to allocate worker thread data", wrk)); |
393 |
|
394 |
X_LOCK (wrklock); |
395 |
|
396 |
if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) |
397 |
{ |
398 |
wrk->prev = &wrk_first; |
399 |
wrk->next = wrk_first.next; |
400 |
wrk_first.next->prev = wrk; |
401 |
wrk_first.next = wrk; |
402 |
++started; |
403 |
} |
404 |
else |
405 |
free (wrk); |
406 |
|
407 |
X_UNLOCK (wrklock); |
408 |
} |
409 |
|
410 |
static void |
411 |
etp_maybe_start_thread (void) |
412 |
{ |
413 |
if (ecb_expect_true (etp_nthreads () >= wanted)) |
414 |
return; |
415 |
|
416 |
/* todo: maybe use idle here, but might be less exact */ |
417 |
if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ())) |
418 |
return; |
419 |
|
420 |
etp_start_thread (); |
421 |
} |
422 |
|
423 |
static void ecb_cold |
424 |
etp_end_thread (void) |
425 |
{ |
426 |
ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */ |
427 |
|
428 |
req->type = ETP_TYPE_QUIT; |
429 |
req->pri = ETP_PRI_MAX - ETP_PRI_MIN; |
430 |
|
431 |
X_LOCK (reqlock); |
432 |
reqq_push (&req_queue, req); |
433 |
X_COND_SIGNAL (reqwait); |
434 |
X_UNLOCK (reqlock); |
435 |
|
436 |
X_LOCK (wrklock); |
437 |
--started; |
438 |
X_UNLOCK (wrklock); |
439 |
} |
440 |
|
441 |
ETP_API_DECL int |
442 |
etp_poll (void) |
443 |
{ |
444 |
unsigned int maxreqs; |
445 |
unsigned int maxtime; |
446 |
struct timeval tv_start, tv_now; |
447 |
|
448 |
X_LOCK (reslock); |
449 |
maxreqs = max_poll_reqs; |
450 |
maxtime = max_poll_time; |
451 |
X_UNLOCK (reslock); |
452 |
|
453 |
if (maxtime) |
454 |
gettimeofday (&tv_start, 0); |
455 |
|
456 |
for (;;) |
457 |
{ |
458 |
ETP_REQ *req; |
459 |
|
460 |
etp_maybe_start_thread (); |
461 |
|
462 |
X_LOCK (reslock); |
463 |
req = reqq_shift (&res_queue); |
464 |
|
465 |
if (req) |
466 |
{ |
467 |
--npending; |
468 |
|
469 |
if (!res_queue.size && done_poll_cb) |
470 |
done_poll_cb (); |
471 |
} |
472 |
|
473 |
X_UNLOCK (reslock); |
474 |
|
475 |
if (!req) |
476 |
return 0; |
477 |
|
478 |
X_LOCK (reqlock); |
479 |
--nreqs; |
480 |
X_UNLOCK (reqlock); |
481 |
|
482 |
if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size)) |
483 |
{ |
484 |
req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */ |
485 |
continue; |
486 |
} |
487 |
else |
488 |
{ |
489 |
int res = ETP_FINISH (req); |
490 |
if (ecb_expect_false (res)) |
491 |
return res; |
492 |
} |
493 |
|
494 |
if (ecb_expect_false (maxreqs && !--maxreqs)) |
495 |
break; |
496 |
|
497 |
if (maxtime) |
498 |
{ |
499 |
gettimeofday (&tv_now, 0); |
500 |
|
501 |
if (etp_tvdiff (&tv_start, &tv_now) >= maxtime) |
502 |
break; |
503 |
} |
504 |
} |
505 |
|
506 |
errno = EAGAIN; |
507 |
return -1; |
508 |
} |
509 |
|
510 |
ETP_API_DECL void |
511 |
etp_grp_cancel (ETP_REQ *grp); |
512 |
|
513 |
ETP_API_DECL void |
514 |
etp_cancel (ETP_REQ *req) |
515 |
{ |
516 |
req->cancelled = 1; |
517 |
|
518 |
etp_grp_cancel (req); |
519 |
} |
520 |
|
521 |
ETP_API_DECL void |
522 |
etp_grp_cancel (ETP_REQ *grp) |
523 |
{ |
524 |
for (grp = grp->grp_first; grp; grp = grp->grp_next) |
525 |
etp_cancel (grp); |
526 |
} |
527 |
|
528 |
ETP_API_DECL void |
529 |
etp_submit (ETP_REQ *req) |
530 |
{ |
531 |
req->pri -= ETP_PRI_MIN; |
532 |
|
533 |
if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN; |
534 |
if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN; |
535 |
|
536 |
if (ecb_expect_false (req->type == ETP_TYPE_GROUP)) |
537 |
{ |
538 |
/* I hope this is worth it :/ */ |
539 |
X_LOCK (reqlock); |
540 |
++nreqs; |
541 |
X_UNLOCK (reqlock); |
542 |
|
543 |
X_LOCK (reslock); |
544 |
|
545 |
++npending; |
546 |
|
547 |
if (!reqq_push (&res_queue, req) && want_poll_cb) |
548 |
want_poll_cb (); |
549 |
|
550 |
X_UNLOCK (reslock); |
551 |
} |
552 |
else |
553 |
{ |
554 |
X_LOCK (reqlock); |
555 |
++nreqs; |
556 |
++nready; |
557 |
reqq_push (&req_queue, req); |
558 |
X_COND_SIGNAL (reqwait); |
559 |
X_UNLOCK (reqlock); |
560 |
|
561 |
etp_maybe_start_thread (); |
562 |
} |
563 |
} |
564 |
|
565 |
ETP_API_DECL void ecb_cold |
566 |
etp_set_max_poll_time (double nseconds) |
567 |
{ |
568 |
if (WORDACCESS_UNSAFE) X_LOCK (reslock); |
569 |
max_poll_time = nseconds * ETP_TICKS; |
570 |
if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); |
571 |
} |
572 |
|
573 |
ETP_API_DECL void ecb_cold |
574 |
etp_set_max_poll_reqs (unsigned int maxreqs) |
575 |
{ |
576 |
if (WORDACCESS_UNSAFE) X_LOCK (reslock); |
577 |
max_poll_reqs = maxreqs; |
578 |
if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); |
579 |
} |
580 |
|
581 |
ETP_API_DECL void ecb_cold |
582 |
etp_set_max_idle (unsigned int nthreads) |
583 |
{ |
584 |
if (WORDACCESS_UNSAFE) X_LOCK (reqlock); |
585 |
max_idle = nthreads; |
586 |
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); |
587 |
} |
588 |
|
589 |
ETP_API_DECL void ecb_cold |
590 |
etp_set_idle_timeout (unsigned int seconds) |
591 |
{ |
592 |
if (WORDACCESS_UNSAFE) X_LOCK (reqlock); |
593 |
idle_timeout = seconds; |
594 |
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); |
595 |
} |
596 |
|
597 |
ETP_API_DECL void ecb_cold |
598 |
etp_set_min_parallel (unsigned int nthreads) |
599 |
{ |
600 |
if (wanted < nthreads) |
601 |
wanted = nthreads; |
602 |
} |
603 |
|
604 |
ETP_API_DECL void ecb_cold |
605 |
etp_set_max_parallel (unsigned int nthreads) |
606 |
{ |
607 |
if (wanted > nthreads) |
608 |
wanted = nthreads; |
609 |
|
610 |
while (started > wanted) |
611 |
etp_end_thread (); |
612 |
} |
613 |
|