From 0cc5c7f7bb5916a544d4d66c5b782a2e30238172 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Sat, 8 Feb 2020 15:39:26 +0100 Subject: [PATCH] :recycle: Reimplement websockets using streams. --- backend/src/uxbox/http/ws.clj | 59 +++++++--- backend/vendor/vertx/deps.edn | 1 + backend/vendor/vertx/src/vertx/stream.clj | 70 ++++++++++++ backend/vendor/vertx/src/vertx/util.clj | 1 + .../vendor/vertx/src/vertx/web/websockets.clj | 103 +++++++++--------- frontend/src/uxbox/main/data/workspace.cljs | 2 +- 6 files changed, 169 insertions(+), 67 deletions(-) create mode 100644 backend/vendor/vertx/src/vertx/stream.clj diff --git a/backend/src/uxbox/http/ws.clj b/backend/src/uxbox/http/ws.clj index 3577ea1925..29351d6ce0 100644 --- a/backend/src/uxbox/http/ws.clj +++ b/backend/src/uxbox/http/ws.clj @@ -23,6 +23,7 @@ [vertx.util :as vu] [vertx.timers :as vt] [vertx.web :as vw] + [vertx.stream :as vs] [vertx.web.websockets :as ws]) (:import java.lang.AutoCloseable @@ -40,9 +41,10 @@ (atom {})) (defn send! - [ws message] - (ws/send! ws (-> (t/encode message) - (t/bytes->str)))) + [{:keys [output] :as ws} message] + (let [msg (-> (t/encode message) + (t/bytes->str))] + (vs/put! output msg))) (defmulti handle-message (fn [ws message] (:type message))) @@ -52,14 +54,14 @@ (let [local (swap! state assoc-in [file-id user-id] ws) sessions (get local file-id) message {:type :who :users (set (keys sessions))}] - (run! #(send! % message) (vals sessions)))) + (p/run! #(send! % message) (vals sessions)))) (defmethod handle-message :disconnect [{:keys [user-id] :as ws} {:keys [file-id] :as message}] (let [local (swap! state update file-id dissoc user-id) sessions (get local file-id) message {:type :who :users (set (keys sessions))}] - (run! #(send! % message) (vals sessions)))) + (p/run! #(send! % message) (vals sessions)))) (defmethod handle-message :who [{:keys [file-id] :as ws} message] @@ -71,7 +73,7 @@ (let [sessions (->> (vals (get @state file-id)) (remove #(= user-id (:user-id %)))) message (assoc message :user-id user-id)] - (run! #(send! % message) sessions))) + (p/run! #(send! % message) sessions))) (defn- on-eventbus-message [{:keys [file-id user-id] :as ws} {:keys [body] :as message}] @@ -85,7 +87,7 @@ ;; --- Handler (defn- on-init - [req ws] + [ws req] (let [ctx (vu/current-context) file-id (get-in req [:path-params :file-id]) user-id (:user req) @@ -94,11 +96,11 @@ :file-id file-id) send-ping #(send! ws {:type :ping}) sem1 (start-eventbus-consumer! ctx ws file-id) - sem2 (vt/schedule-periodic! ctx 30000 send-ping)] + sem2 (vt/schedule-periodic! ctx 5000 send-ping)] (handle-message ws {:type :connect}) - (assoc ws ::sem1 sem1 ::sem2 sem2))) + (p/resolved (assoc ws ::sem1 sem1 ::sem2 sem2)))) -(defn- on-text-message +(defn- on-message [ws message] (->> (t/str->bytes message) (t/decode) @@ -109,13 +111,38 @@ (let [file-id (:file-id ws)] (handle-message ws {:type :disconnect :file-id file-id}) - (.close ^AutoCloseable (::sem1 ws)) - (.close ^AutoCloseable (::sem2 ws)))) + (when-let [sem1 (::sem1 ws)] + (.close ^AutoCloseable sem1)) + (when-let [sem2 (::sem2 ws)] + (.close ^AutoCloseable sem2)))) + +(defn- rcv-loop + [{:keys [input] :as ws}] + (vs/loop [] + (-> (vs/take! input) + (p/then (fn [message] + (when message + (p/do! (on-message ws message) + (p/recur)))))))) + +(defn- log-error + [err] + (log/error "Unexpected exception on websocket handler:\n" + (with-out-str + (.printStackTrace err (java.io.PrintWriter. *out*))))) + +(defn websocket-handler + [req ws] + (p/let [ws (on-init ws req)] + (-> (rcv-loop ws) + (p/finally (fn [_ error] + (.close ^AutoCloseable ws) + (on-close ws) + (when error + (log-error error))))))) (defn handler [{:keys [user] :as req}] - (ws/websocket :on-init (partial on-init req) - :on-text-message on-text-message + (ws/websocket :handler (partial websocket-handler req) ;; :on-error on-error - :on-close on-close)) - + )) diff --git a/backend/vendor/vertx/deps.edn b/backend/vendor/vertx/deps.edn index c5e529c5ea..5de7cdf303 100644 --- a/backend/vendor/vertx/deps.edn +++ b/backend/vendor/vertx/deps.edn @@ -3,6 +3,7 @@ funcool/promesa {:mvn/version "5.0.0"} metosin/reitit-core {:mvn/version "0.3.10"} metosin/sieppari {:mvn/version "0.0.0-alpha8"} + org.clojure/core.async {:mvn/version "0.7.559"} io.vertx/vertx-core {:mvn/version "4.0.0-milestone4"} io.vertx/vertx-web {:mvn/version "4.0.0-milestone4"} io.vertx/vertx-web-client {:mvn/version "4.0.0-milestone4"}} diff --git a/backend/vendor/vertx/src/vertx/stream.clj b/backend/vendor/vertx/src/vertx/stream.clj new file mode 100644 index 0000000000..2909ec91e6 --- /dev/null +++ b/backend/vendor/vertx/src/vertx/stream.clj @@ -0,0 +1,70 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) 2019-2020 Andrey Antukh + +(ns vertx.stream + "A stream abstraction on top of core.async with awareness of vertx + execution context." + (:refer-clojure :exclude [loop]) + (:require + [clojure.spec.alpha :as s] + [clojure.core.async :as a] + [clojure.core :as c] + [promesa.core :as p] + [vertx.impl :as impl] + [vertx.util :as vu])) + +;; --- Streams + +(defmacro loop + [& args] + `(let [ctx# (vu/current-context)] + (binding [p/*loop-run-fn* #(vu/run-on-context! ctx# %)] + (p/loop ~@args)))) + +(defn stream + ([] (a/chan)) + ([b] (a/chan b)) + ([b c] (a/chan b c)) + ([b c e] (a/chan b c e))) + +(defn take! + [c] + (let [d (p/deferred) + ctx (vu/current-context)] + (a/take! c (fn [res] + (vu/run-on-context! ctx #(p/resolve! d res)))) + d)) + +(defn poll! + [c] + (a/poll! c)) + +(defn put! + [c v] + (let [d (p/deferred) + ctx (vu/current-context)] + (a/put! c v (fn [res] + (vu/run-on-context! ctx #(p/resolve! d res)))) + d)) + +(defn offer! + [c v] + (a/offer! c v)) + +(defn alts! + ([ports] (alts! ports {})) + ([ports opts] + (let [d (p/deferred) + ctx (vu/current-context) + deliver #(vu/run-on-context! ctx (fn [] (p/resolve! d %))) + ret (a/do-alts deliver ports opts)] + (if ret + (p/resolved @ret) + d)))) + +(defn close! + [c] + (a/close! c)) diff --git a/backend/vendor/vertx/src/vertx/util.clj b/backend/vendor/vertx/src/vertx/util.clj index 52fc79c145..68f76c3b8a 100644 --- a/backend/vendor/vertx/src/vertx/util.clj +++ b/backend/vendor/vertx/src/vertx/util.clj @@ -103,6 +103,7 @@ (handle [_ v'] (f))))) + (defmacro loop [& args] `(let [ctx# (current-context)] diff --git a/backend/vendor/vertx/src/vertx/web/websockets.clj b/backend/vendor/vertx/src/vertx/web/websockets.clj index 8ef547e3d8..6cb4ad198b 100644 --- a/backend/vendor/vertx/src/vertx/web/websockets.clj +++ b/backend/vendor/vertx/src/vertx/web/websockets.clj @@ -13,6 +13,7 @@ [vertx.web :as vw] [vertx.impl :as vi] [vertx.util :as vu] + [vertx.stream :as vs] [vertx.eventbus :as ve]) (:import java.lang.AutoCloseable @@ -25,31 +26,29 @@ io.vertx.core.http.HttpServerResponse io.vertx.core.http.ServerWebSocket)) -(defprotocol IWebSocket - (send! [it message])) - -(defrecord WebSocket [conn] +(defrecord WebSocket [conn input output] AutoCloseable (close [it] - (.close ^ServerWebSocket conn)) + (vs/close! input) + (vs/close! output))) - IWebSocket - (send! [it message] - (let [d (p/deferred)] - (cond - (string? message) - (.writeTextMessage ^ServerWebSocket conn - ^String message +(defn- write-to-websocket + [conn message] + (let [d (p/deferred)] + (cond + (string? message) + (.writeTextMessage ^ServerWebSocket conn + ^String message + ^Handler (vi/deferred->handler d)) + + (instance? Buffer message) + (.writeBinaryMessage ^ServerWebSocket conn + ^Buffer message ^Handler (vi/deferred->handler d)) - (instance? Buffer message) - (.writeBinaryMessage ^ServerWebSocket conn - ^Buffer message - ^Handler (vi/deferred->handler d)) - - :else - (p/reject! (ex-info "invalid message type" {:message message}))) - d))) + :else + (p/reject! (ex-info "invalid message type" {:message message}))) + d)) (defn- default-on-error [ws err] @@ -58,46 +57,50 @@ (.printStackTrace err (java.io.PrintWriter. *out*)))) (.close ^AutoCloseable ws)) -(defrecord WebSocketResponse [on-init on-text-message on-error on-close] +(defrecord WebSocketResponse [handler on-error] vh/IAsyncResponse - (-handle-response [it ctx] - (let [^HttpServerRequest req (::vh/request ctx) + (-handle-response [it request] + (let [^HttpServerRequest req (::vh/request request) ^ServerWebSocket conn (.upgrade req) - wsref (volatile! (->WebSocket conn)) + inp-s (vs/stream 64) + out-s (vs/stream 64) - impl-on-error (fn [e] (on-error @wsref e)) - impl-on-close (fn [_] (on-close @wsref)) + ctx (vu/current-context) + ws (->WebSocket conn inp-s out-s) + + impl-on-error + (fn [e] (on-error ws e)) + + impl-on-close + (fn [_] + (vs/close! inp-s) + (vs/close! out-s)) impl-on-message (fn [message] - (-> (p/do! (on-text-message @wsref message)) - (p/finally (fn [res err] - (if err - (impl-on-error err) - (do - (.fetch conn 1) - (when (instance? WebSocket res) - (vreset! wsref res))))))))] + (when-not (vs/offer! inp-s message) + (.pause conn) + (prn "BUFF") + (-> (vs/put! inp-s message) + (p/then' (fn [res] + (when-not (false? res) + (.resume conn)))))))] - (-> (p/do! (on-init @wsref)) - (p/finally (fn [data error] - (cond - (not (nil? error)) - (impl-on-error error) + (.exceptionHandler conn (vi/fn->handler impl-on-error)) + (.textMessageHandler conn (vi/fn->handler impl-on-message)) + (.closeHandler conn (vi/fn->handler impl-on-close)) - (instance? WebSocket data) - (do - (vreset! wsref data) - (.exceptionHandler conn (vi/fn->handler impl-on-error)) - (.textMessageHandler conn (vi/fn->handler impl-on-message)) - (.closeHandler conn (vi/fn->handler impl-on-close))) + (vs/loop [] + (p/let [msg (vs/take! out-s)] + (when-not (nil? msg) + (p/do! + (write-to-websocket conn msg) + (p/recur))))) - :else - (.reject conn))))) - nil))) + (vu/run-on-context! ctx #(handler ws))))) (defn websocket - [& {:keys [on-init on-text-message on-error on-close] + [& {:keys [handler on-error] :or {on-error default-on-error}}] - (->WebSocketResponse on-init on-text-message on-error on-close)) + (->WebSocketResponse handler on-error)) diff --git a/frontend/src/uxbox/main/data/workspace.cljs b/frontend/src/uxbox/main/data/workspace.cljs index a8224d7765..87febc1fe9 100644 --- a/frontend/src/uxbox/main/data/workspace.cljs +++ b/frontend/src/uxbox/main/data/workspace.cljs @@ -98,7 +98,7 @@ (->> stream (rx/filter ms/pointer-event?) - (rx/sample 150) + (rx/sample 50) (rx/map #(handle-pointer-send file-id (:pt %))))) (rx/take-until stoper))))))