ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
(Generate patch)

Comparing libeio/etp.c (file contents):
Revision 1.6 by root, Thu Jun 25 18:08:47 2015 UTC vs.
Revision 1.9 by root, Thu Jun 25 20:41:03 2015 UTC

1/* 1/*
2 * libetp implementation 2 * libetp implementation
3 * 3 *
4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libetp@schmorp.de> 4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann <libetp@schmorp.de>
5 * All rights reserved. 5 * All rights reserved.
6 * 6 *
7 * Redistribution and use in source and binary forms, with or without modifica- 7 * Redistribution and use in source and binary forms, with or without modifica-
8 * tion, are permitted provided that the following conditions are met: 8 * tion, are permitted provided that the following conditions are met:
9 * 9 *
105{ 105{
106 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ 106 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
107 int size; 107 int size;
108} etp_reqq; 108} etp_reqq;
109 109
110typedef struct etp_pool *etp_pool;
111
112typedef struct etp_worker
113{
114 etp_pool pool;
115
116 struct etp_tmpbuf tmpbuf;
117
118 /* locked by pool->wrklock */
119 struct etp_worker *prev, *next;
120
121 xthread_t tid;
122
123#ifdef ETP_WORKER_COMMON
124 ETP_WORKER_COMMON
125#endif
126} etp_worker;
127
110struct etp_pool 128struct etp_pool
111{ 129{
112 void *userdata; 130 void *userdata;
113 131
114 etp_reqq req_queue; 132 etp_reqq req_queue;
130 148
131 xmutex_t wrklock; 149 xmutex_t wrklock;
132 xmutex_t reslock; 150 xmutex_t reslock;
133 xmutex_t reqlock; 151 xmutex_t reqlock;
134 xcond_t reqwait; 152 xcond_t reqwait;
153
154 etp_worker wrk_first;
135}; 155};
136
137typedef struct etp_pool *etp_pool;
138
139typedef struct etp_worker
140{
141 etp_pool pool;
142
143 struct etp_tmpbuf tmpbuf;
144
145 /* locked by pool->wrklock */
146 struct etp_worker *prev, *next;
147
148 xthread_t tid;
149
150#ifdef ETP_WORKER_COMMON
151 ETP_WORKER_COMMON
152#endif
153} etp_worker;
154
155static etp_worker wrk_first; /* NOT etp */
156 156
157#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock) 157#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
158#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock) 158#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
159 159
160/* worker threads management */ 160/* worker threads management */
274 274
275 abort (); 275 abort ();
276} 276}
277 277
278ETP_API_DECL int ecb_cold 278ETP_API_DECL int ecb_cold
279etp_init (etp_pool pool, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata)) 279etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
280{ 280{
281 X_MUTEX_CREATE (pool->wrklock); 281 X_MUTEX_CREATE (pool->wrklock);
282 X_MUTEX_CREATE (pool->reslock); 282 X_MUTEX_CREATE (pool->reslock);
283 X_MUTEX_CREATE (pool->reqlock); 283 X_MUTEX_CREATE (pool->reqlock);
284 X_COND_CREATE (pool->reqwait); 284 X_COND_CREATE (pool->reqwait);
285 285
286 reqq_init (&pool->req_queue); 286 reqq_init (&pool->req_queue);
287 reqq_init (&pool->res_queue); 287 reqq_init (&pool->res_queue);
288 288
289 wrk_first.next = 289 pool->wrk_first.next =
290 wrk_first.prev = &wrk_first; 290 pool->wrk_first.prev = &pool->wrk_first;
291 291
292 pool->started = 0; 292 pool->started = 0;
293 pool->idle = 0; 293 pool->idle = 0;
294 pool->nreqs = 0; 294 pool->nreqs = 0;
295 pool->nready = 0; 295 pool->nready = 0;
297 pool->wanted = 4; 297 pool->wanted = 4;
298 298
299 pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */ 299 pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */
300 pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */ 300 pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
301 301
302 pool->userdata = userdata;
302 pool->want_poll_cb = want_poll; 303 pool->want_poll_cb = want_poll;
303 pool->done_poll_cb = done_poll; 304 pool->done_poll_cb = done_poll;
304 305
305 return 0; 306 return 0;
306} 307}
342 343
343 for (;;) 344 for (;;)
344 { 345 {
345 req = reqq_shift (&pool->req_queue); 346 req = reqq_shift (&pool->req_queue);
346 347
347 if (req) 348 if (ecb_expect_true (req))
348 break; 349 break;
349 350
350 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */ 351 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
351 { 352 {
352 X_UNLOCK (pool->reqlock); 353 X_UNLOCK (pool->reqlock);
376 377
377 --pool->nready; 378 --pool->nready;
378 379
379 X_UNLOCK (pool->reqlock); 380 X_UNLOCK (pool->reqlock);
380 381
381 if (req->type == ETP_TYPE_QUIT) 382 if (ecb_expect_false (req->type == ETP_TYPE_QUIT))
382 goto quit; 383 goto quit;
383 384
384 ETP_EXECUTE (self, req); 385 ETP_EXECUTE (self, req);
385 386
386 X_LOCK (pool->reslock); 387 X_LOCK (pool->reslock);
387 388
388 ++pool->npending; 389 ++pool->npending;
389 390
390 if (!reqq_push (&pool->res_queue, req)) 391 if (!reqq_push (&pool->res_queue, req))
391 ETP_WANT_POLL (poll); 392 ETP_WANT_POLL (pool);
392 393
393 etp_worker_clear (self); 394 etp_worker_clear (self);
394 395
395 X_UNLOCK (pool->reslock); 396 X_UNLOCK (pool->reslock);
396 } 397 }
417 418
418 X_LOCK (pool->wrklock); 419 X_LOCK (pool->wrklock);
419 420
420 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) 421 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
421 { 422 {
422 wrk->prev = &wrk_first; 423 wrk->prev = &pool->wrk_first;
423 wrk->next = wrk_first.next; 424 wrk->next = pool->wrk_first.next;
424 wrk_first.next->prev = wrk; 425 pool->wrk_first.next->prev = wrk;
425 wrk_first.next = wrk; 426 pool->wrk_first.next = wrk;
426 ++pool->started; 427 ++pool->started;
427 } 428 }
428 else 429 else
429 free (wrk); 430 free (wrk);
430 431
484 etp_maybe_start_thread (pool); 485 etp_maybe_start_thread (pool);
485 486
486 X_LOCK (pool->reslock); 487 X_LOCK (pool->reslock);
487 req = reqq_shift (&pool->res_queue); 488 req = reqq_shift (&pool->res_queue);
488 489
489 if (req) 490 if (ecb_expect_true (req))
490 { 491 {
491 --pool->npending; 492 --pool->npending;
492 493
493 if (!pool->res_queue.size) 494 if (!pool->res_queue.size)
494 ETP_DONE_POLL (pool->userdata); 495 ETP_DONE_POLL (pool);
495 } 496 }
496 497
497 X_UNLOCK (pool->reslock); 498 X_UNLOCK (pool->reslock);
498 499
499 if (!req) 500 if (ecb_expect_false (!req))
500 return 0; 501 return 0;
501 502
502 X_LOCK (pool->reqlock); 503 X_LOCK (pool->reqlock);
503 --pool->nreqs; 504 --pool->nreqs;
504 X_UNLOCK (pool->reqlock); 505 X_UNLOCK (pool->reqlock);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines