diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index b1880ee92..2d4e4f911 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -32,7 +32,7 @@ procSuite "Waku Store": let proto = WakuStore.init(dialSwitch) subscription = proto.subscription() - rpc = HistoryQuery(uuid: "1234", topics: @["topic"]) + rpc = HistoryQuery(topics: @["topic"]) proto.setPeer(listenSwitch.peerInfo) @@ -48,7 +48,6 @@ procSuite "Waku Store": proc handler(response: HistoryResponse) {.gcsafe, closure.} = check: - response.uuid == rpc.uuid response.messages.len() == 1 response.messages[0] == msg completionFut.complete(true) diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index c1a3d99aa..b53e0d5ec 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -121,7 +121,6 @@ procSuite "WakuNode": Port(60002)) contentTopic = "foobar" message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) - uuid = "123" var completionFut = newFuture[bool]() @@ -136,11 +135,10 @@ procSuite "WakuNode": proc storeHandler(response: HistoryResponse) {.gcsafe, closure.} = check: - response.uuid == uuid response.messages[0] == message completionFut.complete(true) - await node1.query(HistoryQuery(uuid: uuid, topics: @[contentTopic]), storeHandler) + await node1.query(HistoryQuery(topics: @[contentTopic]), storeHandler) check: (await completionFut.withTimeout(5.seconds)) == true diff --git a/waku/node/v2/rpc/wakucallsigs.nim b/waku/node/v2/rpc/wakucallsigs.nim index 072f4a3c8..6ca19d38b 100644 --- a/waku/node/v2/rpc/wakucallsigs.nim +++ b/waku/node/v2/rpc/wakucallsigs.nim @@ -5,7 +5,7 @@ proc waku_publish(topic: string, message: seq[byte]): bool # TODO This should be properly done with rpc types, etc. proc waku_publish2(topic: string, message: seq[byte]): bool proc waku_subscribe(topic: string): bool -proc waku_query(uuid: string, topics: seq[string]): bool +proc waku_query(topics: seq[string]): bool #proc waku_subscribe(topic: string, handler: Topichandler): bool # NYI diff --git a/waku/node/v2/rpc/wakurpc.nim b/waku/node/v2/rpc/wakurpc.nim index 6c862fed7..8fa410677 100644 --- a/waku/node/v2/rpc/wakurpc.nim +++ b/waku/node/v2/rpc/wakurpc.nim @@ -66,12 +66,12 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) = #if not result: # raise newException(ValueError, "Message could not be posted") - rpcsrv.rpc("waku_query") do(uuid: string, topics: seq[string]) -> bool: - debug "waku_query", uuid=uuid + rpcsrv.rpc("waku_query") do(topics: seq[string]) -> bool: + debug "waku_query" # XXX: Hacky in-line handler proc handler(response: HistoryResponse) {.gcsafe.} = - info "Hit response handler", uuid=response.uuid, messages=response.messages + info "Hit response handler", messages=response.messages - await node.query(HistoryQuery(uuid: uuid, topics: topics), handler) + await node.query(HistoryQuery(topics: topics), handler) return true diff --git a/waku/node/v2/waku_types.nim b/waku/node/v2/waku_types.nim index 2597223fb..ba42d05a8 100644 --- a/waku/node/v2/waku_types.nim +++ b/waku/node/v2/waku_types.nim @@ -48,13 +48,16 @@ type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.} HistoryQuery* = object - uuid*: string topics*: seq[string] HistoryResponse* = object - uuid*: string messages*: seq[WakuMessage] + HistoryRPC* = object + requestId*: string + query*: HistoryQuery + response*: HistoryResponse + HistoryPeer* = object peerInfo*: PeerInfo diff --git a/waku/protocol/v2/waku_store.nim b/waku/protocol/v2/waku_store.nim index bb46c4e7f..c8989189f 100644 --- a/waku/protocol/v2/waku_store.nim +++ b/waku/protocol/v2/waku_store.nim @@ -12,7 +12,7 @@ logScope: topics = "wakustore" const - WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha5" + WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha6" proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = var msg = HistoryQuery() @@ -20,8 +20,7 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = var topics: seq[string] - discard ? pb.getField(1, msg.uuid) - discard ? pb.getRepeatedField(2, topics) + discard ? pb.getRepeatedField(1, topics) msg.topics = topics ok(msg) @@ -31,33 +30,52 @@ proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) var messages: seq[seq[byte]] - - discard ? pb.getField(1, msg.uuid) - discard ? pb.getRepeatedField(2, messages) + discard ? pb.getRepeatedField(1, messages) for buf in messages: msg.messages.add(? WakuMessage.init(buf)) ok(msg) +proc init*(T: type HistoryRPC, buffer: seq[byte]): ProtoResult[T] = + var rpc = HistoryRPC() + let pb = initProtoBuffer(buffer) + + discard ? pb.getField(1, rpc.requestId) + + var queryBuffer: seq[byte] + discard ? pb.getField(2, queryBuffer) + + rpc.query = ? HistoryQuery.init(queryBuffer) + + var responseBuffer: seq[byte] + discard ? pb.getField(3, responseBuffer) + + rpc.response = ? HistoryResponse.init(responseBuffer) + + ok(rpc) + proc encode*(query: HistoryQuery): ProtoBuffer = result = initProtoBuffer() - result.write(1, query.uuid) - for topic in query.topics: - result.write(2, topic) + result.write(1, topic) proc encode*(response: HistoryResponse): ProtoBuffer = result = initProtoBuffer() - result.write(1, response.uuid) - for msg in response.messages: - result.write(2, msg.encode()) + result.write(1, msg.encode()) + +proc encode*(rpc: HistoryRPC): ProtoBuffer = + result = initProtoBuffer() + + result.write(1, rpc.requestId) + result.write(2, rpc.query.encode()) + result.write(3, rpc.response.encode()) proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse = - result = HistoryResponse(uuid: query.uuid, messages: newSeq[WakuMessage]()) + result = HistoryResponse(messages: newSeq[WakuMessage]()) for msg in w.messages: if msg.contentTopic in query.topics: result.messages.insert(msg) @@ -65,15 +83,16 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse = method init*(ws: WakuStore) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = var message = await conn.readLp(64*1024) - var rpc = HistoryQuery.init(message) - if rpc.isErr: + var res = HistoryRPC.init(message) + if res.isErr: + error "failed to decode rpc" return info "received query" - let res = ws.findMessages(rpc.value) - - await conn.writeLp(res.encode().buffer) + let value = res.value + let response = ws.findMessages(res.value.query) + await conn.writeLp(HistoryRPC(requestId: value.requestId, response: response).encode().buffer) ws.handler = handle ws.codec = WakuStoreCodec @@ -108,13 +127,13 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn let peer = w.peers[0] let conn = await w.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuStoreCodec) - await conn.writeLP(query.encode().buffer) + await conn.writeLP(HistoryRPC(requestId: "foo", query: query).encode().buffer) var message = await conn.readLp(64*1024) - let response = HistoryResponse.init(message) + let response = HistoryRPC.init(message) if response.isErr: error "failed to decode response" return - handler(response.value) + handler(response.value.response)