Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5; ttl = 100 })
* `timeout` - number of seconds to wait for the task processing
* returns task tuple or table (see retval) and boolean `was_processed` flag

* `space:touch(id, [attr])`
- `id`:
+ `string` | `number` - primary key
+ `tuple` - key will be extracted using index
- `attr`
+ `increment` - the value of ttr and ttl (.runat) increased by increment seconds

### Admin methods

* `space:dig(id, [attr])` - dig out task from buried state
Expand Down
93 changes: 93 additions & 0 deletions test/touch_test.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
---@diagnostic disable: inject-field
local fiber = require 'fiber'
local clock = require 'clock'
local xqueue = require 'xqueue'

require 'test.setup'

local t = require 'luatest' --[[@as luatest]]
local g = t.group('touch')

local queue
local F
local default_time_to = 0.25

g.before_each(function()
if box.space.queue then
box.space.queue:truncate()
for i = #box.space.queue.index, 0, -1 do
local ind = box.space.queue.index[i]
ind:drop()
end
box.space.queue:drop()
end

queue = box.schema.space.create('queue', { if_not_exists = true }) --[[@as xqueue.space]]
---@class test.xqueue.delayed.tuple: box.tuple
---@field id number
---@field status string
---@field runat number
---@field payload any
queue:format({
{ name = 'id', type = 'unsigned' },
{ name = 'status', type = 'string' },
{ name = 'runat', type = 'number' },
{ name = 'payload', type = 'any' },
})

F = { id = 1, status = 2, runat = 3, payload = 4 }

queue:create_index('primary', { parts = {'id'} })
queue:create_index('status', { parts = {'status', 'id'} })
queue:create_index('runat', { parts = {'runat', 'id'} })

xqueue.upgrade(queue, {
debug = true,
fields = {
runat = 'runat',
status = 'status',
},
features = {
id = 'time64',
keep = true,
ttr = default_time_to,
ttl = default_time_to,
},
})
end)

function g.test_touch_ttr()
local puted_task = queue:put({payload = { 'x' }}) --[[@as test.xqueue.delayed.tuple]]
t.assert_equals(puted_task.status, 'R', 'queue:put(...) must insert task in R status')

local task = queue:take({timeout=0})
t.assert_equals(task.id, puted_task.id, 'queue not empty')
t.assert_le(task.runat, clock.realtime() + default_time_to, 'taked task must have runat le than now+ttr')

-- honest worker
local begin_at = clock.realtime()
while clock.realtime() <= begin_at + default_time_to*4 do
fiber.sleep(default_time_to - 0.01)
queue:touch(task, {increment=default_time_to})
end

local acked_task = queue:ack(task)
t.assert_equals(acked_task.status, 'D', 'task must have Done status')
end

function g.test_touch_ttl()
local puted_task = queue:put({payload = { 'x' }}) --[[@as test.xqueue.delayed.tuple]]
t.assert_equals(puted_task.status, 'R', 'queue:put(...) must insert task in R status')

local task
local begin_at = clock.realtime()
while clock.realtime() <= begin_at + default_time_to*4 do
fiber.sleep(default_time_to - 0.01)
task = queue:touch(puted_task, {increment=default_time_to})
end

t.assert_equals(task.status, 'R')
t.assert_gt(task.runat, puted_task.runat + default_time_to*4 - 0.01)
fiber.sleep(default_time_to*2)
t.assert_equals(queue:take(), nil, 'task must be removed by ttl')
end
57 changes: 57 additions & 0 deletions xqueue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1679,6 +1679,63 @@ function methods:bury(key, attr)
xq:putback(t)
end

--[[
* `space:touch(id, [attr])`
- `id`:
+ `string` | `number` - primary key
+ `tuple` - key will be extracted using index
- `attr`
+ `increment` - the value of ttr and ttl (.runat) increased by increment seconds
]]

---@param key table|scalar|box.tuple
---@param attr? { increment: number? }
---@return table|box.tuple
function methods:touch(key, attr)
local xq = self.xq
key = xq:getkey(key)

attr = attr or {}

local increment = 0
if type(attr.increment) ~= 'number' then
error("attr.increment must be number", 2)
end
if attr.increment < 0 then
error("attr.increment can't be negative", 2)
end

if attr.increment then
increment = attr.increment
end

local t = self:get(key)
if not t then
error(string.format( "Task {%s} was not found", key ),2)
end

local status = t[ xq.fields.status ]
if status == 'T' then
xq:check_owner(key)
end

-- delayed or ttl or default ttl
if xq.have_runat and (status == 'T') or (status == 'R') then
xq:atomic(key,function()
t = self:update({key}, {{ '+', xq.fields.runat, increment}})

---@cast t box.tuple
xq:wakeup(t)
if xq.have_runat then
xq.runat_chan:put(true,0)
end
log.info("Touch: {%s} run_at +%s seconds from %s/sid=%s/fid=%s", key, attr.increment, box.session.storage.peer, box.session.id(), fiber.id())
end)
end

return t
end

local function kick_task(self, key, attr)
local xq = self.xq
key = xq:getkey(key)
Expand Down