ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
(Generate patch)

Comparing libeio/etp.c (file contents):
Revision 1.2 by root, Mon Aug 18 04:26:03 2014 UTC vs.
Revision 1.4 by root, Thu Jun 25 17:05:07 2015 UTC

56 56
57#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1) 57#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
58 58
59#define ETP_TICKS ((1000000 + 1023) >> 10) 59#define ETP_TICKS ((1000000 + 1023) >> 10)
60 60
61enum {
62 ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
63 ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */
64};
65
61/* calculate time difference in ~1/ETP_TICKS of a second */ 66/* calculate time difference in ~1/ETP_TICKS of a second */
62ecb_inline int 67ecb_inline int
63etp_tvdiff (struct timeval *tv1, struct timeval *tv2) 68etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
64{ 69{
65 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS 70 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
83static xmutex_t wrklock; 88static xmutex_t wrklock;
84static xmutex_t reslock; 89static xmutex_t reslock;
85static xmutex_t reqlock; 90static xmutex_t reqlock;
86static xcond_t reqwait; 91static xcond_t reqwait;
87 92
93struct etp_tmpbuf
94{
95 void *ptr;
96 int len;
97};
98
99static void *
100etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
101{
102 if (buf->len < len)
103 {
104 free (buf->ptr);
105 buf->ptr = malloc (buf->len = len);
106 }
107
108 return buf->ptr;
109}
110
88typedef struct etp_worker 111typedef struct etp_worker
89{ 112{
90 struct tmpbuf tmpbuf; 113 struct etp_tmpbuf tmpbuf;
91 114
92 /* locked by wrklock */ 115 /* locked by wrklock */
93 struct etp_worker *prev, *next; 116 struct etp_worker *prev, *next;
94 117
95 xthread_t tid; 118 xthread_t tid;
259 done_poll_cb = done_poll; 282 done_poll_cb = done_poll;
260 283
261 return 0; 284 return 0;
262} 285}
263 286
264/* not yet in etp.c */ 287static void ecb_noinline ecb_cold
288etp_proc_init (void)
289{
290#if HAVE_PRCTL_SET_NAME
291 /* provide a more sensible "thread name" */
292 char name[16 + 1];
293 const int namelen = sizeof (name) - 1;
294 int len;
295
296 prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
297 name [namelen] = 0;
298 len = strlen (name);
299 strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
300 prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
301#endif
302}
303
265X_THREAD_PROC (etp_proc); 304X_THREAD_PROC (etp_proc)
305{
306 ETP_REQ *req;
307 struct timespec ts;
308 etp_worker *self = (etp_worker *)thr_arg;
309
310 etp_proc_init ();
311
312 /* try to distribute timeouts somewhat evenly */
313 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
314
315 for (;;)
316 {
317 ts.tv_sec = 0;
318
319 X_LOCK (reqlock);
320
321 for (;;)
322 {
323 req = reqq_shift (&req_queue);
324
325 if (req)
326 break;
327
328 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
329 {
330 X_UNLOCK (reqlock);
331 X_LOCK (wrklock);
332 --started;
333 X_UNLOCK (wrklock);
334 goto quit;
335 }
336
337 ++idle;
338
339 if (idle <= max_idle)
340 /* we are allowed to idle, so do so without any timeout */
341 X_COND_WAIT (reqwait, reqlock);
342 else
343 {
344 /* initialise timeout once */
345 if (!ts.tv_sec)
346 ts.tv_sec = time (0) + idle_timeout;
347
348 if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
349 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
350 }
351
352 --idle;
353 }
354
355 --nready;
356
357 X_UNLOCK (reqlock);
358
359 if (req->type == ETP_TYPE_QUIT)
360 goto quit;
361
362 ETP_EXECUTE (self, req);
363
364 X_LOCK (reslock);
365
366 ++npending;
367
368 if (!reqq_push (&res_queue, req) && want_poll_cb)
369 want_poll_cb ();
370
371 etp_worker_clear (self);
372
373 X_UNLOCK (reslock);
374 }
375
376quit:
377 free (req);
378
379 X_LOCK (wrklock);
380 etp_worker_free (self);
381 X_UNLOCK (wrklock);
382
383 return 0;
384}
266 385
267static void ecb_cold 386static void ecb_cold
268etp_start_thread (void) 387etp_start_thread (void)
269{ 388{
270 etp_worker *wrk = calloc (1, sizeof (etp_worker)); 389 etp_worker *wrk = calloc (1, sizeof (etp_worker));
360 --nreqs; 479 --nreqs;
361 X_UNLOCK (reqlock); 480 X_UNLOCK (reqlock);
362 481
363 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size)) 482 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
364 { 483 {
365 req->int1 = 1; /* mark request as delayed */ 484 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
366 continue; 485 continue;
367 } 486 }
368 else 487 else
369 { 488 {
370 int res = ETP_FINISH (req); 489 int res = ETP_FINISH (req);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines