diff --git a/lib/resty/mysql.lua b/lib/resty/mysql.lua index 7ac5b7b..d3c81a7 100644 --- a/lib/resty/mysql.lua +++ b/lib/resty/mysql.lua @@ -10,6 +10,7 @@ local strbyte = string.byte local strchar = string.char local strfind = string.find local strrep = string.rep +local strsub = string.sub local null = ngx.null local band = bit.band local bxor = bit.bxor @@ -39,6 +40,7 @@ local COM_QUERY = 0x03 local SERVER_MORE_RESULTS_EXISTS = 8 +local FULL_PACKET_SIZE = 16777215 local mt = { __index = _M } @@ -76,8 +78,9 @@ end local function _get_byte8(data, i) local a, b, c, d, e, f, g, h = strbyte(data, i, i + 7) - return bor(a, lshift(b, 8), lshift(c, 16), lshift(d, 24), lshift(e, 32), - lshift(f, 40), lshift(g, 48), lshift(h, 56)), i + 8 + local low = bor(a, lshift(b, 8), lshift(c, 16), lshift(d, 24)), i + 4 + local high = bor(e, lshift(f, 8), lshift(g, 16), lshift(h, 24)), i + 4 + return low + high * 4294967296, i + 8 end @@ -172,6 +175,30 @@ function _send_packet(self, req, size) return sock:send(packet) end +function _recv_more_packet(self) + local sock = self.sock + + local data, err = sock:receive(4) -- packet header + if not data then + return nil, nil, "failed to receive packet header: " .. err + end + + local len, pos = _get_byte3(data, 1) + + if len == 0 then + return nil, nil, "empty packet" + end + + local num = strbyte(data, pos) + + data, err = sock:receive(len) + + if not data then + return nil, nil, "failed to read packet content: " .. err + end + + return len, data, num +end local function _recv_packet(self) local sock = self.sock @@ -225,6 +252,24 @@ local function _recv_packet(self) typ = "DATA" end + if len == FULL_PACKET_SIZE then + -- read more data + local datas = {} + insert(datas, data) + while true do + num = num + 1 + local len, data, packet_num = _recv_more_packet(self) + if packet_num ~= num then + return nil, "receive packet number ERR" + end + insert(datas, data) + if len < FULL_PACKET_SIZE then + break + end + end + return concat(datas), typ + end + return data, typ end @@ -690,8 +735,47 @@ local function send_query(self, query) return nil, "not initialized" end + local query_len = strlen(query) + local max_packet = FULL_PACKET_SIZE - 1 + self.packet_no = -1 + if query_len >= max_packet then + -- send big query + local bytes_all = 0 + local pos = 1 + local remain = query_len + -- first packet + local query_part = strsub(query, pos, pos + max_packet - 1) + pos = pos + max_packet + remain = remain - max_packet + local bytes, err = _send_packet(self, {strchar(COM_QUERY), query_part}, FULL_PACKET_SIZE) + if not bytes then + return nil, err + end + bytes_all = bytes_all + bytes + -- other full packet + while remain >= FULL_PACKET_SIZE do + query_part = strsub(query, pos, pos + FULL_PACKET_SIZE - 1) + pos = pos + FULL_PACKET_SIZE + remain = remain - FULL_PACKET_SIZE + local bytes, err = _send_packet(self, query_part, FULL_PACKET_SIZE) + if not bytes then + return nil, err + end + bytes_all = bytes_all + bytes + end + -- small packet + query_part = strsub(query, pos) + local bytes, err = _send_packet(self, query_part, remain) + if not bytes then + return nil, err + end + bytes_all = bytes_all + bytes + self.state = STATE_COMMAND_SENT + return bytes_all + end + local cmd_packet = {strchar(COM_QUERY), query} local packet_len = 1 + strlen(query) diff --git a/t/large.t b/t/large.t new file mode 100644 index 0000000..a46affe --- /dev/null +++ b/t/large.t @@ -0,0 +1,209 @@ +# vim:set ft= ts=4 sw=4 et: +# set mysqld: max-allowed-packet=200m for Test2 + +my @skip; +BEGIN { + if ($ENV{LD_PRELOAD} =~ /\bmockeagain\.so\b/) { + @skip = (skip_all => 'too slow in mockeagain mode') + } +} + +use Test::Nginx::Socket @skip; +use Cwd qw(cwd); + +#repeat_each(50); +#repeat_each(10); + +plan tests => repeat_each() * (3 * blocks()); + +my $pwd = cwd(); + +our $HttpConfig = qq{ + resolver \$TEST_NGINX_RESOLVER; + lua_package_path "$pwd/lib/?.lua;;"; +}; + +$ENV{TEST_NGINX_RESOLVER} = '8.8.8.8'; +$ENV{TEST_NGINX_MYSQL_PORT} ||= 3306; +$ENV{TEST_NGINX_MYSQL_HOST} ||= '127.0.0.1'; +$ENV{TEST_NGINX_MYSQL_PATH} ||= '/var/run/mysql/mysql.sock'; + +#log_level 'warn'; + +#no_long_string(); +#no_diff(); +no_shuffle(); + +run_tests(); + +__DATA__ + +=== TEST 1: large insert_id +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua ' + local mysql = require("resty.mysql") + local create_sql = [[ + CREATE TABLE `large_t` ( + `id` bigint(11) NOT NULL AUTO_INCREMENT, + PRIMARY KEY (`id`) + ) AUTO_INCREMENT=5000000000; + ]] + local drop_sql = [[ + DROP TABLE `large_t`; + ]] + local insert_sql = [[ + INSERT INTO `large_t` VALUES(NULL); + ]] + local db, err = mysql:new() + if not db then + ngx.say("failed to instantiate mysql: ", err) + return + end + db:set_timeout(1000) + local ok, err = db:connect{ + host = "$TEST_NGINX_MYSQL_HOST", + port = $TEST_NGINX_MYSQL_PORT, + database="test", + user="root", + password=""} + if not ok then + ngx.say("failed to connect: ", err, ": ", errno, " ", sqlstate) + return + end + local res, err = db:query(create_sql) + if not res then + ngx.say("create table error:" .. err) + return + end + local res, err = db:query(insert_sql) + if not res then + ngx.say("insert table error:" .. err) + return + else + ngx.say(res.insert_id) + end + local res, err = db:query(drop_sql) + if not res then + ngx.say("drop table error:" .. err) + return + end + '; + } +--- request +GET /t +--- response_body +5000000000 +--- no_error_log +[error] + +=== TEST 2: large query +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua ' + local mysql = require("resty.mysql") + local db, err = mysql:new() + if not db then + ngx.say("failed to instantiate mysql: ", err) + return + end + db:set_timeout(1000) + local ok, err = db:connect{ + host = "$TEST_NGINX_MYSQL_HOST", + port = $TEST_NGINX_MYSQL_PORT, + database="test", + user="root", + password=""} + if not ok then + ngx.say("failed to connect: ", err, ": ", errno, " ", sqlstate) + return + end + local fix_str = string.format("/* %s */",string.rep("aaaaaaaaaa",1677721 * 2 - 2)) + local query = fix_str .. "select 123 as ok" + local res, err = db:query(query) + if not res then + ngx.say("select string error:" .. err) + return + else + ngx.say(res[1].ok) + end + '; + } +--- request +GET /t +--- response_body +123 +--- timeout: 10 +--- no_error_log +[error] + +=== TEST 3: large row +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua ' + local mysql = require("resty.mysql") + local db, err = mysql:new() + if not db then + ngx.say("failed to instantiate mysql: ", err) + return + end + db:set_timeout(1000) + local ok, err = db:connect{ + host = "$TEST_NGINX_MYSQL_HOST", + port = $TEST_NGINX_MYSQL_PORT, + database="test", + user="root", + password="", + max_packet_size=16777215} + if not ok then + ngx.say("failed to connect: ", err, ": ", errno, " ", sqlstate) + return + end + local create_sql = [[ + CREATE TABLE `large_row_t`(id int, data1 longtext , data2 longtext, data3 longtext, data4 longtext, data5 longtext, data6 longtext, data7 longtext, data8 longtext, data9 longtext, data10 longtext, data11 longtext, data12 longtext,data13 longtext,data14 longtext,data15 longtext,data16 longtext,data17 longtext,data18 longtext) ENGINE=MYISAM; + ]] + local drop_sql = [[ + DROP TABLE `large_row_t` + ]] + local data = string.rep("a", 1024*1024 - 100) + local insert_sql = "INSERT INTO `large_row_t`(id, data1) VALUES(1, \'".. data .."\')" + local update_sql = "UPDATE `large_row_t` SET data2 = data1, data3=data1, data4=data1 , data5=data1, data6=data1, data7=data1, data8=data1, data9=data1, data10=data1, data11=data1, data12=data1, data13=data1, data14=data1, data15=data1, data16=data1, data17=data1, data18=data1" + local select_sql = "SELECT * FROM `large_row_t`" + local res, err = db:query(create_sql) + if not res then + ngx.say("create table error:" .. err) + return + end + local res, err = db:query(insert_sql) + if not res then + ngx.say("insert data error:" .. err) + return + end + local res, err = db:query(update_sql) + if not res then + ngx.say("update data error:" .. err) + return + end + local res, err = db:query(select_sql) + if not res then + ngx.say("select data error:" .. err) + return + else + ngx.say(#res) + end + local res, err = db:query(drop_sql) + if not res then + ngx.say("drop table error:" .. err) + return + end + '; + } +--- request +GET /t +--- response_body +1 +--- no_error_log +[error]