mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-09 09:23:14 +00:00
enhancement/filter-rpc-tests (#130)
* added a test for rpc filter * rm * fix
This commit is contained in:
parent
6ae03e5bc7
commit
31bbdf1d99
@ -1,10 +1,18 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[unittest, options, sets],
|
||||
std/[unittest, options, tables, sets],
|
||||
chronos, chronicles,
|
||||
utils,
|
||||
../../waku/protocol/v2/waku_filter, ../test_helpers
|
||||
libp2p/switch,
|
||||
libp2p/protobuf/minprotobuf,
|
||||
libp2p/stream/[bufferstream, connection],
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/pubsub/rpc/[message, messages, protobuf],
|
||||
libp2p/multistream,
|
||||
libp2p/transports/transport,
|
||||
libp2p/transports/tcptransport,
|
||||
../../waku/protocol/v2/[waku_filter, filter],
|
||||
../test_helpers, ./utils
|
||||
|
||||
procSuite "Waku Filter":
|
||||
|
||||
@ -18,3 +26,54 @@ procSuite "Waku Filter":
|
||||
check:
|
||||
decode.isErr == false
|
||||
decode.value == rpc
|
||||
|
||||
asyncTest "handle filter":
|
||||
let
|
||||
proto = WakuFilter.init()
|
||||
filter = proto.filter()
|
||||
|
||||
var filters = initTable[string, Filter]()
|
||||
filters["test"] = filter
|
||||
|
||||
let
|
||||
peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
|
||||
msg = Message.init(peer, @[byte 1, 2, 3], "topic", 3, false)
|
||||
msg2 = Message.init(peer, @[byte 1, 2, 3], "topic2", 4, false)
|
||||
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
||||
let remoteSecKey = PrivateKey.random(ECDSA, rng[]).get()
|
||||
let remotePeerInfo = PeerInfo.init(
|
||||
remoteSecKey,
|
||||
[ma],
|
||||
["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
|
||||
)
|
||||
|
||||
var serverFut: Future[void]
|
||||
let msListen = newMultistream()
|
||||
|
||||
msListen.addHandler(WakuFilterCodec, proto)
|
||||
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
|
||||
await msListen.handle(conn)
|
||||
|
||||
var transport1 = TcpTransport.init()
|
||||
serverFut = await transport1.listen(ma, connHandler)
|
||||
|
||||
let msDial = newMultistream()
|
||||
let transport2: TcpTransport = TcpTransport.init()
|
||||
let conn = await transport2.dial(transport1.ma)
|
||||
|
||||
var rpc = FilterRPC(filters: @[ContentFilter(topics: @["topic"])])
|
||||
discard await msDial.select(conn, WakuFilterCodec)
|
||||
await conn.writeLP(rpc.encode().buffer)
|
||||
|
||||
await sleepAsync(2.seconds)
|
||||
|
||||
filters.notify(msg)
|
||||
filters.notify(msg2)
|
||||
|
||||
var message = await conn.readLp(64*1024)
|
||||
|
||||
let response = protobuf.decodeMessage(initProtoBuffer(message))
|
||||
|
||||
check:
|
||||
msg == response.value
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user