ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
(Generate patch)

Comparing libeio/etp.c (file contents):
Revision 1.3 by root, Thu Jun 25 15:59:57 2015 UTC vs.
Revision 1.5 by root, Thu Jun 25 17:40:24 2015 UTC

56 56
57#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1) 57#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
58 58
59#define ETP_TICKS ((1000000 + 1023) >> 10) 59#define ETP_TICKS ((1000000 + 1023) >> 10)
60 60
61enum {
62 ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
63 ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */
64};
65
61/* calculate time difference in ~1/ETP_TICKS of a second */ 66/* calculate time difference in ~1/ETP_TICKS of a second */
62ecb_inline int 67ecb_inline int
63etp_tvdiff (struct timeval *tv1, struct timeval *tv2) 68etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
64{ 69{
65 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS 70 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
101 } 106 }
102 107
103 return buf->ptr; 108 return buf->ptr;
104} 109}
105 110
111/*
112 * a somewhat faster data structure might be nice, but
113 * with 8 priorities this actually needs <20 insns
114 * per shift, the most expensive operation.
115 */
116typedef struct
117{
118 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
119 int size;
120} etp_reqq;
121
122struct etp_pool
123{
124 etp_reqq req_queue;
125 etp_reqq res_queue;
126};
127
128typedef struct etp_pool *etp_pool;
129
106typedef struct etp_worker 130typedef struct etp_worker
107{ 131{
132 etp_pool pool;
133
108 struct etp_tmpbuf tmpbuf; 134 struct etp_tmpbuf tmpbuf;
109 135
110 /* locked by wrklock */ 136 /* locked by wrklock */
111 struct etp_worker *prev, *next; 137 struct etp_worker *prev, *next;
112 138
139 165
140 free (wrk); 166 free (wrk);
141} 167}
142 168
143ETP_API_DECL unsigned int 169ETP_API_DECL unsigned int
144etp_nreqs (void) 170etp_nreqs (etp_pool pool)
145{ 171{
146 int retval; 172 int retval;
147 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 173 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
148 retval = nreqs; 174 retval = nreqs;
149 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 175 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
150 return retval; 176 return retval;
151} 177}
152 178
153ETP_API_DECL unsigned int 179ETP_API_DECL unsigned int
154etp_nready (void) 180etp_nready (etp_pool pool)
155{ 181{
156 unsigned int retval; 182 unsigned int retval;
157 183
158 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 184 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
159 retval = nready; 185 retval = nready;
161 187
162 return retval; 188 return retval;
163} 189}
164 190
165ETP_API_DECL unsigned int 191ETP_API_DECL unsigned int
166etp_npending (void) 192etp_npending (etp_pool pool)
167{ 193{
168 unsigned int retval; 194 unsigned int retval;
169 195
170 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 196 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
171 retval = npending; 197 retval = npending;
173 199
174 return retval; 200 return retval;
175} 201}
176 202
177ETP_API_DECL unsigned int 203ETP_API_DECL unsigned int
178etp_nthreads (void) 204etp_nthreads (etp_pool pool)
179{ 205{
180 unsigned int retval; 206 unsigned int retval;
181 207
182 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 208 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
183 retval = started; 209 retval = started;
184 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 210 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
185 211
186 return retval; 212 return retval;
187} 213}
188
189/*
190 * a somewhat faster data structure might be nice, but
191 * with 8 priorities this actually needs <20 insns
192 * per shift, the most expensive operation.
193 */
194typedef struct {
195 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
196 int size;
197} etp_reqq;
198
199static etp_reqq req_queue;
200static etp_reqq res_queue;
201 214
202static void ecb_noinline ecb_cold 215static void ecb_noinline ecb_cold
203reqq_init (etp_reqq *q) 216reqq_init (etp_reqq *q)
204{ 217{
205 int pri; 218 int pri;
252 265
253 abort (); 266 abort ();
254} 267}
255 268
256ETP_API_DECL int ecb_cold 269ETP_API_DECL int ecb_cold
257etp_init (void (*want_poll)(void), void (*done_poll)(void)) 270etp_init (etp_pool pool, void (*want_poll)(void), void (*done_poll)(void))
258{ 271{
259 X_MUTEX_CREATE (wrklock); 272 X_MUTEX_CREATE (wrklock);
260 X_MUTEX_CREATE (reslock); 273 X_MUTEX_CREATE (reslock);
261 X_MUTEX_CREATE (reqlock); 274 X_MUTEX_CREATE (reqlock);
262 X_COND_CREATE (reqwait); 275 X_COND_CREATE (reqwait);
263 276
264 reqq_init (&req_queue); 277 reqq_init (&pool->req_queue);
265 reqq_init (&res_queue); 278 reqq_init (&pool->res_queue);
266 279
267 wrk_first.next = 280 wrk_first.next =
268 wrk_first.prev = &wrk_first; 281 wrk_first.prev = &wrk_first;
269 282
270 started = 0; 283 started = 0;
277 done_poll_cb = done_poll; 290 done_poll_cb = done_poll;
278 291
279 return 0; 292 return 0;
280} 293}
281 294
282/* not yet in etp.c */ 295static void ecb_noinline ecb_cold
296etp_proc_init (void)
297{
298#if HAVE_PRCTL_SET_NAME
299 /* provide a more sensible "thread name" */
300 char name[16 + 1];
301 const int namelen = sizeof (name) - 1;
302 int len;
303
304 prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
305 name [namelen] = 0;
306 len = strlen (name);
307 strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
308 prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
309#endif
310}
311
283X_THREAD_PROC (etp_proc); 312X_THREAD_PROC (etp_proc)
313{
314 ETP_REQ *req;
315 struct timespec ts;
316 etp_worker *self = (etp_worker *)thr_arg;
317 etp_pool pool = self->pool;
318
319 etp_proc_init ();
320
321 /* try to distribute timeouts somewhat evenly */
322 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
323
324 for (;;)
325 {
326 ts.tv_sec = 0;
327
328 X_LOCK (reqlock);
329
330 for (;;)
331 {
332 req = reqq_shift (&pool->req_queue);
333
334 if (req)
335 break;
336
337 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
338 {
339 X_UNLOCK (reqlock);
340 X_LOCK (wrklock);
341 --started;
342 X_UNLOCK (wrklock);
343 goto quit;
344 }
345
346 ++idle;
347
348 if (idle <= max_idle)
349 /* we are allowed to idle, so do so without any timeout */
350 X_COND_WAIT (reqwait, reqlock);
351 else
352 {
353 /* initialise timeout once */
354 if (!ts.tv_sec)
355 ts.tv_sec = time (0) + idle_timeout;
356
357 if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
358 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
359 }
360
361 --idle;
362 }
363
364 --nready;
365
366 X_UNLOCK (reqlock);
367
368 if (req->type == ETP_TYPE_QUIT)
369 goto quit;
370
371 ETP_EXECUTE (self, req);
372
373 X_LOCK (reslock);
374
375 ++npending;
376
377 if (!reqq_push (&pool->res_queue, req) && want_poll_cb)
378 want_poll_cb ();
379
380 etp_worker_clear (self);
381
382 X_UNLOCK (reslock);
383 }
384
385quit:
386 free (req);
387
388 X_LOCK (wrklock);
389 etp_worker_free (self);
390 X_UNLOCK (wrklock);
391
392 return 0;
393}
284 394
285static void ecb_cold 395static void ecb_cold
286etp_start_thread (void) 396etp_start_thread (etp_pool pool)
287{ 397{
288 etp_worker *wrk = calloc (1, sizeof (etp_worker)); 398 etp_worker *wrk = calloc (1, sizeof (etp_worker));
289 399
290 /*TODO*/ 400 /*TODO*/
291 assert (("unable to allocate worker thread data", wrk)); 401 assert (("unable to allocate worker thread data", wrk));
402
403 wrk->pool = pool;
292 404
293 X_LOCK (wrklock); 405 X_LOCK (wrklock);
294 406
295 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) 407 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
296 { 408 {
305 417
306 X_UNLOCK (wrklock); 418 X_UNLOCK (wrklock);
307} 419}
308 420
309static void 421static void
310etp_maybe_start_thread (void) 422etp_maybe_start_thread (etp_pool pool)
311{ 423{
312 if (ecb_expect_true (etp_nthreads () >= wanted)) 424 if (ecb_expect_true (etp_nthreads (pool) >= wanted))
313 return; 425 return;
314 426
315 /* todo: maybe use idle here, but might be less exact */ 427 /* todo: maybe use idle here, but might be less exact */
316 if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ())) 428 if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
317 return; 429 return;
318 430
319 etp_start_thread (); 431 etp_start_thread (pool);
320} 432}
321 433
322static void ecb_cold 434static void ecb_cold
323etp_end_thread (void) 435etp_end_thread (etp_pool pool)
324{ 436{
325 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */ 437 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
326 438
327 req->type = ETP_TYPE_QUIT; 439 req->type = ETP_TYPE_QUIT;
328 req->pri = ETP_PRI_MAX - ETP_PRI_MIN; 440 req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
329 441
330 X_LOCK (reqlock); 442 X_LOCK (reqlock);
331 reqq_push (&req_queue, req); 443 reqq_push (&pool->req_queue, req);
332 X_COND_SIGNAL (reqwait); 444 X_COND_SIGNAL (reqwait);
333 X_UNLOCK (reqlock); 445 X_UNLOCK (reqlock);
334 446
335 X_LOCK (wrklock); 447 X_LOCK (wrklock);
336 --started; 448 --started;
337 X_UNLOCK (wrklock); 449 X_UNLOCK (wrklock);
338} 450}
339 451
340ETP_API_DECL int 452ETP_API_DECL int
341etp_poll (void) 453etp_poll (etp_pool pool)
342{ 454{
343 unsigned int maxreqs; 455 unsigned int maxreqs;
344 unsigned int maxtime; 456 unsigned int maxtime;
345 struct timeval tv_start, tv_now; 457 struct timeval tv_start, tv_now;
346 458
354 466
355 for (;;) 467 for (;;)
356 { 468 {
357 ETP_REQ *req; 469 ETP_REQ *req;
358 470
359 etp_maybe_start_thread (); 471 etp_maybe_start_thread (pool);
360 472
361 X_LOCK (reslock); 473 X_LOCK (reslock);
362 req = reqq_shift (&res_queue); 474 req = reqq_shift (&pool->res_queue);
363 475
364 if (req) 476 if (req)
365 { 477 {
366 --npending; 478 --npending;
367 479
368 if (!res_queue.size && done_poll_cb) 480 if (!pool->res_queue.size && done_poll_cb)
369 done_poll_cb (); 481 done_poll_cb ();
370 } 482 }
371 483
372 X_UNLOCK (reslock); 484 X_UNLOCK (reslock);
373 485
378 --nreqs; 490 --nreqs;
379 X_UNLOCK (reqlock); 491 X_UNLOCK (reqlock);
380 492
381 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size)) 493 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
382 { 494 {
383 req->int1 = 1; /* mark request as delayed */ 495 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
384 continue; 496 continue;
385 } 497 }
386 else 498 else
387 { 499 {
388 int res = ETP_FINISH (req); 500 int res = ETP_FINISH (req);
405 errno = EAGAIN; 517 errno = EAGAIN;
406 return -1; 518 return -1;
407} 519}
408 520
409ETP_API_DECL void 521ETP_API_DECL void
410etp_grp_cancel (ETP_REQ *grp); 522etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
411 523
412ETP_API_DECL void 524ETP_API_DECL void
413etp_cancel (ETP_REQ *req) 525etp_cancel (etp_pool pool, ETP_REQ *req)
414{ 526{
415 req->cancelled = 1; 527 req->cancelled = 1;
416 528
417 etp_grp_cancel (req); 529 etp_grp_cancel (pool, req);
418} 530}
419 531
420ETP_API_DECL void 532ETP_API_DECL void
421etp_grp_cancel (ETP_REQ *grp) 533etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
422{ 534{
423 for (grp = grp->grp_first; grp; grp = grp->grp_next) 535 for (grp = grp->grp_first; grp; grp = grp->grp_next)
424 etp_cancel (grp); 536 etp_cancel (pool, grp);
425} 537}
426 538
427ETP_API_DECL void 539ETP_API_DECL void
428etp_submit (ETP_REQ *req) 540etp_submit (etp_pool pool, ETP_REQ *req)
429{ 541{
430 req->pri -= ETP_PRI_MIN; 542 req->pri -= ETP_PRI_MIN;
431 543
432 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN; 544 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
433 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN; 545 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
441 553
442 X_LOCK (reslock); 554 X_LOCK (reslock);
443 555
444 ++npending; 556 ++npending;
445 557
446 if (!reqq_push (&res_queue, req) && want_poll_cb) 558 if (!reqq_push (&pool->res_queue, req) && want_poll_cb)
447 want_poll_cb (); 559 want_poll_cb ();
448 560
449 X_UNLOCK (reslock); 561 X_UNLOCK (reslock);
450 } 562 }
451 else 563 else
452 { 564 {
453 X_LOCK (reqlock); 565 X_LOCK (reqlock);
454 ++nreqs; 566 ++nreqs;
455 ++nready; 567 ++nready;
456 reqq_push (&req_queue, req); 568 reqq_push (&pool->req_queue, req);
457 X_COND_SIGNAL (reqwait); 569 X_COND_SIGNAL (reqwait);
458 X_UNLOCK (reqlock); 570 X_UNLOCK (reqlock);
459 571
460 etp_maybe_start_thread (); 572 etp_maybe_start_thread (pool);
461 } 573 }
462} 574}
463 575
464ETP_API_DECL void ecb_cold 576ETP_API_DECL void ecb_cold
465etp_set_max_poll_time (double nseconds) 577etp_set_max_poll_time (etp_pool pool, double nseconds)
466{ 578{
467 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 579 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
468 max_poll_time = nseconds * ETP_TICKS; 580 max_poll_time = nseconds * ETP_TICKS;
469 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); 581 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
470} 582}
471 583
472ETP_API_DECL void ecb_cold 584ETP_API_DECL void ecb_cold
473etp_set_max_poll_reqs (unsigned int maxreqs) 585etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
474{ 586{
475 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 587 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
476 max_poll_reqs = maxreqs; 588 max_poll_reqs = maxreqs;
477 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); 589 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
478} 590}
479 591
480ETP_API_DECL void ecb_cold 592ETP_API_DECL void ecb_cold
481etp_set_max_idle (unsigned int nthreads) 593etp_set_max_idle (etp_pool pool, unsigned int nthreads)
482{ 594{
483 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 595 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
484 max_idle = nthreads; 596 max_idle = nthreads;
485 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 597 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
486} 598}
487 599
488ETP_API_DECL void ecb_cold 600ETP_API_DECL void ecb_cold
489etp_set_idle_timeout (unsigned int seconds) 601etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
490{ 602{
491 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 603 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
492 idle_timeout = seconds; 604 idle_timeout = seconds;
493 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 605 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
494} 606}
495 607
496ETP_API_DECL void ecb_cold 608ETP_API_DECL void ecb_cold
497etp_set_min_parallel (unsigned int nthreads) 609etp_set_min_parallel (etp_pool pool, unsigned int nthreads)
498{ 610{
499 if (wanted < nthreads) 611 if (wanted < nthreads)
500 wanted = nthreads; 612 wanted = nthreads;
501} 613}
502 614
503ETP_API_DECL void ecb_cold 615ETP_API_DECL void ecb_cold
504etp_set_max_parallel (unsigned int nthreads) 616etp_set_max_parallel (etp_pool pool, unsigned int nthreads)
505{ 617{
506 if (wanted > nthreads) 618 if (wanted > nthreads)
507 wanted = nthreads; 619 wanted = nthreads;
508 620
509 while (started > wanted) 621 while (started > wanted)
510 etp_end_thread (); 622 etp_end_thread (pool);
511} 623}
512 624

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines