ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/EV-Loop-Async/Async.xs
Revision: 1.8
Committed: Fri Jul 17 02:52:22 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
Changes since 1.7: +67 -14 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #include "xthread.h"
2
3 #include "EXTERN.h"
4 #include "perl.h"
5 #include "XSUB.h"
6
7 #include <stddef.h>
8
9 #include "EVAPI.h"
10
11 /* our userdata */
12 typedef struct {
13 mutex_t lock; /* global loop lock */
14 void (*signal_func) (void *signal_arg, int value);
15 void *signal_arg;
16 ev_async async_w;
17 thread_t tid;
18 unsigned int max_loops;
19 unsigned int count;
20
21 cond_t invoke_cv;
22
23 SV *interrupt;
24 } udat;
25
26 static void loop_set_cb (EV_P);
27
28 static void
29 fg_invoke_pending (EV_P)
30 {
31 udat *u = ev_userdata (EV_A);
32
33 u->count = ev_pending_count (EV_A);
34
35 if (u->count)
36 ev_invoke_pending (EV_A);
37 }
38
39 static void
40 c_func (pTHX_ void *loop_, int value)
41 {
42 struct ev_loop *loop = (struct ev_loop *)loop_;
43 udat *u = ev_userdata (EV_A);
44 int i;
45
46 X_LOCK (u->lock);
47 ev_invoke_pending (EV_A);
48
49 /* do any additional foreground loop runs */
50 for (i = u->max_loops; i--; )
51 {
52 /* this is a bit tricky, but we can manage... */
53 u->count = 0;
54
55 ev_set_invoke_pending_cb (EV_A, fg_invoke_pending);
56 ev_set_loop_release_cb (EV_A, 0, 0);
57 ev_loop (EV_A, EVLOOP_NONBLOCK);
58 loop_set_cb (EV_A);
59
60 if (!u->count)
61 break;
62 }
63
64 X_COND_SIGNAL (u->invoke_cv);
65 X_UNLOCK (u->lock);
66 }
67
68 static void
69 async_cb (EV_P_ ev_async *w, int revents)
70 {
71 /* just used for the side effects */
72 }
73
74 static void
75 l_release (EV_P)
76 {
77 udat *u = ev_userdata (EV_A);
78 X_UNLOCK (u->lock);
79 }
80
81 static void
82 l_acquire (EV_P)
83 {
84 udat *u = ev_userdata (EV_A);
85 X_LOCK (u->lock);
86 }
87
88 static void
89 l_invoke (EV_P)
90 {
91 udat *u = ev_userdata (EV_A);
92
93 while (ev_pending_count (EV_A))
94 {
95 u->signal_func (u->signal_arg, 1);
96 X_COND_WAIT (u->invoke_cv, u->lock);
97 }
98 }
99
100 static void
101 loop_set_cb (EV_P)
102 {
103 ev_set_invoke_pending_cb (EV_A, l_invoke);
104 ev_set_loop_release_cb (EV_A, l_release, l_acquire);
105 }
106
107 X_THREAD_PROC(l_run)
108 {
109 struct ev_loop *loop = (struct ev_loop *)thr_arg;
110
111 l_acquire (EV_A);
112
113 /* yeah */
114 pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS, 0);
115
116 ev_ref (EV_A);
117 ev_loop (EV_A, 0);
118 ev_unref (EV_A);
119
120 l_release (EV_A);
121
122 return 0;
123 }
124
125 static void
126 scope_lock_cb (pTHX_ void *loop_)
127 {
128 struct ev_loop *loop = (struct ev_loop *)SvIVX ((SV *)loop_);
129 udat *u = ev_userdata (EV_A);
130
131 X_UNLOCK (u->lock);
132 SvREFCNT_dec ((SV *)loop_);
133 }
134
135 MODULE = EV::Loop::Async PACKAGE = EV::Loop::Async
136
137 PROTOTYPES: ENABLE
138
139 BOOT:
140 {
141 I_EV_API ("EV::Loop::Async");
142 CvNODEBUG_on (get_cv ("EV::Loop::Async::scope_lock", 0)); /* otherwise calling scope can be the debugger */
143 }
144
145 void
146 _c_func (SV *loop)
147 PPCODE:
148 EXTEND (SP, 2);
149 PUSHs (sv_2mortal (newSViv (PTR2IV (c_func))));
150 PUSHs (sv_2mortal (newSViv (SvIVX (SvRV (loop)))));
151
152 void
153 _attach (SV *loop_, SV *interrupt, IV sig_func, void *sig_arg)
154 CODE:
155 {
156 pthread_mutexattr_t ma;
157 struct ev_loop *loop = (struct ev_loop *)SvIVX (SvRV (loop_));
158 udat *u;
159
160 Newz (0, u, 1, udat);
161 u->interrupt = newSVsv (interrupt);
162 u->signal_func = (void (*)(void *, int))sig_func;
163 u->signal_arg = sig_arg;
164
165 ev_async_init (&u->async_w, async_cb);
166 ev_async_start (EV_A, &u->async_w);
167
168 pthread_mutexattr_init (&ma);
169 pthread_mutexattr_settype (&ma, PTHREAD_MUTEX_RECURSIVE);
170 pthread_mutex_init (&u->lock, &ma);
171 pthread_mutexattr_destroy (&ma);
172
173 pthread_cond_init (&u->invoke_cv, 0);
174
175 ev_set_userdata (EV_A, u);
176 loop_set_cb (EV_A);
177
178 thread_create (&u->tid, l_run, loop);
179 }
180
181 SV *
182 interrupt (SV *loop_)
183 CODE:
184 {
185 struct ev_loop *loop = (struct ev_loop *)SvIVX (SvRV (loop_));
186 udat *u = ev_userdata (EV_A);
187
188 RETVAL = newSVsv (u->interrupt);
189 }
190 OUTPUT:
191 RETVAL
192
193 void
194 set_max_foreground_loops (SV *loop_, UV max_loops)
195 CODE:
196 {
197 struct ev_loop *loop = (struct ev_loop *)SvIVX (SvRV (loop_));
198 udat *u = ev_userdata (EV_A);
199
200 u->max_loops = max_loops;
201 }
202
203 void
204 lock (SV *loop_)
205 ALIAS:
206 lock = 0
207 unlock = 1
208 notify = 2
209 CODE:
210 {
211 struct ev_loop *loop = (struct ev_loop *)SvIVX (SvRV (loop_));
212 udat *u = ev_userdata (EV_A);
213
214 switch (ix)
215 {
216 case 0: X_LOCK (u->lock); break;
217 case 1: X_UNLOCK (u->lock); break;
218 case 2: ev_async_send (EV_A, &u->async_w); break;
219 }
220 }
221
222 void
223 scope_lock (SV *loop_)
224 CODE:
225 {
226 struct ev_loop *loop = (struct ev_loop *)SvIVX (SvRV (loop_));
227 udat *u = ev_userdata (EV_A);
228
229 X_LOCK (u->lock);
230
231 LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
232 SAVEDESTRUCTOR_X (scope_lock_cb, (void *)SvREFCNT_inc (SvRV (loop_)));
233 ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
234 }
235
236 void
237 DESTROY (SV *loop_)
238 CODE:
239 {
240 struct ev_loop *loop = (struct ev_loop *)SvIVX (SvRV (loop_));
241 udat *u = ev_userdata (EV_A);
242
243 if (u)
244 {
245 X_LOCK (u->lock);
246 ev_async_stop (EV_A, &u->async_w);
247 /* now thread is around blocking call, or in pthread_cond_wait */
248 pthread_cancel (u->tid);
249 pthread_mutex_destroy (&u->lock);
250 pthread_cond_destroy (&u->invoke_cv);
251 SvREFCNT_dec (u->interrupt);
252 Safefree (u);
253 }
254 }
255
256
257