Last active
November 23, 2023 04:53
-
-
Save AMD-NICK/77c2252c9800acd83a19a277adca1b75 to your computer and use it in GitHub Desktop.
[LUA] redis query tests (нигде не публиковалось, просто публичная заметка)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Используется только luasocket | |
-- Проверял, можно ли выполнить несколько операций записи и только затем выполнить операции чтения | |
local auth_buff = { | |
"*2\r\n", | |
"$4\r\nAUTH\r\n", | |
"$14\r\nqwertyuiopasdf\r\n", | |
} | |
local host, port = os.getenv("REDIS_HOST"):match("([^:]+):?(%d*)") | |
local socket = require("socket") | |
local skt = socket.tcp() | |
print("settimeout", skt:settimeout(3)) | |
print("connect", skt:connect(host, tonumber(port))) | |
-- print("setoption", skt:setoption('tcp-nodelay', true)) -- так и не заметил влияния ни на что | |
print("auth send", skt:send(table.concat(auth_buff))) | |
print("auth send", skt:send(table.concat(auth_buff))) | |
print("auth recv", skt:receive("*l")) | |
print("auth recv", skt:receive("*l")) | |
-- print("auth recv", skt:receive("*l")) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Проверял, с какой скоростью на чистом luasocket получится выполнить 100k операций записи | |
-- Но не получилось, потому что sock:write не блокирует код и без таймера в конце просто не все команды успевали выполниться | |
local REDIS = require("misc.redis_query") | |
function REDIS.getcon() | |
local host, port = os.getenv("REDIS_HOST"):match("([^:]+):?(%d*)") | |
return REDIS.create({ | |
host = host, | |
port = tonumber(port), | |
password = os.getenv("REDIS_PASS"), | |
-- copas_wrap = true, | |
timeout = 5, -- connect timeout | |
-- tcp_nodelay = false, | |
max_retries_per_request = 10, | |
retry_delay = 5, | |
}) | |
end | |
local conn = REDIS.getcon() | |
local client = conn.client | |
local fp = require("fn").fp -- https://github.com/FPtje/GModFunctional/blob/de2047455b898c821a912eef31c8307878c4f089/fn.lua#L18 | |
local send_command = fp{client.requests.multibulk, client} | |
local read_respons = fp{client.response.read, client} -- exposed https://github.com/nrk/redis-lua/blob/880dda904909adfed0ad79cdd80317c6dd1a005a/src/redis.lua#L314 | |
send_command("AUTH", "qwertyuiopasdf") | |
print("auth ok?", read_respons()) | |
send_command("GET", "fuck3") | |
print("initial_value", read_respons()) | |
local now = require("socket").gettime | |
local time_start = now() | |
for i = 1, 100000 do | |
send_command("INCR", "fuck3") | |
end | |
print("time", now() - time_start) | |
-- print("read", read_respons()) | |
require("socket").sleep(3) -- без этого не все успевает отправиться |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Redis обертка, позволяющая выполнять запросы асинхронно. | |
local socket = require("socket") | |
local redis = require("redis") -- https://github.com/nrk/redis-lua/blob/version-2.0/src/redis.lua | |
local redq = {} | |
local MT = {} | |
function MT.__index(wrapped_client, method_name) | |
local redis_client = wrapped_client.redis_client | |
local has_method = redis.commands[method_name] | |
if has_method then | |
return function(_, ...) | |
return redq.run_method_safe(wrapped_client, method_name, ...) | |
end | |
else | |
print("Attempting to access a field instead of the method", method_name) | |
return redis_client[method_name] | |
end | |
end | |
function redq.run_method_safe(wrapped_client, method_name, ...) | |
local redis_client = wrapped_client.redis_client | |
local method_func = redis_client[method_name] | |
-- wrapped_client.opts.socket:settimeout(1) | |
-- local args = {...} | |
-- print("args", wrapped_client.opts.socket, args[1], args[2], args[3], args[4]) | |
local ok, res = pcall(method_func, redis_client, ...) | |
if ok then | |
wrapped_client._retry_attemps = 0 | |
return res | |
end | |
local opts = wrapped_client.opts | |
-- print("COMMAND ERROR _retry_attemps, err", wrapped_client._retry_attemps, res) | |
local want_reconnect = opts.reconnect_condition == nil | |
or opts.reconnect_condition(res) | |
if not want_reconnect then error(res) end | |
local max_retries = opts.max_retries_per_request or 3 | |
local retry_attempt = wrapped_client._retry_attemps | |
if retry_attempt >= max_retries then | |
wrapped_client._retry_attemps = 0 | |
error(res) | |
end | |
wrapped_client._retry_attemps = retry_attempt + 1 | |
local retry_delay = opts.retry_delay | |
if not retry_delay then | |
retry_delay = 1 | |
elseif type(retry_delay) == "function" then | |
retry_delay = retry_delay(retry_attempt) | |
end | |
retry_delay = retry_delay or 3 | |
if opts.copas_wrap then | |
require("copas").pause(retry_delay) -- only thread (coro) | |
else | |
socket.sleep(retry_delay) -- whole process | |
end | |
local reconnect_ok, err = pcall(redq.reconnect, redis_client, opts) | |
print("Redis Wrapper: " .. (reconnect_ok and "reconnected" or "reconnect failed: " .. err)) | |
return redq.run_method_safe(wrapped_client, method_name, ...) | |
end | |
function redq.wrap(redis_client, opts) | |
local wrapped_client = setmetatable({ | |
redis_client = redis_client, | |
opts = opts, | |
_retry_attemps = 0, | |
}, MT) | |
return wrapped_client | |
end | |
function redq.copas_socket(host, port, timeout, tcp_nodelay) | |
local sock = socket.tcp() | |
sock = require("copas").wrap(sock) | |
sock:connect(host, port) -- #todo check for errors? | |
sock:setoption("tcp-nodelay", tcp_nodelay) | |
sock:settimeouts(timeout, nil, nil) -- conn, send, recv | |
return sock | |
end | |
function redq.reconnect(redis_client, opts) | |
local sock = redis_client.network.socket | |
sock:close() | |
if opts.copas_wrap then | |
sock = redq.copas_socket(opts.host, opts.port, opts.timeout, opts.tcp_nodelay) | |
redis_client.network.socket = sock | |
else | |
if opts.socket then error("we can't recreate custom socket") end | |
local new_redis_client = redis.connect(opts) | |
redis_client.network.socket = new_redis_client.network.socket | |
end | |
if opts.password then redis_client:auth(opts.password) end | |
if opts.db then redis_client:select(opts.db) end | |
end | |
function redq.connect(opts) | |
if opts.copas_wrap then | |
opts.socket = redq.copas_socket(opts.host, opts.port, opts.timeout, opts.tcp_nodelay) | |
end | |
local redis_client = redis.connect(opts) -- has .network.socket field | |
if opts.password then redis_client:auth(opts.password) end | |
if opts.db then redis_client:select(opts.db) end | |
opts.socket = nil -- we don't need it anymore | |
return redis_client | |
end | |
function redq.create(...) | |
local opts = { | |
host = "127.0.0.1", | |
port = 6379, | |
-- sock = "redis.sock", | |
tcp_nodelay = true, | |
copas_wrap = false, | |
-- socket = socket.tcp(), | |
-- timeout = 10, -- connect timeout | |
-- password = "pass", | |
-- db = 0, | |
-- reconnect_condition = function(err) return err:match("timeout") end, | |
-- max_retries_per_request = 3, | |
-- retry_delay = function(times) return times * 1 end, -- in seconds | |
} | |
local args = {...} | |
if type(args[1]) == "string" then | |
opts.sock = args[1] | |
elseif type(args[1]) == "number" then | |
opts.port, opts.host = args[1], args[2] | |
elseif type(args[1]) == "table" then | |
for k, v in pairs( args[1] ) do -- merge with overrides | |
opts[k] = v | |
end | |
end | |
local redis_client = redq.connect(opts) | |
return redq.wrap(redis_client, opts) | |
end | |
return redq |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Читаем с сокета ответ в отдельном потоке, независимо от операции записи | |
local host, port = os.getenv("REDIS_HOST"):match("([^:]+):?(%d*)") | |
local copas = require("copas") | |
copas.loop(function() | |
local REDIS = require("misc.redis_query") | |
local conn = REDIS.create({ | |
host = host, | |
port = tonumber(port), | |
password = os.getenv("REDIS_PASS"), | |
copas_wrap = true, | |
timeout = 5, | |
-- tcp_nodelay = false, | |
}) | |
local client = conn.client | |
local send_command = function(...) return client.requests.multibulk(client, ...) end | |
local read_respons = function(...) return client.response.read(client) end | |
-- send_command("AUTH", "qwertyuiopasdf") | |
-- print("auth ok?", read_respons()) | |
local INCR_KEY = "test15" | |
send_command("GET", INCR_KEY) | |
print("initial_value", read_respons()) | |
local callbacks = {} | |
local need_incr = 100000 | |
local responses_received = 0 | |
local now = require("socket").gettime | |
local time_start = now() | |
for i = 1, need_incr do | |
copas.addnamedthread("i_" .. i, function() | |
send_command("INCR", INCR_KEY) | |
table.insert(callbacks, function(val) | |
responses_received = responses_received + 1 | |
if responses_received % 1000 == 0 then | |
print("responses_received, val", responses_received, val) | |
end | |
if responses_received == need_incr then | |
print("time", now() - time_start) | |
end | |
end) | |
end) | |
end | |
copas.addnamedthread("mainloop", function() | |
while true do | |
local ok, res = pcall(read_respons) | |
if not ok then print("res err", res) end | |
local cb = table.remove(callbacks, 1) | |
cb(res) | |
end | |
end) | |
end) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment