ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Mysql/Mysql.xs
Revision: 1.7
Committed: Thu Feb 17 02:05:15 2011 UTC (13 years, 3 months ago) by root
Branch: MAIN
CVS Tags: rel-1_1
Changes since 1.6: +93 -15 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include <sys/errno.h>
2     #include <unistd.h>
3     #include <fcntl.h>
4    
5     #include <mysql.h>
6    
7     #include "EXTERN.h"
8     #include "perl.h"
9     #include "XSUB.h"
10    
11 root 1.7 #if HAVE_EV
12     # include "EVAPI.h"
13     # include "CoroAPI.h"
14     #endif
15    
16 root 1.3 #define IN_DESTRUCT PL_dirty
17    
18 root 1.1 typedef U16 uint16;
19    
20     /* cached function gv's */
21     static CV *readable, *writable;
22 root 1.7 static int use_ev;
23 root 1.1
24     #include "violite.h"
25    
26     #define DESC_OFFSET 22
27    
28     #define CoMy_MAGIC 0x436f4d79
29    
30     typedef struct {
31     int magic;
32 root 1.5 SV *corohandle_sv, *corohandle;
33 root 1.1 int bufofs, bufcnt;
34 root 1.7 #if HAVE_EV
35     ev_io rw, ww;
36     #endif
37 root 1.1 char buf[VIO_READ_BUFFER_SIZE];
38     } ourdata;
39    
40     #define OURDATAPTR (*((ourdata **)((vio)->desc + DESC_OFFSET)))
41    
42     static int
43 root 1.6 our_read (Vio *vio, xgptr p, int len)
44 root 1.1 {
45     ourdata *our = OURDATAPTR;
46    
47     if (!our->bufcnt)
48     {
49     int rd;
50     my_bool dummy;
51    
52     vio->vioblocking (vio, 0, &dummy);
53    
54     for (;;)
55     {
56     rd = recv (vio->sd, our->buf, sizeof (our->buf), 0);
57    
58     if (rd >= 0 || errno != EAGAIN)
59     break;
60    
61 root 1.7 #if HAVE_EV
62     if (use_ev)
63     {
64     our->rw.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT));
65     ev_io_start (EV_DEFAULT_UC, &(our->rw));
66     CORO_SCHEDULE;
67     ev_io_stop (EV_DEFAULT_UC, &(our->rw)); /* avoids races */
68     }
69     else
70     #endif
71     {
72     dSP;
73     PUSHMARK (SP);
74     XPUSHs (our->corohandle);
75     PUTBACK;
76     call_sv ((SV *)readable, G_VOID | G_DISCARD);
77     }
78 root 1.1 }
79    
80     if (rd <= 0)
81     return rd;
82    
83     our->bufcnt = rd;
84     our->bufofs = 0;
85     }
86    
87     if (our->bufcnt < len)
88     len = our->bufcnt;
89    
90     memcpy (p, our->buf + our->bufofs, len);
91     our->bufofs += len;
92     our->bufcnt -= len;
93    
94     return len;
95     }
96    
97     static int
98 root 1.6 our_write (Vio *vio, const xgptr p, int len)
99 root 1.1 {
100     char *ptr = (char *)p;
101     my_bool dummy;
102    
103     vio->vioblocking (vio, 0, &dummy);
104    
105     while (len > 0)
106     {
107     int wr = send (vio->sd, ptr, len, 0);
108    
109     if (wr > 0)
110     {
111     ptr += wr;
112     len -= wr;
113     }
114     else if (errno == EAGAIN)
115     {
116 root 1.7 ourdata *our = OURDATAPTR;
117    
118     #if HAVE_EV
119     if (use_ev)
120     {
121     our->ww.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT));
122     ev_io_start (EV_DEFAULT_UC, &(our->ww));
123     CORO_SCHEDULE;
124     ev_io_stop (EV_DEFAULT_UC, &(our->ww)); /* avoids races */
125     }
126     else
127     #endif
128     {
129     dSP;
130     PUSHMARK (SP);
131     XPUSHs (our->corohandle);
132     PUTBACK;
133     call_sv ((SV *)writable, G_VOID | G_DISCARD);
134     }
135 root 1.1 }
136     else if (ptr == (char *)p)
137     return -1;
138     else
139     break;
140     }
141    
142     return ptr - (char *)p;
143     }
144    
145 root 1.5 static int
146     our_close (Vio *vio)
147     {
148 root 1.7 ourdata *our = OURDATAPTR;
149    
150 root 1.5 if (vio->read != our_read)
151     croak ("vio.read has unexpected content during unpatch - wtf?");
152    
153     if (vio->write != our_write)
154     croak ("vio.write has unexpected content during unpatch - wtf?");
155    
156     if (vio->vioclose != our_close)
157     croak ("vio.vioclose has unexpected content during unpatch - wtf?");
158    
159 root 1.7 #if HAVE_EV
160     if (use_ev)
161     {
162     ev_io_stop (EV_DEFAULT_UC, &(our->rw));
163     ev_io_stop (EV_DEFAULT_UC, &(our->ww));
164     }
165     #endif
166    
167     SvREFCNT_dec (our->corohandle);
168     SvREFCNT_dec (our->corohandle_sv);
169 root 1.5
170 root 1.7 Safefree (our);
171 root 1.5
172     vio->read = vio_read;
173     vio->write = vio_write;
174     vio->vioclose = vio_close;
175    
176     vio->vioclose (vio);
177     }
178    
179 root 1.7 #if HAVE_EV
180     static void
181     iocb (EV_P_ ev_io *w, int revents)
182     {
183     ev_io_stop (EV_A, w);
184     CORO_READY ((SV *)w->data);
185     }
186     #endif
187    
188 root 1.1 MODULE = Coro::Mysql PACKAGE = Coro::Mysql
189    
190     BOOT:
191     {
192     readable = get_cv ("Coro::Mysql::readable", 0);
193     writable = get_cv ("Coro::Mysql::writable", 0);
194     }
195    
196     PROTOTYPES: ENABLE
197    
198     void
199 root 1.7 _use_ev ()
200     PPCODE:
201     {
202     static int onceonly;
203    
204     if (!onceonly)
205     {
206     onceonly = 1;
207     #if HAVE_EV
208     I_EV_API ("Coro::Mysql");
209     I_CORO_API ("Coro::Mysql");
210     use_ev = 1;
211     #endif
212     }
213    
214     XPUSHs (use_ev ? &PL_sv_yes : &PL_sv_no);
215     }
216    
217     void
218 root 1.5 _patch (IV sock, int fd, SV *corohandle_sv, SV *corohandle)
219 root 1.1 CODE:
220     {
221     MYSQL *my = (MYSQL *)sock;
222     Vio *vio = my->net.vio;
223     ourdata *our;
224    
225     if (fd != my->net.fd)
226     croak ("DBD::mysql fd and libmysql disagree - library mismatch, unsupported transport or API changes?");
227    
228     if (fd != vio->sd)
229     croak ("DBD::mysql fd and vio-sd disagree - library mismatch, unsupported transport or API changes?");
230    
231 root 1.5 if (vio->vioclose != vio_close)
232     croak ("vio.write has unexpected content - library mismatch, unsupported transport or API changes?");
233    
234 root 1.1 if (vio->write != vio_write)
235     croak ("vio.write has unexpected content - library mismatch, unsupported transport or API changes?");
236    
237 root 1.5 if (vio->read != vio_read
238     && vio->read != vio_read_buff)
239 root 1.1 croak ("vio.read has unexpected content - library mismatch, unsupported transport or API changes?");
240    
241     Newz (0, our, 1, ourdata);
242     our->magic = CoMy_MAGIC;
243 root 1.5 our->corohandle_sv = newSVsv (corohandle_sv);
244     our->corohandle = newSVsv (corohandle);
245 root 1.7 #if HAVE_EV
246     if (use_ev)
247     {
248     ev_io_init (&(our->rw), iocb, vio->sd, EV_READ);
249     ev_io_init (&(our->ww), iocb, vio->sd, EV_WRITE);
250     }
251     #endif
252 root 1.1
253     vio->desc [DESC_OFFSET - 1] = 0;
254     OURDATAPTR = our;
255    
256 root 1.5 vio->vioclose = our_close;
257     vio->write = our_write;
258     vio->read = our_read;
259 root 1.1 }
260