--- libeio/etp.c 2014/08/18 04:26:03 1.2 +++ libeio/etp.c 2015/06/25 17:05:07 1.4 @@ -58,6 +58,11 @@ #define ETP_TICKS ((1000000 + 1023) >> 10) +enum { + ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */ + ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */ +}; + /* calculate time difference in ~1/ETP_TICKS of a second */ ecb_inline int etp_tvdiff (struct timeval *tv1, struct timeval *tv2) @@ -85,9 +90,27 @@ static xmutex_t reqlock; static xcond_t reqwait; +struct etp_tmpbuf +{ + void *ptr; + int len; +}; + +static void * +etp_tmpbuf_get (struct etp_tmpbuf *buf, int len) +{ + if (buf->len < len) + { + free (buf->ptr); + buf->ptr = malloc (buf->len = len); + } + + return buf->ptr; +} + typedef struct etp_worker { - struct tmpbuf tmpbuf; + struct etp_tmpbuf tmpbuf; /* locked by wrklock */ struct etp_worker *prev, *next; @@ -261,8 +284,104 @@ return 0; } -/* not yet in etp.c */ -X_THREAD_PROC (etp_proc); +static void ecb_noinline ecb_cold +etp_proc_init (void) +{ +#if HAVE_PRCTL_SET_NAME + /* provide a more sensible "thread name" */ + char name[16 + 1]; + const int namelen = sizeof (name) - 1; + int len; + + prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0); + name [namelen] = 0; + len = strlen (name); + strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio"); + prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0); +#endif +} + +X_THREAD_PROC (etp_proc) +{ + ETP_REQ *req; + struct timespec ts; + etp_worker *self = (etp_worker *)thr_arg; + + etp_proc_init (); + + /* try to distribute timeouts somewhat evenly */ + ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); + + for (;;) + { + ts.tv_sec = 0; + + X_LOCK (reqlock); + + for (;;) + { + req = reqq_shift (&req_queue); + + if (req) + break; + + if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */ + { + X_UNLOCK (reqlock); + X_LOCK (wrklock); + --started; + X_UNLOCK (wrklock); + goto quit; + } + + ++idle; + + if (idle <= max_idle) + /* we are allowed to idle, so do so without any timeout */ + X_COND_WAIT (reqwait, reqlock); + else + { + /* initialise timeout once */ + if (!ts.tv_sec) + ts.tv_sec = time (0) + idle_timeout; + + if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT) + ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */ + } + + --idle; + } + + --nready; + + X_UNLOCK (reqlock); + + if (req->type == ETP_TYPE_QUIT) + goto quit; + + ETP_EXECUTE (self, req); + + X_LOCK (reslock); + + ++npending; + + if (!reqq_push (&res_queue, req) && want_poll_cb) + want_poll_cb (); + + etp_worker_clear (self); + + X_UNLOCK (reslock); + } + +quit: + free (req); + + X_LOCK (wrklock); + etp_worker_free (self); + X_UNLOCK (wrklock); + + return 0; +} static void ecb_cold etp_start_thread (void) @@ -362,7 +481,7 @@ if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size)) { - req->int1 = 1; /* mark request as delayed */ + req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */ continue; } else