simple chat app used to test protocol lib

This commit is contained in:
michaelr 2016-03-06 21:31:24 +02:00
parent e0e60592f0
commit 6a3d64e184
17 changed files with 40 additions and 364 deletions

View File

@ -1,59 +0,0 @@
(ns cljs-tests.protocol.api
(:require [cljs.core.async :refer [<! timeout]]
[cljs-tests.protocol.state.state :as state]
[cljs-tests.protocol.delivery :as delivery]
[cljs-tests.protocol.state.delivery :as delivery-state]
[cljs-tests.protocol.whisper :as whisper]
[cljs-tests.protocol.handler :as h])
(:require-macros [cljs.core.async.macros :refer [go]]))
(def default-content-type "text/plain")
(defn create-connection [ethereum-rpc-url]
(whisper/make-web3 ethereum-rpc-url))
(defn create-identity [connection]
(whisper/new-identity connection))
(defn init-protocol
"Required [handler ethereum-rpc-url]
Optional [whisper-identity] - if not passed a new identity is created automatically
(fn handler [{:keys [event-type...}])
:event-type can be:
:new-msg - [from payload]
:error - [error-msg details]
:msg-acked [msg-id]
:delivery-failed [msg-id]
:new-msg, msg-acked should be handled idempotently (may be called multiple times for the same msg-id)
"
[{:keys [handler ethereum-rpc-url identity]}]
(state/set-handler handler)
(go
(let [connection (create-connection ethereum-rpc-url)
identity (or identity
(<! (create-identity connection)))]
(state/set-connection connection)
(state/set-identity identity)
(whisper/listen connection)
(delivery/start-delivery-loop)
(h/invoke-handler :initialized {:identity identity}))))
(defn send-user-msg [{:keys [to content]}]
(let [{:keys [msg-id msg] :as new-msg} (whisper/make-msg {:from (state/my-identity)
:to to
:payload {:content content
:content-type default-content-type
:type :user-msg}})]
(delivery-state/add-pending-message msg-id msg)
(whisper/post-msg (state/connection) msg)
new-msg))
(defn my-identity []
(state/my-identity))
(defn current-connection []
(state/connection))

View File

@ -1,55 +0,0 @@
(ns cljs-tests.protocol.delivery
(:require [cljs.core.async :refer [<! timeout]]
[cljs-time.core :as t]
[cljs-tests.utils.logging :as log]
[cljs-tests.protocol.state.delivery :as state]
[cljs-tests.protocol.state.state :as s]
[cljs-tests.protocol.whisper :as whisper]
[cljs-tests.protocol.handler :as handler])
(:require-macros [cljs.core.async.macros :refer [go]]))
(def max-retry-send-count 5)
(def ack-wait-timeout-ms (t/millis 5000))
(def check-delivery-interval-msg 100)
(defn expired? [timestamp]
(t/before? (t/plus timestamp ack-wait-timeout-ms) (t/now)))
(defn delivery-expired-seq []
(lazy-seq
(let [{:keys [timestamp] :as item} (->> (state/delivery-queue)
(peek))]
(when timestamp
(if (expired? timestamp)
(do (state/pop-delivery-queue)
(cons item (delivery-expired-seq)))
nil)))))
(defn start-delivery-loop []
(go (loop [_ (<! (timeout check-delivery-interval-msg))]
(doseq [{:keys [msg-id]} (delivery-expired-seq)]
(log/info "Delivery-loop:" "Checking delivery of msg-id" msg-id)
(when-let [{:keys [retry-count msg]} (state/pending? msg-id)]
(log/info "Delivery-loop: Message" msg-id "is pending, retry-count=" retry-count)
(if (< retry-count max-retry-send-count)
(do
(log/info "Delivery-loop: Re-sending message" msg-id)
(whisper/post-msg (s/connection) msg)
(state/push-delivery-queue msg-id)
(state/inc-retry-count msg-id))
(do
(log/info "Delivery-loop: Retry-count for message" msg-id "reached maximum")
(state/remove-pending-message msg-id)
(handler/invoke-handler :delivery-failed {:msg-id msg-id})))))
(recur (<! (timeout check-delivery-interval-msg))))))
(comment
(take 30 (delivery-expired-seq))
(state/add-pending-message 4 {:msg-id 4})
@state/state
)

View File

@ -1,5 +0,0 @@
(ns cljs-tests.protocol.handler
(:require [cljs-tests.protocol.state.state :as state]))
(defn invoke-handler [event-type params]
((state/handler) (assoc params :event-type event-type)))

View File

@ -1,36 +0,0 @@
(ns cljs-tests.protocol.state.delivery
(:require [cljs-time.core :as t]
[cljs-tests.protocol.state.state :refer [state]]
[cljs-tests.utils.logging :as log]))
(defn inc-retry-count [msg-id]
(swap! state (fn [state]
(if (get-in state [:pending-messages msg-id])
(update-in state [:pending-messages msg-id :retry-count] inc)
state))))
(defn pending? [msg-id]
(get-in @state [:pending-messages msg-id]))
(defn push-msg-to-delivery-queue [state msg-id]
(update-in state [:delivery-queue] conj {:timestamp (t/now)
:msg-id msg-id}))
(defn add-pending-message [msg-id msg]
(swap! state (fn [state]
(-> (assoc-in state [:pending-messages msg-id] {:msg msg
:retry-count 0})
(push-msg-to-delivery-queue msg-id)))))
(defn pop-delivery-queue []
(swap! state update-in [:delivery-queue] pop))
(defn push-delivery-queue [msg-id]
(swap! state push-msg-to-delivery-queue msg-id))
(defn remove-pending-message [msg-id]
(log/info "Removing message" msg-id "from pending")
(swap! state update-in [:pending-messages] dissoc msg-id))
(defn delivery-queue []
(:delivery-queue @state))

View File

@ -1,30 +0,0 @@
(ns cljs-tests.protocol.state.state
(:require [cljs-time.core :as t]))
(def state (atom {:pending-messages {}
:filters {}
:delivery-queue #queue []
:handler nil
:identity nil
:connection nil}))
(defn add-filter [topics filter]
(swap! state assoc-in [:filters topics] filter))
(defn set-handler [handler]
(swap! state assoc :handler handler))
(defn set-identity [identity]
(swap! state assoc :identity identity))
(defn set-connection [connection]
(swap! state assoc :connection connection))
(defn connection []
(:connection @state))
(defn my-identity []
(:identity @state))
(defn handler []
(:handler @state))

View File

@ -1,127 +0,0 @@
(ns cljs-tests.protocol.whisper
(:require [cljs.core.async :refer [chan put! close! <!]]
[cljsjs.web3]
[cljsjs.chance]
[cljs-tests.utils.logging :as log]
[cljs-tests.protocol.state.state :as state]
[cljs-tests.protocol.state.delivery :as delivery]
[cljs-tests.protocol.handler :as handler]
[cljs.reader :refer [read-string]])
(:require-macros [cljs.core.async.macros :refer [go]]))
(def syng-app-topic "SYNG-APP-CHAT-TOPIC")
(def syng-msg-ttl 100)
(defn from-ascii [s]
(.fromAscii js/Web3.prototype s))
(defn to-ascii [s]
(.toAscii js/Web3.prototype s))
(defn whisper [web3]
(.-shh web3))
(defn make-topics [topics]
(->> {:topics (mapv from-ascii topics)}
(clj->js)))
(defn make-web3 [rpc-url]
(->> (js/Web3.providers.HttpProvider. rpc-url)
(js/Web3.)))
(defn make-callback [{:keys [error-msg result-channel]}]
(fn [error result]
(if error
(do
(log/error (str error-msg ":") error)
(handler/invoke-handler :error {:error-msg error-msg
:details error}))
(put! result-channel result))
(close! result-channel)))
(defn new-identity [web3]
(let [result-channel (chan)
callback (make-callback {:error-msg "Call to newIdentity failed"
:result-channel result-channel})]
(.newIdentity (.-shh web3) callback)
result-channel))
(defn handle-ack [{:keys [ack-msg-id]}]
(log/info "Got ack for message:" ack-msg-id)
(delivery/remove-pending-message ack-msg-id)
(handler/invoke-handler :msg-acked {:msg-id ack-msg-id}))
(defn post-msg [web3 msg]
(let [js-msg (clj->js msg)]
(log/info "Sending whisper message:" js-msg)
(-> (whisper web3)
(.post js-msg (fn [error result]
(when error
(let [error-msg "Call to shh.post() failed"]
(log/error (str error-msg ":") error)
(handler/invoke-handler :error {:error-msg error-msg
:details error}))))))))
(defn make-msg
"Returns [msg-id msg], `msg` is formed for Web3.shh.post()"
[{:keys [from to ttl topics payload]
:or {ttl syng-msg-ttl
topics []}}]
(let [msg-id (.guid js/chance)]
{:msg-id msg-id
:msg (cond-> {:ttl ttl
:topics (->> (conj topics syng-app-topic)
(mapv from-ascii))
:payload (->> (merge payload {:msg-id msg-id})
(str)
(from-ascii))}
from (assoc :from from)
to (assoc :to to))}))
(defn send-ack [web3 to msg-id]
(log/info "Acking message:" msg-id "To:" to)
(let [{:keys [msg]} (make-msg {:from (state/my-identity)
:to to
:payload {:type :ack
:ack-msg-id msg-id}})]
(post-msg web3 msg)))
(defn handle-user-msg [web3 from {:keys [msg-id] :as payload}]
(send-ack web3 from msg-id)
(handler/invoke-handler :new-msg {:from from
:payload payload}))
(defn handle-arriving-whisper-msg [web3 msg]
(log/info "Got whisper message:" msg)
(let [{from :from
to :to
topics :topics ;; always empty (bug in go-ethereum?)
payload :payload
:as msg} (js->clj msg :keywordize-keys true)]
(if (= to (state/my-identity))
(let [{msg-type :type
msg-id :msg-id
:as payload} (->> (to-ascii payload)
(read-string))]
(case msg-type
:ack (handle-ack payload)
:user-msg (handle-user-msg web3 from payload)))
(log/warn "My identity:" (state/my-identity) "Message To:" to "Message is encrypted for someone else, ignoring"))))
(defn listen
"Returns a filter which can be stopped with (stop-whisper-listener)"
[web3]
(let [topics [syng-app-topic]
shh (whisper web3)
filter (.filter shh (make-topics topics) (fn [error msg]
(if error
(handler/invoke-handler :error {:error-msg error})
(handle-arriving-whisper-msg web3 msg))))]
(state/add-filter topics filter)))
(defn stop-listener [filter]
(.stopWatching filter))

View File

@ -1,23 +0,0 @@
(ns cljs-tests.utils.logging
(:require [cljs-time.core :as t]
[cljs-time.format :as tf]))
(defn timestamp []
(tf/unparse (:hour-minute-second-fraction tf/formatters) (t/now)))
(defn info [& args]
(let [args (cons (timestamp) args)]
(.apply (.-log js/console) js/console (into-array args))))
(defn warn [& args]
(let [args (cons (timestamp) args)]
(.apply (.-warn js/console) js/console (into-array args))))
(defn error [& args]
(let [args (cons (timestamp) args)]
(.apply (.-error js/console) js/console (into-array args))))
(comment
)

13
simple-whisper-chat/.gitignore vendored Normal file
View File

@ -0,0 +1,13 @@
/resources/public/js/compiled/**
figwheel_server.log
pom.xml
*jar
/lib/
/classes/
/out/
/target/
.lein-deps-sum
.lein-repl-history
.lein-plugins/
.repl
.nrepl-port

View File

@ -1,8 +1,8 @@
(defproject cljs-tests "0.1.0-SNAPSHOT"
(defproject syng-im/simple-whisper-chat "0.1.0-SNAPSHOT"
:description "FIXME: write this!"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:url "http://www.eclipse.org/legal/epl-v10.html"}
:min-lein-version "2.5.3"
@ -14,7 +14,7 @@
;; cljs deps
[cljsjs/chance "0.7.3-0"]
[com.andrewmcveigh/cljs-time "0.4.0"]
]
[syng-im/protocol "0.1.0"]]
:plugins [[lein-figwheel "0.5.0-6"]
[lein-cljsbuild "1.1.2" :exclusions [[org.clojure/clojure]]]]
@ -24,40 +24,38 @@
:clean-targets ^{:protect false} ["resources/public/js/compiled" "target"]
:cljsbuild {:builds
[{:id "dev"
:source-paths ["src"]
[{:id "dev"
:source-paths ["src/cljs"]
;; If no code is to be run, set :figwheel true for continued automagical reloading
:figwheel {:on-jsload "cljs-tests.core/on-js-reload"}
:figwheel {:on-jsload "syng-im.core/on-js-reload"}
:compiler {:main cljs-tests.core
:asset-path "js/compiled/out"
:output-to "resources/public/js/compiled/cljs_tests.js"
:output-dir "resources/public/js/compiled/out"
:source-map-timestamp true
:foreign-libs [{:file "resources/public/vendor/js/web3.js"
:file-min "resources/public/vendor/js/web3.min.js"
:provides ["cljsjs.web3"]
;:requires ["cljsjs.jquery"]
}]}}
:compiler {:main syng-im.core
:asset-path "js/compiled/out"
:output-to "resources/public/js/compiled/syng_im.js"
:output-dir "resources/public/js/compiled/out"
:source-map-timestamp true
:foreign-libs [{:file "resources/public/vendor/js/web3.js"
:file-min "resources/public/vendor/js/web3.min.js"
:provides ["cljsjs.web3"]}]}}
;; This next build is an compressed minified build for
;; production. You can build this with:
;; lein cljsbuild once min
{:id "min"
:source-paths ["src"]
:compiler {:output-to "resources/public/js/compiled/cljs_tests.js"
:main cljs-tests.core
:optimizations :advanced
:pretty-print false}}]}
{:id "min"
:source-paths ["src/cljs"]
:compiler {:output-to "resources/public/js/compiled/syng_im.js"
:main syng-im.core
:optimizations :advanced
:pretty-print false}}]}
:figwheel {;; :http-server-root "public" ;; default and assumes "resources"
;; :server-port 3449 ;; default
;; :server-ip "127.0.0.1"
:css-dirs ["resources/public/css"] ;; watch and update CSS
:css-dirs ["resources/public/css"] ;; watch and update CSS
;; Start an nREPL server into the running figwheel process
:nrepl-port 7888
:nrepl-port 7888
;; Server Ring Handler (optional)
;; if you want to embed a ring handler into the figwheel http-kit

View File

@ -43,6 +43,6 @@
<label for="to-identity">To Identity</label>
<input id="to-identity" type="text" />
</div>
<script src="js/compiled/cljs_tests.js" type="text/javascript"></script>
<script src="js/compiled/syng_im.js" type="text/javascript"></script>
</body>
</html>

View File

@ -1,6 +1,6 @@
(ns cljs-tests.core
(:require [cljs-tests.protocol.api :as p]
[cljs-tests.utils.logging :as log]
(ns syng-im.core
(:require [syng-im.protocol.api :as p]
[syng-im.utils.logging :as log]
[goog.dom :as g]
[goog.dom.forms :as f]
[goog.events :as e]
@ -95,7 +95,7 @@
;(p/make-whisper-msg web3-2 user2-ident user1-ident "Hello World!")
(require '[cljs-tests.protocol.whisper :as w])
(require '[syng-im.protocol.whisper :as w])
(def web3 (w/make-web3 "http://localhost:4546"))
(.newIdentity (w/whisper web3) (fn [error result]
(println error result)))