enhancement/integrate-store-chat (#230)

* added store protocol

* setting peer

* fix

* line

* Update dingpu.md

* Update chat2.nim

* Update dingpu.md

* Update dingpu.md

* playing around

* fix

* fix

* fixes

* tested

* tested and finalized

* fix

* Update docs/tutorial/dingpu.md

Co-authored-by: Oskar Thorén <ot@oskarthoren.com>

* Update chat2.nim

* fix

Co-authored-by: Oskar Thorén <ot@oskarthoren.com>
This commit is contained in:
Dean Eigenmann 2020-10-15 13:56:53 +02:00 committed by GitHub
parent e67301f1f6
commit 5c25ed131a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 25 additions and 9 deletions

View File

@ -2,11 +2,13 @@
## Basic chat usage ## Basic chat usage
> If historical messaging is desired, the chat app requires that the remote peer specified in `storenode` option supports the WakuStore protocol. For the current cluster node deployed as part of Dingpu this is already the case.
Start two chat apps: Start two chat apps:
``` ```
./build/chat2 --ports-shift=0 --staticnode:/ip4/134.209.139.210/tcp/60000/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS ./build/chat2 --ports-shift:0 --storenode:/ip4/134.209.139.210/tcp/60000/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS --staticnode:/ip4/134.209.139.210/tcp/60000/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS
./build/chat2 --ports-shift=1 --staticnode:/ip4/134.209.139.210/tcp/60000/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS ./build/chat2 --ports-shift:1 --storenode:/ip4/134.209.139.210/tcp/60000/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS --staticnode:/ip4/134.209.139.210/tcp/60000/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS
``` ```
By specifying `staticnode` it connects to that node subscribes to the `waku` topic. This ensures messages are relayed properly. By specifying `staticnode` it connects to that node subscribes to the `waku` topic. This ensures messages are relayed properly.

View File

@ -19,7 +19,7 @@ import libp2p/[switch, # manage transports, a single entry poi
muxers/muxer, # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection muxers/muxer, # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection
muxers/mplex/mplex] # define some contants and message types for stream multiplexing muxers/mplex/mplex] # define some contants and message types for stream multiplexing
import ../../waku/node/v2/[config, wakunode2, waku_types], import ../../waku/node/v2/[config, wakunode2, waku_types],
../../waku/protocol/v2/waku_relay, ../../waku/protocol/v2/[waku_relay, waku_store],
../../waku/node/common ../../waku/node/common
const Help = """ const Help = """
@ -55,15 +55,17 @@ proc initAddress(T: type MultiAddress, str: string): T =
raise newException(ValueError, raise newException(ValueError,
"Invalid bootstrap node multi-address") "Invalid bootstrap node multi-address")
# NOTE Dialing on WakuRelay specifically proc parsePeer(address: string): PeerInfo =
proc dialPeer(c: Chat, address: string) {.async.} =
let multiAddr = MultiAddress.initAddress(address) let multiAddr = MultiAddress.initAddress(address)
let parts = address.split("/") let parts = address.split("/")
let remotePeer = PeerInfo.init(parts[^1], [multiAddr]) result = PeerInfo.init(parts[^1], [multiAddr])
echo &"dialing peer: {multiAddr}" # NOTE Dialing on WakuRelay specifically
proc dialPeer(c: Chat, address: string) {.async.} =
let peer = parsePeer(address)
echo &"dialing peer: {peer.peerId}"
# XXX Discarding conn, do we want to keep this here? # XXX Discarding conn, do we want to keep this here?
discard await c.node.switch.dial(remotePeer, WakuRelayCodec) discard await c.node.switch.dial(peer, WakuRelayCodec)
c.connected = true c.connected = true
proc connectToNodes(c: Chat, nodes: openArray[string]) = proc connectToNodes(c: Chat, nodes: openArray[string]) =
@ -172,10 +174,20 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId
echo &"Listening on\n {listenStr}" echo &"Listening on\n {listenStr}"
if conf.storenode != "":
node.wakuStore.setPeer(parsePeer(conf.storenode))
proc storeHandler(response: HistoryResponse) {.gcsafe.} =
for msg in response.messages:
let payload = cast[string](msg.payload)
echo &"{payload}"
info "Hit store handler"
await node.query(HistoryQuery(topics: @[DefaultContentTopic]), storeHandler)
# Subscribe to a topic # Subscribe to a topic
# TODO To get end to end sender would require more information in payload # TODO To get end to end sender would require more information in payload
# We could possibly indicate the relayer point with connection somehow probably (?) # We could possibly indicate the relayer point with connection somehow probably (?)
let topic = cast[Topic](DefaultTopic)
proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} =
let message = WakuMessage.init(data).value let message = WakuMessage.init(data).value
let payload = cast[string](message.payload) let payload = cast[string](message.payload)
@ -184,6 +196,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
# XXX Timing issue with subscribe, need to wait a bit to ensure GRAFT message is sent # XXX Timing issue with subscribe, need to wait a bit to ensure GRAFT message is sent
await sleepAsync(5.seconds) await sleepAsync(5.seconds)
let topic = cast[Topic](DefaultTopic)
await node.subscribe(topic, handler) await node.subscribe(topic, handler)
await chat.readWriteLoop() await chat.readWriteLoop()

View File

@ -133,6 +133,7 @@ proc start*(node: WakuNode) {.async.} =
let msg = WakuMessage.init(data) let msg = WakuMessage.init(data)
if msg.isOk(): if msg.isOk():
node.filters.notify(msg.value(), "") node.filters.notify(msg.value(), "")
await node.subscriptions.notify(topic, msg.value())
await node.wakuRelay.subscribe("waku", relayHandler) await node.wakuRelay.subscribe("waku", relayHandler)