ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
(Generate patch)

Comparing libeio/etp.c (file contents):
Revision 1.1 by root, Sun Apr 14 09:43:19 2013 UTC vs.
Revision 1.5 by root, Thu Jun 25 17:40:24 2015 UTC

56 56
57#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1) 57#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
58 58
59#define ETP_TICKS ((1000000 + 1023) >> 10) 59#define ETP_TICKS ((1000000 + 1023) >> 10)
60 60
61enum {
62 ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
63 ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */
64};
65
61/* calculate time difference in ~1/ETP_TICKS of a second */ 66/* calculate time difference in ~1/ETP_TICKS of a second */
62ecb_inline int 67ecb_inline int
63etp_tvdiff (struct timeval *tv1, struct timeval *tv2) 68etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
64{ 69{
65 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS 70 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
83static xmutex_t wrklock; 88static xmutex_t wrklock;
84static xmutex_t reslock; 89static xmutex_t reslock;
85static xmutex_t reqlock; 90static xmutex_t reqlock;
86static xcond_t reqwait; 91static xcond_t reqwait;
87 92
93struct etp_tmpbuf
94{
95 void *ptr;
96 int len;
97};
98
99static void *
100etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
101{
102 if (buf->len < len)
103 {
104 free (buf->ptr);
105 buf->ptr = malloc (buf->len = len);
106 }
107
108 return buf->ptr;
109}
110
111/*
112 * a somewhat faster data structure might be nice, but
113 * with 8 priorities this actually needs <20 insns
114 * per shift, the most expensive operation.
115 */
116typedef struct
117{
118 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
119 int size;
120} etp_reqq;
121
122struct etp_pool
123{
124 etp_reqq req_queue;
125 etp_reqq res_queue;
126};
127
128typedef struct etp_pool *etp_pool;
129
88typedef struct etp_worker 130typedef struct etp_worker
89{ 131{
132 etp_pool pool;
133
90 struct tmpbuf tmpbuf; 134 struct etp_tmpbuf tmpbuf;
91 135
92 /* locked by wrklock */ 136 /* locked by wrklock */
93 struct etp_worker *prev, *next; 137 struct etp_worker *prev, *next;
94 138
95 xthread_t tid; 139 xthread_t tid;
121 165
122 free (wrk); 166 free (wrk);
123} 167}
124 168
125ETP_API_DECL unsigned int 169ETP_API_DECL unsigned int
126etp_nreqs (void) 170etp_nreqs (etp_pool pool)
127{ 171{
128 int retval; 172 int retval;
129 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 173 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
130 retval = nreqs; 174 retval = nreqs;
131 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 175 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
132 return retval; 176 return retval;
133} 177}
134 178
135ETP_API_DECL unsigned int 179ETP_API_DECL unsigned int
136etp_nready (void) 180etp_nready (etp_pool pool)
137{ 181{
138 unsigned int retval; 182 unsigned int retval;
139 183
140 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 184 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
141 retval = nready; 185 retval = nready;
143 187
144 return retval; 188 return retval;
145} 189}
146 190
147ETP_API_DECL unsigned int 191ETP_API_DECL unsigned int
148etp_npending (void) 192etp_npending (etp_pool pool)
149{ 193{
150 unsigned int retval; 194 unsigned int retval;
151 195
152 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 196 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
153 retval = npending; 197 retval = npending;
155 199
156 return retval; 200 return retval;
157} 201}
158 202
159ETP_API_DECL unsigned int 203ETP_API_DECL unsigned int
160etp_nthreads (void) 204etp_nthreads (etp_pool pool)
161{ 205{
162 unsigned int retval; 206 unsigned int retval;
163 207
164 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 208 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
165 retval = started; 209 retval = started;
166 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 210 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
167 211
168 return retval; 212 return retval;
169} 213}
170 214
171/*
172 * a somewhat faster data structure might be nice, but
173 * with 8 priorities this actually needs <20 insns
174 * per shift, the most expensive operation.
175 */
176typedef struct {
177 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
178 int size;
179} etp_reqq;
180
181static etp_reqq req_queue;
182static etp_reqq res_queue;
183
184static void ecb_noinline ecb_cold 215static void ecb_noinline ecb_cold
185reqq_init (etp_reqq *q) 216reqq_init (etp_reqq *q)
186{ 217{
187 int pri; 218 int pri;
188 219
219 250
220 --q->size; 251 --q->size;
221 252
222 for (pri = ETP_NUM_PRI; pri--; ) 253 for (pri = ETP_NUM_PRI; pri--; )
223 { 254 {
224 eio_req *req = q->qs[pri]; 255 ETP_REQ *req = q->qs[pri];
225 256
226 if (req) 257 if (req)
227 { 258 {
228 if (!(q->qs[pri] = (eio_req *)req->next)) 259 if (!(q->qs[pri] = (ETP_REQ *)req->next))
229 q->qe[pri] = 0; 260 q->qe[pri] = 0;
230 261
231 return req; 262 return req;
232 } 263 }
233 } 264 }
234 265
235 abort (); 266 abort ();
236} 267}
237 268
238ETP_API_DECL int ecb_cold 269ETP_API_DECL int ecb_cold
239etp_init (void (*want_poll)(void), void (*done_poll)(void)) 270etp_init (etp_pool pool, void (*want_poll)(void), void (*done_poll)(void))
240{ 271{
241 X_MUTEX_CREATE (wrklock); 272 X_MUTEX_CREATE (wrklock);
242 X_MUTEX_CREATE (reslock); 273 X_MUTEX_CREATE (reslock);
243 X_MUTEX_CREATE (reqlock); 274 X_MUTEX_CREATE (reqlock);
244 X_COND_CREATE (reqwait); 275 X_COND_CREATE (reqwait);
245 276
246 reqq_init (&req_queue); 277 reqq_init (&pool->req_queue);
247 reqq_init (&res_queue); 278 reqq_init (&pool->res_queue);
248 279
249 wrk_first.next = 280 wrk_first.next =
250 wrk_first.prev = &wrk_first; 281 wrk_first.prev = &wrk_first;
251 282
252 started = 0; 283 started = 0;
259 done_poll_cb = done_poll; 290 done_poll_cb = done_poll;
260 291
261 return 0; 292 return 0;
262} 293}
263 294
264/* not yet in etp.c */ 295static void ecb_noinline ecb_cold
296etp_proc_init (void)
297{
298#if HAVE_PRCTL_SET_NAME
299 /* provide a more sensible "thread name" */
300 char name[16 + 1];
301 const int namelen = sizeof (name) - 1;
302 int len;
303
304 prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
305 name [namelen] = 0;
306 len = strlen (name);
307 strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
308 prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
309#endif
310}
311
265X_THREAD_PROC (etp_proc); 312X_THREAD_PROC (etp_proc)
313{
314 ETP_REQ *req;
315 struct timespec ts;
316 etp_worker *self = (etp_worker *)thr_arg;
317 etp_pool pool = self->pool;
318
319 etp_proc_init ();
320
321 /* try to distribute timeouts somewhat evenly */
322 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
323
324 for (;;)
325 {
326 ts.tv_sec = 0;
327
328 X_LOCK (reqlock);
329
330 for (;;)
331 {
332 req = reqq_shift (&pool->req_queue);
333
334 if (req)
335 break;
336
337 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
338 {
339 X_UNLOCK (reqlock);
340 X_LOCK (wrklock);
341 --started;
342 X_UNLOCK (wrklock);
343 goto quit;
344 }
345
346 ++idle;
347
348 if (idle <= max_idle)
349 /* we are allowed to idle, so do so without any timeout */
350 X_COND_WAIT (reqwait, reqlock);
351 else
352 {
353 /* initialise timeout once */
354 if (!ts.tv_sec)
355 ts.tv_sec = time (0) + idle_timeout;
356
357 if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
358 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
359 }
360
361 --idle;
362 }
363
364 --nready;
365
366 X_UNLOCK (reqlock);
367
368 if (req->type == ETP_TYPE_QUIT)
369 goto quit;
370
371 ETP_EXECUTE (self, req);
372
373 X_LOCK (reslock);
374
375 ++npending;
376
377 if (!reqq_push (&pool->res_queue, req) && want_poll_cb)
378 want_poll_cb ();
379
380 etp_worker_clear (self);
381
382 X_UNLOCK (reslock);
383 }
384
385quit:
386 free (req);
387
388 X_LOCK (wrklock);
389 etp_worker_free (self);
390 X_UNLOCK (wrklock);
391
392 return 0;
393}
266 394
267static void ecb_cold 395static void ecb_cold
268etp_start_thread (void) 396etp_start_thread (etp_pool pool)
269{ 397{
270 etp_worker *wrk = calloc (1, sizeof (etp_worker)); 398 etp_worker *wrk = calloc (1, sizeof (etp_worker));
271 399
272 /*TODO*/ 400 /*TODO*/
273 assert (("unable to allocate worker thread data", wrk)); 401 assert (("unable to allocate worker thread data", wrk));
402
403 wrk->pool = pool;
274 404
275 X_LOCK (wrklock); 405 X_LOCK (wrklock);
276 406
277 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) 407 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
278 { 408 {
287 417
288 X_UNLOCK (wrklock); 418 X_UNLOCK (wrklock);
289} 419}
290 420
291static void 421static void
292etp_maybe_start_thread (void) 422etp_maybe_start_thread (etp_pool pool)
293{ 423{
294 if (ecb_expect_true (etp_nthreads () >= wanted)) 424 if (ecb_expect_true (etp_nthreads (pool) >= wanted))
295 return; 425 return;
296 426
297 /* todo: maybe use idle here, but might be less exact */ 427 /* todo: maybe use idle here, but might be less exact */
298 if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ())) 428 if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
299 return; 429 return;
300 430
301 etp_start_thread (); 431 etp_start_thread (pool);
302} 432}
303 433
304static void ecb_cold 434static void ecb_cold
305etp_end_thread (void) 435etp_end_thread (etp_pool pool)
306{ 436{
307 eio_req *req = calloc (1, sizeof (eio_req)); /* will be freed by worker */ 437 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
308 438
309 req->type = ETP_TYPE_QUIT; 439 req->type = ETP_TYPE_QUIT;
310 req->pri = ETP_PRI_MAX - ETP_PRI_MIN; 440 req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
311 441
312 X_LOCK (reqlock); 442 X_LOCK (reqlock);
313 reqq_push (&req_queue, req); 443 reqq_push (&pool->req_queue, req);
314 X_COND_SIGNAL (reqwait); 444 X_COND_SIGNAL (reqwait);
315 X_UNLOCK (reqlock); 445 X_UNLOCK (reqlock);
316 446
317 X_LOCK (wrklock); 447 X_LOCK (wrklock);
318 --started; 448 --started;
319 X_UNLOCK (wrklock); 449 X_UNLOCK (wrklock);
320} 450}
321 451
322ETP_API_DECL int 452ETP_API_DECL int
323etp_poll (void) 453etp_poll (etp_pool pool)
324{ 454{
325 unsigned int maxreqs; 455 unsigned int maxreqs;
326 unsigned int maxtime; 456 unsigned int maxtime;
327 struct timeval tv_start, tv_now; 457 struct timeval tv_start, tv_now;
328 458
336 466
337 for (;;) 467 for (;;)
338 { 468 {
339 ETP_REQ *req; 469 ETP_REQ *req;
340 470
341 etp_maybe_start_thread (); 471 etp_maybe_start_thread (pool);
342 472
343 X_LOCK (reslock); 473 X_LOCK (reslock);
344 req = reqq_shift (&res_queue); 474 req = reqq_shift (&pool->res_queue);
345 475
346 if (req) 476 if (req)
347 { 477 {
348 --npending; 478 --npending;
349 479
350 if (!res_queue.size && done_poll_cb) 480 if (!pool->res_queue.size && done_poll_cb)
351 done_poll_cb (); 481 done_poll_cb ();
352 } 482 }
353 483
354 X_UNLOCK (reslock); 484 X_UNLOCK (reslock);
355 485
360 --nreqs; 490 --nreqs;
361 X_UNLOCK (reqlock); 491 X_UNLOCK (reqlock);
362 492
363 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size)) 493 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
364 { 494 {
365 req->int1 = 1; /* mark request as delayed */ 495 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
366 continue; 496 continue;
367 } 497 }
368 else 498 else
369 { 499 {
370 int res = ETP_FINISH (req); 500 int res = ETP_FINISH (req);
387 errno = EAGAIN; 517 errno = EAGAIN;
388 return -1; 518 return -1;
389} 519}
390 520
391ETP_API_DECL void 521ETP_API_DECL void
392etp_grp_cancel (ETP_REQ *grp); 522etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
393 523
394ETP_API_DECL void 524ETP_API_DECL void
395etp_cancel (ETP_REQ *req) 525etp_cancel (etp_pool pool, ETP_REQ *req)
396{ 526{
397 req->cancelled = 1; 527 req->cancelled = 1;
398 528
399 etp_grp_cancel (req); 529 etp_grp_cancel (pool, req);
400} 530}
401 531
402ETP_API_DECL void 532ETP_API_DECL void
403etp_grp_cancel (ETP_REQ *grp) 533etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
404{ 534{
405 for (grp = grp->grp_first; grp; grp = grp->grp_next) 535 for (grp = grp->grp_first; grp; grp = grp->grp_next)
406 etp_cancel (grp); 536 etp_cancel (pool, grp);
407} 537}
408 538
409ETP_API_DECL void 539ETP_API_DECL void
410etp_submit (ETP_REQ *req) 540etp_submit (etp_pool pool, ETP_REQ *req)
411{ 541{
412 req->pri -= ETP_PRI_MIN; 542 req->pri -= ETP_PRI_MIN;
413 543
414 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN; 544 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
415 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN; 545 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
423 553
424 X_LOCK (reslock); 554 X_LOCK (reslock);
425 555
426 ++npending; 556 ++npending;
427 557
428 if (!reqq_push (&res_queue, req) && want_poll_cb) 558 if (!reqq_push (&pool->res_queue, req) && want_poll_cb)
429 want_poll_cb (); 559 want_poll_cb ();
430 560
431 X_UNLOCK (reslock); 561 X_UNLOCK (reslock);
432 } 562 }
433 else 563 else
434 { 564 {
435 X_LOCK (reqlock); 565 X_LOCK (reqlock);
436 ++nreqs; 566 ++nreqs;
437 ++nready; 567 ++nready;
438 reqq_push (&req_queue, req); 568 reqq_push (&pool->req_queue, req);
439 X_COND_SIGNAL (reqwait); 569 X_COND_SIGNAL (reqwait);
440 X_UNLOCK (reqlock); 570 X_UNLOCK (reqlock);
441 571
442 etp_maybe_start_thread (); 572 etp_maybe_start_thread (pool);
443 } 573 }
444} 574}
445 575
446ETP_API_DECL void ecb_cold 576ETP_API_DECL void ecb_cold
447etp_set_max_poll_time (double nseconds) 577etp_set_max_poll_time (etp_pool pool, double nseconds)
448{ 578{
449 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 579 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
450 max_poll_time = nseconds * ETP_TICKS; 580 max_poll_time = nseconds * ETP_TICKS;
451 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); 581 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
452} 582}
453 583
454ETP_API_DECL void ecb_cold 584ETP_API_DECL void ecb_cold
455etp_set_max_poll_reqs (unsigned int maxreqs) 585etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
456{ 586{
457 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 587 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
458 max_poll_reqs = maxreqs; 588 max_poll_reqs = maxreqs;
459 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); 589 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
460} 590}
461 591
462ETP_API_DECL void ecb_cold 592ETP_API_DECL void ecb_cold
463etp_set_max_idle (unsigned int nthreads) 593etp_set_max_idle (etp_pool pool, unsigned int nthreads)
464{ 594{
465 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 595 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
466 max_idle = nthreads; 596 max_idle = nthreads;
467 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 597 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
468} 598}
469 599
470ETP_API_DECL void ecb_cold 600ETP_API_DECL void ecb_cold
471etp_set_idle_timeout (unsigned int seconds) 601etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
472{ 602{
473 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 603 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
474 idle_timeout = seconds; 604 idle_timeout = seconds;
475 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 605 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
476} 606}
477 607
478ETP_API_DECL void ecb_cold 608ETP_API_DECL void ecb_cold
479etp_set_min_parallel (unsigned int nthreads) 609etp_set_min_parallel (etp_pool pool, unsigned int nthreads)
480{ 610{
481 if (wanted < nthreads) 611 if (wanted < nthreads)
482 wanted = nthreads; 612 wanted = nthreads;
483} 613}
484 614
485ETP_API_DECL void ecb_cold 615ETP_API_DECL void ecb_cold
486etp_set_max_parallel (unsigned int nthreads) 616etp_set_max_parallel (etp_pool pool, unsigned int nthreads)
487{ 617{
488 if (wanted > nthreads) 618 if (wanted > nthreads)
489 wanted = nthreads; 619 wanted = nthreads;
490 620
491 while (started > wanted) 621 while (started > wanted)
492 etp_end_thread (); 622 etp_end_thread (pool);
493} 623}
494 624

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines