From a13526fe8756bc107a3a2d6cafb72baca569378d Mon Sep 17 00:00:00 2001 From: Dean Eigenmann <7621705+decanus@users.noreply.github.com> Date: Thu, 27 Aug 2020 04:44:09 +0200 Subject: [PATCH] feature/store-protocol (#102) * started working on store protocol * eol * update * fix * encoding * started * started working on decoding * fixes * started fleshing out tests * testing * encode / decode test * eol * fix test * fmt * errors * testing entire rpc now * fix * format * added comment * removed StoreRPC * added comment * readded the store rpc * updated * fix tests --- tests/all_tests_v2.nim | 3 +- tests/test_helpers.nim | 24 +++++- tests/v2/test_waku_store.nim | 84 +++++++++++++++++++ waku/protocol/v2/waku_store.nim | 144 ++++++++++++++++++++++++++++++++ 4 files changed, 252 insertions(+), 3 deletions(-) create mode 100644 tests/v2/test_waku_store.nim create mode 100644 waku/protocol/v2/waku_store.nim diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index e24ff1ad5..4e8688534 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -1,4 +1,5 @@ import # Waku v2 tests ./v2/test_waku, - ./v2/test_wakunode + ./v2/test_wakunode, + ./v2/test_waku_store diff --git a/tests/test_helpers.nim b/tests/test_helpers.nim index 6c910463b..23cc9e950 100644 --- a/tests/test_helpers.nim +++ b/tests/test_helpers.nim @@ -1,7 +1,9 @@ import - unittest, chronos, + unittest, chronos, bearssl, eth/[keys, p2p] +import libp2p/crypto/crypto + var nextPort = 30303 proc localAddress*(port: int): Address = @@ -12,7 +14,7 @@ proc localAddress*(port: int): Address = proc setupTestNode*( rng: ref BrHmacDrbgContext, capabilities: varargs[ProtocolInfo, `protocolInfo`]): EthereumNode = - let keys1 = KeyPair.random(rng[]) + let keys1 = keys.KeyPair.random(rng[]) result = newEthereumNode(keys1, localAddress(nextPort), 1, nil, addAllCapabilities = false, rng = rng) nextPort.inc @@ -30,3 +32,21 @@ template procSuite*(name, body: untyped) = body suitePayload() + +# Copied from here: https://github.com/status-im/nim-libp2p/blob/d522537b19a532bc4af94fcd146f779c1f23bad0/tests/helpers.nim#L28 +type RngWrap = object + rng: ref BrHmacDrbgContext + +var rngVar: RngWrap + +proc getRng(): ref BrHmacDrbgContext = + # TODO if `rngVar` is a threadvar like it should be, there are random and + # spurious compile failures on mac - this is not gcsafe but for the + # purpose of the tests, it's ok as long as we only use a single thread + {.gcsafe.}: + if rngVar.rng.isNil: + rngVar.rng = crypto.newRng() + rngVar.rng + +template rng*(): ref BrHmacDrbgContext = + getRng() diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim new file mode 100644 index 000000000..f09d621e3 --- /dev/null +++ b/tests/v2/test_waku_store.nim @@ -0,0 +1,84 @@ +import unittest, options, tables, sets, sequtils +import chronos, chronicles +import utils, + libp2p/errors, + libp2p/switch, + libp2p/protobuf/minprotobuf, + libp2p/stream/[bufferstream, connection], + libp2p/crypto/crypto, + libp2p/protocols/pubsub/floodsub, + libp2p/protocols/pubsub/rpc/message, + libp2p/multistream, + libp2p/transports/transport, + libp2p/transports/tcptransport +import ../../waku/protocol/v2/[waku_relay, waku_store, filter] + +import ../test_helpers + +procSuite "Waku Store": + + test "encoding and decoding StoreRPC": + let + peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) + msg = Message.init(peer, @[byte 1, 2, 3], "topic", 3, false) + + rpc = StoreRPC(query: @[HistoryQuery(requestID: 1, topics: @["foo"])], response: @[HistoryResponse(requestID: 1, messages: @[msg])]) + + let buf = rpc.encode() + + let decode = StoreRPC.init(buf.buffer) + + check: + decode.isErr == false + decode.value == rpc + + asyncTest "handle query": + let + proto = WakuStore.init() + filter = proto.filter() + + var filters = initTable[string, Filter]() + filters["test"] = filter + + let + peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) + msg = Message.init(peer, @[byte 1, 2, 3], "topic", 3, false) + msg2 = Message.init(peer, @[byte 1, 2, 3], "topic2", 4, false) + + filters.notify(msg) + filters.notify(msg2) + + let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let remoteSecKey = PrivateKey.random(ECDSA, rng[]).get() + let remotePeerInfo = PeerInfo.init( + remoteSecKey, + [ma], + ["/test/proto1/1.0.0", "/test/proto2/1.0.0"] + ) + + var serverFut: Future[void] + let msListen = newMultistream() + + msListen.addHandler(WakuStoreCodec, proto) + proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = + await msListen.handle(conn) + + var transport1 = TcpTransport.init() + serverFut = await transport1.listen(ma, connHandler) + + let msDial = newMultistream() + let transport2: TcpTransport = TcpTransport.init() + let conn = await transport2.dial(transport1.ma) + + var rpc = StoreRPC(query: @[HistoryQuery(requestID: 1, topics: @["topic"])]) + discard await msDial.select(conn, WakuStoreCodec) + await conn.writeLP(rpc.encode().buffer) + + var message = await conn.readLp(64*1024) + let response = StoreRPC.init(message) + + check: + response.isErr == false + response.value.response[0].requestID == rpc.query[0].requestID + response.value.response[0].messages.len() == 1 + response.value.response[0].messages[0] == msg diff --git a/waku/protocol/v2/waku_store.nim b/waku/protocol/v2/waku_store.nim new file mode 100644 index 000000000..7c09186c3 --- /dev/null +++ b/waku/protocol/v2/waku_store.nim @@ -0,0 +1,144 @@ +import chronos, chronicles +import ./filter +import tables +import libp2p/protocols/pubsub/pubsub, + libp2p/protocols/pubsub/pubsubpeer, + libp2p/protocols/pubsub/floodsub, + libp2p/protocols/pubsub/gossipsub, + libp2p/protocols/pubsub/rpc/[messages, protobuf], + libp2p/protocols/protocol, + libp2p/protobuf/minprotobuf, + libp2p/stream/connection + +import metrics + +import stew/results + +const + WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha2" + +type + StoreRPC* = object + query*: seq[HistoryQuery] + response*: seq[HistoryResponse] + + HistoryQuery* = object + requestID*: uint64 + topics*: seq[string] + + HistoryResponse* = object + requestID*: uint64 + messages*: seq[Message] + + WakuStore* = ref object of LPProtocol + messages*: seq[Message] + +method init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = + var msg = HistoryQuery() + let pb = initProtoBuffer(buffer) + + var topics: seq[string] + + discard ? pb.getField(1, msg.requestID) + discard ? pb.getRepeatedField(2, topics) + + msg.topics = topics + ok(msg) + +proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] = + var msg = HistoryResponse() + let pb = initProtoBuffer(buffer) + + var messages: seq[seq[byte]] + + discard ? pb.getField(1, msg.requestID) + discard ? pb.getRepeatedField(2, messages) + + for buf in messages: + msg.messages.add(? protobuf.decodeMessage(initProtoBuffer(buf))) + + ok(msg) + +proc init*(T: type StoreRPC, buffer: seq[byte]): ProtoResult[T] = + var rpc = StoreRPC() + let pb = initProtoBuffer(buffer) + + var queries: seq[seq[byte]] + discard ? pb.getRepeatedField(1, queries) + + for buffer in queries: + rpc.query.add(? HistoryQuery.init(buffer)) + + var responses: seq[seq[byte]] + discard ? pb.getRepeatedField(2, responses) + + for buffer in responses: + rpc.response.add(? HistoryResponse.init(buffer)) + + ok(rpc) + +method encode*(query: HistoryQuery): ProtoBuffer = + result = initProtoBuffer() + + result.write(1, query.requestID) + + for topic in query.topics: + result.write(2, topic) + +method encode*(response: HistoryResponse): ProtoBuffer = + result = initProtoBuffer() + + result.write(1, response.requestID) + + for msg in response.messages: + result.write(2, msg.encodeMessage()) + +method encode*(response: StoreRPC): ProtoBuffer = + result = initProtoBuffer() + + for query in response.query: + result.write(1, query.encode().buffer) + + for response in response.response: + result.write(2, response.encode().buffer) + +proc query(w: WakuStore, query: HistoryQuery): HistoryResponse = + result = HistoryResponse(requestID: query.requestID, messages: newSeq[Message]()) + for msg in w.messages: + for topic in query.topics: + if topic in msg.topicIDs: + result.messages.insert(msg) + break + +method init*(T: type WakuStore): T = + var ws = WakuStore() + + proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + var message = await conn.readLp(64*1024) + var rpc = StoreRPC.init(message) + if rpc.isErr: + return + + info "received query" + + var response = StoreRPC(query: newSeq[HistoryQuery](0), response: newSeq[HistoryResponse](0)) + + for query in rpc.value.query: + let res = ws.query(query) + response.response.add(res) + + await conn.writeLp(response.encode().buffer) + + ws.handler = handle + ws.codec = WakuStoreCodec + result = ws + +proc filter*(proto: WakuStore): Filter = + ## The filter function returns the pubsub filter for the node. + ## This is used to pipe messages into the storage, therefore + ## the filter should be used by the component that receives + ## new messages. + proc handle(msg: Message) = + proto.messages.add(msg) + + Filter.init(@[], handle)