ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Mysql/Mysql.xs
Revision: 1.17
Committed: Mon Mar 4 06:19:16 2019 UTC (5 years, 2 months ago) by root
Branch: MAIN
CVS Tags: rel-2_1, HEAD
Changes since 1.16: +19 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.11 #include <errno.h>
2 root 1.1 #include <unistd.h>
3     #include <fcntl.h>
4    
5 root 1.16 /* mariadb/mysql uses all these reserved macro names, and probably more :( */
6     #undef read
7     #undef write
8     #undef close
9    
10 root 1.1 #include <mysql.h>
11    
12     #include "EXTERN.h"
13     #include "perl.h"
14     #include "XSUB.h"
15    
16 root 1.7 #if HAVE_EV
17     # include "EVAPI.h"
18     # include "CoroAPI.h"
19     #endif
20    
21 root 1.3 #define IN_DESTRUCT PL_dirty
22    
23 root 1.1 typedef U16 uint16;
24    
25     /* cached function gv's */
26     static CV *readable, *writable;
27 root 1.7 static int use_ev;
28 root 1.1
29 root 1.16 #if MARIADB_VERSION_ID >= 100300
30    
31     typedef unsigned char uchar; /* bug? */
32     #include <ma_pvio.h>
33    
34     #define PVIO 1
35     #define VIOPTR MARIADB_PVIO *
36     #define VIOM(vio) (vio)->methods
37     #define vioblocking blocking
38     #define vioclose close
39     #define VIODATA(vio) (vio)->data
40     /* ma_pvio_get_socket would be it, but it's only declared, not defined */
41     #define VIOSD(vio) mysql_get_socket ((vio)->mysql)
42     #define VIO_READ_BUFFER_SIZE PVIO_READ_AHEAD_CACHE_SIZE
43     #define my_to_vio(sock) (sock)->net.pvio
44    
45     #define OURDATAPTR ((ourdata *)vio->methods)
46    
47     typedef uchar *xgptr;
48     typedef const uchar *cxgptr;
49     typedef size_t xsize_t;
50     typedef ssize_t xssize_t;
51     typedef my_bool xmy_bool;
52    
53     #else
54    
55     #include "violite.h"
56    
57     #define PVIO 0
58     #define VIOPTR Vio *
59     #define VIOM(vio) vio
60     #define VIODATA(vio) (vio)->desc
61     #define VIOSD(vio) (vio)->sd
62     #define my_to_vio(sock) (sock)->net.vio
63    
64     typedef int xmy_bool;
65    
66     #endif
67 root 1.1
68     #define CoMy_MAGIC 0x436f4d79
69    
70     typedef struct {
71 root 1.16 #if PVIO
72     /* must be first member */
73     struct st_ma_pvio_methods methods;
74     #else
75 root 1.10 #if DESC_IS_PTR
76     char desc[30];
77     const char *old_desc;
78     #endif
79 root 1.16 #endif
80 root 1.1 int magic;
81 root 1.5 SV *corohandle_sv, *corohandle;
82 root 1.1 int bufofs, bufcnt;
83 root 1.7 #if HAVE_EV
84     ev_io rw, ww;
85     #endif
86 root 1.1 char buf[VIO_READ_BUFFER_SIZE];
87 root 1.16 #if PVIO
88     struct st_ma_pvio_methods *oldmethods;
89     #else
90     xssize_t (*old_read)(VIOPTR, uchar *, size_t);
91     xssize_t (*old_write)(VIOPTR, const uchar *, size_t);
92     xmy_bool (*old_close)(VIOPTR);
93     #endif
94 root 1.1 } ourdata;
95    
96 root 1.16 #ifndef OURDATAPTR
97 root 1.10 #if DESC_IS_PTR
98     # define OURDATAPTR (*(ourdata **)&((vio)->desc))
99     #else
100     # define DESC_OFFSET 22
101     # define OURDATAPTR (*((ourdata **)((vio)->desc + DESC_OFFSET)))
102     #endif
103 root 1.16 #endif
104 root 1.1
105 root 1.16 static xssize_t
106     our_read (VIOPTR vio, xgptr p, xsize_t len)
107 root 1.1 {
108     ourdata *our = OURDATAPTR;
109    
110     if (!our->bufcnt)
111     {
112     int rd;
113     my_bool dummy;
114    
115 root 1.16 VIOM (vio)->vioblocking (vio, 0, &dummy);
116 root 1.1
117     for (;;)
118     {
119 root 1.16 rd = recv (VIOSD (vio), our->buf, sizeof (our->buf), 0);
120 root 1.1
121     if (rd >= 0 || errno != EAGAIN)
122     break;
123    
124 root 1.7 #if HAVE_EV
125     if (use_ev)
126     {
127     our->rw.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT));
128     ev_io_start (EV_DEFAULT_UC, &(our->rw));
129     CORO_SCHEDULE;
130     ev_io_stop (EV_DEFAULT_UC, &(our->rw)); /* avoids races */
131     }
132     else
133     #endif
134     {
135     dSP;
136     PUSHMARK (SP);
137     XPUSHs (our->corohandle);
138     PUTBACK;
139     call_sv ((SV *)readable, G_VOID | G_DISCARD);
140     }
141 root 1.1 }
142    
143     if (rd <= 0)
144     return rd;
145    
146     our->bufcnt = rd;
147     our->bufofs = 0;
148     }
149    
150     if (our->bufcnt < len)
151     len = our->bufcnt;
152    
153     memcpy (p, our->buf + our->bufofs, len);
154     our->bufofs += len;
155     our->bufcnt -= len;
156    
157     return len;
158     }
159    
160 root 1.16 static xssize_t
161     our_write (VIOPTR vio, cxgptr p, xsize_t len)
162 root 1.1 {
163     char *ptr = (char *)p;
164     my_bool dummy;
165    
166 root 1.16 VIOM (vio)->vioblocking (vio, 0, &dummy);
167 root 1.1
168     while (len > 0)
169     {
170 root 1.16 int wr = send (VIOSD (vio), ptr, len, 0);
171 root 1.1
172     if (wr > 0)
173     {
174     ptr += wr;
175     len -= wr;
176     }
177     else if (errno == EAGAIN)
178     {
179 root 1.7 ourdata *our = OURDATAPTR;
180    
181     #if HAVE_EV
182     if (use_ev)
183     {
184     our->ww.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT));
185     ev_io_start (EV_DEFAULT_UC, &(our->ww));
186     CORO_SCHEDULE;
187     ev_io_stop (EV_DEFAULT_UC, &(our->ww)); /* avoids races */
188     }
189     else
190     #endif
191     {
192     dSP;
193     PUSHMARK (SP);
194     XPUSHs (our->corohandle);
195     PUTBACK;
196     call_sv ((SV *)writable, G_VOID | G_DISCARD);
197     }
198 root 1.1 }
199     else if (ptr == (char *)p)
200     return -1;
201     else
202     break;
203     }
204    
205     return ptr - (char *)p;
206     }
207    
208 root 1.16 static xmy_bool
209     our_close (VIOPTR vio)
210 root 1.5 {
211 root 1.7 ourdata *our = OURDATAPTR;
212    
213 root 1.16 if (VIOM (vio)->read != our_read)
214 root 1.5 croak ("vio.read has unexpected content during unpatch - wtf?");
215    
216 root 1.16 if (VIOM (vio)->write != our_write)
217 root 1.5 croak ("vio.write has unexpected content during unpatch - wtf?");
218    
219 root 1.16 if (VIOM (vio)->vioclose != our_close)
220 root 1.5 croak ("vio.vioclose has unexpected content during unpatch - wtf?");
221    
222 root 1.7 #if HAVE_EV
223     if (use_ev)
224     {
225     ev_io_stop (EV_DEFAULT_UC, &(our->rw));
226     ev_io_stop (EV_DEFAULT_UC, &(our->ww));
227     }
228     #endif
229    
230     SvREFCNT_dec (our->corohandle);
231     SvREFCNT_dec (our->corohandle_sv);
232 root 1.5
233 root 1.10 #if DESC_IS_PTR
234     vio->desc = our->old_desc;
235     #endif
236    
237 root 1.16 #if PVIO
238     vio->methods = our->oldmethods;
239     #else
240     VIOM (vio)->vioclose = our->old_close;
241     VIOM (vio)->write = our->old_write;
242     VIOM (vio)->read = our->old_read;
243     #endif
244 root 1.5
245 root 1.14 Safefree (our);
246    
247 root 1.16 VIOM (vio)->vioclose (vio);
248 root 1.5 }
249    
250 root 1.7 #if HAVE_EV
251     static void
252     iocb (EV_P_ ev_io *w, int revents)
253     {
254     ev_io_stop (EV_A, w);
255     CORO_READY ((SV *)w->data);
256     }
257     #endif
258    
259 root 1.1 MODULE = Coro::Mysql PACKAGE = Coro::Mysql
260    
261     BOOT:
262     {
263     readable = get_cv ("Coro::Mysql::readable", 0);
264     writable = get_cv ("Coro::Mysql::writable", 0);
265     }
266    
267     PROTOTYPES: ENABLE
268    
269     void
270 root 1.7 _use_ev ()
271     PPCODE:
272     {
273     static int onceonly;
274    
275     if (!onceonly)
276     {
277     onceonly = 1;
278     #if HAVE_EV
279     I_EV_API ("Coro::Mysql");
280     I_CORO_API ("Coro::Mysql");
281     use_ev = 1;
282     #endif
283     }
284    
285     XPUSHs (use_ev ? &PL_sv_yes : &PL_sv_no);
286     }
287    
288     void
289 root 1.8 _patch (IV sock, int fd, unsigned long client_version, SV *corohandle_sv, SV *corohandle)
290 root 1.1 CODE:
291     {
292     MYSQL *my = (MYSQL *)sock;
293 root 1.16 VIOPTR vio = my_to_vio (my);
294 root 1.1 ourdata *our;
295    
296 root 1.8 /* matching versions are required but not sufficient */
297     if (client_version != mysql_get_client_version ())
298     croak ("DBD::mysql linked against different libmysqlclient library than Coro::Mysql (%lu vs. %lu).",
299     client_version, mysql_get_client_version ());
300    
301 root 1.1 if (fd != my->net.fd)
302     croak ("DBD::mysql fd and libmysql disagree - library mismatch, unsupported transport or API changes?");
303    
304 root 1.16 if (fd != VIOSD (vio))
305 root 1.1 croak ("DBD::mysql fd and vio-sd disagree - library mismatch, unsupported transport or API changes?");
306 root 1.13 #if MYSQL_VERSION_ID < 100010 && !defined(MARIADB_BASE_VERSION)
307 root 1.16 if (VIOM (vio)->vioclose != vio_close)
308 root 1.9 croak ("vio.vioclose has unexpected content - library mismatch, unsupported transport or API changes?");
309 root 1.5
310 root 1.16 if (VIOM (vio)->write != vio_write)
311 root 1.1 croak ("vio.write has unexpected content - library mismatch, unsupported transport or API changes?");
312    
313 root 1.16 if (VIOM (vio)->read != vio_read
314     && VIOM (vio)->read != vio_read_buff)
315 root 1.1 croak ("vio.read has unexpected content - library mismatch, unsupported transport or API changes?");
316 root 1.12 #endif
317 root 1.16 #if PVIO
318     if (vio->type != PVIO_TYPE_UNIXSOCKET && vio->type != PVIO_TYPE_SOCKET)
319     croak ("connection type mismatch: Coro::Mysql only supports 'unixsocket' and 'socket' types at this time");
320     #endif
321 root 1.1
322     Newz (0, our, 1, ourdata);
323     our->magic = CoMy_MAGIC;
324 root 1.5 our->corohandle_sv = newSVsv (corohandle_sv);
325     our->corohandle = newSVsv (corohandle);
326 root 1.7 #if HAVE_EV
327     if (use_ev)
328     {
329 root 1.16 ev_io_init (&(our->rw), iocb, VIOSD (vio), EV_READ);
330     ev_io_init (&(our->ww), iocb, VIOSD (vio), EV_WRITE);
331 root 1.7 }
332     #endif
333 root 1.16 #if PVIO
334     /* with pvio, we replace methods by our own struct,
335     * both becauase the original might be read-only,
336     * and because we have no private data member, so the
337     * methods pointer includes our data as well
338     */
339     our->methods = *vio->methods;
340     our->oldmethods = vio->methods;
341     vio->methods = &our->methods;
342     #else
343     OURDATAPTR = our;
344 root 1.10 #if DESC_IS_PTR
345     our->old_desc = vio->desc;
346     strncpy (our->desc, vio->desc, sizeof (our->desc));
347     our->desc [sizeof (our->desc) - 1] = 0;
348     #else
349 root 1.1 vio->desc [DESC_OFFSET - 1] = 0;
350 root 1.10 #endif
351 root 1.16 our->old_close = VIOM (vio)->vioclose;
352     our->old_write = VIOM (vio)->write;
353     our->old_read = VIOM (vio)->read;
354     #endif
355 root 1.1
356 root 1.16 /* with pvio, this patches our own struct */
357     VIOM (vio)->vioclose = our_close;
358     VIOM (vio)->write = our_write;
359     VIOM (vio)->read = our_read;
360 root 1.1 }
361    
362 root 1.17 int
363     _is_patched (IV sock)
364     CODE:
365     {
366     MYSQL *my = (MYSQL *)sock;
367     VIOPTR vio = my_to_vio (my);
368     RETVAL = VIOM (vio)->write == our_write;
369     }
370     OUTPUT: RETVAL
371    
372     int
373     have_ev ()
374     CODE:
375     RETVAL = 0;
376     #if HAVE_EV
377     RETVAL = 1;
378     #endif
379     OUTPUT: RETVAL
380