fix/content-type-int (#235)

* fixes

* fixes

* using a better topic

* fix

* fixed
This commit is contained in:
Dean Eigenmann 2020-10-21 10:55:06 +02:00 committed by GitHub
parent b2edfe8dda
commit 5ddd8701c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 37 additions and 25 deletions

View File

@ -37,7 +37,7 @@ proc runBackground() {.async.} =
# Publish to a topic # Publish to a topic
let payload = cast[seq[byte]]("hello world") let payload = cast[seq[byte]]("hello world")
let message = WakuMessage(payload: payload, contentTopic: "foo") let message = WakuMessage(payload: payload, contentTopic: ContentTopic(1))
node.publish(topic, message) node.publish(topic, message)
# TODO Await with try/except here # TODO Await with try/except here

View File

@ -3,7 +3,7 @@ when not(compileOption("threads")):
import std/[tables, strformat, strutils] import std/[tables, strformat, strutils]
import confutils, chronicles, chronos, stew/shims/net as stewNet, import confutils, chronicles, chronos, stew/shims/net as stewNet,
eth/keys, bearssl eth/keys, bearssl, stew/[byteutils, endians2]
import libp2p/[switch, # manage transports, a single entry point for dialing and listening import libp2p/[switch, # manage transports, a single entry point for dialing and listening
multistream, # tag stream with short header to identify it multistream, # tag stream with short header to identify it
crypto/crypto, # cryptographic functions crypto/crypto, # cryptographic functions
@ -31,7 +31,9 @@ const Help = """
""" """
const DefaultTopic = "waku" const DefaultTopic = "waku"
const DefaultContentTopic = "dingpu"
const Dingpu = "dingpu".toBytes
const DefaultContentTopic = ContentTopic(uint32.fromBytes(Dingpu))
# XXX Connected is a bit annoying, because incoming connections don't trigger state change # XXX Connected is a bit annoying, because incoming connections don't trigger state change
# Could poll connection pool or something here, I suppose # Could poll connection pool or something here, I suppose

View File

@ -21,7 +21,8 @@ procSuite "Waku Filter":
let let
key = PrivateKey.random(ECDSA, rng[]).get() key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.init(key) peer = PeerInfo.init(key)
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "pew2") contentTopic = ContentTopic(1)
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
var dialSwitch = newStandardSwitch() var dialSwitch = newStandardSwitch()
discard await dialSwitch.start() discard await dialSwitch.start()
@ -38,7 +39,7 @@ procSuite "Waku Filter":
let let
proto = WakuFilter.init(dialSwitch, crypto.newRng(), handle) proto = WakuFilter.init(dialSwitch, crypto.newRng(), handle)
rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @["pew", "pew2"])], topic: "topic") rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: "topic")
dialSwitch.mount(proto) dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo) proto.setPeer(listenSwitch.peerInfo)

View File

@ -20,8 +20,9 @@ procSuite "Waku Store":
let let
key = PrivateKey.random(ECDSA, rng[]).get() key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.init(key) peer = PeerInfo.init(key)
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "topic") topic = ContentTopic(1)
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "topic2") msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic)
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(2))
var dialSwitch = newStandardSwitch() var dialSwitch = newStandardSwitch()
discard await dialSwitch.start() discard await dialSwitch.start()
@ -32,7 +33,7 @@ procSuite "Waku Store":
let let
proto = WakuStore.init(dialSwitch, crypto.newRng()) proto = WakuStore.init(dialSwitch, crypto.newRng())
subscription = proto.subscription() subscription = proto.subscription()
rpc = HistoryQuery(topics: @["topic"]) rpc = HistoryQuery(topics: @[topic])
proto.setPeer(listenSwitch.peerInfo) proto.setPeer(listenSwitch.peerInfo)

View File

@ -19,7 +19,7 @@ procSuite "WakuNode":
node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"), node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"),
Port(60000)) Port(60000))
pubSubTopic = "chat" pubSubTopic = "chat"
contentTopic = "foobar" contentTopic = ContentTopic(1)
filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])]) filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])])
message = WakuMessage(payload: "hello world".toBytes(), message = WakuMessage(payload: "hello world".toBytes(),
contentTopic: contentTopic) contentTopic: contentTopic)
@ -71,7 +71,7 @@ procSuite "WakuNode":
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002)) Port(60002))
pubSubTopic = "chat" pubSubTopic = "chat"
contentTopic = "foobar" contentTopic = ContentTopic(1)
filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])]) filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])])
message = WakuMessage(payload: "hello world".toBytes(), message = WakuMessage(payload: "hello world".toBytes(),
contentTopic: contentTopic) contentTopic: contentTopic)
@ -133,7 +133,7 @@ procSuite "WakuNode":
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002)) Port(60002))
contentTopic = "foobar" contentTopic = ContentTopic(1)
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
@ -169,7 +169,7 @@ procSuite "WakuNode":
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002)) Port(60002))
contentTopic = "foobar" contentTopic = ContentTopic(1)
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()

View File

@ -57,17 +57,21 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
#if not result: #if not result:
# raise newException(ValueError, "Message could not be posted") # raise newException(ValueError, "Message could not be posted")
rpcsrv.rpc("waku_query") do(topics: seq[string]) -> bool: rpcsrv.rpc("waku_query") do(topics: seq[int]) -> bool:
debug "waku_query" debug "waku_query"
# XXX: Hacky in-line handler # XXX: Hacky in-line handler
proc handler(response: HistoryResponse) {.gcsafe.} = proc handler(response: HistoryResponse) {.gcsafe.} =
info "Hit response handler", messages=response.messages info "Hit response handler", messages=response.messages
await node.query(HistoryQuery(topics: topics), handler) var contentTopics = newSeq[ContentTopic]()
for topic in topics:
contentTopics.add(ContentTopic(topic))
await node.query(HistoryQuery(topics: contentTopics), handler)
return true return true
rpcsrv.rpc("waku_subscribe_filter") do(topic: string, contentFilters: seq[seq[string]]) -> bool: rpcsrv.rpc("waku_subscribe_filter") do(topic: string, contentFilters: seq[seq[int]]) -> bool:
debug "waku_subscribe_filter" debug "waku_subscribe_filter"
# XXX: Hacky in-line handler # XXX: Hacky in-line handler
@ -76,7 +80,10 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
var filters = newSeq[ContentFilter]() var filters = newSeq[ContentFilter]()
for topics in contentFilters: for topics in contentFilters:
filters.add(ContentFilter(topics: topics)) var contentTopics = newSeq[ContentTopic]()
for topic in topics:
contentTopics.add(ContentTopic(topic))
filters.add(ContentFilter(topics: contentTopics))
await node.subscribe(FilterRequest(topic: topic, contentFilters: filters), handler) await node.subscribe(FilterRequest(topic: topic, contentFilters: filters), handler)
return true return true

View File

@ -4,25 +4,26 @@
import import
std/[tables, times], std/[tables, times],
chronos, bearssl, stew/byteutils, chronos, bearssl, stew/[byteutils, endians2],
libp2p/[switch, peerinfo, multiaddress, crypto/crypto], libp2p/[switch, peerinfo, multiaddress, crypto/crypto],
libp2p/protobuf/minprotobuf, libp2p/protobuf/minprotobuf,
libp2p/protocols/protocol, libp2p/protocols/protocol,
libp2p/switch, libp2p/switch,
libp2p/stream/connection, libp2p/stream/connection,
libp2p/protocols/pubsub/[pubsub, gossipsub], libp2p/protocols/pubsub/[pubsub, gossipsub],
nimcrypto/sha2, nimcrypto/sha2
stew/byteutils
# Common data types ----------------------------------------------------------- # Common data types -----------------------------------------------------------
type type
ContentTopic* = uint32
Topic* = string Topic* = string
Message* = seq[byte] Message* = seq[byte]
WakuMessage* = object WakuMessage* = object
payload*: seq[byte] payload*: seq[byte]
contentTopic*: string contentTopic*: ContentTopic
MessageNotificationHandler* = proc(topic: string, msg: WakuMessage): Future[void] {.gcsafe, closure.} MessageNotificationHandler* = proc(topic: string, msg: WakuMessage): Future[void] {.gcsafe, closure.}
@ -51,7 +52,7 @@ type
direction*: bool direction*: bool
HistoryQuery* = object HistoryQuery* = object
topics*: seq[string] topics*: seq[ContentTopic]
pagingInfo*: PagingInfo pagingInfo*: PagingInfo
HistoryResponse* = object HistoryResponse* = object
@ -102,7 +103,7 @@ type
pushHandler*: MessagePushHandler pushHandler*: MessagePushHandler
ContentFilter* = object ContentFilter* = object
topics*: seq[string] topics*: seq[ContentTopic]
ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure.} ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure.}
@ -179,7 +180,7 @@ proc computeIndex*(msg: WakuMessage): Index =
## Takes a WakuMessage and returns its index ## Takes a WakuMessage and returns its index
var ctx: sha256 var ctx: sha256
ctx.init() ctx.init()
if msg.contentTopic.len != 0: # checks for non-empty contentTopic if msg.contentTopic != 0: # checks for non-empty contentTopic
ctx.update(msg.contentTopic.toBytes()) # converts the topic to bytes ctx.update(msg.contentTopic.toBytes()) # converts the topic to bytes
ctx.update(msg.payload) ctx.update(msg.payload)
let digest = ctx.finish() # computes the hash let digest = ctx.finish() # computes the hash

View File

@ -40,7 +40,7 @@ proc encode*(rpc: FilterRequest): ProtoBuffer =
proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] = proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var topics: seq[string] var topics: seq[ContentTopic]
discard ? pb.getRepeatedField(1, topics) discard ? pb.getRepeatedField(1, topics)
ok(ContentFilter(topics: topics)) ok(ContentFilter(topics: topics))

View File

@ -20,7 +20,7 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
var msg = HistoryQuery() var msg = HistoryQuery()
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var topics: seq[string] var topics: seq[ContentTopic]
discard ? pb.getRepeatedField(1, topics) discard ? pb.getRepeatedField(1, topics)