diff --git a/backend/resources/log4j2-devenv.xml b/backend/resources/log4j2-devenv.xml
index 31e1968294..3cf7ab00b7 100644
--- a/backend/resources/log4j2-devenv.xml
+++ b/backend/resources/log4j2-devenv.xml
@@ -40,7 +40,7 @@
-
+
diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj
index 056c99cc8a..3c61e6b35e 100644
--- a/backend/src/app/main.clj
+++ b/backend/src/app/main.clj
@@ -493,7 +493,7 @@
::mtx/metrics (ig/ref ::mtx/metrics)
::db/pool (ig/ref ::db/pool)}
- [::default ::wrk/worker]
+ [::default ::wrk/runner]
{::wrk/parallelism (cf/get ::worker-default-parallelism 1)
::wrk/queue :default
::rds/redis (ig/ref ::rds/redis)
@@ -501,7 +501,7 @@
::mtx/metrics (ig/ref ::mtx/metrics)
::db/pool (ig/ref ::db/pool)}
- [::webhook ::wrk/worker]
+ [::webhook ::wrk/runner]
{::wrk/parallelism (cf/get ::worker-webhook-parallelism 1)
::wrk/queue :webhooks
::rds/redis (ig/ref ::rds/redis)
diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj
index 9614a3fa4a..a648080f3b 100644
--- a/backend/src/app/worker.clj
+++ b/backend/src/app/worker.clj
@@ -8,21 +8,16 @@
"Async tasks abstraction (impl)."
(:require
[app.common.data :as d]
- [app.common.data.macros :as dm]
- [app.common.exceptions :as ex]
[app.common.logging :as l]
[app.common.spec :as us]
- [app.common.transit :as t]
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
[app.metrics :as mtx]
- [app.redis :as rds]
[app.util.time :as dt]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
- [integrant.core :as ig]
- [promesa.exec :as px]))
+ [integrant.core :as ig]))
(set! *warn-on-reflection* true)
@@ -59,244 +54,6 @@
{}
tasks))
-;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
-;; WORKER
-;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
-
-(defn- decode-task-row
- [{:keys [props] :as row}]
- (cond-> row
- (db/pgobject? props)
- (assoc :props (db/decode-transit-pgobject props))))
-
-(declare ^:private run-worker-loop!)
-(declare ^:private start-worker!)
-(declare ^:private get-error-context)
-
-(defmethod ig/pre-init-spec ::worker [_]
- (s/keys :req [::parallelism
- ::mtx/metrics
- ::db/pool
- ::rds/redis
- ::queue
- ::registry]))
-
-(defmethod ig/prep-key ::worker
- [_ cfg]
- (merge {::parallelism 1}
- (d/without-nils cfg)))
-
-(defmethod ig/init-key ::worker
- [_ {:keys [::db/pool ::queue ::parallelism] :as cfg}]
- (let [queue (d/name queue)
- cfg (assoc cfg ::queue queue)]
- (if (db/read-only? pool)
- (l/wrn :hint "worker: not started (db is read-only)" :queue queue :parallelism parallelism)
- (doall
- (->> (range parallelism)
- (map #(assoc cfg ::worker-id %))
- (map start-worker!))))))
-
-(defmethod ig/halt-key! ::worker
- [_ threads]
- (run! px/interrupt! threads))
-
-(defn- start-worker!
- [{:keys [::rds/redis ::worker-id ::queue] :as cfg}]
- (px/thread
- {:name (format "penpot/worker/runner:%s" worker-id)}
- (l/inf :hint "worker: started" :worker-id worker-id :queue queue)
- (try
- (dm/with-open [rconn (rds/connect redis)]
- (let [tenant (cf/get :tenant "main")
- cfg (-> cfg
- (assoc ::queue (str/ffmt "taskq:%:%" tenant queue))
- (assoc ::rds/rconn rconn)
- (assoc ::timeout (dt/duration "5s")))]
- (loop []
- (when (px/interrupted?)
- (throw (InterruptedException. "interrupted")))
-
- (run-worker-loop! cfg)
- (recur))))
-
- (catch InterruptedException _
- (l/debug :hint "worker: interrupted"
- :worker-id worker-id
- :queue queue))
- (catch Throwable cause
- (l/err :hint "worker: unexpected exception"
- :worker-id worker-id
- :queue queue
- :cause cause))
- (finally
- (l/inf :hint "worker: terminated"
- :worker-id worker-id
- :queue queue)))))
-
-(defn- run-worker-loop!
- [{:keys [::db/pool ::rds/rconn ::timeout ::queue ::registry ::worker-id]}]
- (letfn [(handle-task-retry [{:keys [task error inc-by delay] :or {inc-by 1 delay 1000}}]
- (let [explain (ex-message error)
- nretry (+ (:retry-num task) inc-by)
- now (dt/now)
- delay (->> (iterate #(* % 2) delay) (take nretry) (last))]
- (db/update! pool :task
- {:error explain
- :status "retry"
- :modified-at now
- :scheduled-at (dt/plus now delay)
- :retry-num nretry}
- {:id (:id task)})
- nil))
-
- (handle-task-failure [{:keys [task error]}]
- (let [explain (ex-message error)]
- (db/update! pool :task
- {:error explain
- :modified-at (dt/now)
- :status "failed"}
- {:id (:id task)})
- nil))
-
- (handle-task-completion [{:keys [task]}]
- (let [now (dt/now)]
- (db/update! pool :task
- {:completed-at now
- :modified-at now
- :status "completed"}
- {:id (:id task)})
- nil))
-
- (decode-payload [^bytes payload]
- (try
- (let [task-id (t/decode payload)]
- (if (uuid? task-id)
- task-id
- (l/err :hint "worker: received unexpected payload (uuid expected)"
- :payload task-id)))
- (catch Throwable cause
- (l/err :hint "worker: unable to decode payload"
- :payload payload
- :length (alength payload)
- :cause cause))))
-
- (handle-task [{:keys [name] :as task}]
- (let [task-fn (get registry name)]
- (if task-fn
- (task-fn task)
- (l/wrn :hint "no task handler found" :name name))
- {:status :completed :task task}))
-
- (handle-task-exception [cause task]
- (let [edata (ex-data cause)]
- (if (and (< (:retry-num task)
- (:max-retries task))
- (= ::retry (:type edata)))
- (cond-> {:status :retry :task task :error cause}
- (dt/duration? (:delay edata))
- (assoc :delay (:delay edata))
-
- (= ::noop (:strategy edata))
- (assoc :inc-by 0))
- (do
- (l/err :hint "worker: unhandled exception on task"
- ::l/context (get-error-context cause task)
- :cause cause)
- (if (>= (:retry-num task) (:max-retries task))
- {:status :failed :task task :error cause}
- {:status :retry :task task :error cause})))))
-
- (get-task [task-id]
- (ex/try!
- (some-> (db/get* pool :task {:id task-id})
- (decode-task-row))))
-
- (run-task [task-id]
- (loop [task (get-task task-id)]
- (cond
- (ex/exception? task)
- (if (or (db/connection-error? task)
- (db/serialization-error? task))
- (do
- (l/wrn :hint "worker: connection error on retrieving task from database (retrying in some instants)"
- :worker-id worker-id
- :cause task)
- (px/sleep (::rds/timeout rconn))
- (recur (get-task task-id)))
- (do
- (l/err :hint "worker: unhandled exception on retrieving task from database (retrying in some instants)"
- :worker-id worker-id
- :cause task)
- (px/sleep (::rds/timeout rconn))
- (recur (get-task task-id))))
-
- (nil? task)
- (l/wrn :hint "worker: no task found on the database"
- :worker-id worker-id
- :task-id task-id)
-
- :else
- (try
- (l/trc :hint "executing task"
- :name (:name task)
- :id (str (:id task))
- :queue queue
- :worker-id worker-id
- :retry (:retry-num task))
- (handle-task task)
- (catch InterruptedException cause
- (throw cause))
- (catch Throwable cause
- (handle-task-exception cause task))))))
-
- (process-result [{:keys [status] :as result}]
- (ex/try!
- (case status
- :retry (handle-task-retry result)
- :failed (handle-task-failure result)
- :completed (handle-task-completion result)
- nil)))
-
- (run-task-loop [task-id]
- (loop [result (run-task task-id)]
- (when-let [cause (process-result result)]
- (if (or (db/connection-error? cause)
- (db/serialization-error? cause))
- (do
- (l/wrn :hint "worker: database exeption on processing task result (retrying in some instants)"
- :cause cause)
- (px/sleep (::rds/timeout rconn))
- (recur result))
- (do
- (l/err :hint "worker: unhandled exception on processing task result (retrying in some instants)"
- :cause cause)
- (px/sleep (::rds/timeout rconn))
- (recur result))))))]
-
- (try
- (let [[_ payload] (rds/blpop! rconn timeout queue)]
- (some-> payload
- decode-payload
- run-task-loop))
-
- (catch InterruptedException cause
- (throw cause))
-
- (catch Exception cause
- (if (rds/timeout-exception? cause)
- (do
- (l/err :hint "worker: redis pop operation timeout, consider increasing redis timeout (will retry in some instants)"
- :timeout timeout
- :cause cause)
- (px/sleep timeout))
-
- (l/err :hint "worker: unhandled exception" :cause cause))))))
-
-(defn get-error-context
- [_ item]
- {:params item})
-
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; SUBMIT API
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@@ -348,6 +105,7 @@
[& {:keys [::task ::delay ::queue ::priority ::max-retries ::conn ::dedupe ::label]
:or {delay 0 queue :default priority 100 max-retries 3 label ""}
:as options}]
+
(us/verify! ::submit-options options)
(let [duration (dt/duration delay)
interval (db/interval duration)
diff --git a/backend/src/app/worker/cron.clj b/backend/src/app/worker/cron.clj
index 0f44dcaae3..689fcba90d 100644
--- a/backend/src/app/worker/cron.clj
+++ b/backend/src/app/worker/cron.clj
@@ -11,7 +11,8 @@
[app.common.logging :as l]
[app.db :as db]
[app.util.time :as dt]
- [app.worker :as wrk]
+ [app.worker :as-alias wrk]
+ [app.worker.runner :refer [get-error-context]]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[integrant.core :as ig]
@@ -64,7 +65,7 @@
(catch Throwable cause
(let [elapsed (dt/format-duration (tpoint))]
- (binding [l/*context* (wrk/get-error-context cause task)]
+ (binding [l/*context* (get-error-context cause task)]
(l/err :hint "unhandled exception on running task"
:task-id id
:elapsed elapsed
@@ -98,11 +99,11 @@
(s/def ::props (s/nilable map?))
(s/def ::task keyword?)
-(s/def ::wrk/task
+(s/def ::task-item
(s/keys :req-un [::cron ::task]
:opt-un [::props ::id]))
-(s/def ::wrk/entries (s/coll-of (s/nilable ::wrk/task)))
+(s/def ::wrk/entries (s/coll-of (s/nilable ::task-item)))
(defmethod ig/pre-init-spec ::wrk/cron [_]
(s/keys :req [::db/pool ::wrk/entries ::wrk/registry]))
diff --git a/backend/src/app/worker/runner.clj b/backend/src/app/worker/runner.clj
new file mode 100644
index 0000000000..40332ab235
--- /dev/null
+++ b/backend/src/app/worker/runner.clj
@@ -0,0 +1,272 @@
+;; This Source Code Form is subject to the terms of the Mozilla Public
+;; License, v. 2.0. If a copy of the MPL was not distributed with this
+;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
+;;
+;; Copyright (c) KALEIDOS INC
+
+(ns app.worker.runner
+ "Async tasks abstraction (impl)."
+ (:require
+ [app.common.data :as d]
+ [app.common.data.macros :as dm]
+ [app.common.exceptions :as ex]
+ [app.common.logging :as l]
+ [app.common.transit :as t]
+ [app.config :as cf]
+ [app.db :as db]
+ [app.metrics :as mtx]
+ [app.redis :as rds]
+ [app.util.time :as dt]
+ [app.worker :as-alias wrk]
+ [clojure.spec.alpha :as s]
+ [cuerdas.core :as str]
+ [integrant.core :as ig]
+ [promesa.exec :as px]))
+
+(set! *warn-on-reflection* true)
+
+(defn- decode-task-row
+ [{:keys [props] :as row}]
+ (cond-> row
+ (db/pgobject? props)
+ (assoc :props (db/decode-transit-pgobject props))))
+
+(defn get-error-context
+ [_ item]
+ {:params item})
+
+(defn- run-worker-loop!
+ [{:keys [::db/pool ::rds/rconn ::wrk/registry ::timeout ::queue ::id]}]
+ (letfn [(handle-task-retry [{:keys [task error inc-by delay] :or {inc-by 1 delay 1000}}]
+ (let [explain (ex-message error)
+ nretry (+ (:retry-num task) inc-by)
+ now (dt/now)
+ delay (->> (iterate #(* % 2) delay) (take nretry) (last))]
+ (db/update! pool :task
+ {:error explain
+ :status "retry"
+ :modified-at now
+ :scheduled-at (dt/plus now delay)
+ :retry-num nretry}
+ {:id (:id task)})
+ nil))
+
+ (handle-task-failure [{:keys [task error]}]
+ (let [explain (ex-message error)]
+ (db/update! pool :task
+ {:error explain
+ :modified-at (dt/now)
+ :status "failed"}
+ {:id (:id task)})
+ nil))
+
+ (handle-task-completion [{:keys [task]}]
+ (let [now (dt/now)]
+ (db/update! pool :task
+ {:completed-at now
+ :modified-at now
+ :status "completed"}
+ {:id (:id task)})
+ nil))
+
+ (decode-payload [^bytes payload]
+ (try
+ (let [task-id (t/decode payload)]
+ (if (uuid? task-id)
+ task-id
+ (l/err :hint "received unexpected payload (uuid expected)"
+ :payload task-id)))
+ (catch Throwable cause
+ (l/err :hint "unable to decode payload"
+ :payload payload
+ :length (alength payload)
+ :cause cause))))
+
+ (handle-task [{:keys [name] :as task}]
+ (let [task-fn (get registry name)]
+ (if task-fn
+ (task-fn task)
+ (l/wrn :hint "no task handler found" :name name))
+ {:status :completed :task task}))
+
+ (handle-task-exception [cause task]
+ (let [edata (ex-data cause)]
+ (if (and (< (:retry-num task)
+ (:max-retries task))
+ (= ::retry (:type edata)))
+ (cond-> {:status :retry :task task :error cause}
+ (dt/duration? (:delay edata))
+ (assoc :delay (:delay edata))
+
+ (= ::noop (:strategy edata))
+ (assoc :inc-by 0))
+ (do
+ (l/err :hint "unhandled exception on task"
+ ::l/context (get-error-context cause task)
+ :cause cause)
+ (if (>= (:retry-num task) (:max-retries task))
+ {:status :failed :task task :error cause}
+ {:status :retry :task task :error cause})))))
+
+ (get-task [task-id]
+ (ex/try!
+ (some-> (db/get* pool :task {:id task-id})
+ (decode-task-row))))
+
+ (run-task [task-id]
+ (loop [task (get-task task-id)]
+ (cond
+ (ex/exception? task)
+ (if (or (db/connection-error? task)
+ (db/serialization-error? task))
+ (do
+ (l/wrn :hint "connection error on retrieving task from database (retrying in some instants)"
+ :id id
+ :cause task)
+ (px/sleep (::rds/timeout rconn))
+ (recur (get-task task-id)))
+ (do
+ (l/err :hint "unhandled exception on retrieving task from database (retrying in some instants)"
+ :id id
+ :cause task)
+ (px/sleep (::rds/timeout rconn))
+ (recur (get-task task-id))))
+
+ (nil? task)
+ (l/wrn :hint "no task found on the database"
+ :id id
+ :task-id task-id)
+
+ :else
+ (try
+ (l/trc :hint "start task"
+ :queue queue
+ :runner-id id
+ :name (:name task)
+ :task-id (str task-id)
+ :retry (:retry-num task))
+ (let [tpoint (dt/tpoint)
+ result (handle-task task)
+ elapsed (dt/format-duration (tpoint))]
+
+ (l/trc :hint "end task"
+ :queue queue
+ :runner-id id
+ :name (:name task)
+ :task-id (str task-id)
+ :retry (:retry-num task)
+ :elapsed elapsed)
+
+ result)
+
+ (catch InterruptedException cause
+ (throw cause))
+ (catch Throwable cause
+ (handle-task-exception cause task))))))
+
+ (process-result [{:keys [status] :as result}]
+ (ex/try!
+ (case status
+ :retry (handle-task-retry result)
+ :failed (handle-task-failure result)
+ :completed (handle-task-completion result)
+ nil)))
+
+ (run-task-loop [task-id]
+ (loop [result (run-task task-id)]
+ (when-let [cause (process-result result)]
+ (if (or (db/connection-error? cause)
+ (db/serialization-error? cause))
+ (do
+ (l/wrn :hint "database exeption on processing task result (retrying in some instants)"
+ :cause cause)
+ (px/sleep (::rds/timeout rconn))
+ (recur result))
+ (do
+ (l/err :hint "unhandled exception on processing task result (retrying in some instants)"
+ :cause cause)
+ (px/sleep (::rds/timeout rconn))
+ (recur result))))))]
+
+ (try
+ (let [queue (str/ffmt "taskq:%" queue)
+ [_ payload] (rds/blpop! rconn timeout queue)]
+ (some-> payload
+ decode-payload
+ run-task-loop))
+
+ (catch InterruptedException cause
+ (throw cause))
+
+ (catch Exception cause
+ (if (rds/timeout-exception? cause)
+ (do
+ (l/err :hint "redis pop operation timeout, consider increasing redis timeout (will retry in some instants)"
+ :timeout timeout
+ :cause cause)
+ (px/sleep timeout))
+
+ (l/err :hint "unhandled exception" :cause cause))))))
+
+(defn- start-thread!
+ [{:keys [::rds/redis ::id ::queue] :as cfg}]
+ (px/thread
+ {:name (format "penpot/worker/runner:%s" id)}
+ (l/inf :hint "started" :id id :queue queue)
+ (try
+ (dm/with-open [rconn (rds/connect redis)]
+ (let [tenant (cf/get :tenant "main")
+ cfg (-> cfg
+ (assoc ::queue (str/ffmt "%:%" tenant queue))
+ (assoc ::rds/rconn rconn)
+ (assoc ::timeout (dt/duration "5s")))]
+ (loop []
+ (when (px/interrupted?)
+ (throw (InterruptedException. "interrupted")))
+
+ (run-worker-loop! cfg)
+ (recur))))
+
+ (catch InterruptedException _
+ (l/debug :hint "interrupted"
+ :id id
+ :queue queue))
+ (catch Throwable cause
+ (l/err :hint "unexpected exception"
+ :id id
+ :queue queue
+ :cause cause))
+ (finally
+ (l/inf :hint "terminated"
+ :id id
+ :queue queue)))))
+
+(s/def ::wrk/queue keyword?)
+
+(defmethod ig/pre-init-spec ::runner [_]
+ (s/keys :req [::wrk/parallelism
+ ::mtx/metrics
+ ::db/pool
+ ::rds/redis
+ ::wrk/queue
+ ::wrk/registry]))
+
+(defmethod ig/prep-key ::wrk/runner
+ [_ cfg]
+ (merge {::wrk/parallelism 1}
+ (d/without-nils cfg)))
+
+(defmethod ig/init-key ::wrk/runner
+ [_ {:keys [::db/pool ::wrk/queue ::wrk/parallelism] :as cfg}]
+ (let [queue (d/name queue)
+ cfg (assoc cfg ::queue queue)]
+ (if (db/read-only? pool)
+ (l/wrn :hint "not started (db is read-only)" :queue queue :parallelism parallelism)
+ (doall
+ (->> (range parallelism)
+ (map #(assoc cfg ::id %))
+ (map start-thread!))))))
+
+(defmethod ig/halt-key! ::wrk/runner
+ [_ threads]
+ (run! px/interrupt! threads))
diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj
index 27544c4fae..61b5f42bf2 100644
--- a/backend/test/backend_tests/helpers.clj
+++ b/backend/test/backend_tests/helpers.clj
@@ -156,8 +156,8 @@
:app.loggers.database/reporter
:app.worker/cron
:app.worker/dispatcher
- [:app.main/default :app.worker/worker]
- [:app.main/webhook :app.worker/worker]))
+ [:app.main/default :app.worker/runner]
+ [:app.main/webhook :app.worker/runner]))
_ (ig/load-namespaces system)
system (-> (ig/prep system)
(ig/init))]