ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
Revision: 1.14
Committed: Tue Aug 14 11:47:01 2018 UTC (5 years, 8 months ago) by root
Content type: text/plain
Branch: MAIN
CVS Tags: rel-4_6, rel-4_7, rel-4_81, rel-4_80, rel-4_78, rel-4_79, rel-4_54, rel-4_74, rel-4_75, rel-4_76, rel-4_77, rel-4_71, rel-4_72, rel-4_73, HEAD
Changes since 1.13: +5 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 /*
2 * libetp implementation
3 *
4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann <libetp@schmorp.de>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without modifica-
8 * tion, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 *
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19 * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
20 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21 * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25 * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26 * OF THE POSSIBILITY OF SUCH DAMAGE.
27 *
28 * Alternatively, the contents of this file may be used under the terms of
29 * the GNU General Public License ("GPL") version 2 or any later version,
30 * in which case the provisions of the GPL are applicable instead of
31 * the above. If you wish to allow the use of your version of this file
32 * only under the terms of the GPL and not to allow others to use your
33 * version of this file under the BSD license, indicate your decision
34 * by deleting the provisions above and replace them with the notice
35 * and other provisions required by the GPL. If you do not delete the
36 * provisions above, a recipient may use your version of this file under
37 * either the BSD or the GPL.
38 */
39
40 #if HAVE_SYS_PRCTL_H
41 # include <sys/prctl.h>
42 #endif
43
44 #ifdef EIO_STACKSIZE
45 # define X_STACKSIZE EIO_STACKSIZE
46 #endif
47 #include "xthread.h"
48
49 #ifndef ETP_API_DECL
50 # define ETP_API_DECL static
51 #endif
52
53 #ifndef ETP_PRI_MIN
54 # define ETP_PRI_MIN 0
55 # define ETP_PRI_MAX 0
56 #endif
57
58 #ifndef ETP_TYPE_QUIT
59 # define ETP_TYPE_QUIT 0
60 #endif
61
62 #ifndef ETP_TYPE_GROUP
63 # define ETP_TYPE_GROUP 1
64 #endif
65
66 #ifndef ETP_WANT_POLL
67 # define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata)
68 #endif
69 #ifndef ETP_DONE_POLL
70 # define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata)
71 #endif
72
73 #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
74
75 #define ETP_TICKS ((1000000 + 1023) >> 10)
76
77 enum {
78 ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
79 ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */
80 };
81
82 /* calculate time difference in ~1/ETP_TICKS of a second */
83 ecb_inline int
84 etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
85 {
86 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
87 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
88 }
89
90 struct etp_tmpbuf
91 {
92 void *ptr;
93 int len;
94 };
95
96 static void *
97 etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
98 {
99 if (buf->len < len)
100 {
101 free (buf->ptr);
102 buf->ptr = malloc (buf->len = len);
103 }
104
105 return buf->ptr;
106 }
107
108 /*
109 * a somewhat faster data structure might be nice, but
110 * with 8 priorities this actually needs <20 insns
111 * per shift, the most expensive operation.
112 */
113 typedef struct
114 {
115 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
116 int size;
117 } etp_reqq;
118
119 typedef struct etp_pool *etp_pool;
120
121 typedef struct etp_worker
122 {
123 etp_pool pool;
124
125 struct etp_tmpbuf tmpbuf;
126
127 /* locked by pool->wrklock */
128 struct etp_worker *prev, *next;
129
130 xthread_t tid;
131
132 #ifdef ETP_WORKER_COMMON
133 ETP_WORKER_COMMON
134 #endif
135 } etp_worker;
136
137 struct etp_pool
138 {
139 void *userdata;
140
141 etp_reqq req_queue;
142 etp_reqq res_queue;
143
144 unsigned int started, idle, wanted;
145
146 unsigned int max_poll_time; /* pool->reslock */
147 unsigned int max_poll_reqs; /* pool->reslock */
148
149 unsigned int nreqs; /* pool->reqlock */
150 unsigned int nready; /* pool->reqlock */
151 unsigned int npending; /* pool->reqlock */
152 unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */
153 unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */
154
155 void (*want_poll_cb) (void *userdata);
156 void (*done_poll_cb) (void *userdata);
157
158 xmutex_t wrklock;
159 xmutex_t reslock;
160 xmutex_t reqlock;
161 xcond_t reqwait;
162
163 etp_worker wrk_first;
164 };
165
166 #define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
167 #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
168
169 /* worker threads management */
170
171 static void
172 etp_worker_clear (etp_worker *wrk)
173 {
174 }
175
176 static void ecb_cold
177 etp_worker_free (etp_worker *wrk)
178 {
179 free (wrk->tmpbuf.ptr);
180
181 wrk->next->prev = wrk->prev;
182 wrk->prev->next = wrk->next;
183
184 free (wrk);
185 }
186
187 ETP_API_DECL unsigned int
188 etp_nreqs (etp_pool pool)
189 {
190 int retval;
191 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
192 retval = pool->nreqs;
193 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
194 return retval;
195 }
196
197 ETP_API_DECL unsigned int
198 etp_nready (etp_pool pool)
199 {
200 unsigned int retval;
201
202 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
203 retval = pool->nready;
204 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
205
206 return retval;
207 }
208
209 ETP_API_DECL unsigned int
210 etp_npending (etp_pool pool)
211 {
212 unsigned int retval;
213
214 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
215 retval = pool->npending;
216 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
217
218 return retval;
219 }
220
221 ETP_API_DECL unsigned int
222 etp_nthreads (etp_pool pool)
223 {
224 unsigned int retval;
225
226 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
227 retval = pool->started;
228 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
229
230 return retval;
231 }
232
233 static void ecb_noinline ecb_cold
234 reqq_init (etp_reqq *q)
235 {
236 int pri;
237
238 for (pri = 0; pri < ETP_NUM_PRI; ++pri)
239 q->qs[pri] = q->qe[pri] = 0;
240
241 q->size = 0;
242 }
243
244 static int ecb_noinline
245 reqq_push (etp_reqq *q, ETP_REQ *req)
246 {
247 int pri = req->pri;
248 req->next = 0;
249
250 if (q->qe[pri])
251 {
252 q->qe[pri]->next = req;
253 q->qe[pri] = req;
254 }
255 else
256 q->qe[pri] = q->qs[pri] = req;
257
258 return q->size++;
259 }
260
261 static ETP_REQ * ecb_noinline
262 reqq_shift (etp_reqq *q)
263 {
264 int pri;
265
266 if (!q->size)
267 return 0;
268
269 --q->size;
270
271 for (pri = ETP_NUM_PRI; pri--; )
272 {
273 ETP_REQ *req = q->qs[pri];
274
275 if (req)
276 {
277 if (!(q->qs[pri] = (ETP_REQ *)req->next))
278 q->qe[pri] = 0;
279
280 return req;
281 }
282 }
283
284 abort ();
285 }
286
287 ETP_API_DECL int ecb_cold
288 etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
289 {
290 X_MUTEX_CREATE (pool->wrklock);
291 X_MUTEX_CREATE (pool->reslock);
292 X_MUTEX_CREATE (pool->reqlock);
293 X_COND_CREATE (pool->reqwait);
294
295 reqq_init (&pool->req_queue);
296 reqq_init (&pool->res_queue);
297
298 pool->wrk_first.next =
299 pool->wrk_first.prev = &pool->wrk_first;
300
301 pool->started = 0;
302 pool->idle = 0;
303 pool->nreqs = 0;
304 pool->nready = 0;
305 pool->npending = 0;
306 pool->wanted = 4;
307
308 pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */
309 pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
310
311 pool->userdata = userdata;
312 pool->want_poll_cb = want_poll;
313 pool->done_poll_cb = done_poll;
314
315 return 0;
316 }
317
318 static void ecb_noinline ecb_cold
319 etp_proc_init (void)
320 {
321 #if HAVE_PRCTL_SET_NAME
322 /* provide a more sensible "thread name" */
323 char name[16 + 1];
324 const int namelen = sizeof (name) - 1;
325 int len;
326
327 prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
328 name [namelen] = 0;
329 len = strlen (name);
330 strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
331 prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
332 #endif
333 }
334
335 X_THREAD_PROC (etp_proc)
336 {
337 ETP_REQ *req;
338 struct timespec ts;
339 etp_worker *self = (etp_worker *)thr_arg;
340 etp_pool pool = self->pool;
341
342 etp_proc_init ();
343
344 /* try to distribute timeouts somewhat evenly */
345 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
346
347 for (;;)
348 {
349 ts.tv_sec = 0;
350
351 X_LOCK (pool->reqlock);
352
353 for (;;)
354 {
355 req = reqq_shift (&pool->req_queue);
356
357 if (ecb_expect_true (req))
358 break;
359
360 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
361 {
362 X_UNLOCK (pool->reqlock);
363 X_LOCK (pool->wrklock);
364 --pool->started;
365 X_UNLOCK (pool->wrklock);
366 goto quit;
367 }
368
369 ++pool->idle;
370
371 if (pool->idle <= pool->max_idle)
372 /* we are allowed to pool->idle, so do so without any timeout */
373 X_COND_WAIT (pool->reqwait, pool->reqlock);
374 else
375 {
376 /* initialise timeout once */
377 if (!ts.tv_sec)
378 ts.tv_sec = time (0) + pool->idle_timeout;
379
380 if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT)
381 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
382 }
383
384 --pool->idle;
385 }
386
387 --pool->nready;
388
389 X_UNLOCK (pool->reqlock);
390
391 if (ecb_expect_false (req->type == ETP_TYPE_QUIT))
392 goto quit;
393
394 ETP_EXECUTE (self, req);
395
396 X_LOCK (pool->reslock);
397
398 ++pool->npending;
399
400 if (!reqq_push (&pool->res_queue, req))
401 ETP_WANT_POLL (pool);
402
403 etp_worker_clear (self);
404
405 X_UNLOCK (pool->reslock);
406 }
407
408 quit:
409 free (req);
410
411 X_LOCK (pool->wrklock);
412 etp_worker_free (self);
413 X_UNLOCK (pool->wrklock);
414
415 return 0;
416 }
417
418 static void ecb_cold
419 etp_start_thread (etp_pool pool)
420 {
421 etp_worker *wrk = calloc (1, sizeof (etp_worker));
422
423 /*TODO*/
424 assert (("unable to allocate worker thread data", wrk));
425
426 wrk->pool = pool;
427
428 X_LOCK (pool->wrklock);
429
430 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
431 {
432 wrk->prev = &pool->wrk_first;
433 wrk->next = pool->wrk_first.next;
434 pool->wrk_first.next->prev = wrk;
435 pool->wrk_first.next = wrk;
436 ++pool->started;
437 }
438 else
439 free (wrk);
440
441 X_UNLOCK (pool->wrklock);
442 }
443
444 static void
445 etp_maybe_start_thread (etp_pool pool)
446 {
447 if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted))
448 return;
449
450 /* todo: maybe use pool->idle here, but might be less exact */
451 if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
452 return;
453
454 etp_start_thread (pool);
455 }
456
457 static void ecb_cold
458 etp_end_thread (etp_pool pool)
459 {
460 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
461
462 req->type = ETP_TYPE_QUIT;
463 req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
464
465 X_LOCK (pool->reqlock);
466 reqq_push (&pool->req_queue, req);
467 X_COND_SIGNAL (pool->reqwait);
468 X_UNLOCK (pool->reqlock);
469
470 X_LOCK (pool->wrklock);
471 --pool->started;
472 X_UNLOCK (pool->wrklock);
473 }
474
475 ETP_API_DECL int
476 etp_poll (etp_pool pool)
477 {
478 unsigned int maxreqs;
479 unsigned int maxtime;
480 struct timeval tv_start, tv_now;
481
482 X_LOCK (pool->reslock);
483 maxreqs = pool->max_poll_reqs;
484 maxtime = pool->max_poll_time;
485 X_UNLOCK (pool->reslock);
486
487 if (maxtime)
488 gettimeofday (&tv_start, 0);
489
490 for (;;)
491 {
492 ETP_REQ *req;
493
494 etp_maybe_start_thread (pool);
495
496 X_LOCK (pool->reslock);
497 req = reqq_shift (&pool->res_queue);
498
499 if (ecb_expect_true (req))
500 {
501 --pool->npending;
502
503 if (!pool->res_queue.size)
504 ETP_DONE_POLL (pool);
505 }
506
507 X_UNLOCK (pool->reslock);
508
509 if (ecb_expect_false (!req))
510 return 0;
511
512 X_LOCK (pool->reqlock);
513 --pool->nreqs;
514 X_UNLOCK (pool->reqlock);
515
516 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
517 {
518 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
519 continue;
520 }
521 else
522 {
523 int res = ETP_FINISH (req);
524 if (ecb_expect_false (res))
525 return res;
526 }
527
528 if (ecb_expect_false (maxreqs && !--maxreqs))
529 break;
530
531 if (maxtime)
532 {
533 gettimeofday (&tv_now, 0);
534
535 if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
536 break;
537 }
538 }
539
540 errno = EAGAIN;
541 return -1;
542 }
543
544 ETP_API_DECL void
545 etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
546
547 ETP_API_DECL void
548 etp_cancel (etp_pool pool, ETP_REQ *req)
549 {
550 req->cancelled = 1;
551
552 etp_grp_cancel (pool, req);
553 }
554
555 ETP_API_DECL void
556 etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
557 {
558 for (grp = grp->grp_first; grp; grp = grp->grp_next)
559 etp_cancel (pool, grp);
560 }
561
562 ETP_API_DECL void
563 etp_submit (etp_pool pool, ETP_REQ *req)
564 {
565 req->pri -= ETP_PRI_MIN;
566
567 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
568 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
569
570 if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
571 {
572 /* I hope this is worth it :/ */
573 X_LOCK (pool->reqlock);
574 ++pool->nreqs;
575 X_UNLOCK (pool->reqlock);
576
577 X_LOCK (pool->reslock);
578
579 ++pool->npending;
580
581 if (!reqq_push (&pool->res_queue, req))
582 ETP_WANT_POLL (pool);
583
584 X_UNLOCK (pool->reslock);
585 }
586 else
587 {
588 X_LOCK (pool->reqlock);
589 ++pool->nreqs;
590 ++pool->nready;
591 reqq_push (&pool->req_queue, req);
592 X_COND_SIGNAL (pool->reqwait);
593 X_UNLOCK (pool->reqlock);
594
595 etp_maybe_start_thread (pool);
596 }
597 }
598
599 ETP_API_DECL void ecb_cold
600 etp_set_max_poll_time (etp_pool pool, double seconds)
601 {
602 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
603 pool->max_poll_time = seconds * ETP_TICKS;
604 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
605 }
606
607 ETP_API_DECL void ecb_cold
608 etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
609 {
610 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
611 pool->max_poll_reqs = maxreqs;
612 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
613 }
614
615 ETP_API_DECL void ecb_cold
616 etp_set_max_idle (etp_pool pool, unsigned int threads)
617 {
618 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
619 pool->max_idle = threads;
620 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
621 }
622
623 ETP_API_DECL void ecb_cold
624 etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
625 {
626 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
627 pool->idle_timeout = seconds;
628 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
629 }
630
631 ETP_API_DECL void ecb_cold
632 etp_set_min_parallel (etp_pool pool, unsigned int threads)
633 {
634 if (pool->wanted < threads)
635 pool->wanted = threads;
636 }
637
638 ETP_API_DECL void ecb_cold
639 etp_set_max_parallel (etp_pool pool, unsigned int threads)
640 {
641 if (pool->wanted > threads)
642 pool->wanted = threads;
643
644 while (pool->started > pool->wanted)
645 etp_end_thread (pool);
646 }
647