ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Mysql/Mysql.xs
Revision: 1.12
Committed: Tue May 6 13:04:34 2014 UTC (10 years ago) by root
Branch: MAIN
Changes since 1.11: +12 -4 lines
Log Message:
rel-1_23

File Contents

# User Rev Content
1 root 1.11 #include <errno.h>
2 root 1.1 #include <unistd.h>
3     #include <fcntl.h>
4    
5     #include <mysql.h>
6    
7     #include "EXTERN.h"
8     #include "perl.h"
9     #include "XSUB.h"
10    
11 root 1.7 #if HAVE_EV
12     # include "EVAPI.h"
13     # include "CoroAPI.h"
14     #endif
15    
16 root 1.3 #define IN_DESTRUCT PL_dirty
17    
18 root 1.1 typedef U16 uint16;
19    
20     /* cached function gv's */
21     static CV *readable, *writable;
22 root 1.7 static int use_ev;
23 root 1.1
24     #include "violite.h"
25    
26     #define CoMy_MAGIC 0x436f4d79
27    
28     typedef struct {
29 root 1.10 #if DESC_IS_PTR
30     char desc[30];
31     const char *old_desc;
32     #endif
33 root 1.1 int magic;
34 root 1.5 SV *corohandle_sv, *corohandle;
35 root 1.1 int bufofs, bufcnt;
36 root 1.7 #if HAVE_EV
37     ev_io rw, ww;
38     #endif
39 root 1.1 char buf[VIO_READ_BUFFER_SIZE];
40 root 1.12 size_t (*old_read)(Vio*, uchar *, size_t);
41     size_t (*old_write)(Vio*, const uchar *, size_t);
42     int (*old_vioclose)(Vio*);
43 root 1.1 } ourdata;
44    
45 root 1.10 #if DESC_IS_PTR
46     # define OURDATAPTR (*(ourdata **)&((vio)->desc))
47     #else
48     # define DESC_OFFSET 22
49     # define OURDATAPTR (*((ourdata **)((vio)->desc + DESC_OFFSET)))
50     #endif
51 root 1.1
52 root 1.8 static xlen
53     our_read (Vio *vio, xgptr p, xlen len)
54 root 1.1 {
55     ourdata *our = OURDATAPTR;
56    
57     if (!our->bufcnt)
58     {
59     int rd;
60     my_bool dummy;
61    
62     vio->vioblocking (vio, 0, &dummy);
63    
64     for (;;)
65     {
66     rd = recv (vio->sd, our->buf, sizeof (our->buf), 0);
67    
68     if (rd >= 0 || errno != EAGAIN)
69     break;
70    
71 root 1.7 #if HAVE_EV
72     if (use_ev)
73     {
74     our->rw.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT));
75     ev_io_start (EV_DEFAULT_UC, &(our->rw));
76     CORO_SCHEDULE;
77     ev_io_stop (EV_DEFAULT_UC, &(our->rw)); /* avoids races */
78     }
79     else
80     #endif
81     {
82     dSP;
83     PUSHMARK (SP);
84     XPUSHs (our->corohandle);
85     PUTBACK;
86     call_sv ((SV *)readable, G_VOID | G_DISCARD);
87     }
88 root 1.1 }
89    
90     if (rd <= 0)
91     return rd;
92    
93     our->bufcnt = rd;
94     our->bufofs = 0;
95     }
96    
97     if (our->bufcnt < len)
98     len = our->bufcnt;
99    
100     memcpy (p, our->buf + our->bufofs, len);
101     our->bufofs += len;
102     our->bufcnt -= len;
103    
104     return len;
105     }
106    
107 root 1.8 static xlen
108     our_write (Vio *vio, cxgptr p, xlen len)
109 root 1.1 {
110     char *ptr = (char *)p;
111     my_bool dummy;
112    
113     vio->vioblocking (vio, 0, &dummy);
114    
115     while (len > 0)
116     {
117     int wr = send (vio->sd, ptr, len, 0);
118    
119     if (wr > 0)
120     {
121     ptr += wr;
122     len -= wr;
123     }
124     else if (errno == EAGAIN)
125     {
126 root 1.7 ourdata *our = OURDATAPTR;
127    
128     #if HAVE_EV
129     if (use_ev)
130     {
131     our->ww.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT));
132     ev_io_start (EV_DEFAULT_UC, &(our->ww));
133     CORO_SCHEDULE;
134     ev_io_stop (EV_DEFAULT_UC, &(our->ww)); /* avoids races */
135     }
136     else
137     #endif
138     {
139     dSP;
140     PUSHMARK (SP);
141     XPUSHs (our->corohandle);
142     PUTBACK;
143     call_sv ((SV *)writable, G_VOID | G_DISCARD);
144     }
145 root 1.1 }
146     else if (ptr == (char *)p)
147     return -1;
148     else
149     break;
150     }
151    
152     return ptr - (char *)p;
153     }
154    
155 root 1.5 static int
156     our_close (Vio *vio)
157     {
158 root 1.7 ourdata *our = OURDATAPTR;
159    
160 root 1.5 if (vio->read != our_read)
161     croak ("vio.read has unexpected content during unpatch - wtf?");
162    
163     if (vio->write != our_write)
164     croak ("vio.write has unexpected content during unpatch - wtf?");
165    
166     if (vio->vioclose != our_close)
167     croak ("vio.vioclose has unexpected content during unpatch - wtf?");
168    
169 root 1.7 #if HAVE_EV
170     if (use_ev)
171     {
172     ev_io_stop (EV_DEFAULT_UC, &(our->rw));
173     ev_io_stop (EV_DEFAULT_UC, &(our->ww));
174     }
175     #endif
176    
177     SvREFCNT_dec (our->corohandle);
178     SvREFCNT_dec (our->corohandle_sv);
179 root 1.5
180 root 1.10 #if DESC_IS_PTR
181     vio->desc = our->old_desc;
182     #endif
183    
184 root 1.7 Safefree (our);
185 root 1.5
186 root 1.12 vio->vioclose = our->old_vioclose;
187     vio->write = our->old_write;
188     vio->read = our->old_read;
189 root 1.5
190     vio->vioclose (vio);
191     }
192    
193 root 1.7 #if HAVE_EV
194     static void
195     iocb (EV_P_ ev_io *w, int revents)
196     {
197     ev_io_stop (EV_A, w);
198     CORO_READY ((SV *)w->data);
199     }
200     #endif
201    
202 root 1.1 MODULE = Coro::Mysql PACKAGE = Coro::Mysql
203    
204     BOOT:
205     {
206     readable = get_cv ("Coro::Mysql::readable", 0);
207     writable = get_cv ("Coro::Mysql::writable", 0);
208     }
209    
210     PROTOTYPES: ENABLE
211    
212     void
213 root 1.7 _use_ev ()
214     PPCODE:
215     {
216     static int onceonly;
217    
218     if (!onceonly)
219     {
220     onceonly = 1;
221     #if HAVE_EV
222     I_EV_API ("Coro::Mysql");
223     I_CORO_API ("Coro::Mysql");
224     use_ev = 1;
225     #endif
226     }
227    
228     XPUSHs (use_ev ? &PL_sv_yes : &PL_sv_no);
229     }
230    
231     void
232 root 1.8 _patch (IV sock, int fd, unsigned long client_version, SV *corohandle_sv, SV *corohandle)
233 root 1.1 CODE:
234     {
235     MYSQL *my = (MYSQL *)sock;
236     Vio *vio = my->net.vio;
237     ourdata *our;
238    
239 root 1.8 /* matching versions are required but not sufficient */
240     if (client_version != mysql_get_client_version ())
241     croak ("DBD::mysql linked against different libmysqlclient library than Coro::Mysql (%lu vs. %lu).",
242     client_version, mysql_get_client_version ());
243    
244 root 1.1 if (fd != my->net.fd)
245     croak ("DBD::mysql fd and libmysql disagree - library mismatch, unsupported transport or API changes?");
246    
247     if (fd != vio->sd)
248     croak ("DBD::mysql fd and vio-sd disagree - library mismatch, unsupported transport or API changes?");
249 root 1.12 #if MYSQL_VERSION_ID < 100010
250 root 1.5 if (vio->vioclose != vio_close)
251 root 1.9 croak ("vio.vioclose has unexpected content - library mismatch, unsupported transport or API changes?");
252 root 1.5
253 root 1.1 if (vio->write != vio_write)
254     croak ("vio.write has unexpected content - library mismatch, unsupported transport or API changes?");
255    
256 root 1.5 if (vio->read != vio_read
257     && vio->read != vio_read_buff)
258 root 1.1 croak ("vio.read has unexpected content - library mismatch, unsupported transport or API changes?");
259 root 1.12 #endif
260 root 1.1
261     Newz (0, our, 1, ourdata);
262     our->magic = CoMy_MAGIC;
263 root 1.5 our->corohandle_sv = newSVsv (corohandle_sv);
264     our->corohandle = newSVsv (corohandle);
265 root 1.7 #if HAVE_EV
266     if (use_ev)
267     {
268     ev_io_init (&(our->rw), iocb, vio->sd, EV_READ);
269     ev_io_init (&(our->ww), iocb, vio->sd, EV_WRITE);
270     }
271     #endif
272 root 1.10 #if DESC_IS_PTR
273     our->old_desc = vio->desc;
274     strncpy (our->desc, vio->desc, sizeof (our->desc));
275     our->desc [sizeof (our->desc) - 1] = 0;
276     #else
277 root 1.1 vio->desc [DESC_OFFSET - 1] = 0;
278 root 1.10 #endif
279 root 1.1 OURDATAPTR = our;
280    
281 root 1.12 our->old_vioclose = vio->vioclose;
282     our->old_write = vio->write;
283     our->old_read = vio->read;
284    
285 root 1.5 vio->vioclose = our_close;
286     vio->write = our_write;
287     vio->read = our_read;
288 root 1.1 }
289