mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-28 06:50:56 +00:00
spec/alpha-v4 (#151)
This commit is contained in:
parent
e5f0f36f01
commit
4a5acd224a
@ -16,22 +16,6 @@ import
|
||||
|
||||
procSuite "Waku Filter":
|
||||
|
||||
test "encoding and decoding FilterRPC":
|
||||
let
|
||||
peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
|
||||
rpc = FilterRPC(
|
||||
filterRequest: @[FilterRequest(contentFilter: @[ContentFilter(topics: @["foo", "bar"])], topic: "foo")],
|
||||
messagePush: @[MessagePush(message: @[Message.init(peer, @[byte 1, 2, 3], "topic", 3, false)])]
|
||||
)
|
||||
|
||||
let buf = rpc.encode()
|
||||
|
||||
let decode = FilterRPC.init(buf.buffer)
|
||||
|
||||
check:
|
||||
decode.isErr == false
|
||||
decode.value == rpc
|
||||
|
||||
asyncTest "handle filter":
|
||||
let
|
||||
proto = WakuFilter.init()
|
||||
@ -67,7 +51,7 @@ procSuite "Waku Filter":
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
var rpc = FilterRPC(filterRequest: @[FilterRequest(contentFilter: @[ContentFilter(topics: @[])], topic: "topic")])
|
||||
var rpc = FilterRequest(contentFilter: @[ContentFilter(topics: @[])], topic: "topic")
|
||||
discard await msDial.select(conn, WakuFilterCodec)
|
||||
await conn.writeLP(rpc.encode().buffer)
|
||||
|
||||
@ -78,9 +62,8 @@ procSuite "Waku Filter":
|
||||
|
||||
var message = await conn.readLp(64*1024)
|
||||
|
||||
let response = FilterRPC.init(message)
|
||||
let response = MessagePush.init(message)
|
||||
let res = response.value
|
||||
check:
|
||||
res.messagePush.len() == 1
|
||||
res.messagePush[0].message.len() == 1
|
||||
res.messagePush[0].message[0] == msg
|
||||
res.messages.len() == 1
|
||||
res.messages[0] == msg
|
||||
|
@ -15,22 +15,6 @@ import
|
||||
../test_helpers, ./utils
|
||||
|
||||
procSuite "Waku Store":
|
||||
|
||||
test "encoding and decoding StoreRPC":
|
||||
let
|
||||
peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
|
||||
msg = Message.init(peer, @[byte 1, 2, 3], "topic", 3, false)
|
||||
|
||||
rpc = StoreRPC(query: @[HistoryQuery(uuid: "1", topics: @["foo"])], response: @[HistoryResponse(uuid: "1", messages: @[msg])])
|
||||
|
||||
let buf = rpc.encode()
|
||||
|
||||
let decode = StoreRPC.init(buf.buffer)
|
||||
|
||||
check:
|
||||
decode.isErr == false
|
||||
decode.value == rpc
|
||||
|
||||
asyncTest "handle query":
|
||||
let
|
||||
proto = WakuStore.init()
|
||||
@ -69,15 +53,15 @@ procSuite "Waku Store":
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
var rpc = StoreRPC(query: @[HistoryQuery(uuid: "1234", topics: @["topic"])])
|
||||
var rpc = HistoryQuery(uuid: "1234", topics: @["topic"])
|
||||
discard await msDial.select(conn, WakuStoreCodec)
|
||||
await conn.writeLP(rpc.encode().buffer)
|
||||
|
||||
var message = await conn.readLp(64*1024)
|
||||
let response = StoreRPC.init(message)
|
||||
let response = HistoryResponse.init(message)
|
||||
|
||||
check:
|
||||
response.isErr == false
|
||||
response.value.response[0].uuid == rpc.query[0].uuid
|
||||
response.value.response[0].messages.len() == 1
|
||||
response.value.response[0].messages[0] == msg
|
||||
response.value.uuid == rpc.uuid
|
||||
response.value.messages.len() == 1
|
||||
response.value.messages[0] == msg
|
||||
|
@ -15,7 +15,7 @@ import
|
||||
# relay protocol.
|
||||
|
||||
const
|
||||
WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha3"
|
||||
WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha4"
|
||||
|
||||
type
|
||||
ContentFilter* = object
|
||||
@ -26,15 +26,11 @@ type
|
||||
topic*: string
|
||||
|
||||
MessagePush* = object
|
||||
message*: seq[Message]
|
||||
|
||||
FilterRPC* = object
|
||||
filterRequest*: seq[FilterRequest]
|
||||
messagePush*: seq[MessagePush]
|
||||
messages*: seq[Message]
|
||||
|
||||
Subscriber = object
|
||||
connection: Connection
|
||||
filter: seq[FilterRequest]
|
||||
filter: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN?
|
||||
|
||||
WakuFilter* = ref object of LPProtocol
|
||||
subscribers*: seq[Subscriber]
|
||||
@ -78,18 +74,9 @@ proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] =
|
||||
proc encode*(push: MessagePush): ProtoBuffer =
|
||||
result = initProtoBuffer()
|
||||
|
||||
for push in push.message:
|
||||
for push in push.messages:
|
||||
result.write(1, push.encodeMessage())
|
||||
|
||||
proc encode*(rpc: FilterRPC): ProtoBuffer =
|
||||
result = initProtoBuffer()
|
||||
|
||||
for request in rpc.filterRequest:
|
||||
result.write(1, request.encode())
|
||||
|
||||
for push in rpc.messagePush:
|
||||
result.write(2, push.encode())
|
||||
|
||||
proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] =
|
||||
var push = MessagePush()
|
||||
let pb = initProtoBuffer(buffer)
|
||||
@ -98,28 +85,10 @@ proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] =
|
||||
discard ? pb.getRepeatedField(1, messages)
|
||||
|
||||
for buf in messages:
|
||||
push.message.add(? protobuf.decodeMessage(initProtoBuffer(buf)))
|
||||
push.messages.add(? protobuf.decodeMessage(initProtoBuffer(buf)))
|
||||
|
||||
ok(push)
|
||||
|
||||
proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] =
|
||||
var rpc = FilterRPC()
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
var requests: seq[seq[byte]]
|
||||
discard ? pb.getRepeatedField(1, requests)
|
||||
|
||||
for buffer in requests:
|
||||
rpc.filterRequest.add(? FilterRequest.init(buffer))
|
||||
|
||||
var pushes: seq[seq[byte]]
|
||||
discard ? pb.getRepeatedField(2, pushes)
|
||||
|
||||
for buffer in pushes:
|
||||
rpc.messagePush.add(? MessagePush.init(buffer))
|
||||
|
||||
ok(rpc)
|
||||
|
||||
proc init*(T: type WakuFilter): T =
|
||||
var ws = WakuFilter(subscribers: newSeq[Subscriber](0))
|
||||
|
||||
@ -129,11 +98,11 @@ proc init*(T: type WakuFilter): T =
|
||||
|
||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||
var message = await conn.readLp(64*1024)
|
||||
var res = FilterRPC.init(message)
|
||||
var res = FilterRequest.init(message)
|
||||
if res.isErr:
|
||||
return
|
||||
|
||||
ws.subscribers.add(Subscriber(connection: conn, filter: res.value.filterRequest))
|
||||
ws.subscribers.add(Subscriber(connection: conn, filter: res.value))
|
||||
# @TODO THIS IS A VERY ROUGH EXPERIMENT
|
||||
|
||||
ws.handler = handle
|
||||
@ -145,10 +114,9 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
|
||||
## This filter can then be used to send messages to subscribers that match conditions.
|
||||
proc handle(msg: Message) =
|
||||
for subscriber in proto.subscribers:
|
||||
for filter in subscriber.filter:
|
||||
if filter.topic in msg.topicIDs:
|
||||
if subscriber.filter.topic in msg.topicIDs:
|
||||
# @TODO PROBABLY WANT TO BATCH MESSAGES
|
||||
discard subscriber.connection.writeLp(FilterRPC(messagePush: @[MessagePush(message: @[msg])]).encode().buffer)
|
||||
discard subscriber.connection.writeLp(MessagePush(messages: @[msg]).encode().buffer)
|
||||
break
|
||||
|
||||
MessageNotificationSubscription.init(@[], handle)
|
||||
|
@ -8,13 +8,9 @@ import
|
||||
./message_notifier
|
||||
|
||||
const
|
||||
WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha2"
|
||||
WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha4"
|
||||
|
||||
type
|
||||
StoreRPC* = object
|
||||
query*: seq[HistoryQuery]
|
||||
response*: seq[HistoryResponse]
|
||||
|
||||
HistoryQuery* = object
|
||||
uuid*: string
|
||||
topics*: seq[string]
|
||||
@ -52,24 +48,6 @@ proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] =
|
||||
|
||||
ok(msg)
|
||||
|
||||
proc init*(T: type StoreRPC, buffer: seq[byte]): ProtoResult[T] =
|
||||
var rpc = StoreRPC()
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
var queries: seq[seq[byte]]
|
||||
discard ? pb.getRepeatedField(1, queries)
|
||||
|
||||
for buffer in queries:
|
||||
rpc.query.add(? HistoryQuery.init(buffer))
|
||||
|
||||
var responses: seq[seq[byte]]
|
||||
discard ? pb.getRepeatedField(2, responses)
|
||||
|
||||
for buffer in responses:
|
||||
rpc.response.add(? HistoryResponse.init(buffer))
|
||||
|
||||
ok(rpc)
|
||||
|
||||
proc encode*(query: HistoryQuery): ProtoBuffer =
|
||||
result = initProtoBuffer()
|
||||
|
||||
@ -86,15 +64,6 @@ proc encode*(response: HistoryResponse): ProtoBuffer =
|
||||
for msg in response.messages:
|
||||
result.write(2, msg.encodeMessage())
|
||||
|
||||
proc encode*(response: StoreRPC): ProtoBuffer =
|
||||
result = initProtoBuffer()
|
||||
|
||||
for query in response.query:
|
||||
result.write(1, query.encode().buffer)
|
||||
|
||||
for response in response.response:
|
||||
result.write(2, response.encode().buffer)
|
||||
|
||||
proc query(w: WakuStore, query: HistoryQuery): HistoryResponse =
|
||||
result = HistoryResponse(uuid: query.uuid, messages: newSeq[Message]())
|
||||
for msg in w.messages:
|
||||
@ -108,19 +77,15 @@ proc init*(T: type WakuStore): T =
|
||||
|
||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||
var message = await conn.readLp(64*1024)
|
||||
var rpc = StoreRPC.init(message)
|
||||
var rpc = HistoryQuery.init(message)
|
||||
if rpc.isErr:
|
||||
return
|
||||
|
||||
info "received query"
|
||||
|
||||
var response = StoreRPC(query: newSeq[HistoryQuery](0), response: newSeq[HistoryResponse](0))
|
||||
let res = ws.query(rpc.value)
|
||||
|
||||
for query in rpc.value.query:
|
||||
let res = ws.query(query)
|
||||
response.response.add(res)
|
||||
|
||||
await conn.writeLp(response.encode().buffer)
|
||||
await conn.writeLp(res.encode().buffer)
|
||||
|
||||
ws.handler = handle
|
||||
ws.codec = WakuStoreCodec
|
||||
|
Loading…
x
Reference in New Issue
Block a user