ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Mysql/Mysql.xs
Revision: 1.7
Committed: Thu Feb 17 02:05:15 2011 UTC (13 years, 3 months ago) by root
Branch: MAIN
CVS Tags: rel-1_1
Changes since 1.6: +93 -15 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #include <sys/errno.h>
2 #include <unistd.h>
3 #include <fcntl.h>
4
5 #include <mysql.h>
6
7 #include "EXTERN.h"
8 #include "perl.h"
9 #include "XSUB.h"
10
11 #if HAVE_EV
12 # include "EVAPI.h"
13 # include "CoroAPI.h"
14 #endif
15
16 #define IN_DESTRUCT PL_dirty
17
18 typedef U16 uint16;
19
20 /* cached function gv's */
21 static CV *readable, *writable;
22 static int use_ev;
23
24 #include "violite.h"
25
26 #define DESC_OFFSET 22
27
28 #define CoMy_MAGIC 0x436f4d79
29
30 typedef struct {
31 int magic;
32 SV *corohandle_sv, *corohandle;
33 int bufofs, bufcnt;
34 #if HAVE_EV
35 ev_io rw, ww;
36 #endif
37 char buf[VIO_READ_BUFFER_SIZE];
38 } ourdata;
39
40 #define OURDATAPTR (*((ourdata **)((vio)->desc + DESC_OFFSET)))
41
42 static int
43 our_read (Vio *vio, xgptr p, int len)
44 {
45 ourdata *our = OURDATAPTR;
46
47 if (!our->bufcnt)
48 {
49 int rd;
50 my_bool dummy;
51
52 vio->vioblocking (vio, 0, &dummy);
53
54 for (;;)
55 {
56 rd = recv (vio->sd, our->buf, sizeof (our->buf), 0);
57
58 if (rd >= 0 || errno != EAGAIN)
59 break;
60
61 #if HAVE_EV
62 if (use_ev)
63 {
64 our->rw.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT));
65 ev_io_start (EV_DEFAULT_UC, &(our->rw));
66 CORO_SCHEDULE;
67 ev_io_stop (EV_DEFAULT_UC, &(our->rw)); /* avoids races */
68 }
69 else
70 #endif
71 {
72 dSP;
73 PUSHMARK (SP);
74 XPUSHs (our->corohandle);
75 PUTBACK;
76 call_sv ((SV *)readable, G_VOID | G_DISCARD);
77 }
78 }
79
80 if (rd <= 0)
81 return rd;
82
83 our->bufcnt = rd;
84 our->bufofs = 0;
85 }
86
87 if (our->bufcnt < len)
88 len = our->bufcnt;
89
90 memcpy (p, our->buf + our->bufofs, len);
91 our->bufofs += len;
92 our->bufcnt -= len;
93
94 return len;
95 }
96
97 static int
98 our_write (Vio *vio, const xgptr p, int len)
99 {
100 char *ptr = (char *)p;
101 my_bool dummy;
102
103 vio->vioblocking (vio, 0, &dummy);
104
105 while (len > 0)
106 {
107 int wr = send (vio->sd, ptr, len, 0);
108
109 if (wr > 0)
110 {
111 ptr += wr;
112 len -= wr;
113 }
114 else if (errno == EAGAIN)
115 {
116 ourdata *our = OURDATAPTR;
117
118 #if HAVE_EV
119 if (use_ev)
120 {
121 our->ww.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT));
122 ev_io_start (EV_DEFAULT_UC, &(our->ww));
123 CORO_SCHEDULE;
124 ev_io_stop (EV_DEFAULT_UC, &(our->ww)); /* avoids races */
125 }
126 else
127 #endif
128 {
129 dSP;
130 PUSHMARK (SP);
131 XPUSHs (our->corohandle);
132 PUTBACK;
133 call_sv ((SV *)writable, G_VOID | G_DISCARD);
134 }
135 }
136 else if (ptr == (char *)p)
137 return -1;
138 else
139 break;
140 }
141
142 return ptr - (char *)p;
143 }
144
145 static int
146 our_close (Vio *vio)
147 {
148 ourdata *our = OURDATAPTR;
149
150 if (vio->read != our_read)
151 croak ("vio.read has unexpected content during unpatch - wtf?");
152
153 if (vio->write != our_write)
154 croak ("vio.write has unexpected content during unpatch - wtf?");
155
156 if (vio->vioclose != our_close)
157 croak ("vio.vioclose has unexpected content during unpatch - wtf?");
158
159 #if HAVE_EV
160 if (use_ev)
161 {
162 ev_io_stop (EV_DEFAULT_UC, &(our->rw));
163 ev_io_stop (EV_DEFAULT_UC, &(our->ww));
164 }
165 #endif
166
167 SvREFCNT_dec (our->corohandle);
168 SvREFCNT_dec (our->corohandle_sv);
169
170 Safefree (our);
171
172 vio->read = vio_read;
173 vio->write = vio_write;
174 vio->vioclose = vio_close;
175
176 vio->vioclose (vio);
177 }
178
179 #if HAVE_EV
180 static void
181 iocb (EV_P_ ev_io *w, int revents)
182 {
183 ev_io_stop (EV_A, w);
184 CORO_READY ((SV *)w->data);
185 }
186 #endif
187
188 MODULE = Coro::Mysql PACKAGE = Coro::Mysql
189
190 BOOT:
191 {
192 readable = get_cv ("Coro::Mysql::readable", 0);
193 writable = get_cv ("Coro::Mysql::writable", 0);
194 }
195
196 PROTOTYPES: ENABLE
197
198 void
199 _use_ev ()
200 PPCODE:
201 {
202 static int onceonly;
203
204 if (!onceonly)
205 {
206 onceonly = 1;
207 #if HAVE_EV
208 I_EV_API ("Coro::Mysql");
209 I_CORO_API ("Coro::Mysql");
210 use_ev = 1;
211 #endif
212 }
213
214 XPUSHs (use_ev ? &PL_sv_yes : &PL_sv_no);
215 }
216
217 void
218 _patch (IV sock, int fd, SV *corohandle_sv, SV *corohandle)
219 CODE:
220 {
221 MYSQL *my = (MYSQL *)sock;
222 Vio *vio = my->net.vio;
223 ourdata *our;
224
225 if (fd != my->net.fd)
226 croak ("DBD::mysql fd and libmysql disagree - library mismatch, unsupported transport or API changes?");
227
228 if (fd != vio->sd)
229 croak ("DBD::mysql fd and vio-sd disagree - library mismatch, unsupported transport or API changes?");
230
231 if (vio->vioclose != vio_close)
232 croak ("vio.write has unexpected content - library mismatch, unsupported transport or API changes?");
233
234 if (vio->write != vio_write)
235 croak ("vio.write has unexpected content - library mismatch, unsupported transport or API changes?");
236
237 if (vio->read != vio_read
238 && vio->read != vio_read_buff)
239 croak ("vio.read has unexpected content - library mismatch, unsupported transport or API changes?");
240
241 Newz (0, our, 1, ourdata);
242 our->magic = CoMy_MAGIC;
243 our->corohandle_sv = newSVsv (corohandle_sv);
244 our->corohandle = newSVsv (corohandle);
245 #if HAVE_EV
246 if (use_ev)
247 {
248 ev_io_init (&(our->rw), iocb, vio->sd, EV_READ);
249 ev_io_init (&(our->ww), iocb, vio->sd, EV_WRITE);
250 }
251 #endif
252
253 vio->desc [DESC_OFFSET - 1] = 0;
254 OURDATAPTR = our;
255
256 vio->vioclose = our_close;
257 vio->write = our_write;
258 vio->read = our_read;
259 }
260