Skip to content

Commit 7d9d66a

Browse files
committed
Check timeout on acquire
* Avoid to touch created_at because it is misused and maybe used in different meanings in the future. From v0.8.48, PQ can use timeout to find living tasks. * Check timeout on heartbeat. v0.9's AcquiredTask#timeout saves the last heartbeated time. PQ can check whether the heartbeating task is still acquired by itself or not.
1 parent 1836d53 commit 7d9d66a

2 files changed

Lines changed: 14 additions & 5 deletions

File tree

lib/perfectqueue/backend/rdb.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ module PerfectQueue::Backend
66
class RDBBackend
77
MAX_RETRY = ::PerfectQueue::Backend::RDBCompatBackend::MAX_RETRY
88
DELETE_OFFSET = ::PerfectQueue::Backend::RDBCompatBackend::DELETE_OFFSET
9+
EVENT_HORIZON = ::PerfectQueue::Backend::RDBCompatBackend::EVENT_HORIZON
910
class Token < Struct.new(:key)
1011
end
1112

@@ -48,7 +49,7 @@ def submit(id, data, time=Process.clock_gettime(Process::CLOCK_REALTIME, :second
4849

4950
def cancel(id, delete_timeout=3600, now=Process.clock_gettime(Process::CLOCK_REALTIME, :second))
5051
connect {
51-
n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND created_at IS NOT NULL;", now+delete_timeout-DELETE_OFFSET, id].update
52+
n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND #{EVENT_HORIZON} < timeout;", now+delete_timeout-DELETE_OFFSET, id].update
5253
return n > 0
5354
}
5455
end

lib/perfectqueue/backend/rdb_compat.rb

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ def finish(task_token, retention_time, options)
296296
key = task_token.key
297297

298298
connect {
299-
n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND created_at IS NOT NULL", delete_timeout, key].update
299+
n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND #{EVENT_HORIZON} < timeout", delete_timeout, key].update
300300
if n <= 0
301301
raise IdempotentAlreadyFinishedError, "task key=#{key} does not exist or already finished."
302302
end
@@ -317,8 +317,14 @@ def heartbeat(task_token, alive_time, options)
317317
sql << ", data=?"
318318
params << compress_data(data.to_json, options[:compression])
319319
end
320-
sql << " WHERE id=? AND created_at IS NOT NULL"
321-
params << key
320+
if last_timeout = options[:last_timeout]
321+
sql << " WHERE id=? AND timeout=?"
322+
params << key
323+
params << last_timeout
324+
else
325+
sql << " WHERE id=? AND #{EVENT_HORIZON} < timeout"
326+
params << key
327+
end
322328

323329
connect {
324330
n = @db[*params].update
@@ -327,7 +333,9 @@ def heartbeat(task_token, alive_time, options)
327333
if row == nil
328334
raise PreemptedError, "task key=#{key} does not exist or preempted."
329335
elsif row[:created_at] == nil
330-
raise PreemptedError, "task key=#{key} preempted."
336+
raise PreemptedError, "task key=#{key} is finished or canceled"
337+
elsif options[:last_timeout] && row[:timeout] != options[:last_timeout]
338+
raise PreemptedError, "task key=#{key} is preempted by another worker."
331339
else # row[:timeout] == next_timeout
332340
# ok
333341
end

0 commit comments

Comments
 (0)