From e805515767e188336aa93cff9e4f4601fc503e83 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 7 Feb 2020 12:15:38 +0100 Subject: [PATCH] :recycle: Refactor websockets subsystem. --- backend/src/uxbox/http/ws.clj | 104 +++++++----------- .../vendor/vertx/src/vertx/web/websockets.clj | 103 +++++++++++++++++ 2 files changed, 140 insertions(+), 67 deletions(-) create mode 100644 backend/vendor/vertx/src/vertx/web/websockets.clj diff --git a/backend/src/uxbox/http/ws.clj b/backend/src/uxbox/http/ws.clj index 25ffd17673..64e17f43e1 100644 --- a/backend/src/uxbox/http/ws.clj +++ b/backend/src/uxbox/http/ws.clj @@ -9,32 +9,30 @@ (:require [clojure.tools.logging :as log] [promesa.core :as p] + [uxbox.common.exceptions :as ex] [uxbox.emails :as emails] [uxbox.http.session :as session] [uxbox.services.init] [uxbox.services.mutations :as sm] [uxbox.services.queries :as sq] - [uxbox.util.uuid :as uuid] - [uxbox.util.transit :as t] [uxbox.util.blob :as blob] + [uxbox.util.transit :as t] + [uxbox.util.uuid :as uuid] + [vertx.eventbus :as ve] [vertx.http :as vh] - [vertx.web :as vw] [vertx.impl :as vi] [vertx.util :as vu] - [vertx.eventbus :as ve]) + [vertx.web :as vw] + [vertx.web.websockets :as ws]) (:import - io.vertx.core.Future - io.vertx.core.Promise io.vertx.core.Handler + io.vertx.core.Promise io.vertx.core.Vertx io.vertx.core.buffer.Buffer io.vertx.core.http.HttpServerRequest io.vertx.core.http.HttpServerResponse io.vertx.core.http.ServerWebSocket)) -(declare ws-websocket) -(declare ws-send!) - ;; --- State Management (defonce state @@ -42,7 +40,7 @@ (defn send! [ws message] - (ws-send! ws (-> (t/encode message) + (ws/send! ws (-> (t/encode message) (t/bytes->str)))) (defmulti handle-message @@ -85,63 +83,35 @@ ;; --- Handler +(defn- on-init + [req ws] + (let [ctx (vu/current-context) + file-id (get-in req [:path-params :file-id]) + user-id (:user req) + ws (assoc ws + :user-id user-id + :file-id file-id) + sem (start-eventbus-consumer! ctx ws file-id)] + (handle-message ws {:type :connect}) + (assoc ws ::sem sem))) + +(defn- on-text-message + [ws message] + (->> (t/str->bytes message) + (t/decode) + (handle-message ws))) + +(defn- on-close + [ws] + (let [file-id (:file-id ws)] + (handle-message ws {:type :disconnect + :file-id file-id}) + (.unregister (::sem ws)))) + (defn handler [{:keys [user] :as req}] - (letfn [(on-init [ws] - (let [ctx (vu/current-context) - fid (get-in req [:path-params :file-id]) - ws (assoc ws - :user-id user - :file-id fid) - sem (start-eventbus-consumer! ctx ws fid)] - (handle-message ws {:type :connect}) - (assoc ws ::sem sem))) + (ws/websocket :on-init (partial on-init req) + :on-text-message on-text-message + ;; :on-error on-error + :on-close on-close)) - (on-message [ws message] - (try - (->> (t/str->bytes message) - (t/decode) - (handle-message ws)) - (catch Throwable err - (log/error "Unexpected exception:\n" - (with-out-str - (.printStackTrace err (java.io.PrintWriter. *out*))))))) - - (on-close [ws] - (let [fid (get-in req [:path-params :file-id])] - (handle-message ws {:type :disconnect :file-id fid}) - (.unregister (::sem ws))))] - - (-> (ws-websocket) - (assoc :on-init on-init - :on-message on-message - :on-close on-close)))) - -;; --- Internal (vertx api) (experimental) - -(defrecord WebSocket [on-init on-message on-close] - vh/IAsyncResponse - (-handle-response [this ctx] - (let [^HttpServerRequest req (::vh/request ctx) - ^ServerWebSocket ws (.upgrade req) - local (volatile! (assoc this :ws ws))] - (-> (p/do! (on-init @local)) - (p/then (fn [data] - (vreset! local data) - (.textMessageHandler ws (vi/fn->handler - (fn [msg] - (-> (p/do! (on-message @local msg)) - (p/then (fn [data] - (when (instance? WebSocket data) - (vreset! local data)) - (.fetch ws 1))))))) - (.closeHandler ws (vi/fn->handler (fn [& args] (on-close @local)))))))))) - -(defn ws-websocket - [] - (->WebSocket nil nil nil)) - -(defn ws-send! - [ws msg] - (.writeTextMessage ^ServerWebSocket (:ws ws) - ^String msg)) diff --git a/backend/vendor/vertx/src/vertx/web/websockets.clj b/backend/vendor/vertx/src/vertx/web/websockets.clj new file mode 100644 index 0000000000..8ef547e3d8 --- /dev/null +++ b/backend/vendor/vertx/src/vertx/web/websockets.clj @@ -0,0 +1,103 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) 2019-2020 Andrey Antukh + +(ns vertx.web.websockets + "Web Sockets." + (:require + [clojure.tools.logging :as log] + [promesa.core :as p] + [vertx.http :as vh] + [vertx.web :as vw] + [vertx.impl :as vi] + [vertx.util :as vu] + [vertx.eventbus :as ve]) + (:import + java.lang.AutoCloseable + io.vertx.core.Future + io.vertx.core.Promise + io.vertx.core.Handler + io.vertx.core.Vertx + io.vertx.core.buffer.Buffer + io.vertx.core.http.HttpServerRequest + io.vertx.core.http.HttpServerResponse + io.vertx.core.http.ServerWebSocket)) + +(defprotocol IWebSocket + (send! [it message])) + +(defrecord WebSocket [conn] + AutoCloseable + (close [it] + (.close ^ServerWebSocket conn)) + + IWebSocket + (send! [it message] + (let [d (p/deferred)] + (cond + (string? message) + (.writeTextMessage ^ServerWebSocket conn + ^String message + ^Handler (vi/deferred->handler d)) + + (instance? Buffer message) + (.writeBinaryMessage ^ServerWebSocket conn + ^Buffer message + ^Handler (vi/deferred->handler d)) + + :else + (p/reject! (ex-info "invalid message type" {:message message}))) + d))) + +(defn- default-on-error + [ws err] + (log/error "Unexpected exception on websocket handler:\n" + (with-out-str + (.printStackTrace err (java.io.PrintWriter. *out*)))) + (.close ^AutoCloseable ws)) + +(defrecord WebSocketResponse [on-init on-text-message on-error on-close] + vh/IAsyncResponse + (-handle-response [it ctx] + (let [^HttpServerRequest req (::vh/request ctx) + ^ServerWebSocket conn (.upgrade req) + + wsref (volatile! (->WebSocket conn)) + + impl-on-error (fn [e] (on-error @wsref e)) + impl-on-close (fn [_] (on-close @wsref)) + + impl-on-message + (fn [message] + (-> (p/do! (on-text-message @wsref message)) + (p/finally (fn [res err] + (if err + (impl-on-error err) + (do + (.fetch conn 1) + (when (instance? WebSocket res) + (vreset! wsref res))))))))] + + (-> (p/do! (on-init @wsref)) + (p/finally (fn [data error] + (cond + (not (nil? error)) + (impl-on-error error) + + (instance? WebSocket data) + (do + (vreset! wsref data) + (.exceptionHandler conn (vi/fn->handler impl-on-error)) + (.textMessageHandler conn (vi/fn->handler impl-on-message)) + (.closeHandler conn (vi/fn->handler impl-on-close))) + + :else + (.reject conn))))) + nil))) + +(defn websocket + [& {:keys [on-init on-text-message on-error on-close] + :or {on-error default-on-error}}] + (->WebSocketResponse on-init on-text-message on-error on-close))