ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
Revision: 1.4
Committed: Thu Jun 25 17:05:07 2015 UTC (8 years, 11 months ago) by root
Content type: text/plain
Branch: MAIN
Changes since 1.3: +104 -3 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 /*
2 * libetp implementation
3 *
4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libetp@schmorp.de>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without modifica-
8 * tion, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 *
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19 * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
20 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21 * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25 * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26 * OF THE POSSIBILITY OF SUCH DAMAGE.
27 *
28 * Alternatively, the contents of this file may be used under the terms of
29 * the GNU General Public License ("GPL") version 2 or any later version,
30 * in which case the provisions of the GPL are applicable instead of
31 * the above. If you wish to allow the use of your version of this file
32 * only under the terms of the GPL and not to allow others to use your
33 * version of this file under the BSD license, indicate your decision
34 * by deleting the provisions above and replace them with the notice
35 * and other provisions required by the GPL. If you do not delete the
36 * provisions above, a recipient may use your version of this file under
37 * either the BSD or the GPL.
38 */
39
40 #ifndef ETP_API_DECL
41 # define ETP_API_DECL static
42 #endif
43
44 #ifndef ETP_PRI_MIN
45 # define ETP_PRI_MIN 0
46 # define ETP_PRI_MAX 0
47 #endif
48
49 #ifndef ETP_TYPE_QUIT
50 # define ETP_TYPE_QUIT 0
51 #endif
52
53 #ifndef ETP_TYPE_GROUP
54 # define ETP_TYPE_GROUP 1
55 #endif
56
57 #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
58
59 #define ETP_TICKS ((1000000 + 1023) >> 10)
60
61 enum {
62 ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
63 ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */
64 };
65
66 /* calculate time difference in ~1/ETP_TICKS of a second */
67 ecb_inline int
68 etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
69 {
70 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
71 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
72 }
73
74 static unsigned int started, idle, wanted = 4;
75
76 static void (*want_poll_cb) (void);
77 static void (*done_poll_cb) (void);
78
79 static unsigned int max_poll_time; /* reslock */
80 static unsigned int max_poll_reqs; /* reslock */
81
82 static unsigned int nreqs; /* reqlock */
83 static unsigned int nready; /* reqlock */
84 static unsigned int npending; /* reqlock */
85 static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */
86 static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */
87
88 static xmutex_t wrklock;
89 static xmutex_t reslock;
90 static xmutex_t reqlock;
91 static xcond_t reqwait;
92
93 struct etp_tmpbuf
94 {
95 void *ptr;
96 int len;
97 };
98
99 static void *
100 etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
101 {
102 if (buf->len < len)
103 {
104 free (buf->ptr);
105 buf->ptr = malloc (buf->len = len);
106 }
107
108 return buf->ptr;
109 }
110
111 typedef struct etp_worker
112 {
113 struct etp_tmpbuf tmpbuf;
114
115 /* locked by wrklock */
116 struct etp_worker *prev, *next;
117
118 xthread_t tid;
119
120 #ifdef ETP_WORKER_COMMON
121 ETP_WORKER_COMMON
122 #endif
123 } etp_worker;
124
125 static etp_worker wrk_first; /* NOT etp */
126
127 #define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock)
128 #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock)
129
130 /* worker threads management */
131
132 static void
133 etp_worker_clear (etp_worker *wrk)
134 {
135 }
136
137 static void ecb_cold
138 etp_worker_free (etp_worker *wrk)
139 {
140 free (wrk->tmpbuf.ptr);
141
142 wrk->next->prev = wrk->prev;
143 wrk->prev->next = wrk->next;
144
145 free (wrk);
146 }
147
148 ETP_API_DECL unsigned int
149 etp_nreqs (void)
150 {
151 int retval;
152 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
153 retval = nreqs;
154 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
155 return retval;
156 }
157
158 ETP_API_DECL unsigned int
159 etp_nready (void)
160 {
161 unsigned int retval;
162
163 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
164 retval = nready;
165 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
166
167 return retval;
168 }
169
170 ETP_API_DECL unsigned int
171 etp_npending (void)
172 {
173 unsigned int retval;
174
175 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
176 retval = npending;
177 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
178
179 return retval;
180 }
181
182 ETP_API_DECL unsigned int
183 etp_nthreads (void)
184 {
185 unsigned int retval;
186
187 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
188 retval = started;
189 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
190
191 return retval;
192 }
193
194 /*
195 * a somewhat faster data structure might be nice, but
196 * with 8 priorities this actually needs <20 insns
197 * per shift, the most expensive operation.
198 */
199 typedef struct {
200 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
201 int size;
202 } etp_reqq;
203
204 static etp_reqq req_queue;
205 static etp_reqq res_queue;
206
207 static void ecb_noinline ecb_cold
208 reqq_init (etp_reqq *q)
209 {
210 int pri;
211
212 for (pri = 0; pri < ETP_NUM_PRI; ++pri)
213 q->qs[pri] = q->qe[pri] = 0;
214
215 q->size = 0;
216 }
217
218 static int ecb_noinline
219 reqq_push (etp_reqq *q, ETP_REQ *req)
220 {
221 int pri = req->pri;
222 req->next = 0;
223
224 if (q->qe[pri])
225 {
226 q->qe[pri]->next = req;
227 q->qe[pri] = req;
228 }
229 else
230 q->qe[pri] = q->qs[pri] = req;
231
232 return q->size++;
233 }
234
235 static ETP_REQ * ecb_noinline
236 reqq_shift (etp_reqq *q)
237 {
238 int pri;
239
240 if (!q->size)
241 return 0;
242
243 --q->size;
244
245 for (pri = ETP_NUM_PRI; pri--; )
246 {
247 ETP_REQ *req = q->qs[pri];
248
249 if (req)
250 {
251 if (!(q->qs[pri] = (ETP_REQ *)req->next))
252 q->qe[pri] = 0;
253
254 return req;
255 }
256 }
257
258 abort ();
259 }
260
261 ETP_API_DECL int ecb_cold
262 etp_init (void (*want_poll)(void), void (*done_poll)(void))
263 {
264 X_MUTEX_CREATE (wrklock);
265 X_MUTEX_CREATE (reslock);
266 X_MUTEX_CREATE (reqlock);
267 X_COND_CREATE (reqwait);
268
269 reqq_init (&req_queue);
270 reqq_init (&res_queue);
271
272 wrk_first.next =
273 wrk_first.prev = &wrk_first;
274
275 started = 0;
276 idle = 0;
277 nreqs = 0;
278 nready = 0;
279 npending = 0;
280
281 want_poll_cb = want_poll;
282 done_poll_cb = done_poll;
283
284 return 0;
285 }
286
287 static void ecb_noinline ecb_cold
288 etp_proc_init (void)
289 {
290 #if HAVE_PRCTL_SET_NAME
291 /* provide a more sensible "thread name" */
292 char name[16 + 1];
293 const int namelen = sizeof (name) - 1;
294 int len;
295
296 prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
297 name [namelen] = 0;
298 len = strlen (name);
299 strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
300 prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
301 #endif
302 }
303
304 X_THREAD_PROC (etp_proc)
305 {
306 ETP_REQ *req;
307 struct timespec ts;
308 etp_worker *self = (etp_worker *)thr_arg;
309
310 etp_proc_init ();
311
312 /* try to distribute timeouts somewhat evenly */
313 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
314
315 for (;;)
316 {
317 ts.tv_sec = 0;
318
319 X_LOCK (reqlock);
320
321 for (;;)
322 {
323 req = reqq_shift (&req_queue);
324
325 if (req)
326 break;
327
328 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
329 {
330 X_UNLOCK (reqlock);
331 X_LOCK (wrklock);
332 --started;
333 X_UNLOCK (wrklock);
334 goto quit;
335 }
336
337 ++idle;
338
339 if (idle <= max_idle)
340 /* we are allowed to idle, so do so without any timeout */
341 X_COND_WAIT (reqwait, reqlock);
342 else
343 {
344 /* initialise timeout once */
345 if (!ts.tv_sec)
346 ts.tv_sec = time (0) + idle_timeout;
347
348 if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
349 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
350 }
351
352 --idle;
353 }
354
355 --nready;
356
357 X_UNLOCK (reqlock);
358
359 if (req->type == ETP_TYPE_QUIT)
360 goto quit;
361
362 ETP_EXECUTE (self, req);
363
364 X_LOCK (reslock);
365
366 ++npending;
367
368 if (!reqq_push (&res_queue, req) && want_poll_cb)
369 want_poll_cb ();
370
371 etp_worker_clear (self);
372
373 X_UNLOCK (reslock);
374 }
375
376 quit:
377 free (req);
378
379 X_LOCK (wrklock);
380 etp_worker_free (self);
381 X_UNLOCK (wrklock);
382
383 return 0;
384 }
385
386 static void ecb_cold
387 etp_start_thread (void)
388 {
389 etp_worker *wrk = calloc (1, sizeof (etp_worker));
390
391 /*TODO*/
392 assert (("unable to allocate worker thread data", wrk));
393
394 X_LOCK (wrklock);
395
396 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
397 {
398 wrk->prev = &wrk_first;
399 wrk->next = wrk_first.next;
400 wrk_first.next->prev = wrk;
401 wrk_first.next = wrk;
402 ++started;
403 }
404 else
405 free (wrk);
406
407 X_UNLOCK (wrklock);
408 }
409
410 static void
411 etp_maybe_start_thread (void)
412 {
413 if (ecb_expect_true (etp_nthreads () >= wanted))
414 return;
415
416 /* todo: maybe use idle here, but might be less exact */
417 if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ()))
418 return;
419
420 etp_start_thread ();
421 }
422
423 static void ecb_cold
424 etp_end_thread (void)
425 {
426 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
427
428 req->type = ETP_TYPE_QUIT;
429 req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
430
431 X_LOCK (reqlock);
432 reqq_push (&req_queue, req);
433 X_COND_SIGNAL (reqwait);
434 X_UNLOCK (reqlock);
435
436 X_LOCK (wrklock);
437 --started;
438 X_UNLOCK (wrklock);
439 }
440
441 ETP_API_DECL int
442 etp_poll (void)
443 {
444 unsigned int maxreqs;
445 unsigned int maxtime;
446 struct timeval tv_start, tv_now;
447
448 X_LOCK (reslock);
449 maxreqs = max_poll_reqs;
450 maxtime = max_poll_time;
451 X_UNLOCK (reslock);
452
453 if (maxtime)
454 gettimeofday (&tv_start, 0);
455
456 for (;;)
457 {
458 ETP_REQ *req;
459
460 etp_maybe_start_thread ();
461
462 X_LOCK (reslock);
463 req = reqq_shift (&res_queue);
464
465 if (req)
466 {
467 --npending;
468
469 if (!res_queue.size && done_poll_cb)
470 done_poll_cb ();
471 }
472
473 X_UNLOCK (reslock);
474
475 if (!req)
476 return 0;
477
478 X_LOCK (reqlock);
479 --nreqs;
480 X_UNLOCK (reqlock);
481
482 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
483 {
484 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
485 continue;
486 }
487 else
488 {
489 int res = ETP_FINISH (req);
490 if (ecb_expect_false (res))
491 return res;
492 }
493
494 if (ecb_expect_false (maxreqs && !--maxreqs))
495 break;
496
497 if (maxtime)
498 {
499 gettimeofday (&tv_now, 0);
500
501 if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
502 break;
503 }
504 }
505
506 errno = EAGAIN;
507 return -1;
508 }
509
510 ETP_API_DECL void
511 etp_grp_cancel (ETP_REQ *grp);
512
513 ETP_API_DECL void
514 etp_cancel (ETP_REQ *req)
515 {
516 req->cancelled = 1;
517
518 etp_grp_cancel (req);
519 }
520
521 ETP_API_DECL void
522 etp_grp_cancel (ETP_REQ *grp)
523 {
524 for (grp = grp->grp_first; grp; grp = grp->grp_next)
525 etp_cancel (grp);
526 }
527
528 ETP_API_DECL void
529 etp_submit (ETP_REQ *req)
530 {
531 req->pri -= ETP_PRI_MIN;
532
533 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
534 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
535
536 if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
537 {
538 /* I hope this is worth it :/ */
539 X_LOCK (reqlock);
540 ++nreqs;
541 X_UNLOCK (reqlock);
542
543 X_LOCK (reslock);
544
545 ++npending;
546
547 if (!reqq_push (&res_queue, req) && want_poll_cb)
548 want_poll_cb ();
549
550 X_UNLOCK (reslock);
551 }
552 else
553 {
554 X_LOCK (reqlock);
555 ++nreqs;
556 ++nready;
557 reqq_push (&req_queue, req);
558 X_COND_SIGNAL (reqwait);
559 X_UNLOCK (reqlock);
560
561 etp_maybe_start_thread ();
562 }
563 }
564
565 ETP_API_DECL void ecb_cold
566 etp_set_max_poll_time (double nseconds)
567 {
568 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
569 max_poll_time = nseconds * ETP_TICKS;
570 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
571 }
572
573 ETP_API_DECL void ecb_cold
574 etp_set_max_poll_reqs (unsigned int maxreqs)
575 {
576 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
577 max_poll_reqs = maxreqs;
578 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
579 }
580
581 ETP_API_DECL void ecb_cold
582 etp_set_max_idle (unsigned int nthreads)
583 {
584 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
585 max_idle = nthreads;
586 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
587 }
588
589 ETP_API_DECL void ecb_cold
590 etp_set_idle_timeout (unsigned int seconds)
591 {
592 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
593 idle_timeout = seconds;
594 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
595 }
596
597 ETP_API_DECL void ecb_cold
598 etp_set_min_parallel (unsigned int nthreads)
599 {
600 if (wanted < nthreads)
601 wanted = nthreads;
602 }
603
604 ETP_API_DECL void ecb_cold
605 etp_set_max_parallel (unsigned int nthreads)
606 {
607 if (wanted > nthreads)
608 wanted = nthreads;
609
610 while (started > wanted)
611 etp_end_thread ();
612 }
613