ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Mysql/Mysql.xs
Revision: 1.11
Committed: Sat Aug 3 02:11:27 2013 UTC (10 years, 10 months ago) by root
Branch: MAIN
Changes since 1.10: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.11 #include <errno.h>
2 root 1.1 #include <unistd.h>
3     #include <fcntl.h>
4    
5     #include <mysql.h>
6    
7     #include "EXTERN.h"
8     #include "perl.h"
9     #include "XSUB.h"
10    
11 root 1.7 #if HAVE_EV
12     # include "EVAPI.h"
13     # include "CoroAPI.h"
14     #endif
15    
16 root 1.3 #define IN_DESTRUCT PL_dirty
17    
18 root 1.1 typedef U16 uint16;
19    
20     /* cached function gv's */
21     static CV *readable, *writable;
22 root 1.7 static int use_ev;
23 root 1.1
24     #include "violite.h"
25    
26     #define CoMy_MAGIC 0x436f4d79
27    
28     typedef struct {
29 root 1.10 #if DESC_IS_PTR
30     char desc[30];
31     const char *old_desc;
32     #endif
33 root 1.1 int magic;
34 root 1.5 SV *corohandle_sv, *corohandle;
35 root 1.1 int bufofs, bufcnt;
36 root 1.7 #if HAVE_EV
37     ev_io rw, ww;
38     #endif
39 root 1.1 char buf[VIO_READ_BUFFER_SIZE];
40     } ourdata;
41    
42 root 1.10 #if DESC_IS_PTR
43     # define OURDATAPTR (*(ourdata **)&((vio)->desc))
44     #else
45     # define DESC_OFFSET 22
46     # define OURDATAPTR (*((ourdata **)((vio)->desc + DESC_OFFSET)))
47     #endif
48 root 1.1
49 root 1.8 static xlen
50     our_read (Vio *vio, xgptr p, xlen len)
51 root 1.1 {
52     ourdata *our = OURDATAPTR;
53    
54     if (!our->bufcnt)
55     {
56     int rd;
57     my_bool dummy;
58    
59     vio->vioblocking (vio, 0, &dummy);
60    
61     for (;;)
62     {
63     rd = recv (vio->sd, our->buf, sizeof (our->buf), 0);
64    
65     if (rd >= 0 || errno != EAGAIN)
66     break;
67    
68 root 1.7 #if HAVE_EV
69     if (use_ev)
70     {
71     our->rw.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT));
72     ev_io_start (EV_DEFAULT_UC, &(our->rw));
73     CORO_SCHEDULE;
74     ev_io_stop (EV_DEFAULT_UC, &(our->rw)); /* avoids races */
75     }
76     else
77     #endif
78     {
79     dSP;
80     PUSHMARK (SP);
81     XPUSHs (our->corohandle);
82     PUTBACK;
83     call_sv ((SV *)readable, G_VOID | G_DISCARD);
84     }
85 root 1.1 }
86    
87     if (rd <= 0)
88     return rd;
89    
90     our->bufcnt = rd;
91     our->bufofs = 0;
92     }
93    
94     if (our->bufcnt < len)
95     len = our->bufcnt;
96    
97     memcpy (p, our->buf + our->bufofs, len);
98     our->bufofs += len;
99     our->bufcnt -= len;
100    
101     return len;
102     }
103    
104 root 1.8 static xlen
105     our_write (Vio *vio, cxgptr p, xlen len)
106 root 1.1 {
107     char *ptr = (char *)p;
108     my_bool dummy;
109    
110     vio->vioblocking (vio, 0, &dummy);
111    
112     while (len > 0)
113     {
114     int wr = send (vio->sd, ptr, len, 0);
115    
116     if (wr > 0)
117     {
118     ptr += wr;
119     len -= wr;
120     }
121     else if (errno == EAGAIN)
122     {
123 root 1.7 ourdata *our = OURDATAPTR;
124    
125     #if HAVE_EV
126     if (use_ev)
127     {
128     our->ww.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT));
129     ev_io_start (EV_DEFAULT_UC, &(our->ww));
130     CORO_SCHEDULE;
131     ev_io_stop (EV_DEFAULT_UC, &(our->ww)); /* avoids races */
132     }
133     else
134     #endif
135     {
136     dSP;
137     PUSHMARK (SP);
138     XPUSHs (our->corohandle);
139     PUTBACK;
140     call_sv ((SV *)writable, G_VOID | G_DISCARD);
141     }
142 root 1.1 }
143     else if (ptr == (char *)p)
144     return -1;
145     else
146     break;
147     }
148    
149     return ptr - (char *)p;
150     }
151    
152 root 1.5 static int
153     our_close (Vio *vio)
154     {
155 root 1.7 ourdata *our = OURDATAPTR;
156    
157 root 1.5 if (vio->read != our_read)
158     croak ("vio.read has unexpected content during unpatch - wtf?");
159    
160     if (vio->write != our_write)
161     croak ("vio.write has unexpected content during unpatch - wtf?");
162    
163     if (vio->vioclose != our_close)
164     croak ("vio.vioclose has unexpected content during unpatch - wtf?");
165    
166 root 1.7 #if HAVE_EV
167     if (use_ev)
168     {
169     ev_io_stop (EV_DEFAULT_UC, &(our->rw));
170     ev_io_stop (EV_DEFAULT_UC, &(our->ww));
171     }
172     #endif
173    
174     SvREFCNT_dec (our->corohandle);
175     SvREFCNT_dec (our->corohandle_sv);
176 root 1.5
177 root 1.10 #if DESC_IS_PTR
178     vio->desc = our->old_desc;
179     #endif
180    
181 root 1.7 Safefree (our);
182 root 1.5
183     vio->read = vio_read;
184     vio->write = vio_write;
185     vio->vioclose = vio_close;
186    
187     vio->vioclose (vio);
188     }
189    
190 root 1.7 #if HAVE_EV
191     static void
192     iocb (EV_P_ ev_io *w, int revents)
193     {
194     ev_io_stop (EV_A, w);
195     CORO_READY ((SV *)w->data);
196     }
197     #endif
198    
199 root 1.1 MODULE = Coro::Mysql PACKAGE = Coro::Mysql
200    
201     BOOT:
202     {
203     readable = get_cv ("Coro::Mysql::readable", 0);
204     writable = get_cv ("Coro::Mysql::writable", 0);
205     }
206    
207     PROTOTYPES: ENABLE
208    
209     void
210 root 1.7 _use_ev ()
211     PPCODE:
212     {
213     static int onceonly;
214    
215     if (!onceonly)
216     {
217     onceonly = 1;
218     #if HAVE_EV
219     I_EV_API ("Coro::Mysql");
220     I_CORO_API ("Coro::Mysql");
221     use_ev = 1;
222     #endif
223     }
224    
225     XPUSHs (use_ev ? &PL_sv_yes : &PL_sv_no);
226     }
227    
228     void
229 root 1.8 _patch (IV sock, int fd, unsigned long client_version, SV *corohandle_sv, SV *corohandle)
230 root 1.1 CODE:
231     {
232     MYSQL *my = (MYSQL *)sock;
233     Vio *vio = my->net.vio;
234     ourdata *our;
235    
236 root 1.8 /* matching versions are required but not sufficient */
237     if (client_version != mysql_get_client_version ())
238     croak ("DBD::mysql linked against different libmysqlclient library than Coro::Mysql (%lu vs. %lu).",
239     client_version, mysql_get_client_version ());
240    
241 root 1.1 if (fd != my->net.fd)
242     croak ("DBD::mysql fd and libmysql disagree - library mismatch, unsupported transport or API changes?");
243    
244     if (fd != vio->sd)
245     croak ("DBD::mysql fd and vio-sd disagree - library mismatch, unsupported transport or API changes?");
246    
247 root 1.5 if (vio->vioclose != vio_close)
248 root 1.9 croak ("vio.vioclose has unexpected content - library mismatch, unsupported transport or API changes?");
249 root 1.5
250 root 1.1 if (vio->write != vio_write)
251     croak ("vio.write has unexpected content - library mismatch, unsupported transport or API changes?");
252    
253 root 1.5 if (vio->read != vio_read
254     && vio->read != vio_read_buff)
255 root 1.1 croak ("vio.read has unexpected content - library mismatch, unsupported transport or API changes?");
256    
257     Newz (0, our, 1, ourdata);
258     our->magic = CoMy_MAGIC;
259 root 1.5 our->corohandle_sv = newSVsv (corohandle_sv);
260     our->corohandle = newSVsv (corohandle);
261 root 1.7 #if HAVE_EV
262     if (use_ev)
263     {
264     ev_io_init (&(our->rw), iocb, vio->sd, EV_READ);
265     ev_io_init (&(our->ww), iocb, vio->sd, EV_WRITE);
266     }
267     #endif
268 root 1.10 #if DESC_IS_PTR
269     our->old_desc = vio->desc;
270     strncpy (our->desc, vio->desc, sizeof (our->desc));
271     our->desc [sizeof (our->desc) - 1] = 0;
272     #else
273 root 1.1 vio->desc [DESC_OFFSET - 1] = 0;
274 root 1.10 #endif
275 root 1.1 OURDATAPTR = our;
276    
277 root 1.5 vio->vioclose = our_close;
278     vio->write = our_write;
279     vio->read = our_read;
280 root 1.1 }
281