ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.2
Committed: Sat Jun 27 18:13:34 2015 UTC (8 years, 11 months ago) by root
Branch: MAIN
Changes since 1.1: +4 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #define PERL_NO_GET_CONTEXT
2    
3     #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7     #define X_STACKSIZE -1
8    
9     #include "CoroAPI.h"
10     #include "perlmulticore.h"
11     #include "schmorp.h"
12     #include "xthread.h"
13    
14     #ifdef _WIN32
15     typedef char sigset_t;
16     #define pthread_sigmask(mode,new,old)
17     #endif
18    
19     static pthread_key_t current_key;
20    
21     static s_epipe ep;
22     static void *perl_thx;
23     static sigset_t cursigset, fullsigset;
24    
25     struct tctx
26     {
27     void *coro;
28     };
29    
30     static struct tctx *tctx_free;
31    
32     static int available;
33     static int max_idle = 8;
34    
35     static xmutex_t perl_m = X_MUTEX_INIT; static xcond_t perl_c = X_COND_INIT; static struct tctx *perl_f;
36     static xmutex_t wait_m = X_MUTEX_INIT; static xcond_t wait_c = X_COND_INIT; static int wait_f;
37    
38     static int wakeup_f;
39     static struct tctx **waiters;
40     static int waiters_count, waiters_max;
41    
42     static struct tctx *
43     tctx_get (void)
44     {
45     struct tctx *ctx;
46    
47     if (!tctx_free)
48     ctx = malloc (sizeof (*tctx_free));
49     else
50     {
51     ctx = tctx_free;
52     tctx_free = tctx_free->coro;
53     }
54    
55     return ctx;
56     }
57    
58     static void
59     tctx_put (struct tctx *ctx)
60     {
61     ctx->coro = tctx_free;
62     tctx_free = ctx;
63     }
64    
65     X_THREAD_PROC(thread_proc)
66     {
67     PERL_SET_CONTEXT (perl_thx);
68    
69     {
70     dTHX; /* inefficient, we already have perl_thx, but I see no better way */
71     struct tctx *ctx;
72    
73     for (;;)
74     {
75 root 1.2 /* TODO: should really use some idle time and exit after that */
76 root 1.1 X_LOCK (perl_m);
77     while (!perl_f)
78     X_COND_WAIT (perl_c, perl_m);
79     ctx = perl_f;
80     perl_f = 0;
81     --available;
82     pthread_sigmask (SIG_SETMASK, &cursigset, 0);
83     X_UNLOCK (perl_m);
84    
85     while (ctx->coro)
86     CORO_SCHEDULE;
87    
88     X_LOCK (wait_m);
89     pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
90     wait_f = 1;
91     X_COND_SIGNAL (wait_c);
92    
93     if (available >= max_idle)
94     {
95     X_UNLOCK (wait_m);
96     break;
97     }
98    
99     ++available;
100     X_UNLOCK (wait_m);
101     }
102     }
103     }
104    
105     static void
106     start_thread (void)
107     {
108     xthread_t tid;
109    
110     ++available;
111     xthread_create (&tid, thread_proc, 0);
112     }
113    
114     static void
115     pmapi_release (void)
116     {
117     struct tctx *ctx = tctx_get ();
118     ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
119     pthread_setspecific (current_key, ctx);
120     pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
121    
122     if (!available)
123     start_thread ();
124    
125     X_LOCK (perl_m);
126     perl_f = ctx;
127     X_COND_SIGNAL (perl_c);
128     X_UNLOCK (perl_m);
129     }
130    
131     static void
132     pmapi_acquire (void)
133     {
134     struct tctx *ctx = pthread_getspecific (current_key);
135    
136     X_LOCK (wait_m);
137    
138     if (waiters_count >= waiters_max)
139     {
140     waiters_max = waiters_max ? waiters_max * 2 : 16;
141     waiters = realloc (waiters, waiters_max * sizeof (*waiters));
142     }
143    
144     waiters [waiters_count++] = ctx;
145    
146     s_epipe_signal (&ep);
147     while (!wait_f)
148     X_COND_WAIT (wait_c, wait_m);
149     wait_f = 0;
150     X_UNLOCK (wait_m);
151    
152     tctx_put (ctx);
153     pthread_sigmask (SIG_SETMASK, &cursigset, 0);
154     }
155    
156     MODULE = Coro::Multicore PACKAGE = Coro::Multicore
157    
158     PROTOTYPES: DISABLE
159    
160     BOOT:
161     {
162     #ifndef _WIN32
163     sigfillset (&fullsigset);
164     #endif
165    
166     pthread_key_create (&current_key, 0);
167    
168     if (s_epipe_new (&ep))
169     croak ("Coro::Multicore: unable to initialise event pipe.\n");
170    
171     perl_thx = PERL_GET_CONTEXT;
172    
173     I_CORO_API ("Coro::Multicore");
174    
175     /* not perfectly efficient to do it this way, but it's simple */
176     perl_multicore_init ();
177     perl_multicore_api->pmapi_release = pmapi_release;
178     perl_multicore_api->pmapi_acquire = pmapi_acquire;
179     }
180    
181     U32
182     max_idle_threads (U32 max = NO_INIT)
183     CODE:
184     X_LOCK (wait_m);
185     RETVAL = max_idle;
186     if (items)
187     max_idle = max;
188     X_UNLOCK (wait_m);
189     OUTPUT:
190     RETVAL
191    
192    
193     int
194     fd ()
195     CODE:
196     RETVAL = s_epipe_fd (&ep);
197     OUTPUT:
198     RETVAL
199    
200     void
201     poll (...)
202     CODE:
203     s_epipe_drain (&ep);
204     X_LOCK (wait_m);
205     while (waiters_count)
206     {
207     struct tctx *ctx = waiters [--waiters_count];
208     CORO_READY ((SV *)ctx->coro);
209     SvREFCNT_dec_NN ((SV *)ctx->coro);
210     ctx->coro = 0;
211     }
212     X_UNLOCK (wait_m);
213    
214     void
215 root 1.2 sleep (NV seconds)
216 root 1.1 CODE:
217     perlinterp_release ();
218 root 1.2 usleep (seconds * 1e6);
219 root 1.1 perlinterp_acquire ();
220 root 1.2