ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
Revision: 1.13
Committed: Tue Aug 14 11:44:53 2018 UTC (5 years, 9 months ago) by root
Content type: text/plain
Branch: MAIN
Changes since 1.12: +4 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 /*
2 * libetp implementation
3 *
4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann <libetp@schmorp.de>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without modifica-
8 * tion, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 *
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19 * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
20 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21 * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25 * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26 * OF THE POSSIBILITY OF SUCH DAMAGE.
27 *
28 * Alternatively, the contents of this file may be used under the terms of
29 * the GNU General Public License ("GPL") version 2 or any later version,
30 * in which case the provisions of the GPL are applicable instead of
31 * the above. If you wish to allow the use of your version of this file
32 * only under the terms of the GPL and not to allow others to use your
33 * version of this file under the BSD license, indicate your decision
34 * by deleting the provisions above and replace them with the notice
35 * and other provisions required by the GPL. If you do not delete the
36 * provisions above, a recipient may use your version of this file under
37 * either the BSD or the GPL.
38 */
39
40 #if HAVE_SYS_PRCTL_H
41 # include <sys/prctl.h>
42 #endif
43
44 #ifndef ETP_API_DECL
45 # define ETP_API_DECL static
46 #endif
47
48 #ifndef ETP_PRI_MIN
49 # define ETP_PRI_MIN 0
50 # define ETP_PRI_MAX 0
51 #endif
52
53 #ifndef ETP_TYPE_QUIT
54 # define ETP_TYPE_QUIT 0
55 #endif
56
57 #ifndef ETP_TYPE_GROUP
58 # define ETP_TYPE_GROUP 1
59 #endif
60
61 #ifndef ETP_WANT_POLL
62 # define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata)
63 #endif
64 #ifndef ETP_DONE_POLL
65 # define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata)
66 #endif
67
68 #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
69
70 #define ETP_TICKS ((1000000 + 1023) >> 10)
71
72 enum {
73 ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
74 ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */
75 };
76
77 /* calculate time difference in ~1/ETP_TICKS of a second */
78 ecb_inline int
79 etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
80 {
81 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
82 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
83 }
84
85 struct etp_tmpbuf
86 {
87 void *ptr;
88 int len;
89 };
90
91 static void *
92 etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
93 {
94 if (buf->len < len)
95 {
96 free (buf->ptr);
97 buf->ptr = malloc (buf->len = len);
98 }
99
100 return buf->ptr;
101 }
102
103 /*
104 * a somewhat faster data structure might be nice, but
105 * with 8 priorities this actually needs <20 insns
106 * per shift, the most expensive operation.
107 */
108 typedef struct
109 {
110 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
111 int size;
112 } etp_reqq;
113
114 typedef struct etp_pool *etp_pool;
115
116 typedef struct etp_worker
117 {
118 etp_pool pool;
119
120 struct etp_tmpbuf tmpbuf;
121
122 /* locked by pool->wrklock */
123 struct etp_worker *prev, *next;
124
125 xthread_t tid;
126
127 #ifdef ETP_WORKER_COMMON
128 ETP_WORKER_COMMON
129 #endif
130 } etp_worker;
131
132 struct etp_pool
133 {
134 void *userdata;
135
136 etp_reqq req_queue;
137 etp_reqq res_queue;
138
139 unsigned int started, idle, wanted;
140
141 unsigned int max_poll_time; /* pool->reslock */
142 unsigned int max_poll_reqs; /* pool->reslock */
143
144 unsigned int nreqs; /* pool->reqlock */
145 unsigned int nready; /* pool->reqlock */
146 unsigned int npending; /* pool->reqlock */
147 unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */
148 unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */
149
150 void (*want_poll_cb) (void *userdata);
151 void (*done_poll_cb) (void *userdata);
152
153 xmutex_t wrklock;
154 xmutex_t reslock;
155 xmutex_t reqlock;
156 xcond_t reqwait;
157
158 etp_worker wrk_first;
159 };
160
161 #define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
162 #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
163
164 /* worker threads management */
165
166 static void
167 etp_worker_clear (etp_worker *wrk)
168 {
169 }
170
171 static void ecb_cold
172 etp_worker_free (etp_worker *wrk)
173 {
174 free (wrk->tmpbuf.ptr);
175
176 wrk->next->prev = wrk->prev;
177 wrk->prev->next = wrk->next;
178
179 free (wrk);
180 }
181
182 ETP_API_DECL unsigned int
183 etp_nreqs (etp_pool pool)
184 {
185 int retval;
186 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
187 retval = pool->nreqs;
188 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
189 return retval;
190 }
191
192 ETP_API_DECL unsigned int
193 etp_nready (etp_pool pool)
194 {
195 unsigned int retval;
196
197 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
198 retval = pool->nready;
199 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
200
201 return retval;
202 }
203
204 ETP_API_DECL unsigned int
205 etp_npending (etp_pool pool)
206 {
207 unsigned int retval;
208
209 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
210 retval = pool->npending;
211 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
212
213 return retval;
214 }
215
216 ETP_API_DECL unsigned int
217 etp_nthreads (etp_pool pool)
218 {
219 unsigned int retval;
220
221 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
222 retval = pool->started;
223 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
224
225 return retval;
226 }
227
228 static void ecb_noinline ecb_cold
229 reqq_init (etp_reqq *q)
230 {
231 int pri;
232
233 for (pri = 0; pri < ETP_NUM_PRI; ++pri)
234 q->qs[pri] = q->qe[pri] = 0;
235
236 q->size = 0;
237 }
238
239 static int ecb_noinline
240 reqq_push (etp_reqq *q, ETP_REQ *req)
241 {
242 int pri = req->pri;
243 req->next = 0;
244
245 if (q->qe[pri])
246 {
247 q->qe[pri]->next = req;
248 q->qe[pri] = req;
249 }
250 else
251 q->qe[pri] = q->qs[pri] = req;
252
253 return q->size++;
254 }
255
256 static ETP_REQ * ecb_noinline
257 reqq_shift (etp_reqq *q)
258 {
259 int pri;
260
261 if (!q->size)
262 return 0;
263
264 --q->size;
265
266 for (pri = ETP_NUM_PRI; pri--; )
267 {
268 ETP_REQ *req = q->qs[pri];
269
270 if (req)
271 {
272 if (!(q->qs[pri] = (ETP_REQ *)req->next))
273 q->qe[pri] = 0;
274
275 return req;
276 }
277 }
278
279 abort ();
280 }
281
282 ETP_API_DECL int ecb_cold
283 etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
284 {
285 X_MUTEX_CREATE (pool->wrklock);
286 X_MUTEX_CREATE (pool->reslock);
287 X_MUTEX_CREATE (pool->reqlock);
288 X_COND_CREATE (pool->reqwait);
289
290 reqq_init (&pool->req_queue);
291 reqq_init (&pool->res_queue);
292
293 pool->wrk_first.next =
294 pool->wrk_first.prev = &pool->wrk_first;
295
296 pool->started = 0;
297 pool->idle = 0;
298 pool->nreqs = 0;
299 pool->nready = 0;
300 pool->npending = 0;
301 pool->wanted = 4;
302
303 pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */
304 pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
305
306 pool->userdata = userdata;
307 pool->want_poll_cb = want_poll;
308 pool->done_poll_cb = done_poll;
309
310 return 0;
311 }
312
313 static void ecb_noinline ecb_cold
314 etp_proc_init (void)
315 {
316 #if HAVE_PRCTL_SET_NAME
317 /* provide a more sensible "thread name" */
318 char name[16 + 1];
319 const int namelen = sizeof (name) - 1;
320 int len;
321
322 prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
323 name [namelen] = 0;
324 len = strlen (name);
325 strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
326 prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
327 #endif
328 }
329
330 X_THREAD_PROC (etp_proc)
331 {
332 ETP_REQ *req;
333 struct timespec ts;
334 etp_worker *self = (etp_worker *)thr_arg;
335 etp_pool pool = self->pool;
336
337 etp_proc_init ();
338
339 /* try to distribute timeouts somewhat evenly */
340 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
341
342 for (;;)
343 {
344 ts.tv_sec = 0;
345
346 X_LOCK (pool->reqlock);
347
348 for (;;)
349 {
350 req = reqq_shift (&pool->req_queue);
351
352 if (ecb_expect_true (req))
353 break;
354
355 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
356 {
357 X_UNLOCK (pool->reqlock);
358 X_LOCK (pool->wrklock);
359 --pool->started;
360 X_UNLOCK (pool->wrklock);
361 goto quit;
362 }
363
364 ++pool->idle;
365
366 if (pool->idle <= pool->max_idle)
367 /* we are allowed to pool->idle, so do so without any timeout */
368 X_COND_WAIT (pool->reqwait, pool->reqlock);
369 else
370 {
371 /* initialise timeout once */
372 if (!ts.tv_sec)
373 ts.tv_sec = time (0) + pool->idle_timeout;
374
375 if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT)
376 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
377 }
378
379 --pool->idle;
380 }
381
382 --pool->nready;
383
384 X_UNLOCK (pool->reqlock);
385
386 if (ecb_expect_false (req->type == ETP_TYPE_QUIT))
387 goto quit;
388
389 ETP_EXECUTE (self, req);
390
391 X_LOCK (pool->reslock);
392
393 ++pool->npending;
394
395 if (!reqq_push (&pool->res_queue, req))
396 ETP_WANT_POLL (pool);
397
398 etp_worker_clear (self);
399
400 X_UNLOCK (pool->reslock);
401 }
402
403 quit:
404 free (req);
405
406 X_LOCK (pool->wrklock);
407 etp_worker_free (self);
408 X_UNLOCK (pool->wrklock);
409
410 return 0;
411 }
412
413 static void ecb_cold
414 etp_start_thread (etp_pool pool)
415 {
416 etp_worker *wrk = calloc (1, sizeof (etp_worker));
417
418 /*TODO*/
419 assert (("unable to allocate worker thread data", wrk));
420
421 wrk->pool = pool;
422
423 X_LOCK (pool->wrklock);
424
425 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
426 {
427 wrk->prev = &pool->wrk_first;
428 wrk->next = pool->wrk_first.next;
429 pool->wrk_first.next->prev = wrk;
430 pool->wrk_first.next = wrk;
431 ++pool->started;
432 }
433 else
434 free (wrk);
435
436 X_UNLOCK (pool->wrklock);
437 }
438
439 static void
440 etp_maybe_start_thread (etp_pool pool)
441 {
442 if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted))
443 return;
444
445 /* todo: maybe use pool->idle here, but might be less exact */
446 if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
447 return;
448
449 etp_start_thread (pool);
450 }
451
452 static void ecb_cold
453 etp_end_thread (etp_pool pool)
454 {
455 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
456
457 req->type = ETP_TYPE_QUIT;
458 req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
459
460 X_LOCK (pool->reqlock);
461 reqq_push (&pool->req_queue, req);
462 X_COND_SIGNAL (pool->reqwait);
463 X_UNLOCK (pool->reqlock);
464
465 X_LOCK (pool->wrklock);
466 --pool->started;
467 X_UNLOCK (pool->wrklock);
468 }
469
470 ETP_API_DECL int
471 etp_poll (etp_pool pool)
472 {
473 unsigned int maxreqs;
474 unsigned int maxtime;
475 struct timeval tv_start, tv_now;
476
477 X_LOCK (pool->reslock);
478 maxreqs = pool->max_poll_reqs;
479 maxtime = pool->max_poll_time;
480 X_UNLOCK (pool->reslock);
481
482 if (maxtime)
483 gettimeofday (&tv_start, 0);
484
485 for (;;)
486 {
487 ETP_REQ *req;
488
489 etp_maybe_start_thread (pool);
490
491 X_LOCK (pool->reslock);
492 req = reqq_shift (&pool->res_queue);
493
494 if (ecb_expect_true (req))
495 {
496 --pool->npending;
497
498 if (!pool->res_queue.size)
499 ETP_DONE_POLL (pool);
500 }
501
502 X_UNLOCK (pool->reslock);
503
504 if (ecb_expect_false (!req))
505 return 0;
506
507 X_LOCK (pool->reqlock);
508 --pool->nreqs;
509 X_UNLOCK (pool->reqlock);
510
511 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
512 {
513 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
514 continue;
515 }
516 else
517 {
518 int res = ETP_FINISH (req);
519 if (ecb_expect_false (res))
520 return res;
521 }
522
523 if (ecb_expect_false (maxreqs && !--maxreqs))
524 break;
525
526 if (maxtime)
527 {
528 gettimeofday (&tv_now, 0);
529
530 if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
531 break;
532 }
533 }
534
535 errno = EAGAIN;
536 return -1;
537 }
538
539 ETP_API_DECL void
540 etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
541
542 ETP_API_DECL void
543 etp_cancel (etp_pool pool, ETP_REQ *req)
544 {
545 req->cancelled = 1;
546
547 etp_grp_cancel (pool, req);
548 }
549
550 ETP_API_DECL void
551 etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
552 {
553 for (grp = grp->grp_first; grp; grp = grp->grp_next)
554 etp_cancel (pool, grp);
555 }
556
557 ETP_API_DECL void
558 etp_submit (etp_pool pool, ETP_REQ *req)
559 {
560 req->pri -= ETP_PRI_MIN;
561
562 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
563 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
564
565 if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
566 {
567 /* I hope this is worth it :/ */
568 X_LOCK (pool->reqlock);
569 ++pool->nreqs;
570 X_UNLOCK (pool->reqlock);
571
572 X_LOCK (pool->reslock);
573
574 ++pool->npending;
575
576 if (!reqq_push (&pool->res_queue, req))
577 ETP_WANT_POLL (pool);
578
579 X_UNLOCK (pool->reslock);
580 }
581 else
582 {
583 X_LOCK (pool->reqlock);
584 ++pool->nreqs;
585 ++pool->nready;
586 reqq_push (&pool->req_queue, req);
587 X_COND_SIGNAL (pool->reqwait);
588 X_UNLOCK (pool->reqlock);
589
590 etp_maybe_start_thread (pool);
591 }
592 }
593
594 ETP_API_DECL void ecb_cold
595 etp_set_max_poll_time (etp_pool pool, double seconds)
596 {
597 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
598 pool->max_poll_time = seconds * ETP_TICKS;
599 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
600 }
601
602 ETP_API_DECL void ecb_cold
603 etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
604 {
605 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
606 pool->max_poll_reqs = maxreqs;
607 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
608 }
609
610 ETP_API_DECL void ecb_cold
611 etp_set_max_idle (etp_pool pool, unsigned int threads)
612 {
613 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
614 pool->max_idle = threads;
615 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
616 }
617
618 ETP_API_DECL void ecb_cold
619 etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
620 {
621 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
622 pool->idle_timeout = seconds;
623 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
624 }
625
626 ETP_API_DECL void ecb_cold
627 etp_set_min_parallel (etp_pool pool, unsigned int threads)
628 {
629 if (pool->wanted < threads)
630 pool->wanted = threads;
631 }
632
633 ETP_API_DECL void ecb_cold
634 etp_set_max_parallel (etp_pool pool, unsigned int threads)
635 {
636 if (pool->wanted > threads)
637 pool->wanted = threads;
638
639 while (pool->started > pool->wanted)
640 etp_end_thread (pool);
641 }
642