ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Mysql/Mysql.xs
Revision: 1.9
Committed: Mon Sep 10 20:00:58 2012 UTC (11 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-1_21
Changes since 1.8: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include <sys/errno.h>
2     #include <unistd.h>
3     #include <fcntl.h>
4    
5     #include <mysql.h>
6    
7     #include "EXTERN.h"
8     #include "perl.h"
9     #include "XSUB.h"
10    
11 root 1.7 #if HAVE_EV
12     # include "EVAPI.h"
13     # include "CoroAPI.h"
14     #endif
15    
16 root 1.3 #define IN_DESTRUCT PL_dirty
17    
18 root 1.1 typedef U16 uint16;
19    
20     /* cached function gv's */
21     static CV *readable, *writable;
22 root 1.7 static int use_ev;
23 root 1.1
24     #include "violite.h"
25    
26     #define DESC_OFFSET 22
27    
28     #define CoMy_MAGIC 0x436f4d79
29    
30     typedef struct {
31     int magic;
32 root 1.5 SV *corohandle_sv, *corohandle;
33 root 1.1 int bufofs, bufcnt;
34 root 1.7 #if HAVE_EV
35     ev_io rw, ww;
36     #endif
37 root 1.1 char buf[VIO_READ_BUFFER_SIZE];
38     } ourdata;
39    
40     #define OURDATAPTR (*((ourdata **)((vio)->desc + DESC_OFFSET)))
41    
42 root 1.8 static xlen
43     our_read (Vio *vio, xgptr p, xlen len)
44 root 1.1 {
45     ourdata *our = OURDATAPTR;
46    
47     if (!our->bufcnt)
48     {
49     int rd;
50     my_bool dummy;
51    
52     vio->vioblocking (vio, 0, &dummy);
53    
54     for (;;)
55     {
56     rd = recv (vio->sd, our->buf, sizeof (our->buf), 0);
57    
58     if (rd >= 0 || errno != EAGAIN)
59     break;
60    
61 root 1.7 #if HAVE_EV
62     if (use_ev)
63     {
64     our->rw.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT));
65     ev_io_start (EV_DEFAULT_UC, &(our->rw));
66     CORO_SCHEDULE;
67     ev_io_stop (EV_DEFAULT_UC, &(our->rw)); /* avoids races */
68     }
69     else
70     #endif
71     {
72     dSP;
73     PUSHMARK (SP);
74     XPUSHs (our->corohandle);
75     PUTBACK;
76     call_sv ((SV *)readable, G_VOID | G_DISCARD);
77     }
78 root 1.1 }
79    
80     if (rd <= 0)
81     return rd;
82    
83     our->bufcnt = rd;
84     our->bufofs = 0;
85     }
86    
87     if (our->bufcnt < len)
88     len = our->bufcnt;
89    
90     memcpy (p, our->buf + our->bufofs, len);
91     our->bufofs += len;
92     our->bufcnt -= len;
93    
94     return len;
95     }
96    
97 root 1.8 static xlen
98     our_write (Vio *vio, cxgptr p, xlen len)
99 root 1.1 {
100     char *ptr = (char *)p;
101     my_bool dummy;
102    
103     vio->vioblocking (vio, 0, &dummy);
104    
105     while (len > 0)
106     {
107     int wr = send (vio->sd, ptr, len, 0);
108    
109     if (wr > 0)
110     {
111     ptr += wr;
112     len -= wr;
113     }
114     else if (errno == EAGAIN)
115     {
116 root 1.7 ourdata *our = OURDATAPTR;
117    
118     #if HAVE_EV
119     if (use_ev)
120     {
121     our->ww.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT));
122     ev_io_start (EV_DEFAULT_UC, &(our->ww));
123     CORO_SCHEDULE;
124     ev_io_stop (EV_DEFAULT_UC, &(our->ww)); /* avoids races */
125     }
126     else
127     #endif
128     {
129     dSP;
130     PUSHMARK (SP);
131     XPUSHs (our->corohandle);
132     PUTBACK;
133     call_sv ((SV *)writable, G_VOID | G_DISCARD);
134     }
135 root 1.1 }
136     else if (ptr == (char *)p)
137     return -1;
138     else
139     break;
140     }
141    
142     return ptr - (char *)p;
143     }
144    
145 root 1.5 static int
146     our_close (Vio *vio)
147     {
148 root 1.7 ourdata *our = OURDATAPTR;
149    
150 root 1.5 if (vio->read != our_read)
151     croak ("vio.read has unexpected content during unpatch - wtf?");
152    
153     if (vio->write != our_write)
154     croak ("vio.write has unexpected content during unpatch - wtf?");
155    
156     if (vio->vioclose != our_close)
157     croak ("vio.vioclose has unexpected content during unpatch - wtf?");
158    
159 root 1.7 #if HAVE_EV
160     if (use_ev)
161     {
162     ev_io_stop (EV_DEFAULT_UC, &(our->rw));
163     ev_io_stop (EV_DEFAULT_UC, &(our->ww));
164     }
165     #endif
166    
167     SvREFCNT_dec (our->corohandle);
168     SvREFCNT_dec (our->corohandle_sv);
169 root 1.5
170 root 1.7 Safefree (our);
171 root 1.5
172     vio->read = vio_read;
173     vio->write = vio_write;
174     vio->vioclose = vio_close;
175    
176     vio->vioclose (vio);
177     }
178    
179 root 1.7 #if HAVE_EV
180     static void
181     iocb (EV_P_ ev_io *w, int revents)
182     {
183     ev_io_stop (EV_A, w);
184     CORO_READY ((SV *)w->data);
185     }
186     #endif
187    
188 root 1.1 MODULE = Coro::Mysql PACKAGE = Coro::Mysql
189    
190     BOOT:
191     {
192     readable = get_cv ("Coro::Mysql::readable", 0);
193     writable = get_cv ("Coro::Mysql::writable", 0);
194     }
195    
196     PROTOTYPES: ENABLE
197    
198     void
199 root 1.7 _use_ev ()
200     PPCODE:
201     {
202     static int onceonly;
203    
204     if (!onceonly)
205     {
206     onceonly = 1;
207     #if HAVE_EV
208     I_EV_API ("Coro::Mysql");
209     I_CORO_API ("Coro::Mysql");
210     use_ev = 1;
211     #endif
212     }
213    
214     XPUSHs (use_ev ? &PL_sv_yes : &PL_sv_no);
215     }
216    
217     void
218 root 1.8 _patch (IV sock, int fd, unsigned long client_version, SV *corohandle_sv, SV *corohandle)
219 root 1.1 CODE:
220     {
221     MYSQL *my = (MYSQL *)sock;
222     Vio *vio = my->net.vio;
223     ourdata *our;
224    
225 root 1.8 /* matching versions are required but not sufficient */
226     if (client_version != mysql_get_client_version ())
227     croak ("DBD::mysql linked against different libmysqlclient library than Coro::Mysql (%lu vs. %lu).",
228     client_version, mysql_get_client_version ());
229    
230 root 1.1 if (fd != my->net.fd)
231     croak ("DBD::mysql fd and libmysql disagree - library mismatch, unsupported transport or API changes?");
232    
233     if (fd != vio->sd)
234     croak ("DBD::mysql fd and vio-sd disagree - library mismatch, unsupported transport or API changes?");
235    
236 root 1.5 if (vio->vioclose != vio_close)
237 root 1.9 croak ("vio.vioclose has unexpected content - library mismatch, unsupported transport or API changes?");
238 root 1.5
239 root 1.1 if (vio->write != vio_write)
240     croak ("vio.write has unexpected content - library mismatch, unsupported transport or API changes?");
241    
242 root 1.5 if (vio->read != vio_read
243     && vio->read != vio_read_buff)
244 root 1.1 croak ("vio.read has unexpected content - library mismatch, unsupported transport or API changes?");
245    
246     Newz (0, our, 1, ourdata);
247     our->magic = CoMy_MAGIC;
248 root 1.5 our->corohandle_sv = newSVsv (corohandle_sv);
249     our->corohandle = newSVsv (corohandle);
250 root 1.7 #if HAVE_EV
251     if (use_ev)
252     {
253     ev_io_init (&(our->rw), iocb, vio->sd, EV_READ);
254     ev_io_init (&(our->ww), iocb, vio->sd, EV_WRITE);
255     }
256     #endif
257 root 1.1
258     vio->desc [DESC_OFFSET - 1] = 0;
259     OURDATAPTR = our;
260    
261 root 1.5 vio->vioclose = our_close;
262     vio->write = our_write;
263     vio->read = our_read;
264 root 1.1 }
265