ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
Revision: 1.5
Committed: Thu Jun 25 17:40:24 2015 UTC (8 years, 11 months ago) by root
Content type: text/plain
Branch: MAIN
Changes since 1.4: +60 -49 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 /*
2 * libetp implementation
3 *
4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libetp@schmorp.de>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without modifica-
8 * tion, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 *
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19 * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
20 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21 * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25 * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26 * OF THE POSSIBILITY OF SUCH DAMAGE.
27 *
28 * Alternatively, the contents of this file may be used under the terms of
29 * the GNU General Public License ("GPL") version 2 or any later version,
30 * in which case the provisions of the GPL are applicable instead of
31 * the above. If you wish to allow the use of your version of this file
32 * only under the terms of the GPL and not to allow others to use your
33 * version of this file under the BSD license, indicate your decision
34 * by deleting the provisions above and replace them with the notice
35 * and other provisions required by the GPL. If you do not delete the
36 * provisions above, a recipient may use your version of this file under
37 * either the BSD or the GPL.
38 */
39
40 #ifndef ETP_API_DECL
41 # define ETP_API_DECL static
42 #endif
43
44 #ifndef ETP_PRI_MIN
45 # define ETP_PRI_MIN 0
46 # define ETP_PRI_MAX 0
47 #endif
48
49 #ifndef ETP_TYPE_QUIT
50 # define ETP_TYPE_QUIT 0
51 #endif
52
53 #ifndef ETP_TYPE_GROUP
54 # define ETP_TYPE_GROUP 1
55 #endif
56
57 #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
58
59 #define ETP_TICKS ((1000000 + 1023) >> 10)
60
61 enum {
62 ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
63 ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */
64 };
65
66 /* calculate time difference in ~1/ETP_TICKS of a second */
67 ecb_inline int
68 etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
69 {
70 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
71 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
72 }
73
74 static unsigned int started, idle, wanted = 4;
75
76 static void (*want_poll_cb) (void);
77 static void (*done_poll_cb) (void);
78
79 static unsigned int max_poll_time; /* reslock */
80 static unsigned int max_poll_reqs; /* reslock */
81
82 static unsigned int nreqs; /* reqlock */
83 static unsigned int nready; /* reqlock */
84 static unsigned int npending; /* reqlock */
85 static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */
86 static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */
87
88 static xmutex_t wrklock;
89 static xmutex_t reslock;
90 static xmutex_t reqlock;
91 static xcond_t reqwait;
92
93 struct etp_tmpbuf
94 {
95 void *ptr;
96 int len;
97 };
98
99 static void *
100 etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
101 {
102 if (buf->len < len)
103 {
104 free (buf->ptr);
105 buf->ptr = malloc (buf->len = len);
106 }
107
108 return buf->ptr;
109 }
110
111 /*
112 * a somewhat faster data structure might be nice, but
113 * with 8 priorities this actually needs <20 insns
114 * per shift, the most expensive operation.
115 */
116 typedef struct
117 {
118 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
119 int size;
120 } etp_reqq;
121
122 struct etp_pool
123 {
124 etp_reqq req_queue;
125 etp_reqq res_queue;
126 };
127
128 typedef struct etp_pool *etp_pool;
129
130 typedef struct etp_worker
131 {
132 etp_pool pool;
133
134 struct etp_tmpbuf tmpbuf;
135
136 /* locked by wrklock */
137 struct etp_worker *prev, *next;
138
139 xthread_t tid;
140
141 #ifdef ETP_WORKER_COMMON
142 ETP_WORKER_COMMON
143 #endif
144 } etp_worker;
145
146 static etp_worker wrk_first; /* NOT etp */
147
148 #define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock)
149 #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock)
150
151 /* worker threads management */
152
153 static void
154 etp_worker_clear (etp_worker *wrk)
155 {
156 }
157
158 static void ecb_cold
159 etp_worker_free (etp_worker *wrk)
160 {
161 free (wrk->tmpbuf.ptr);
162
163 wrk->next->prev = wrk->prev;
164 wrk->prev->next = wrk->next;
165
166 free (wrk);
167 }
168
169 ETP_API_DECL unsigned int
170 etp_nreqs (etp_pool pool)
171 {
172 int retval;
173 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
174 retval = nreqs;
175 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
176 return retval;
177 }
178
179 ETP_API_DECL unsigned int
180 etp_nready (etp_pool pool)
181 {
182 unsigned int retval;
183
184 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
185 retval = nready;
186 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
187
188 return retval;
189 }
190
191 ETP_API_DECL unsigned int
192 etp_npending (etp_pool pool)
193 {
194 unsigned int retval;
195
196 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
197 retval = npending;
198 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
199
200 return retval;
201 }
202
203 ETP_API_DECL unsigned int
204 etp_nthreads (etp_pool pool)
205 {
206 unsigned int retval;
207
208 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
209 retval = started;
210 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
211
212 return retval;
213 }
214
215 static void ecb_noinline ecb_cold
216 reqq_init (etp_reqq *q)
217 {
218 int pri;
219
220 for (pri = 0; pri < ETP_NUM_PRI; ++pri)
221 q->qs[pri] = q->qe[pri] = 0;
222
223 q->size = 0;
224 }
225
226 static int ecb_noinline
227 reqq_push (etp_reqq *q, ETP_REQ *req)
228 {
229 int pri = req->pri;
230 req->next = 0;
231
232 if (q->qe[pri])
233 {
234 q->qe[pri]->next = req;
235 q->qe[pri] = req;
236 }
237 else
238 q->qe[pri] = q->qs[pri] = req;
239
240 return q->size++;
241 }
242
243 static ETP_REQ * ecb_noinline
244 reqq_shift (etp_reqq *q)
245 {
246 int pri;
247
248 if (!q->size)
249 return 0;
250
251 --q->size;
252
253 for (pri = ETP_NUM_PRI; pri--; )
254 {
255 ETP_REQ *req = q->qs[pri];
256
257 if (req)
258 {
259 if (!(q->qs[pri] = (ETP_REQ *)req->next))
260 q->qe[pri] = 0;
261
262 return req;
263 }
264 }
265
266 abort ();
267 }
268
269 ETP_API_DECL int ecb_cold
270 etp_init (etp_pool pool, void (*want_poll)(void), void (*done_poll)(void))
271 {
272 X_MUTEX_CREATE (wrklock);
273 X_MUTEX_CREATE (reslock);
274 X_MUTEX_CREATE (reqlock);
275 X_COND_CREATE (reqwait);
276
277 reqq_init (&pool->req_queue);
278 reqq_init (&pool->res_queue);
279
280 wrk_first.next =
281 wrk_first.prev = &wrk_first;
282
283 started = 0;
284 idle = 0;
285 nreqs = 0;
286 nready = 0;
287 npending = 0;
288
289 want_poll_cb = want_poll;
290 done_poll_cb = done_poll;
291
292 return 0;
293 }
294
295 static void ecb_noinline ecb_cold
296 etp_proc_init (void)
297 {
298 #if HAVE_PRCTL_SET_NAME
299 /* provide a more sensible "thread name" */
300 char name[16 + 1];
301 const int namelen = sizeof (name) - 1;
302 int len;
303
304 prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
305 name [namelen] = 0;
306 len = strlen (name);
307 strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
308 prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
309 #endif
310 }
311
312 X_THREAD_PROC (etp_proc)
313 {
314 ETP_REQ *req;
315 struct timespec ts;
316 etp_worker *self = (etp_worker *)thr_arg;
317 etp_pool pool = self->pool;
318
319 etp_proc_init ();
320
321 /* try to distribute timeouts somewhat evenly */
322 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
323
324 for (;;)
325 {
326 ts.tv_sec = 0;
327
328 X_LOCK (reqlock);
329
330 for (;;)
331 {
332 req = reqq_shift (&pool->req_queue);
333
334 if (req)
335 break;
336
337 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
338 {
339 X_UNLOCK (reqlock);
340 X_LOCK (wrklock);
341 --started;
342 X_UNLOCK (wrklock);
343 goto quit;
344 }
345
346 ++idle;
347
348 if (idle <= max_idle)
349 /* we are allowed to idle, so do so without any timeout */
350 X_COND_WAIT (reqwait, reqlock);
351 else
352 {
353 /* initialise timeout once */
354 if (!ts.tv_sec)
355 ts.tv_sec = time (0) + idle_timeout;
356
357 if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
358 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
359 }
360
361 --idle;
362 }
363
364 --nready;
365
366 X_UNLOCK (reqlock);
367
368 if (req->type == ETP_TYPE_QUIT)
369 goto quit;
370
371 ETP_EXECUTE (self, req);
372
373 X_LOCK (reslock);
374
375 ++npending;
376
377 if (!reqq_push (&pool->res_queue, req) && want_poll_cb)
378 want_poll_cb ();
379
380 etp_worker_clear (self);
381
382 X_UNLOCK (reslock);
383 }
384
385 quit:
386 free (req);
387
388 X_LOCK (wrklock);
389 etp_worker_free (self);
390 X_UNLOCK (wrklock);
391
392 return 0;
393 }
394
395 static void ecb_cold
396 etp_start_thread (etp_pool pool)
397 {
398 etp_worker *wrk = calloc (1, sizeof (etp_worker));
399
400 /*TODO*/
401 assert (("unable to allocate worker thread data", wrk));
402
403 wrk->pool = pool;
404
405 X_LOCK (wrklock);
406
407 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
408 {
409 wrk->prev = &wrk_first;
410 wrk->next = wrk_first.next;
411 wrk_first.next->prev = wrk;
412 wrk_first.next = wrk;
413 ++started;
414 }
415 else
416 free (wrk);
417
418 X_UNLOCK (wrklock);
419 }
420
421 static void
422 etp_maybe_start_thread (etp_pool pool)
423 {
424 if (ecb_expect_true (etp_nthreads (pool) >= wanted))
425 return;
426
427 /* todo: maybe use idle here, but might be less exact */
428 if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
429 return;
430
431 etp_start_thread (pool);
432 }
433
434 static void ecb_cold
435 etp_end_thread (etp_pool pool)
436 {
437 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
438
439 req->type = ETP_TYPE_QUIT;
440 req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
441
442 X_LOCK (reqlock);
443 reqq_push (&pool->req_queue, req);
444 X_COND_SIGNAL (reqwait);
445 X_UNLOCK (reqlock);
446
447 X_LOCK (wrklock);
448 --started;
449 X_UNLOCK (wrklock);
450 }
451
452 ETP_API_DECL int
453 etp_poll (etp_pool pool)
454 {
455 unsigned int maxreqs;
456 unsigned int maxtime;
457 struct timeval tv_start, tv_now;
458
459 X_LOCK (reslock);
460 maxreqs = max_poll_reqs;
461 maxtime = max_poll_time;
462 X_UNLOCK (reslock);
463
464 if (maxtime)
465 gettimeofday (&tv_start, 0);
466
467 for (;;)
468 {
469 ETP_REQ *req;
470
471 etp_maybe_start_thread (pool);
472
473 X_LOCK (reslock);
474 req = reqq_shift (&pool->res_queue);
475
476 if (req)
477 {
478 --npending;
479
480 if (!pool->res_queue.size && done_poll_cb)
481 done_poll_cb ();
482 }
483
484 X_UNLOCK (reslock);
485
486 if (!req)
487 return 0;
488
489 X_LOCK (reqlock);
490 --nreqs;
491 X_UNLOCK (reqlock);
492
493 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
494 {
495 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
496 continue;
497 }
498 else
499 {
500 int res = ETP_FINISH (req);
501 if (ecb_expect_false (res))
502 return res;
503 }
504
505 if (ecb_expect_false (maxreqs && !--maxreqs))
506 break;
507
508 if (maxtime)
509 {
510 gettimeofday (&tv_now, 0);
511
512 if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
513 break;
514 }
515 }
516
517 errno = EAGAIN;
518 return -1;
519 }
520
521 ETP_API_DECL void
522 etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
523
524 ETP_API_DECL void
525 etp_cancel (etp_pool pool, ETP_REQ *req)
526 {
527 req->cancelled = 1;
528
529 etp_grp_cancel (pool, req);
530 }
531
532 ETP_API_DECL void
533 etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
534 {
535 for (grp = grp->grp_first; grp; grp = grp->grp_next)
536 etp_cancel (pool, grp);
537 }
538
539 ETP_API_DECL void
540 etp_submit (etp_pool pool, ETP_REQ *req)
541 {
542 req->pri -= ETP_PRI_MIN;
543
544 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
545 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
546
547 if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
548 {
549 /* I hope this is worth it :/ */
550 X_LOCK (reqlock);
551 ++nreqs;
552 X_UNLOCK (reqlock);
553
554 X_LOCK (reslock);
555
556 ++npending;
557
558 if (!reqq_push (&pool->res_queue, req) && want_poll_cb)
559 want_poll_cb ();
560
561 X_UNLOCK (reslock);
562 }
563 else
564 {
565 X_LOCK (reqlock);
566 ++nreqs;
567 ++nready;
568 reqq_push (&pool->req_queue, req);
569 X_COND_SIGNAL (reqwait);
570 X_UNLOCK (reqlock);
571
572 etp_maybe_start_thread (pool);
573 }
574 }
575
576 ETP_API_DECL void ecb_cold
577 etp_set_max_poll_time (etp_pool pool, double nseconds)
578 {
579 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
580 max_poll_time = nseconds * ETP_TICKS;
581 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
582 }
583
584 ETP_API_DECL void ecb_cold
585 etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
586 {
587 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
588 max_poll_reqs = maxreqs;
589 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
590 }
591
592 ETP_API_DECL void ecb_cold
593 etp_set_max_idle (etp_pool pool, unsigned int nthreads)
594 {
595 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
596 max_idle = nthreads;
597 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
598 }
599
600 ETP_API_DECL void ecb_cold
601 etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
602 {
603 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
604 idle_timeout = seconds;
605 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
606 }
607
608 ETP_API_DECL void ecb_cold
609 etp_set_min_parallel (etp_pool pool, unsigned int nthreads)
610 {
611 if (wanted < nthreads)
612 wanted = nthreads;
613 }
614
615 ETP_API_DECL void ecb_cold
616 etp_set_max_parallel (etp_pool pool, unsigned int nthreads)
617 {
618 if (wanted > nthreads)
619 wanted = nthreads;
620
621 while (started > wanted)
622 etp_end_thread (pool);
623 }
624