ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Mysql/Mysql.xs
Revision: 1.11
Committed: Sat Aug 3 02:11:27 2013 UTC (10 years, 9 months ago) by root
Branch: MAIN
Changes since 1.10: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #include <errno.h>
2 #include <unistd.h>
3 #include <fcntl.h>
4
5 #include <mysql.h>
6
7 #include "EXTERN.h"
8 #include "perl.h"
9 #include "XSUB.h"
10
11 #if HAVE_EV
12 # include "EVAPI.h"
13 # include "CoroAPI.h"
14 #endif
15
16 #define IN_DESTRUCT PL_dirty
17
18 typedef U16 uint16;
19
20 /* cached function gv's */
21 static CV *readable, *writable;
22 static int use_ev;
23
24 #include "violite.h"
25
26 #define CoMy_MAGIC 0x436f4d79
27
28 typedef struct {
29 #if DESC_IS_PTR
30 char desc[30];
31 const char *old_desc;
32 #endif
33 int magic;
34 SV *corohandle_sv, *corohandle;
35 int bufofs, bufcnt;
36 #if HAVE_EV
37 ev_io rw, ww;
38 #endif
39 char buf[VIO_READ_BUFFER_SIZE];
40 } ourdata;
41
42 #if DESC_IS_PTR
43 # define OURDATAPTR (*(ourdata **)&((vio)->desc))
44 #else
45 # define DESC_OFFSET 22
46 # define OURDATAPTR (*((ourdata **)((vio)->desc + DESC_OFFSET)))
47 #endif
48
49 static xlen
50 our_read (Vio *vio, xgptr p, xlen len)
51 {
52 ourdata *our = OURDATAPTR;
53
54 if (!our->bufcnt)
55 {
56 int rd;
57 my_bool dummy;
58
59 vio->vioblocking (vio, 0, &dummy);
60
61 for (;;)
62 {
63 rd = recv (vio->sd, our->buf, sizeof (our->buf), 0);
64
65 if (rd >= 0 || errno != EAGAIN)
66 break;
67
68 #if HAVE_EV
69 if (use_ev)
70 {
71 our->rw.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT));
72 ev_io_start (EV_DEFAULT_UC, &(our->rw));
73 CORO_SCHEDULE;
74 ev_io_stop (EV_DEFAULT_UC, &(our->rw)); /* avoids races */
75 }
76 else
77 #endif
78 {
79 dSP;
80 PUSHMARK (SP);
81 XPUSHs (our->corohandle);
82 PUTBACK;
83 call_sv ((SV *)readable, G_VOID | G_DISCARD);
84 }
85 }
86
87 if (rd <= 0)
88 return rd;
89
90 our->bufcnt = rd;
91 our->bufofs = 0;
92 }
93
94 if (our->bufcnt < len)
95 len = our->bufcnt;
96
97 memcpy (p, our->buf + our->bufofs, len);
98 our->bufofs += len;
99 our->bufcnt -= len;
100
101 return len;
102 }
103
104 static xlen
105 our_write (Vio *vio, cxgptr p, xlen len)
106 {
107 char *ptr = (char *)p;
108 my_bool dummy;
109
110 vio->vioblocking (vio, 0, &dummy);
111
112 while (len > 0)
113 {
114 int wr = send (vio->sd, ptr, len, 0);
115
116 if (wr > 0)
117 {
118 ptr += wr;
119 len -= wr;
120 }
121 else if (errno == EAGAIN)
122 {
123 ourdata *our = OURDATAPTR;
124
125 #if HAVE_EV
126 if (use_ev)
127 {
128 our->ww.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT));
129 ev_io_start (EV_DEFAULT_UC, &(our->ww));
130 CORO_SCHEDULE;
131 ev_io_stop (EV_DEFAULT_UC, &(our->ww)); /* avoids races */
132 }
133 else
134 #endif
135 {
136 dSP;
137 PUSHMARK (SP);
138 XPUSHs (our->corohandle);
139 PUTBACK;
140 call_sv ((SV *)writable, G_VOID | G_DISCARD);
141 }
142 }
143 else if (ptr == (char *)p)
144 return -1;
145 else
146 break;
147 }
148
149 return ptr - (char *)p;
150 }
151
152 static int
153 our_close (Vio *vio)
154 {
155 ourdata *our = OURDATAPTR;
156
157 if (vio->read != our_read)
158 croak ("vio.read has unexpected content during unpatch - wtf?");
159
160 if (vio->write != our_write)
161 croak ("vio.write has unexpected content during unpatch - wtf?");
162
163 if (vio->vioclose != our_close)
164 croak ("vio.vioclose has unexpected content during unpatch - wtf?");
165
166 #if HAVE_EV
167 if (use_ev)
168 {
169 ev_io_stop (EV_DEFAULT_UC, &(our->rw));
170 ev_io_stop (EV_DEFAULT_UC, &(our->ww));
171 }
172 #endif
173
174 SvREFCNT_dec (our->corohandle);
175 SvREFCNT_dec (our->corohandle_sv);
176
177 #if DESC_IS_PTR
178 vio->desc = our->old_desc;
179 #endif
180
181 Safefree (our);
182
183 vio->read = vio_read;
184 vio->write = vio_write;
185 vio->vioclose = vio_close;
186
187 vio->vioclose (vio);
188 }
189
190 #if HAVE_EV
191 static void
192 iocb (EV_P_ ev_io *w, int revents)
193 {
194 ev_io_stop (EV_A, w);
195 CORO_READY ((SV *)w->data);
196 }
197 #endif
198
199 MODULE = Coro::Mysql PACKAGE = Coro::Mysql
200
201 BOOT:
202 {
203 readable = get_cv ("Coro::Mysql::readable", 0);
204 writable = get_cv ("Coro::Mysql::writable", 0);
205 }
206
207 PROTOTYPES: ENABLE
208
209 void
210 _use_ev ()
211 PPCODE:
212 {
213 static int onceonly;
214
215 if (!onceonly)
216 {
217 onceonly = 1;
218 #if HAVE_EV
219 I_EV_API ("Coro::Mysql");
220 I_CORO_API ("Coro::Mysql");
221 use_ev = 1;
222 #endif
223 }
224
225 XPUSHs (use_ev ? &PL_sv_yes : &PL_sv_no);
226 }
227
228 void
229 _patch (IV sock, int fd, unsigned long client_version, SV *corohandle_sv, SV *corohandle)
230 CODE:
231 {
232 MYSQL *my = (MYSQL *)sock;
233 Vio *vio = my->net.vio;
234 ourdata *our;
235
236 /* matching versions are required but not sufficient */
237 if (client_version != mysql_get_client_version ())
238 croak ("DBD::mysql linked against different libmysqlclient library than Coro::Mysql (%lu vs. %lu).",
239 client_version, mysql_get_client_version ());
240
241 if (fd != my->net.fd)
242 croak ("DBD::mysql fd and libmysql disagree - library mismatch, unsupported transport or API changes?");
243
244 if (fd != vio->sd)
245 croak ("DBD::mysql fd and vio-sd disagree - library mismatch, unsupported transport or API changes?");
246
247 if (vio->vioclose != vio_close)
248 croak ("vio.vioclose has unexpected content - library mismatch, unsupported transport or API changes?");
249
250 if (vio->write != vio_write)
251 croak ("vio.write has unexpected content - library mismatch, unsupported transport or API changes?");
252
253 if (vio->read != vio_read
254 && vio->read != vio_read_buff)
255 croak ("vio.read has unexpected content - library mismatch, unsupported transport or API changes?");
256
257 Newz (0, our, 1, ourdata);
258 our->magic = CoMy_MAGIC;
259 our->corohandle_sv = newSVsv (corohandle_sv);
260 our->corohandle = newSVsv (corohandle);
261 #if HAVE_EV
262 if (use_ev)
263 {
264 ev_io_init (&(our->rw), iocb, vio->sd, EV_READ);
265 ev_io_init (&(our->ww), iocb, vio->sd, EV_WRITE);
266 }
267 #endif
268 #if DESC_IS_PTR
269 our->old_desc = vio->desc;
270 strncpy (our->desc, vio->desc, sizeof (our->desc));
271 our->desc [sizeof (our->desc) - 1] = 0;
272 #else
273 vio->desc [DESC_OFFSET - 1] = 0;
274 #endif
275 OURDATAPTR = our;
276
277 vio->vioclose = our_close;
278 vio->write = our_write;
279 vio->read = our_read;
280 }
281