ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Mysql/Mysql.xs
Revision: 1.15
Committed: Tue Jun 3 13:38:48 2014 UTC (9 years, 11 months ago) by root
Branch: MAIN
CVS Tags: rel-1_27
Changes since 1.14: +0 -1 lines
Log Message:
1.27

File Contents

# Content
1 #include <errno.h>
2 #include <unistd.h>
3 #include <fcntl.h>
4
5 #include <mysql.h>
6
7 #include "EXTERN.h"
8 #include "perl.h"
9 #include "XSUB.h"
10
11 #if HAVE_EV
12 # include "EVAPI.h"
13 # include "CoroAPI.h"
14 #endif
15
16 #define IN_DESTRUCT PL_dirty
17
18 typedef U16 uint16;
19
20 /* cached function gv's */
21 static CV *readable, *writable;
22 static int use_ev;
23
24 #include "violite.h"
25
26 #define CoMy_MAGIC 0x436f4d79
27
28 typedef struct {
29 #if DESC_IS_PTR
30 char desc[30];
31 const char *old_desc;
32 #endif
33 int magic;
34 SV *corohandle_sv, *corohandle;
35 int bufofs, bufcnt;
36 #if HAVE_EV
37 ev_io rw, ww;
38 #endif
39 char buf[VIO_READ_BUFFER_SIZE];
40 size_t (*old_read)(Vio*, uchar *, size_t);
41 size_t (*old_write)(Vio*, const uchar *, size_t);
42 int (*old_vioclose)(Vio*);
43 } ourdata;
44
45 #if DESC_IS_PTR
46 # define OURDATAPTR (*(ourdata **)&((vio)->desc))
47 #else
48 # define DESC_OFFSET 22
49 # define OURDATAPTR (*((ourdata **)((vio)->desc + DESC_OFFSET)))
50 #endif
51
52 static xlen
53 our_read (Vio *vio, xgptr p, xlen len)
54 {
55 ourdata *our = OURDATAPTR;
56
57 if (!our->bufcnt)
58 {
59 int rd;
60 my_bool dummy;
61
62 vio->vioblocking (vio, 0, &dummy);
63
64 for (;;)
65 {
66 rd = recv (vio->sd, our->buf, sizeof (our->buf), 0);
67
68 if (rd >= 0 || errno != EAGAIN)
69 break;
70
71 #if HAVE_EV
72 if (use_ev)
73 {
74 our->rw.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT));
75 ev_io_start (EV_DEFAULT_UC, &(our->rw));
76 CORO_SCHEDULE;
77 ev_io_stop (EV_DEFAULT_UC, &(our->rw)); /* avoids races */
78 }
79 else
80 #endif
81 {
82 dSP;
83 PUSHMARK (SP);
84 XPUSHs (our->corohandle);
85 PUTBACK;
86 call_sv ((SV *)readable, G_VOID | G_DISCARD);
87 }
88 }
89
90 if (rd <= 0)
91 return rd;
92
93 our->bufcnt = rd;
94 our->bufofs = 0;
95 }
96
97 if (our->bufcnt < len)
98 len = our->bufcnt;
99
100 memcpy (p, our->buf + our->bufofs, len);
101 our->bufofs += len;
102 our->bufcnt -= len;
103
104 return len;
105 }
106
107 static xlen
108 our_write (Vio *vio, cxgptr p, xlen len)
109 {
110 char *ptr = (char *)p;
111 my_bool dummy;
112
113 vio->vioblocking (vio, 0, &dummy);
114
115 while (len > 0)
116 {
117 int wr = send (vio->sd, ptr, len, 0);
118
119 if (wr > 0)
120 {
121 ptr += wr;
122 len -= wr;
123 }
124 else if (errno == EAGAIN)
125 {
126 ourdata *our = OURDATAPTR;
127
128 #if HAVE_EV
129 if (use_ev)
130 {
131 our->ww.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT));
132 ev_io_start (EV_DEFAULT_UC, &(our->ww));
133 CORO_SCHEDULE;
134 ev_io_stop (EV_DEFAULT_UC, &(our->ww)); /* avoids races */
135 }
136 else
137 #endif
138 {
139 dSP;
140 PUSHMARK (SP);
141 XPUSHs (our->corohandle);
142 PUTBACK;
143 call_sv ((SV *)writable, G_VOID | G_DISCARD);
144 }
145 }
146 else if (ptr == (char *)p)
147 return -1;
148 else
149 break;
150 }
151
152 return ptr - (char *)p;
153 }
154
155 static int
156 our_close (Vio *vio)
157 {
158 ourdata *our = OURDATAPTR;
159
160 if (vio->read != our_read)
161 croak ("vio.read has unexpected content during unpatch - wtf?");
162
163 if (vio->write != our_write)
164 croak ("vio.write has unexpected content during unpatch - wtf?");
165
166 if (vio->vioclose != our_close)
167 croak ("vio.vioclose has unexpected content during unpatch - wtf?");
168
169 #if HAVE_EV
170 if (use_ev)
171 {
172 ev_io_stop (EV_DEFAULT_UC, &(our->rw));
173 ev_io_stop (EV_DEFAULT_UC, &(our->ww));
174 }
175 #endif
176
177 SvREFCNT_dec (our->corohandle);
178 SvREFCNT_dec (our->corohandle_sv);
179
180 #if DESC_IS_PTR
181 vio->desc = our->old_desc;
182 #endif
183
184 vio->vioclose = our->old_vioclose;
185 vio->write = our->old_write;
186 vio->read = our->old_read;
187
188 Safefree (our);
189
190 vio->vioclose (vio);
191 }
192
193 #if HAVE_EV
194 static void
195 iocb (EV_P_ ev_io *w, int revents)
196 {
197 ev_io_stop (EV_A, w);
198 CORO_READY ((SV *)w->data);
199 }
200 #endif
201
202 MODULE = Coro::Mysql PACKAGE = Coro::Mysql
203
204 BOOT:
205 {
206 readable = get_cv ("Coro::Mysql::readable", 0);
207 writable = get_cv ("Coro::Mysql::writable", 0);
208 }
209
210 PROTOTYPES: ENABLE
211
212 void
213 _use_ev ()
214 PPCODE:
215 {
216 static int onceonly;
217
218 if (!onceonly)
219 {
220 onceonly = 1;
221 #if HAVE_EV
222 I_EV_API ("Coro::Mysql");
223 I_CORO_API ("Coro::Mysql");
224 use_ev = 1;
225 #endif
226 }
227
228 XPUSHs (use_ev ? &PL_sv_yes : &PL_sv_no);
229 }
230
231 void
232 _patch (IV sock, int fd, unsigned long client_version, SV *corohandle_sv, SV *corohandle)
233 CODE:
234 {
235 MYSQL *my = (MYSQL *)sock;
236 Vio *vio = my->net.vio;
237 ourdata *our;
238
239 /* matching versions are required but not sufficient */
240 if (client_version != mysql_get_client_version ())
241 croak ("DBD::mysql linked against different libmysqlclient library than Coro::Mysql (%lu vs. %lu).",
242 client_version, mysql_get_client_version ());
243
244 if (fd != my->net.fd)
245 croak ("DBD::mysql fd and libmysql disagree - library mismatch, unsupported transport or API changes?");
246
247 if (fd != vio->sd)
248 croak ("DBD::mysql fd and vio-sd disagree - library mismatch, unsupported transport or API changes?");
249 #if MYSQL_VERSION_ID < 100010 && !defined(MARIADB_BASE_VERSION)
250 if (vio->vioclose != vio_close)
251 croak ("vio.vioclose has unexpected content - library mismatch, unsupported transport or API changes?");
252
253 if (vio->write != vio_write)
254 croak ("vio.write has unexpected content - library mismatch, unsupported transport or API changes?");
255
256 if (vio->read != vio_read
257 && vio->read != vio_read_buff)
258 croak ("vio.read has unexpected content - library mismatch, unsupported transport or API changes?");
259 #endif
260
261 Newz (0, our, 1, ourdata);
262 our->magic = CoMy_MAGIC;
263 our->corohandle_sv = newSVsv (corohandle_sv);
264 our->corohandle = newSVsv (corohandle);
265 #if HAVE_EV
266 if (use_ev)
267 {
268 ev_io_init (&(our->rw), iocb, vio->sd, EV_READ);
269 ev_io_init (&(our->ww), iocb, vio->sd, EV_WRITE);
270 }
271 #endif
272 #if DESC_IS_PTR
273 our->old_desc = vio->desc;
274 strncpy (our->desc, vio->desc, sizeof (our->desc));
275 our->desc [sizeof (our->desc) - 1] = 0;
276 #else
277 vio->desc [DESC_OFFSET - 1] = 0;
278 #endif
279 OURDATAPTR = our;
280
281 our->old_vioclose = vio->vioclose;
282 our->old_write = vio->write;
283 our->old_read = vio->read;
284
285 vio->vioclose = our_close;
286 vio->write = our_write;
287 vio->read = our_read;
288 }
289