ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-XSThreadPool/etp.c
Revision: 1.2
Committed: Thu Jun 25 21:24:18 2015 UTC (8 years, 10 months ago) by root
Content type: text/plain
Branch: MAIN
CVS Tags: HEAD
Changes since 1.1: +10 -10 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 /*
2 * libetp implementation
3 *
4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann <libetp@schmorp.de>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without modifica-
8 * tion, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 *
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19 * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
20 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21 * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25 * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26 * OF THE POSSIBILITY OF SUCH DAMAGE.
27 *
28 * Alternatively, the contents of this file may be used under the terms of
29 * the GNU General Public License ("GPL") version 2 or any later version,
30 * in which case the provisions of the GPL are applicable instead of
31 * the above. If you wish to allow the use of your version of this file
32 * only under the terms of the GPL and not to allow others to use your
33 * version of this file under the BSD license, indicate your decision
34 * by deleting the provisions above and replace them with the notice
35 * and other provisions required by the GPL. If you do not delete the
36 * provisions above, a recipient may use your version of this file under
37 * either the BSD or the GPL.
38 */
39
40 #ifndef ETP_API_DECL
41 # define ETP_API_DECL static
42 #endif
43
44 #ifndef ETP_PRI_MIN
45 # define ETP_PRI_MIN 0
46 # define ETP_PRI_MAX 0
47 #endif
48
49 #ifndef ETP_TYPE_QUIT
50 # define ETP_TYPE_QUIT 0
51 #endif
52
53 #ifndef ETP_TYPE_GROUP
54 # define ETP_TYPE_GROUP 1
55 #endif
56
57 #ifndef ETP_WANT_POLL
58 # define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata)
59 #endif
60 #ifndef ETP_DONE_POLL
61 # define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata)
62 #endif
63
64 #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
65
66 #define ETP_TICKS ((1000000 + 1023) >> 10)
67
68 enum {
69 ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
70 ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */
71 };
72
73 /* calculate time difference in ~1/ETP_TICKS of a second */
74 ecb_inline int
75 etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
76 {
77 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
78 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
79 }
80
81 struct etp_tmpbuf
82 {
83 void *ptr;
84 int len;
85 };
86
87 static void *
88 etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
89 {
90 if (buf->len < len)
91 {
92 free (buf->ptr);
93 buf->ptr = malloc (buf->len = len);
94 }
95
96 return buf->ptr;
97 }
98
99 /*
100 * a somewhat faster data structure might be nice, but
101 * with 8 priorities this actually needs <20 insns
102 * per shift, the most expensive operation.
103 */
104 typedef struct
105 {
106 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
107 int size;
108 } etp_reqq;
109
110 typedef struct etp_pool *etp_pool;
111
112 typedef struct etp_worker
113 {
114 etp_pool pool;
115
116 struct etp_tmpbuf tmpbuf;
117
118 /* locked by pool->wrklock */
119 struct etp_worker *prev, *next;
120
121 xthread_t tid;
122
123 #ifdef ETP_WORKER_COMMON
124 ETP_WORKER_COMMON
125 #endif
126 } etp_worker;
127
128 struct etp_pool
129 {
130 void *userdata;
131
132 etp_reqq req_queue;
133 etp_reqq res_queue;
134
135 unsigned int started, idle, wanted;
136
137 unsigned int max_poll_time; /* pool->reslock */
138 unsigned int max_poll_reqs; /* pool->reslock */
139
140 unsigned int nreqs; /* pool->reqlock */
141 unsigned int nready; /* pool->reqlock */
142 unsigned int npending; /* pool->reqlock */
143 unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */
144 unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */
145
146 void (*want_poll_cb) (void *userdata);
147 void (*done_poll_cb) (void *userdata);
148
149 xmutex_t wrklock;
150 xmutex_t reslock;
151 xmutex_t reqlock;
152 xcond_t reqwait;
153
154 etp_worker wrk_first;
155 };
156
157 #define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
158 #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
159
160 /* worker threads management */
161
162 static void
163 etp_worker_clear (etp_worker *wrk)
164 {
165 }
166
167 static void ecb_cold
168 etp_worker_free (etp_worker *wrk)
169 {
170 free (wrk->tmpbuf.ptr);
171
172 wrk->next->prev = wrk->prev;
173 wrk->prev->next = wrk->next;
174
175 free (wrk);
176 }
177
178 ETP_API_DECL unsigned int
179 etp_nreqs (etp_pool pool)
180 {
181 int retval;
182 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
183 retval = pool->nreqs;
184 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
185 return retval;
186 }
187
188 ETP_API_DECL unsigned int
189 etp_nready (etp_pool pool)
190 {
191 unsigned int retval;
192
193 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
194 retval = pool->nready;
195 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
196
197 return retval;
198 }
199
200 ETP_API_DECL unsigned int
201 etp_npending (etp_pool pool)
202 {
203 unsigned int retval;
204
205 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
206 retval = pool->npending;
207 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
208
209 return retval;
210 }
211
212 ETP_API_DECL unsigned int
213 etp_nthreads (etp_pool pool)
214 {
215 unsigned int retval;
216
217 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
218 retval = pool->started;
219 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
220
221 return retval;
222 }
223
224 static void ecb_noinline ecb_cold
225 reqq_init (etp_reqq *q)
226 {
227 int pri;
228
229 for (pri = 0; pri < ETP_NUM_PRI; ++pri)
230 q->qs[pri] = q->qe[pri] = 0;
231
232 q->size = 0;
233 }
234
235 static int ecb_noinline
236 reqq_push (etp_reqq *q, ETP_REQ *req)
237 {
238 int pri = req->pri;
239 req->next = 0;
240
241 if (q->qe[pri])
242 {
243 q->qe[pri]->next = req;
244 q->qe[pri] = req;
245 }
246 else
247 q->qe[pri] = q->qs[pri] = req;
248
249 return q->size++;
250 }
251
252 static ETP_REQ * ecb_noinline
253 reqq_shift (etp_reqq *q)
254 {
255 int pri;
256
257 if (!q->size)
258 return 0;
259
260 --q->size;
261
262 for (pri = ETP_NUM_PRI; pri--; )
263 {
264 ETP_REQ *req = q->qs[pri];
265
266 if (req)
267 {
268 if (!(q->qs[pri] = (ETP_REQ *)req->next))
269 q->qe[pri] = 0;
270
271 return req;
272 }
273 }
274
275 abort ();
276 }
277
278 ETP_API_DECL int ecb_cold
279 etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
280 {
281 X_MUTEX_CREATE (pool->wrklock);
282 X_MUTEX_CREATE (pool->reslock);
283 X_MUTEX_CREATE (pool->reqlock);
284 X_COND_CREATE (pool->reqwait);
285
286 reqq_init (&pool->req_queue);
287 reqq_init (&pool->res_queue);
288
289 pool->wrk_first.next =
290 pool->wrk_first.prev = &pool->wrk_first;
291
292 pool->started = 0;
293 pool->idle = 0;
294 pool->nreqs = 0;
295 pool->nready = 0;
296 pool->npending = 0;
297 pool->wanted = 4;
298
299 pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */
300 pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
301
302 pool->userdata = userdata;
303 pool->want_poll_cb = want_poll;
304 pool->done_poll_cb = done_poll;
305
306 return 0;
307 }
308
309 static void ecb_noinline ecb_cold
310 etp_proc_init (void)
311 {
312 #if HAVE_PRCTL_SET_NAME
313 /* provide a more sensible "thread name" */
314 char name[16 + 1];
315 const int namelen = sizeof (name) - 1;
316 int len;
317
318 prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
319 name [namelen] = 0;
320 len = strlen (name);
321 strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
322 prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
323 #endif
324 }
325
326 X_THREAD_PROC (etp_proc)
327 {
328 ETP_REQ *req;
329 struct timespec ts;
330 etp_worker *self = (etp_worker *)thr_arg;
331 etp_pool pool = self->pool;
332
333 etp_proc_init ();
334
335 /* try to distribute timeouts somewhat evenly */
336 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
337
338 for (;;)
339 {
340 ts.tv_sec = 0;
341
342 X_LOCK (pool->reqlock);
343
344 for (;;)
345 {
346 req = reqq_shift (&pool->req_queue);
347
348 if (ecb_expect_true (req))
349 break;
350
351 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
352 {
353 X_UNLOCK (pool->reqlock);
354 X_LOCK (pool->wrklock);
355 --pool->started;
356 X_UNLOCK (pool->wrklock);
357 goto quit;
358 }
359
360 ++pool->idle;
361
362 if (pool->idle <= pool->max_idle)
363 /* we are allowed to pool->idle, so do so without any timeout */
364 X_COND_WAIT (pool->reqwait, pool->reqlock);
365 else
366 {
367 /* initialise timeout once */
368 if (!ts.tv_sec)
369 ts.tv_sec = time (0) + pool->idle_timeout;
370
371 if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT)
372 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
373 }
374
375 --pool->idle;
376 }
377
378 --pool->nready;
379
380 X_UNLOCK (pool->reqlock);
381
382 if (ecb_expect_false (req->type == ETP_TYPE_QUIT))
383 goto quit;
384
385 ETP_EXECUTE (self, req);
386
387 X_LOCK (pool->reslock);
388
389 ++pool->npending;
390
391 if (!reqq_push (&pool->res_queue, req))
392 ETP_WANT_POLL (pool);
393
394 etp_worker_clear (self);
395
396 X_UNLOCK (pool->reslock);
397 }
398
399 quit:
400 free (req);
401
402 X_LOCK (pool->wrklock);
403 etp_worker_free (self);
404 X_UNLOCK (pool->wrklock);
405
406 return 0;
407 }
408
409 static void ecb_cold
410 etp_start_thread (etp_pool pool)
411 {
412 etp_worker *wrk = calloc (1, sizeof (etp_worker));
413
414 /*TODO*/
415 assert (("unable to allocate worker thread data", wrk));
416
417 wrk->pool = pool;
418
419 X_LOCK (pool->wrklock);
420
421 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
422 {
423 wrk->prev = &pool->wrk_first;
424 wrk->next = pool->wrk_first.next;
425 pool->wrk_first.next->prev = wrk;
426 pool->wrk_first.next = wrk;
427 ++pool->started;
428 }
429 else
430 free (wrk);
431
432 X_UNLOCK (pool->wrklock);
433 }
434
435 static void
436 etp_maybe_start_thread (etp_pool pool)
437 {
438 if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted))
439 return;
440
441 /* todo: maybe use pool->idle here, but might be less exact */
442 if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
443 return;
444
445 etp_start_thread (pool);
446 }
447
448 static void ecb_cold
449 etp_end_thread (etp_pool pool)
450 {
451 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
452
453 req->type = ETP_TYPE_QUIT;
454 req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
455
456 X_LOCK (pool->reqlock);
457 reqq_push (&pool->req_queue, req);
458 X_COND_SIGNAL (pool->reqwait);
459 X_UNLOCK (pool->reqlock);
460
461 X_LOCK (pool->wrklock);
462 --pool->started;
463 X_UNLOCK (pool->wrklock);
464 }
465
466 ETP_API_DECL int
467 etp_poll (etp_pool pool)
468 {
469 unsigned int maxreqs;
470 unsigned int maxtime;
471 struct timeval tv_start, tv_now;
472
473 X_LOCK (pool->reslock);
474 maxreqs = pool->max_poll_reqs;
475 maxtime = pool->max_poll_time;
476 X_UNLOCK (pool->reslock);
477
478 if (maxtime)
479 gettimeofday (&tv_start, 0);
480
481 for (;;)
482 {
483 ETP_REQ *req;
484
485 etp_maybe_start_thread (pool);
486
487 X_LOCK (pool->reslock);
488 req = reqq_shift (&pool->res_queue);
489
490 if (ecb_expect_true (req))
491 {
492 --pool->npending;
493
494 if (!pool->res_queue.size)
495 ETP_DONE_POLL (pool);
496 }
497
498 X_UNLOCK (pool->reslock);
499
500 if (ecb_expect_false (!req))
501 return 0;
502
503 X_LOCK (pool->reqlock);
504 --pool->nreqs;
505 X_UNLOCK (pool->reqlock);
506
507 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
508 {
509 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
510 continue;
511 }
512 else
513 {
514 int res = ETP_FINISH (req);
515 if (ecb_expect_false (res))
516 return res;
517 }
518
519 if (ecb_expect_false (maxreqs && !--maxreqs))
520 break;
521
522 if (maxtime)
523 {
524 gettimeofday (&tv_now, 0);
525
526 if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
527 break;
528 }
529 }
530
531 errno = EAGAIN;
532 return -1;
533 }
534
535 ETP_API_DECL void
536 etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
537
538 ETP_API_DECL void
539 etp_cancel (etp_pool pool, ETP_REQ *req)
540 {
541 req->cancelled = 1;
542
543 etp_grp_cancel (pool, req);
544 }
545
546 ETP_API_DECL void
547 etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
548 {
549 for (grp = grp->grp_first; grp; grp = grp->grp_next)
550 etp_cancel (pool, grp);
551 }
552
553 ETP_API_DECL void
554 etp_submit (etp_pool pool, ETP_REQ *req)
555 {
556 req->pri -= ETP_PRI_MIN;
557
558 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
559 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
560
561 if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
562 {
563 /* I hope this is worth it :/ */
564 X_LOCK (pool->reqlock);
565 ++pool->nreqs;
566 X_UNLOCK (pool->reqlock);
567
568 X_LOCK (pool->reslock);
569
570 ++pool->npending;
571
572 if (!reqq_push (&pool->res_queue, req))
573 ETP_WANT_POLL (pool);
574
575 X_UNLOCK (pool->reslock);
576 }
577 else
578 {
579 X_LOCK (pool->reqlock);
580 ++pool->nreqs;
581 ++pool->nready;
582 reqq_push (&pool->req_queue, req);
583 X_COND_SIGNAL (pool->reqwait);
584 X_UNLOCK (pool->reqlock);
585
586 etp_maybe_start_thread (pool);
587 }
588 }
589
590 ETP_API_DECL void ecb_cold
591 etp_set_max_poll_time (etp_pool pool, double seconds)
592 {
593 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
594 pool->max_poll_time = seconds * ETP_TICKS;
595 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
596 }
597
598 ETP_API_DECL void ecb_cold
599 etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
600 {
601 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
602 pool->max_poll_reqs = maxreqs;
603 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
604 }
605
606 ETP_API_DECL void ecb_cold
607 etp_set_max_idle (etp_pool pool, unsigned int threads)
608 {
609 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
610 pool->max_idle = threads;
611 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
612 }
613
614 ETP_API_DECL void ecb_cold
615 etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
616 {
617 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
618 pool->idle_timeout = seconds;
619 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
620 }
621
622 ETP_API_DECL void ecb_cold
623 etp_set_min_parallel (etp_pool pool, unsigned int threads)
624 {
625 if (pool->wanted < threads)
626 pool->wanted = threads;
627 }
628
629 ETP_API_DECL void ecb_cold
630 etp_set_max_parallel (etp_pool pool, unsigned int threads)
631 {
632 if (pool->wanted > threads)
633 pool->wanted = threads;
634
635 while (pool->started > pool->wanted)
636 etp_end_thread (pool);
637 }
638