From 065fc157bf69026f82c8f12139995b27a469de24 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Thu, 10 Sep 2020 13:48:46 +0200 Subject: [PATCH] :tada: Add team-id channel subscriptions. --- backend/src/app/http/ws.clj | 16 +- backend/src/app/redis.clj | 6 +- backend/src/app/services/notifications.clj | 303 ++++++++++----------- backend/src/app/util/redis.clj | 57 ++-- 4 files changed, 188 insertions(+), 194 deletions(-) diff --git a/backend/src/app/http/ws.clj b/backend/src/app/http/ws.clj index 259782a85c..c470776943 100644 --- a/backend/src/app/http/ws.clj +++ b/backend/src/app/http/ws.clj @@ -27,14 +27,24 @@ (s/def ::websocket-params (s/keys :req-un [::file-id ::session-id])) +(def sql:retrieve-file + "select f.id as id, + p.team_id as team_id + from file as f + join project as p on (p.id = f.project_id) + where f.id = ?") + +(defn retrieve-file + [conn id] + (db/exec-one! conn [sql:retrieve-file id])) + (defn websocket [{:keys [profile-id] :as req}] (let [params (us/conform ::websocket-params (:params req)) - file (db/get-by-id db/pool :file (:file-id params)) + file (retrieve-file db/pool (:file-id params)) params (assoc params :profile-id profile-id - :file file)] - + :team-id (:team-id file))] (cond (not profile-id) {:error {:code 403 :message "Authentication required"}} diff --git a/backend/src/app/redis.clj b/backend/src/app/redis.clj index 1c0eec8e64..39345e5a19 100644 --- a/backend/src/app/redis.clj +++ b/backend/src/app/redis.clj @@ -35,10 +35,8 @@ ;; --- API FORWARD (defn subscribe - ([topic] - (redis/subscribe client topic)) - ([topic xf] - (redis/subscribe client topic xf))) + [opts] + (redis/subscribe client opts)) (defn run! [cmd params] diff --git a/backend/src/app/services/notifications.clj b/backend/src/app/services/notifications.clj index e1b87e6a01..3f837c9828 100644 --- a/backend/src/app/services/notifications.clj +++ b/backend/src/app/services/notifications.clj @@ -15,173 +15,19 @@ [app.db :as db] [app.metrics :as mtx] [app.redis :as redis] + [app.util.async :as aa] [app.util.time :as dt] [app.util.transit :as t] - [clojure.core.async :as a :refer [>! ! out message)))) - -(defn start-loop! - [{:keys [in out sub] :as ws}] - (go-try - (loop [] - (let [timeout (a/timeout 30000) - [val port] (a/alts! [in sub timeout])] - ;; (prn "alts" val "from" (cond (= port in) "input" - ;; (= port sub) "redis" - ;; :else "timeout")) - - (cond - ;; Process message coming from connected client - (and (= port in) (not (nil? val))) - (do - (! out {:type :ping}) - (recur)) - - :else - nil))))) - -(defn disconnect! - [conn] - (let [session (.getSession conn)] - (when session - (.disconnect session)))) - -(defn- on-subscribed - [{:keys [conn] :as ws}] - (a/go - (try - (! out val)) + (recur)) + + ;; Timeout channel signaling + (= port timeout) + (do + (a/>! out {:type :ping}) + (recur)) + + :else + nil))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Incoming Messages Handling +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +;; --- Impl + +(defn- publish + [channel message] + (aa/go-try + (let [message (t/encode-str message)] + (aa/Client (RedisClient/create) (RedisURI/create uri))) + (->Client (RedisClient/create) + (RedisURI/create uri))) (defn connect - [client] - (let [^RedisURI uri (:uri client) - ^RedisClient client (:client client) - ^StatefulRedisConnection conn (.connect client StringCodec/UTF8 uri)] - (->Connection (.async conn)))) + [{:keys [uri] :as client}] + (let [conn (.connect ^RedisClient @client StringCodec/UTF8 ^RedisURI uri)] + (->Connection conn (.async ^StatefulRedisConnection conn)))) (defn- impl-subscribe - [^String topic xf ^StatefulRedisPubSubConnection conn] + [topics xform ^StatefulRedisPubSubConnection conn] (let [cmd (.sync conn) - output (a/chan 1 (comp (filter string?) xf)) + output (a/chan 1 (comp (filter string?) xform)) buffer (a/chan (a/sliding-buffer 64)) sub (reify RedisPubSubListener (message [it pattern channel message]) @@ -60,8 +66,8 @@ (punsubscribed [it pattern count]) (subscribed [it channel count]) (unsubscribed [it channel count]))] - (.addListener conn sub) + ;; Start message event-loop (with keepalive mechanism) (a/go-loop [] (let [[val port] (a/alts! [buffer (a/timeout 5000)]) message (if (= port buffer) val ::keepalive)] @@ -73,17 +79,20 @@ (when (.isOpen conn) (.close conn)))))) - (.subscribe ^RedisPubSubCommands cmd (into-array String [topic])) + ;; Synchronously subscribe to topics + (.addListener conn sub) + (.subscribe ^RedisPubSubCommands cmd topics) + + ;; Return the output channel output)) (defn subscribe - ([client topic] - (subscribe client topic (map identity))) - ([client topic xf] - (let [^RedisURI uri (:uri client) - ^RedisClient client (:client client)] - (->> (.connectPubSub client StringCodec/UTF8 uri) - (impl-subscribe topic xf))))) + [{:keys [uri] :as client} {:keys [topic topics xform]}] + (let [topics (if (vector? topics) + (into-array String (map str topics)) + (into-array String [(str topics)]))] + (->> (.connectPubSub ^RedisClient @client StringCodec/UTF8 ^RedisURI uri) + (impl-subscribe topics xform)))) (defn- resolve-to-bool [v]