enhancement/store-message-handler (#185)

* started working on wrapper

* fixes

* changes

* fixes

* Update waku_store.nim
This commit is contained in:
Dean Eigenmann 2020-09-25 16:02:13 +02:00 committed by GitHub
parent bcc931baf5
commit 25b48bb99e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 52 additions and 33 deletions

View File

@ -32,7 +32,7 @@ procSuite "Waku Store":
let
proto = WakuStore.init(dialSwitch)
subscription = proto.subscription()
rpc = HistoryQuery(uuid: "1234", topics: @["topic"])
rpc = HistoryQuery(topics: @["topic"])
proto.setPeer(listenSwitch.peerInfo)
@ -48,7 +48,6 @@ procSuite "Waku Store":
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.uuid == rpc.uuid
response.messages.len() == 1
response.messages[0] == msg
completionFut.complete(true)

View File

@ -121,7 +121,6 @@ procSuite "WakuNode":
Port(60002))
contentTopic = "foobar"
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
uuid = "123"
var completionFut = newFuture[bool]()
@ -136,11 +135,10 @@ procSuite "WakuNode":
proc storeHandler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.uuid == uuid
response.messages[0] == message
completionFut.complete(true)
await node1.query(HistoryQuery(uuid: uuid, topics: @[contentTopic]), storeHandler)
await node1.query(HistoryQuery(topics: @[contentTopic]), storeHandler)
check:
(await completionFut.withTimeout(5.seconds)) == true

View File

@ -5,7 +5,7 @@ proc waku_publish(topic: string, message: seq[byte]): bool
# TODO This should be properly done with rpc types, etc.
proc waku_publish2(topic: string, message: seq[byte]): bool
proc waku_subscribe(topic: string): bool
proc waku_query(uuid: string, topics: seq[string]): bool
proc waku_query(topics: seq[string]): bool
#proc waku_subscribe(topic: string, handler: Topichandler): bool
# NYI

View File

@ -66,12 +66,12 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
#if not result:
# raise newException(ValueError, "Message could not be posted")
rpcsrv.rpc("waku_query") do(uuid: string, topics: seq[string]) -> bool:
debug "waku_query", uuid=uuid
rpcsrv.rpc("waku_query") do(topics: seq[string]) -> bool:
debug "waku_query"
# XXX: Hacky in-line handler
proc handler(response: HistoryResponse) {.gcsafe.} =
info "Hit response handler", uuid=response.uuid, messages=response.messages
info "Hit response handler", messages=response.messages
await node.query(HistoryQuery(uuid: uuid, topics: topics), handler)
await node.query(HistoryQuery(topics: topics), handler)
return true

View File

@ -48,13 +48,16 @@ type
QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
HistoryQuery* = object
uuid*: string
topics*: seq[string]
HistoryResponse* = object
uuid*: string
messages*: seq[WakuMessage]
HistoryRPC* = object
requestId*: string
query*: HistoryQuery
response*: HistoryResponse
HistoryPeer* = object
peerInfo*: PeerInfo

View File

@ -12,7 +12,7 @@ logScope:
topics = "wakustore"
const
WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha5"
WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha6"
proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
var msg = HistoryQuery()
@ -20,8 +20,7 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
var topics: seq[string]
discard ? pb.getField(1, msg.uuid)
discard ? pb.getRepeatedField(2, topics)
discard ? pb.getRepeatedField(1, topics)
msg.topics = topics
ok(msg)
@ -31,33 +30,52 @@ proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
var messages: seq[seq[byte]]
discard ? pb.getField(1, msg.uuid)
discard ? pb.getRepeatedField(2, messages)
discard ? pb.getRepeatedField(1, messages)
for buf in messages:
msg.messages.add(? WakuMessage.init(buf))
ok(msg)
proc init*(T: type HistoryRPC, buffer: seq[byte]): ProtoResult[T] =
var rpc = HistoryRPC()
let pb = initProtoBuffer(buffer)
discard ? pb.getField(1, rpc.requestId)
var queryBuffer: seq[byte]
discard ? pb.getField(2, queryBuffer)
rpc.query = ? HistoryQuery.init(queryBuffer)
var responseBuffer: seq[byte]
discard ? pb.getField(3, responseBuffer)
rpc.response = ? HistoryResponse.init(responseBuffer)
ok(rpc)
proc encode*(query: HistoryQuery): ProtoBuffer =
result = initProtoBuffer()
result.write(1, query.uuid)
for topic in query.topics:
result.write(2, topic)
result.write(1, topic)
proc encode*(response: HistoryResponse): ProtoBuffer =
result = initProtoBuffer()
result.write(1, response.uuid)
for msg in response.messages:
result.write(2, msg.encode())
result.write(1, msg.encode())
proc encode*(rpc: HistoryRPC): ProtoBuffer =
result = initProtoBuffer()
result.write(1, rpc.requestId)
result.write(2, rpc.query.encode())
result.write(3, rpc.response.encode())
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
result = HistoryResponse(uuid: query.uuid, messages: newSeq[WakuMessage]())
result = HistoryResponse(messages: newSeq[WakuMessage]())
for msg in w.messages:
if msg.contentTopic in query.topics:
result.messages.insert(msg)
@ -65,15 +83,16 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
method init*(ws: WakuStore) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var message = await conn.readLp(64*1024)
var rpc = HistoryQuery.init(message)
if rpc.isErr:
var res = HistoryRPC.init(message)
if res.isErr:
error "failed to decode rpc"
return
info "received query"
let res = ws.findMessages(rpc.value)
await conn.writeLp(res.encode().buffer)
let value = res.value
let response = ws.findMessages(res.value.query)
await conn.writeLp(HistoryRPC(requestId: value.requestId, response: response).encode().buffer)
ws.handler = handle
ws.codec = WakuStoreCodec
@ -108,13 +127,13 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
let peer = w.peers[0]
let conn = await w.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuStoreCodec)
await conn.writeLP(query.encode().buffer)
await conn.writeLP(HistoryRPC(requestId: "foo", query: query).encode().buffer)
var message = await conn.readLp(64*1024)
let response = HistoryResponse.init(message)
let response = HistoryRPC.init(message)
if response.isErr:
error "failed to decode response"
return
handler(response.value)
handler(response.value.response)