Merge pull request #7682 from penpot/niwinz-staging-worker-runner-exceptions

🐛 Fix incorrect status return on worker runner
This commit is contained in:
Alejandro Alonso 2025-11-05 07:44:28 +01:00 committed by GitHub
commit 7c529eedd4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 31 additions and 21 deletions

View File

@ -319,5 +319,9 @@
([key default]
(c/get config key default)))
(defn logging-context
[]
{:version/backend (:full version)})
;; Set value for all new threads bindings.
(alter-var-root #'*assert* (constantly (contains? flags :backend-asserts)))

View File

@ -25,15 +25,14 @@
(let [claims (-> {}
(into (::session/token-claims request))
(into (::actoken/token-claims request)))]
{:request/path (:path request)
:request/method (:method request)
:request/params (:params request)
:request/user-agent (yreq/get-header request "user-agent")
:request/ip-addr (inet/parse-request request)
:request/profile-id (:uid claims)
:version/frontend (or (yreq/get-header request "x-frontend-version") "unknown")
:version/backend (:full cf/version)}))
(-> (cf/logging-context)
(assoc :request/path (:path request))
(assoc :request/method (:method request))
(assoc :request/params (:params request))
(assoc :request/user-agent (yreq/get-header request "user-agent"))
(assoc :request/ip-addr (inet/parse-request request))
(assoc :request/profile-id (:uid claims))
(assoc :version/frontend (or (yreq/get-header request "x-frontend-version") "unknown")))))
(defmulti handle-error
(fn [cause _ _]

View File

@ -13,6 +13,7 @@
[app.common.schema :as sm]
[app.common.time :as ct]
[app.common.transit :as t]
[app.config :as cf]
[app.db :as db]
[app.metrics :as mtx]
[app.redis :as rds]
@ -60,7 +61,8 @@
(defn get-error-context
[_ item]
{:params item})
(-> (cf/logging-context)
(assoc :params item)))
(defn- get-task
[{:keys [::db/pool]} task-id]
@ -131,6 +133,11 @@
[{:keys [::id ::timeout] :as cfg} task-id scheduled-at]
(loop [task (get-task cfg task-id)]
(cond
(nil? task)
(l/wrn :hint "no task found on the database"
:runner-id id
:task-id task-id)
(ex/exception? task)
(if (or (db/connection-error? task)
(db/serialization-error? task))
@ -153,11 +160,6 @@
:task-id task-id
:runner-id id)
(nil? task)
(l/wrn :hint "no task found on the database"
:runner-id id
:task-id task-id)
:else
(let [result (run-task cfg task)]
(with-meta result
@ -213,6 +215,7 @@
:payload payload)))
(catch Throwable cause
(l/err :hint "unable to decode payload"
::l/context (cf/logging-context)
:payload payload
:length (alength ^String/1 payload)
:cause cause))))
@ -224,11 +227,11 @@
"failed" (handle-task-failure result)
"completed" (handle-task-completion result)
(throw (IllegalArgumentException.
(str "invalid status received: " status))))))
(str "invalid status received: '" status "'"))))))
(run-task-loop [[task-id scheduled-at]]
(loop [result (run-task! cfg task-id scheduled-at)]
(when-let [cause (process-result result)]
(when-let [cause (some-> result process-result)]
(if (or (db/connection-error? cause)
(db/serialization-error? cause))
(do
@ -236,9 +239,9 @@
:cause cause)
(px/sleep timeout)
(recur result))
(do
(l/err :hint "unhandled exception on processing task result"
:cause cause))))))]
(l/err :hint "unhandled exception on processing task result"
::l/context (cf/logging-context)
:cause cause)))))]
(try
(let [key (str/ffmt "penpot.worker.queue:%" queue)
@ -254,11 +257,14 @@
(if (rds/timeout-exception? cause)
(do
(l/err :hint "redis pop operation timeout, consider increasing redis timeout (will retry in some instants)"
::l/context (cf/logging-context)
:timeout timeout
:cause cause)
(px/sleep timeout))
(l/err :hint "unhandled exception" :cause cause))))))
(l/err :hint "unhandled exception"
::l/context (cf/logging-context)
:cause cause))))))
(defn- start-thread!
[{:keys [::id ::queue ::wrk/tenant] :as cfg}]
@ -284,6 +290,7 @@
:queue queue))
(catch Throwable cause
(l/err :hint "unexpected exception"
::l/context (cf/logging-context)
:id id
:queue queue
:cause cause))