diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index 3e0aac35f..5702f40bb 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -16,22 +16,6 @@ import procSuite "Waku Filter": - test "encoding and decoding FilterRPC": - let - peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) - rpc = FilterRPC( - filterRequest: @[FilterRequest(contentFilter: @[ContentFilter(topics: @["foo", "bar"])], topic: "foo")], - messagePush: @[MessagePush(message: @[Message.init(peer, @[byte 1, 2, 3], "topic", 3, false)])] - ) - - let buf = rpc.encode() - - let decode = FilterRPC.init(buf.buffer) - - check: - decode.isErr == false - decode.value == rpc - asyncTest "handle filter": let proto = WakuFilter.init() @@ -67,7 +51,7 @@ procSuite "Waku Filter": let transport2: TcpTransport = TcpTransport.init() let conn = await transport2.dial(transport1.ma) - var rpc = FilterRPC(filterRequest: @[FilterRequest(contentFilter: @[ContentFilter(topics: @[])], topic: "topic")]) + var rpc = FilterRequest(contentFilter: @[ContentFilter(topics: @[])], topic: "topic") discard await msDial.select(conn, WakuFilterCodec) await conn.writeLP(rpc.encode().buffer) @@ -78,9 +62,8 @@ procSuite "Waku Filter": var message = await conn.readLp(64*1024) - let response = FilterRPC.init(message) + let response = MessagePush.init(message) let res = response.value check: - res.messagePush.len() == 1 - res.messagePush[0].message.len() == 1 - res.messagePush[0].message[0] == msg + res.messages.len() == 1 + res.messages[0] == msg diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index ba7a0b1aa..375baeb83 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -15,22 +15,6 @@ import ../test_helpers, ./utils procSuite "Waku Store": - - test "encoding and decoding StoreRPC": - let - peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) - msg = Message.init(peer, @[byte 1, 2, 3], "topic", 3, false) - - rpc = StoreRPC(query: @[HistoryQuery(uuid: "1", topics: @["foo"])], response: @[HistoryResponse(uuid: "1", messages: @[msg])]) - - let buf = rpc.encode() - - let decode = StoreRPC.init(buf.buffer) - - check: - decode.isErr == false - decode.value == rpc - asyncTest "handle query": let proto = WakuStore.init() @@ -69,15 +53,15 @@ procSuite "Waku Store": let transport2: TcpTransport = TcpTransport.init() let conn = await transport2.dial(transport1.ma) - var rpc = StoreRPC(query: @[HistoryQuery(uuid: "1234", topics: @["topic"])]) + var rpc = HistoryQuery(uuid: "1234", topics: @["topic"]) discard await msDial.select(conn, WakuStoreCodec) await conn.writeLP(rpc.encode().buffer) var message = await conn.readLp(64*1024) - let response = StoreRPC.init(message) + let response = HistoryResponse.init(message) check: response.isErr == false - response.value.response[0].uuid == rpc.query[0].uuid - response.value.response[0].messages.len() == 1 - response.value.response[0].messages[0] == msg + response.value.uuid == rpc.uuid + response.value.messages.len() == 1 + response.value.messages[0] == msg diff --git a/waku/protocol/v2/waku_filter.nim b/waku/protocol/v2/waku_filter.nim index a4af5c0ce..5cfc818ab 100644 --- a/waku/protocol/v2/waku_filter.nim +++ b/waku/protocol/v2/waku_filter.nim @@ -15,7 +15,7 @@ import # relay protocol. const - WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha3" + WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha4" type ContentFilter* = object @@ -26,15 +26,11 @@ type topic*: string MessagePush* = object - message*: seq[Message] - - FilterRPC* = object - filterRequest*: seq[FilterRequest] - messagePush*: seq[MessagePush] + messages*: seq[Message] Subscriber = object connection: Connection - filter: seq[FilterRequest] + filter: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN? WakuFilter* = ref object of LPProtocol subscribers*: seq[Subscriber] @@ -78,18 +74,9 @@ proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] = proc encode*(push: MessagePush): ProtoBuffer = result = initProtoBuffer() - for push in push.message: + for push in push.messages: result.write(1, push.encodeMessage()) -proc encode*(rpc: FilterRPC): ProtoBuffer = - result = initProtoBuffer() - - for request in rpc.filterRequest: - result.write(1, request.encode()) - - for push in rpc.messagePush: - result.write(2, push.encode()) - proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] = var push = MessagePush() let pb = initProtoBuffer(buffer) @@ -98,28 +85,10 @@ proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] = discard ? pb.getRepeatedField(1, messages) for buf in messages: - push.message.add(? protobuf.decodeMessage(initProtoBuffer(buf))) + push.messages.add(? protobuf.decodeMessage(initProtoBuffer(buf))) ok(push) -proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] = - var rpc = FilterRPC() - let pb = initProtoBuffer(buffer) - - var requests: seq[seq[byte]] - discard ? pb.getRepeatedField(1, requests) - - for buffer in requests: - rpc.filterRequest.add(? FilterRequest.init(buffer)) - - var pushes: seq[seq[byte]] - discard ? pb.getRepeatedField(2, pushes) - - for buffer in pushes: - rpc.messagePush.add(? MessagePush.init(buffer)) - - ok(rpc) - proc init*(T: type WakuFilter): T = var ws = WakuFilter(subscribers: newSeq[Subscriber](0)) @@ -129,11 +98,11 @@ proc init*(T: type WakuFilter): T = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = var message = await conn.readLp(64*1024) - var res = FilterRPC.init(message) + var res = FilterRequest.init(message) if res.isErr: return - ws.subscribers.add(Subscriber(connection: conn, filter: res.value.filterRequest)) + ws.subscribers.add(Subscriber(connection: conn, filter: res.value)) # @TODO THIS IS A VERY ROUGH EXPERIMENT ws.handler = handle @@ -145,10 +114,9 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription = ## This filter can then be used to send messages to subscribers that match conditions. proc handle(msg: Message) = for subscriber in proto.subscribers: - for filter in subscriber.filter: - if filter.topic in msg.topicIDs: + if subscriber.filter.topic in msg.topicIDs: # @TODO PROBABLY WANT TO BATCH MESSAGES - discard subscriber.connection.writeLp(FilterRPC(messagePush: @[MessagePush(message: @[msg])]).encode().buffer) + discard subscriber.connection.writeLp(MessagePush(messages: @[msg]).encode().buffer) break MessageNotificationSubscription.init(@[], handle) diff --git a/waku/protocol/v2/waku_store.nim b/waku/protocol/v2/waku_store.nim index 19287a29e..2845f832d 100644 --- a/waku/protocol/v2/waku_store.nim +++ b/waku/protocol/v2/waku_store.nim @@ -8,13 +8,9 @@ import ./message_notifier const - WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha2" + WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha4" type - StoreRPC* = object - query*: seq[HistoryQuery] - response*: seq[HistoryResponse] - HistoryQuery* = object uuid*: string topics*: seq[string] @@ -52,24 +48,6 @@ proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] = ok(msg) -proc init*(T: type StoreRPC, buffer: seq[byte]): ProtoResult[T] = - var rpc = StoreRPC() - let pb = initProtoBuffer(buffer) - - var queries: seq[seq[byte]] - discard ? pb.getRepeatedField(1, queries) - - for buffer in queries: - rpc.query.add(? HistoryQuery.init(buffer)) - - var responses: seq[seq[byte]] - discard ? pb.getRepeatedField(2, responses) - - for buffer in responses: - rpc.response.add(? HistoryResponse.init(buffer)) - - ok(rpc) - proc encode*(query: HistoryQuery): ProtoBuffer = result = initProtoBuffer() @@ -86,15 +64,6 @@ proc encode*(response: HistoryResponse): ProtoBuffer = for msg in response.messages: result.write(2, msg.encodeMessage()) -proc encode*(response: StoreRPC): ProtoBuffer = - result = initProtoBuffer() - - for query in response.query: - result.write(1, query.encode().buffer) - - for response in response.response: - result.write(2, response.encode().buffer) - proc query(w: WakuStore, query: HistoryQuery): HistoryResponse = result = HistoryResponse(uuid: query.uuid, messages: newSeq[Message]()) for msg in w.messages: @@ -108,19 +77,15 @@ proc init*(T: type WakuStore): T = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = var message = await conn.readLp(64*1024) - var rpc = StoreRPC.init(message) + var rpc = HistoryQuery.init(message) if rpc.isErr: return info "received query" - var response = StoreRPC(query: newSeq[HistoryQuery](0), response: newSeq[HistoryResponse](0)) + let res = ws.query(rpc.value) - for query in rpc.value.query: - let res = ws.query(query) - response.response.add(res) - - await conn.writeLp(response.encode().buffer) + await conn.writeLp(res.encode().buffer) ws.handler = handle ws.codec = WakuStoreCodec