enhancement/protobuf (#47)

* changed to data

* fix

* updated

* checkpoint

* sending protobuf

* working on tests

* bump

* fix

* fix

* fix

* testing

* Update test_waku.nim
This commit is contained in:
Dean Eigenmann 2020-07-23 04:53:29 +02:00 committed by GitHub
parent 0799ef9c16
commit fb45502ba7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 34 additions and 13 deletions

View File

@ -99,5 +99,4 @@ proc waku_unsubscribe(id: Identifier): bool
proc waku_setLightNode(isLightNode: bool)
proc waku_setTopicInterest(topics: seq[string])
proc waku_setBloomFilter(topics: seq[string])
```

View File

@ -12,6 +12,7 @@ import chronos, chronicles
import utils,
libp2p/errors,
libp2p/switch,
libp2p/protobuf/minprotobuf,
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,
libp2p/protocols/pubsub/floodsub
@ -35,6 +36,19 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
dec ceil
doAssert(ceil > 0, "waitSub timeout!")
proc message(): seq[byte] =
var pb = initProtoBuffer()
pb.write(1, "hello")
pb.finish()
pb.buffer
proc decodeMessage(data: seq[byte]): string =
var pb = initProtoBuffer(data)
result = ""
let res = pb.getField(1, result)
suite "FloodSub":
teardown:
let
@ -55,7 +69,9 @@ suite "FloodSub":
var completionFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
debug "Hit handler", topic
let msg = decodeMessage(data)
check topic == "foobar"
check msg == "hello"
completionFut.complete(true)
let
@ -71,7 +87,8 @@ suite "FloodSub":
await waitSub(nodes[0], nodes[1], "foobar")
# TODO: you might want to check the value here
discard await nodes[0].publish("foobar", cast[seq[byte]]("Hello!"))
let msg = message()
discard await nodes[0].publish("foobar", msg)
result = await completionFut.wait(5.seconds)

View File

@ -1,5 +1,6 @@
import
os, strutils, strformat, chronicles, json_rpc/[rpcclient, rpcserver], nimcrypto/sysrand,
libp2p/protobuf/minprotobuf,
eth/common as eth_common, eth/keys,
options
#options as what # TODO: Huh? Redefinition?
@ -12,6 +13,13 @@ createRpcSigs(RpcHttpClient, sigWakuPath)
const topicAmount = 10 #100
proc message(i: int): ProtoBuffer =
let value = "hello " & $(i)
var result = initProtoBuffer()
result.write(initProtoField(1, value))
result.finish()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
debug "Hit handler", topic=topic, data=data
@ -39,12 +47,11 @@ os.sleep(2000)
for i in 0..<topicAmount:
os.sleep(50)
# TODO: This would then publish on a subtopic here
var s = "hello " & $2
var res3 = waitFor nodes[0].wakuPublish("waku", s & "0")
res3 = waitFor nodes[1].wakuPublish("waku", s & "1")
res3 = waitFor nodes[2].wakuPublish("waku", s & "2")
res3 = waitFor nodes[3].wakuPublish("waku", s & "3")
res3 = waitFor nodes[4].wakuPublish("waku", s & "4")
var res3 = waitFor nodes[0].wakuPublish("waku", message(0).buffer)
res3 = waitFor nodes[1].wakuPublish("waku", message(1).buffer)
res3 = waitFor nodes[2].wakuPublish("waku", message(2).buffer)
res3 = waitFor nodes[3].wakuPublish("waku", message(3).buffer)
res3 = waitFor nodes[4].wakuPublish("waku", message(4).buffer)
# Scenario xx2 - 14 full nodes, two edge nodes
# Assume one full topic

View File

@ -1,6 +1,6 @@
# Alpha - Currently implemented in v2
proc waku_version(): string
proc waku_publish(topic: string, message: string): bool
proc waku_publish(topic: string, message: seq[byte]): bool
proc waku_subscribe(topic: string): bool
#proc waku_subscribe(topic: string, handler: Topichandler): bool

View File

@ -21,13 +21,12 @@ proc setupWakuRPC*(wakuProto: WakuProto, rpcsrv: RpcServer) =
result = WakuSubCodec
# TODO: Implement symkey etc logic
rpcsrv.rpc("waku_publish") do(topic: string, message: string) -> bool:
let data = cast[seq[byte]](message)
rpcsrv.rpc("waku_publish") do(topic: string, message: seq[byte]) -> bool:
# Assumes someone subscribing on this topic
#let wakuSub = wakuProto.switch.pubsub
let wakuSub = cast[WakuSub](wakuProto.switch.pubSub.get())
# XXX also future return type
discard wakuSub.publish(topic, data)
discard wakuSub.publish(topic, message)
return true
#if not result:
# raise newException(ValueError, "Message could not be posted")
@ -45,4 +44,3 @@ proc setupWakuRPC*(wakuProto: WakuProto, rpcsrv: RpcServer) =
#if not result:
# raise newException(ValueError, "Message could not be posted")