ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.1
Committed: Sat Jun 27 17:59:10 2015 UTC (8 years, 11 months ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #define PERL_NO_GET_CONTEXT
2    
3     #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7     #define X_STACKSIZE -1
8    
9     #include "CoroAPI.h"
10     #include "perlmulticore.h"
11     #include "schmorp.h"
12     #include "xthread.h"
13    
14     #ifdef _WIN32
15     typedef char sigset_t;
16     #define pthread_sigmask(mode,new,old)
17     #endif
18    
19     static pthread_key_t current_key;
20    
21     static s_epipe ep;
22     static void *perl_thx;
23     static sigset_t cursigset, fullsigset;
24    
25     struct tctx
26     {
27     void *coro;
28     };
29    
30     static struct tctx *tctx_free;
31    
32     static int available;
33     static int max_idle = 8;
34    
35     static xmutex_t perl_m = X_MUTEX_INIT; static xcond_t perl_c = X_COND_INIT; static struct tctx *perl_f;
36     static xmutex_t wait_m = X_MUTEX_INIT; static xcond_t wait_c = X_COND_INIT; static int wait_f;
37    
38     static int wakeup_f;
39     static struct tctx **waiters;
40     static int waiters_count, waiters_max;
41    
42     static struct tctx *
43     tctx_get (void)
44     {
45     struct tctx *ctx;
46    
47     if (!tctx_free)
48     ctx = malloc (sizeof (*tctx_free));
49     else
50     {
51     ctx = tctx_free;
52     tctx_free = tctx_free->coro;
53     }
54    
55     return ctx;
56     }
57    
58     static void
59     tctx_put (struct tctx *ctx)
60     {
61     ctx->coro = tctx_free;
62     tctx_free = ctx;
63     }
64    
65     X_THREAD_PROC(thread_proc)
66     {
67     PERL_SET_CONTEXT (perl_thx);
68    
69     {
70     dTHX; /* inefficient, we already have perl_thx, but I see no better way */
71     struct tctx *ctx;
72    
73     for (;;)
74     {
75     X_LOCK (perl_m);
76     while (!perl_f)
77     X_COND_WAIT (perl_c, perl_m);
78     ctx = perl_f;
79     perl_f = 0;
80     --available;
81     pthread_sigmask (SIG_SETMASK, &cursigset, 0);
82     X_UNLOCK (perl_m);
83    
84     while (ctx->coro)
85     CORO_SCHEDULE;
86    
87     X_LOCK (wait_m);
88     pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
89     wait_f = 1;
90     X_COND_SIGNAL (wait_c);
91    
92     if (available >= max_idle)
93     {
94     X_UNLOCK (wait_m);
95     break;
96     }
97    
98     ++available;
99     X_UNLOCK (wait_m);
100     }
101     }
102     }
103    
104     static void
105     start_thread (void)
106     {
107     xthread_t tid;
108    
109     ++available;
110     xthread_create (&tid, thread_proc, 0);
111     }
112    
113     static void
114     pmapi_release (void)
115     {
116     struct tctx *ctx = tctx_get ();
117     ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
118     pthread_setspecific (current_key, ctx);
119     pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
120    
121     if (!available)
122     start_thread ();
123    
124     X_LOCK (perl_m);
125     perl_f = ctx;
126     X_COND_SIGNAL (perl_c);
127     X_UNLOCK (perl_m);
128     }
129    
130     static void
131     pmapi_acquire (void)
132     {
133     struct tctx *ctx = pthread_getspecific (current_key);
134    
135     X_LOCK (wait_m);
136    
137     if (waiters_count >= waiters_max)
138     {
139     waiters_max = waiters_max ? waiters_max * 2 : 16;
140     waiters = realloc (waiters, waiters_max * sizeof (*waiters));
141     }
142    
143     waiters [waiters_count++] = ctx;
144    
145     s_epipe_signal (&ep);
146     while (!wait_f)
147     X_COND_WAIT (wait_c, wait_m);
148     wait_f = 0;
149     X_UNLOCK (wait_m);
150    
151     tctx_put (ctx);
152     pthread_sigmask (SIG_SETMASK, &cursigset, 0);
153     }
154    
155     MODULE = Coro::Multicore PACKAGE = Coro::Multicore
156    
157     PROTOTYPES: DISABLE
158    
159     BOOT:
160     {
161     #ifndef _WIN32
162     sigfillset (&fullsigset);
163     #endif
164    
165     pthread_key_create (&current_key, 0);
166    
167     if (s_epipe_new (&ep))
168     croak ("Coro::Multicore: unable to initialise event pipe.\n");
169    
170     perl_thx = PERL_GET_CONTEXT;
171    
172     I_CORO_API ("Coro::Multicore");
173    
174     /* not perfectly efficient to do it this way, but it's simple */
175     perl_multicore_init ();
176     perl_multicore_api->pmapi_release = pmapi_release;
177     perl_multicore_api->pmapi_acquire = pmapi_acquire;
178     }
179    
180     U32
181     max_idle_threads (U32 max = NO_INIT)
182     CODE:
183     X_LOCK (wait_m);
184     RETVAL = max_idle;
185     if (items)
186     max_idle = max;
187     X_UNLOCK (wait_m);
188     OUTPUT:
189     RETVAL
190    
191    
192     int
193     fd ()
194     CODE:
195     RETVAL = s_epipe_fd (&ep);
196     OUTPUT:
197     RETVAL
198    
199     void
200     poll (...)
201     CODE:
202     s_epipe_drain (&ep);
203     X_LOCK (wait_m);
204     while (waiters_count)
205     {
206     struct tctx *ctx = waiters [--waiters_count];
207     CORO_READY ((SV *)ctx->coro);
208     SvREFCNT_dec_NN ((SV *)ctx->coro);
209     ctx->coro = 0;
210     }
211     X_UNLOCK (wait_m);
212    
213     void
214     tst (int i)
215     CODE:
216     perlinterp_release ();
217     usleep (100000 * i);
218     perlinterp_acquire ();