Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 31 additions & 32 deletions xqueue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -821,8 +821,26 @@ function M.upgrade(space,opts,depth)

self.ready = fiber.channel(0)

local function was_reload_func()
local start_gen = 0
if package.reload then
start_gen = package.reload.count
end

return start_gen, function()
local curr_gen = 0
if package.reload then
curr_gen = package.reload.count
end
return curr_gen ~= start_gen
end
end

local function rw_fiber_f(func, ...)
local xq = self

local _, was_reload = was_reload_func()

repeat
if box.info.ro then
log.verbose("awaiting rw")
Expand All @@ -832,7 +850,12 @@ function M.upgrade(space,opts,depth)
else
fiber.sleep(0.001)
end
until not box.info.ro
until not box.info.ro or was_reload()
end

if was_reload() then
log.info('shutting down rw fiber')
return
end

local ok, err = pcall(func, ...)
Expand All @@ -849,21 +872,9 @@ function M.upgrade(space,opts,depth)
local worker = opts.worker
for i = 1,workers do
fiber.create(rw_fiber_f, function(space,xq)
local fname = space.name .. '.xq.wrk' .. tostring(i)
local start_gen, was_reload = was_reload_func()
---@diagnostic disable-next-line: undefined-field
local start_gen = 0
if package.reload then
start_gen = package.reload.count
end
fname = fname .. '.' .. start_gen

local was_reload = function()
local curr_gen = 0
if package.reload then
curr_gen = package.reload.count
end
return curr_gen ~= start_gen
end
local fname = space.name .. '.xq.wrk' .. tostring(i) .. '.' .. start_gen

fiber.name(string.sub(fname,1,32))
repeat fiber.sleep(0.001) until space.xq
Expand Down Expand Up @@ -907,21 +918,8 @@ function M.upgrade(space,opts,depth)
if have_runat then
self.runat_chan = fiber.channel(0)
self.runat = fiber.create(rw_fiber_f, function(space,xq,runat_index)
local fname = space.name .. '.xq'
local start_gen = 0

if package.reload then
start_gen = package.reload.count
end
fname = fname .. '.' .. start_gen

local was_reload = function()
local curr_gen = 0
if package.reload then
curr_gen = package.reload.count
end
return curr_gen ~= start_gen
end
local start_gen, was_reload = was_reload_func()
local fname = space.name .. '.xq.' .. start_gen

fiber.name(string.sub(fname,1,32))
repeat fiber.sleep(0.001) until space.xq
Expand Down Expand Up @@ -1755,7 +1753,7 @@ function methods:touch(key, attr)

local status = t[ xq.fields.status ]
if status == 'T' then
xq:check_owner(key)
xq:check_owner(key)
end

-- delayed or ttl or default ttl
Expand All @@ -1768,7 +1766,8 @@ function methods:touch(key, attr)
if xq.have_runat then
xq.runat_chan:put(true,0)
end
log.info("Touch: {%s} run_at +%s seconds from %s/sid=%s/fid=%s", key, attr.increment, box.session.storage.peer, box.session.id(), fiber.id())
log.info("Touch: {%s} run_at +%s seconds from %s/sid=%s/fid=%s",
key, attr.increment, box.session.storage.peer, box.session.id(), fiber.id())
end)
end

Expand Down