refactored protocol to a separate lib
This commit is contained in:
parent
6a3d64e184
commit
ee798b1567
|
@ -0,0 +1,39 @@
|
|||
(defproject syng-im/protocol "0.1.0"
|
||||
:description "FIXME: write this!"
|
||||
:url "http://example.com/FIXME"
|
||||
:license {:name "Eclipse Public License"
|
||||
:url "http://www.eclipse.org/legal/epl-v10.html"}
|
||||
|
||||
:min-lein-version "2.5.3"
|
||||
|
||||
:dependencies [[org.clojure/clojure "1.7.0"]
|
||||
[org.clojure/clojurescript "1.7.170"]
|
||||
[org.clojure/core.async "0.2.374" :exclusions [org.clojure/tools.reader]]
|
||||
[cljsjs/chance "0.7.3-0"]
|
||||
[com.andrewmcveigh/cljs-time "0.4.0"]]
|
||||
|
||||
:plugins [[lein-cljsbuild "1.1.2" :exclusions [[org.clojure/clojure]]]]
|
||||
|
||||
:source-paths ["src"]
|
||||
|
||||
:clean-targets ^{:protect false} ["resources/public/js/compiled" "target"]
|
||||
|
||||
:cljsbuild {:builds
|
||||
[{:id "dev"
|
||||
:source-paths ["src"]
|
||||
:compiler {:asset-path "js/compiled/out"
|
||||
:output-to "resources/public/js/compiled/protocol.js"
|
||||
:output-dir "resources/public/js/compiled/out"
|
||||
:source-map-timestamp true
|
||||
:foreign-libs [{:file "resources/public/vendor/js/web3.js"
|
||||
:file-min "resources/public/vendor/js/web3.min.js"
|
||||
:provides ["cljsjs.web3"]}]}}
|
||||
;; This next build is an compressed minified build for
|
||||
;; production. You can build this with:
|
||||
;; lein cljsbuild once min
|
||||
{:id "min"
|
||||
:source-paths ["src"]
|
||||
:compiler {:output-to "resources/public/js/compiled/protocol.js"
|
||||
:optimizations :advanced
|
||||
:pretty-print false}}]}
|
||||
)
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because one or more lines are too long
File diff suppressed because it is too large
Load Diff
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
@ -0,0 +1,60 @@
|
|||
(ns syng-im.protocol.api
|
||||
(:require [cljs.core.async :refer [<! timeout]]
|
||||
[syng-im.protocol.state.state :as state]
|
||||
[syng-im.protocol.delivery :as delivery]
|
||||
[syng-im.protocol.state.delivery :as delivery-state]
|
||||
[syng-im.protocol.whisper :as whisper]
|
||||
[syng-im.protocol.handler :as h])
|
||||
(:require-macros [cljs.core.async.macros :refer [go]]))
|
||||
|
||||
(def default-content-type "text/plain")
|
||||
|
||||
(defn create-connection [ethereum-rpc-url]
|
||||
(whisper/make-web3 ethereum-rpc-url))
|
||||
|
||||
(defn create-identity [connection]
|
||||
(whisper/new-identity connection))
|
||||
|
||||
(defn init-protocol
|
||||
"Required [handler ethereum-rpc-url]
|
||||
Optional [whisper-identity] - if not passed a new identity is created automatically
|
||||
|
||||
(fn handler [{:keys [event-type...}])
|
||||
|
||||
:event-type can be:
|
||||
|
||||
:new-msg - [from payload]
|
||||
:error - [error-msg details]
|
||||
:msg-acked [msg-id]
|
||||
:delivery-failed [msg-id]
|
||||
:initialized [identity]
|
||||
|
||||
:new-msg, msg-acked should be handled idempotently (may be called multiple times for the same msg-id)
|
||||
"
|
||||
[{:keys [handler ethereum-rpc-url identity]}]
|
||||
(state/set-handler handler)
|
||||
(go
|
||||
(let [connection (create-connection ethereum-rpc-url)
|
||||
identity (or identity
|
||||
(<! (create-identity connection)))]
|
||||
(state/set-connection connection)
|
||||
(state/set-identity identity)
|
||||
(whisper/listen connection)
|
||||
(delivery/start-delivery-loop)
|
||||
(h/invoke-handler :initialized {:identity identity}))))
|
||||
|
||||
(defn send-user-msg [{:keys [to content]}]
|
||||
(let [{:keys [msg-id msg] :as new-msg} (whisper/make-msg {:from (state/my-identity)
|
||||
:to to
|
||||
:payload {:content content
|
||||
:content-type default-content-type
|
||||
:type :user-msg}})]
|
||||
(delivery-state/add-pending-message msg-id msg)
|
||||
(whisper/post-msg (state/connection) msg)
|
||||
new-msg))
|
||||
|
||||
(defn my-identity []
|
||||
(state/my-identity))
|
||||
|
||||
(defn current-connection []
|
||||
(state/connection))
|
|
@ -0,0 +1,55 @@
|
|||
(ns syng-im.protocol.delivery
|
||||
(:require [cljs.core.async :refer [<! timeout]]
|
||||
[cljs-time.core :as t]
|
||||
[syng-im.utils.logging :as log]
|
||||
[syng-im.protocol.state.delivery :as state]
|
||||
[syng-im.protocol.state.state :as s]
|
||||
[syng-im.protocol.whisper :as whisper]
|
||||
[syng-im.protocol.handler :as handler])
|
||||
(:require-macros [cljs.core.async.macros :refer [go]]))
|
||||
|
||||
(def max-retry-send-count 5)
|
||||
(def ack-wait-timeout-ms (t/millis 5000))
|
||||
(def check-delivery-interval-msg 100)
|
||||
|
||||
(defn expired? [timestamp]
|
||||
(t/before? (t/plus timestamp ack-wait-timeout-ms) (t/now)))
|
||||
|
||||
(defn delivery-expired-seq []
|
||||
(lazy-seq
|
||||
(let [{:keys [timestamp] :as item} (->> (state/delivery-queue)
|
||||
(peek))]
|
||||
(when timestamp
|
||||
(if (expired? timestamp)
|
||||
(do (state/pop-delivery-queue)
|
||||
(cons item (delivery-expired-seq)))
|
||||
nil)))))
|
||||
|
||||
(defn start-delivery-loop []
|
||||
(go (loop [_ (<! (timeout check-delivery-interval-msg))]
|
||||
(doseq [{:keys [msg-id]} (delivery-expired-seq)]
|
||||
(log/info "Delivery-loop:" "Checking delivery of msg-id" msg-id)
|
||||
(when-let [{:keys [retry-count msg]} (state/pending? msg-id)]
|
||||
(log/info "Delivery-loop: Message" msg-id "is pending, retry-count=" retry-count)
|
||||
(if (< retry-count max-retry-send-count)
|
||||
(do
|
||||
(log/info "Delivery-loop: Re-sending message" msg-id)
|
||||
(whisper/post-msg (s/connection) msg)
|
||||
(state/push-delivery-queue msg-id)
|
||||
(state/inc-retry-count msg-id))
|
||||
(do
|
||||
(log/info "Delivery-loop: Retry-count for message" msg-id "reached maximum")
|
||||
(state/remove-pending-message msg-id)
|
||||
(handler/invoke-handler :delivery-failed {:msg-id msg-id})))))
|
||||
(recur (<! (timeout check-delivery-interval-msg))))))
|
||||
|
||||
|
||||
(comment
|
||||
|
||||
|
||||
(take 30 (delivery-expired-seq))
|
||||
(state/add-pending-message 4 {:msg-id 4})
|
||||
|
||||
@state/state
|
||||
|
||||
)
|
|
@ -0,0 +1,5 @@
|
|||
(ns syng-im.protocol.handler
|
||||
(:require [syng-im.protocol.state.state :as state]))
|
||||
|
||||
(defn invoke-handler [event-type params]
|
||||
((state/handler) (assoc params :event-type event-type)))
|
|
@ -0,0 +1,36 @@
|
|||
(ns syng-im.protocol.state.delivery
|
||||
(:require [cljs-time.core :as t]
|
||||
[syng-im.protocol.state.state :refer [state]]
|
||||
[syng-im.utils.logging :as log]))
|
||||
|
||||
(defn inc-retry-count [msg-id]
|
||||
(swap! state (fn [state]
|
||||
(if (get-in state [:pending-messages msg-id])
|
||||
(update-in state [:pending-messages msg-id :retry-count] inc)
|
||||
state))))
|
||||
|
||||
(defn pending? [msg-id]
|
||||
(get-in @state [:pending-messages msg-id]))
|
||||
|
||||
(defn push-msg-to-delivery-queue [state msg-id]
|
||||
(update-in state [:delivery-queue] conj {:timestamp (t/now)
|
||||
:msg-id msg-id}))
|
||||
|
||||
(defn add-pending-message [msg-id msg]
|
||||
(swap! state (fn [state]
|
||||
(-> (assoc-in state [:pending-messages msg-id] {:msg msg
|
||||
:retry-count 0})
|
||||
(push-msg-to-delivery-queue msg-id)))))
|
||||
|
||||
(defn pop-delivery-queue []
|
||||
(swap! state update-in [:delivery-queue] pop))
|
||||
|
||||
(defn push-delivery-queue [msg-id]
|
||||
(swap! state push-msg-to-delivery-queue msg-id))
|
||||
|
||||
(defn remove-pending-message [msg-id]
|
||||
(log/info "Removing message" msg-id "from pending")
|
||||
(swap! state update-in [:pending-messages] dissoc msg-id))
|
||||
|
||||
(defn delivery-queue []
|
||||
(:delivery-queue @state))
|
|
@ -0,0 +1,30 @@
|
|||
(ns syng-im.protocol.state.state
|
||||
(:require [cljs-time.core :as t]))
|
||||
|
||||
(def state (atom {:pending-messages {}
|
||||
:filters {}
|
||||
:delivery-queue #queue []
|
||||
:handler nil
|
||||
:identity nil
|
||||
:connection nil}))
|
||||
|
||||
(defn add-filter [topics filter]
|
||||
(swap! state assoc-in [:filters topics] filter))
|
||||
|
||||
(defn set-handler [handler]
|
||||
(swap! state assoc :handler handler))
|
||||
|
||||
(defn set-identity [identity]
|
||||
(swap! state assoc :identity identity))
|
||||
|
||||
(defn set-connection [connection]
|
||||
(swap! state assoc :connection connection))
|
||||
|
||||
(defn connection []
|
||||
(:connection @state))
|
||||
|
||||
(defn my-identity []
|
||||
(:identity @state))
|
||||
|
||||
(defn handler []
|
||||
(:handler @state))
|
|
@ -0,0 +1,127 @@
|
|||
(ns syng-im.protocol.whisper
|
||||
(:require [cljs.core.async :refer [chan put! close! <!]]
|
||||
[cljsjs.web3]
|
||||
[cljsjs.chance]
|
||||
[syng-im.utils.logging :as log]
|
||||
[syng-im.protocol.state.state :as state]
|
||||
[syng-im.protocol.state.delivery :as delivery]
|
||||
[syng-im.protocol.handler :as handler]
|
||||
[cljs.reader :refer [read-string]])
|
||||
(:require-macros [cljs.core.async.macros :refer [go]]))
|
||||
|
||||
(def syng-app-topic "SYNG-APP-CHAT-TOPIC")
|
||||
(def syng-msg-ttl 100)
|
||||
|
||||
(defn from-ascii [s]
|
||||
(.fromAscii js/Web3.prototype s))
|
||||
|
||||
(defn to-ascii [s]
|
||||
(.toAscii js/Web3.prototype s))
|
||||
|
||||
(defn whisper [web3]
|
||||
(.-shh web3))
|
||||
|
||||
(defn make-topics [topics]
|
||||
(->> {:topics (mapv from-ascii topics)}
|
||||
(clj->js)))
|
||||
|
||||
(defn make-web3 [rpc-url]
|
||||
(->> (js/Web3.providers.HttpProvider. rpc-url)
|
||||
(js/Web3.)))
|
||||
|
||||
(defn make-callback [{:keys [error-msg result-channel]}]
|
||||
(fn [error result]
|
||||
(if error
|
||||
(do
|
||||
(log/error (str error-msg ":") error)
|
||||
(handler/invoke-handler :error {:error-msg error-msg
|
||||
:details error}))
|
||||
(put! result-channel result))
|
||||
(close! result-channel)))
|
||||
|
||||
(defn new-identity [web3]
|
||||
(let [result-channel (chan)
|
||||
callback (make-callback {:error-msg "Call to newIdentity failed"
|
||||
:result-channel result-channel})]
|
||||
(.newIdentity (.-shh web3) callback)
|
||||
result-channel))
|
||||
|
||||
(defn handle-ack [{:keys [ack-msg-id]}]
|
||||
(log/info "Got ack for message:" ack-msg-id)
|
||||
(delivery/remove-pending-message ack-msg-id)
|
||||
(handler/invoke-handler :msg-acked {:msg-id ack-msg-id}))
|
||||
|
||||
(defn post-msg [web3 msg]
|
||||
(let [js-msg (clj->js msg)]
|
||||
(log/info "Sending whisper message:" js-msg)
|
||||
(-> (whisper web3)
|
||||
(.post js-msg (fn [error result]
|
||||
(when error
|
||||
(let [error-msg "Call to shh.post() failed"]
|
||||
(log/error (str error-msg ":") error)
|
||||
(handler/invoke-handler :error {:error-msg error-msg
|
||||
:details error}))))))))
|
||||
|
||||
(defn make-msg
|
||||
"Returns [msg-id msg], `msg` is formed for Web3.shh.post()"
|
||||
[{:keys [from to ttl topics payload]
|
||||
:or {ttl syng-msg-ttl
|
||||
topics []}}]
|
||||
(let [msg-id (.guid js/chance)]
|
||||
{:msg-id msg-id
|
||||
:msg (cond-> {:ttl ttl
|
||||
:topics (->> (conj topics syng-app-topic)
|
||||
(mapv from-ascii))
|
||||
:payload (->> (merge payload {:msg-id msg-id})
|
||||
(str)
|
||||
(from-ascii))}
|
||||
from (assoc :from from)
|
||||
to (assoc :to to))}))
|
||||
|
||||
(defn send-ack [web3 to msg-id]
|
||||
(log/info "Acking message:" msg-id "To:" to)
|
||||
(let [{:keys [msg]} (make-msg {:from (state/my-identity)
|
||||
:to to
|
||||
:payload {:type :ack
|
||||
:ack-msg-id msg-id}})]
|
||||
(post-msg web3 msg)))
|
||||
|
||||
(defn handle-user-msg [web3 from {:keys [msg-id] :as payload}]
|
||||
(send-ack web3 from msg-id)
|
||||
(handler/invoke-handler :new-msg {:from from
|
||||
:payload payload}))
|
||||
|
||||
(defn handle-arriving-whisper-msg [web3 msg]
|
||||
(log/info "Got whisper message:" msg)
|
||||
(let [{from :from
|
||||
to :to
|
||||
topics :topics ;; always empty (bug in go-ethereum?)
|
||||
payload :payload
|
||||
:as msg} (js->clj msg :keywordize-keys true)]
|
||||
(if (= to (state/my-identity))
|
||||
(let [{msg-type :type
|
||||
msg-id :msg-id
|
||||
:as payload} (->> (to-ascii payload)
|
||||
(read-string))]
|
||||
(case msg-type
|
||||
:ack (handle-ack payload)
|
||||
:user-msg (handle-user-msg web3 from payload)))
|
||||
(log/warn "My identity:" (state/my-identity) "Message To:" to "Message is encrypted for someone else, ignoring"))))
|
||||
|
||||
(defn listen
|
||||
"Returns a filter which can be stopped with (stop-whisper-listener)"
|
||||
[web3]
|
||||
(let [topics [syng-app-topic]
|
||||
shh (whisper web3)
|
||||
filter (.filter shh (make-topics topics) (fn [error msg]
|
||||
(if error
|
||||
(handler/invoke-handler :error {:error-msg error})
|
||||
(handle-arriving-whisper-msg web3 msg))))]
|
||||
(state/add-filter topics filter)))
|
||||
|
||||
(defn stop-listener [filter]
|
||||
(.stopWatching filter))
|
||||
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,23 @@
|
|||
(ns syng-im.utils.logging
|
||||
(:require [cljs-time.core :as t]
|
||||
[cljs-time.format :as tf]))
|
||||
|
||||
(defn timestamp []
|
||||
(tf/unparse (:hour-minute-second-fraction tf/formatters) (t/now)))
|
||||
|
||||
(defn info [& args]
|
||||
(let [args (cons (timestamp) args)]
|
||||
(.apply (.-log js/console) js/console (into-array args))))
|
||||
|
||||
(defn warn [& args]
|
||||
(let [args (cons (timestamp) args)]
|
||||
(.apply (.-warn js/console) js/console (into-array args))))
|
||||
|
||||
(defn error [& args]
|
||||
(let [args (cons (timestamp) args)]
|
||||
(.apply (.-error js/console) js/console (into-array args))))
|
||||
|
||||
|
||||
(comment
|
||||
|
||||
)
|
Loading…
Reference in New Issue