Make file-gc-scheduler task compatible with virtual clock

And simplify implementation
This commit is contained in:
Andrey Antukh 2025-11-05 10:18:09 +01:00
parent cd53d3659c
commit 7d5c1c9b5f
2 changed files with 16 additions and 13 deletions

View File

@ -218,6 +218,9 @@
(when (or (nil? revn) (= revn (:revn file))) (when (or (nil? revn) (= revn (:revn file)))
file))) file)))
;; FIXME: we should skip files that does not match the revn on the
;; props and add proper schema for this task props
(defn- process-file! (defn- process-file!
[cfg {:keys [file-id] :as props}] [cfg {:keys [file-id] :as props}]
(if-let [file (get-file cfg props)] (if-let [file (get-file cfg props)]

View File

@ -8,6 +8,7 @@
"A maintenance task that is responsible of properly scheduling the "A maintenance task that is responsible of properly scheduling the
file-gc task for all files that matches the eligibility threshold." file-gc task for all files that matches the eligibility threshold."
(:require (:require
[app.common.logging :as l]
[app.common.time :as ct] [app.common.time :as ct]
[app.config :as cf] [app.config :as cf]
[app.db :as db] [app.db :as db]
@ -21,25 +22,24 @@
f.modified_at f.modified_at
FROM file AS f FROM file AS f
WHERE f.has_media_trimmed IS false WHERE f.has_media_trimmed IS false
AND f.modified_at < now() - ?::interval AND f.modified_at < ?
AND f.deleted_at IS NULL AND f.deleted_at IS NULL
ORDER BY f.modified_at DESC ORDER BY f.modified_at DESC
FOR UPDATE OF f FOR UPDATE OF f
SKIP LOCKED") SKIP LOCKED")
(defn- get-candidates
[{:keys [::db/conn ::min-age] :as cfg}]
(let [min-age (db/interval min-age)]
(db/plan conn [sql:get-candidates min-age] {:fetch-size 10})))
(defn- schedule! (defn- schedule!
[cfg] [{:keys [::db/conn] :as cfg} threshold]
(let [total (reduce (fn [total {:keys [id modified-at revn]}] (let [total (reduce (fn [total {:keys [id modified-at revn]}]
(let [params {:file-id id :modified-at modified-at :revn revn}] (let [params {:file-id id :revn revn}]
(l/trc :hint "schedule"
:file-id (str id)
:revn revn
:modified-at (ct/format-inst modified-at))
(wrk/submit! (assoc cfg ::wrk/params params)) (wrk/submit! (assoc cfg ::wrk/params params))
(inc total))) (inc total)))
0 0
(get-candidates cfg))] (db/plan conn [sql:get-candidates threshold] {:fetch-size 10}))]
{:processed total})) {:processed total}))
(defmethod ig/assert-key ::handler (defmethod ig/assert-key ::handler
@ -53,12 +53,12 @@
(defmethod ig/init-key ::handler (defmethod ig/init-key ::handler
[_ cfg] [_ cfg]
(fn [{:keys [props] :as task}] (fn [{:keys [props] :as task}]
(let [min-age (ct/duration (or (:min-age props) (::min-age cfg)))] (let [threshold (-> (ct/duration (or (:min-age props) (::min-age cfg)))
(ct/in-past))]
(-> cfg (-> cfg
(assoc ::db/rollback (:rollback? props)) (assoc ::db/rollback (:rollback? props))
(assoc ::min-age min-age)
(assoc ::wrk/task :file-gc) (assoc ::wrk/task :file-gc)
(assoc ::wrk/priority 10) (assoc ::wrk/priority 10)
(assoc ::wrk/mark-retries 0) (assoc ::wrk/mark-retries 0)
(assoc ::wrk/delay 1000) (assoc ::wrk/delay 10000)
(db/tx-run! schedule!))))) (db/tx-run! schedule! threshold)))))