feature/store-protocol (#102)

* started working on store protocol

* eol

* update

* fix

* encoding

* started

* started working on decoding

* fixes

* started fleshing out tests

* testing

* encode / decode test

* eol

* fix test

* fmt

* errors

* testing entire rpc now

* fix

* format

* added comment

* removed StoreRPC

* added comment

* readded the store rpc

* updated

* fix tests
This commit is contained in:
Dean Eigenmann 2020-08-27 04:44:09 +02:00 committed by GitHub
parent 4314dcf6e9
commit 0e62e8ffa4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 252 additions and 3 deletions

View File

@ -1,4 +1,5 @@
import import
# Waku v2 tests # Waku v2 tests
./v2/test_waku, ./v2/test_waku,
./v2/test_wakunode ./v2/test_wakunode,
./v2/test_waku_store

View File

@ -1,7 +1,9 @@
import import
unittest, chronos, unittest, chronos, bearssl,
eth/[keys, p2p] eth/[keys, p2p]
import libp2p/crypto/crypto
var nextPort = 30303 var nextPort = 30303
proc localAddress*(port: int): Address = proc localAddress*(port: int): Address =
@ -12,7 +14,7 @@ proc localAddress*(port: int): Address =
proc setupTestNode*( proc setupTestNode*(
rng: ref BrHmacDrbgContext, rng: ref BrHmacDrbgContext,
capabilities: varargs[ProtocolInfo, `protocolInfo`]): EthereumNode = capabilities: varargs[ProtocolInfo, `protocolInfo`]): EthereumNode =
let keys1 = KeyPair.random(rng[]) let keys1 = keys.KeyPair.random(rng[])
result = newEthereumNode(keys1, localAddress(nextPort), 1, nil, result = newEthereumNode(keys1, localAddress(nextPort), 1, nil,
addAllCapabilities = false, rng = rng) addAllCapabilities = false, rng = rng)
nextPort.inc nextPort.inc
@ -30,3 +32,21 @@ template procSuite*(name, body: untyped) =
body body
suitePayload() suitePayload()
# Copied from here: https://github.com/status-im/nim-libp2p/blob/d522537b19a532bc4af94fcd146f779c1f23bad0/tests/helpers.nim#L28
type RngWrap = object
rng: ref BrHmacDrbgContext
var rngVar: RngWrap
proc getRng(): ref BrHmacDrbgContext =
# TODO if `rngVar` is a threadvar like it should be, there are random and
# spurious compile failures on mac - this is not gcsafe but for the
# purpose of the tests, it's ok as long as we only use a single thread
{.gcsafe.}:
if rngVar.rng.isNil:
rngVar.rng = crypto.newRng()
rngVar.rng
template rng*(): ref BrHmacDrbgContext =
getRng()

View File

@ -0,0 +1,84 @@
import unittest, options, tables, sets, sequtils
import chronos, chronicles
import utils,
libp2p/errors,
libp2p/switch,
libp2p/protobuf/minprotobuf,
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,
libp2p/protocols/pubsub/floodsub,
libp2p/protocols/pubsub/rpc/message,
libp2p/multistream,
libp2p/transports/transport,
libp2p/transports/tcptransport
import ../../waku/protocol/v2/[waku_relay, waku_store, filter]
import ../test_helpers
procSuite "Waku Store":
test "encoding and decoding StoreRPC":
let
peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
msg = Message.init(peer, @[byte 1, 2, 3], "topic", 3, false)
rpc = StoreRPC(query: @[HistoryQuery(requestID: 1, topics: @["foo"])], response: @[HistoryResponse(requestID: 1, messages: @[msg])])
let buf = rpc.encode()
let decode = StoreRPC.init(buf.buffer)
check:
decode.isErr == false
decode.value == rpc
asyncTest "handle query":
let
proto = WakuStore.init()
filter = proto.filter()
var filters = initTable[string, Filter]()
filters["test"] = filter
let
peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
msg = Message.init(peer, @[byte 1, 2, 3], "topic", 3, false)
msg2 = Message.init(peer, @[byte 1, 2, 3], "topic2", 4, false)
filters.notify(msg)
filters.notify(msg2)
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let remoteSecKey = PrivateKey.random(ECDSA, rng[]).get()
let remotePeerInfo = PeerInfo.init(
remoteSecKey,
[ma],
["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
)
var serverFut: Future[void]
let msListen = newMultistream()
msListen.addHandler(WakuStoreCodec, proto)
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
await msListen.handle(conn)
var transport1 = TcpTransport.init()
serverFut = await transport1.listen(ma, connHandler)
let msDial = newMultistream()
let transport2: TcpTransport = TcpTransport.init()
let conn = await transport2.dial(transport1.ma)
var rpc = StoreRPC(query: @[HistoryQuery(requestID: 1, topics: @["topic"])])
discard await msDial.select(conn, WakuStoreCodec)
await conn.writeLP(rpc.encode().buffer)
var message = await conn.readLp(64*1024)
let response = StoreRPC.init(message)
check:
response.isErr == false
response.value.response[0].requestID == rpc.query[0].requestID
response.value.response[0].messages.len() == 1
response.value.response[0].messages[0] == msg

View File

@ -0,0 +1,144 @@
import chronos, chronicles
import ./filter
import tables
import libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/pubsubpeer,
libp2p/protocols/pubsub/floodsub,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/pubsub/rpc/[messages, protobuf],
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection
import metrics
import stew/results
const
WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha2"
type
StoreRPC* = object
query*: seq[HistoryQuery]
response*: seq[HistoryResponse]
HistoryQuery* = object
requestID*: uint64
topics*: seq[string]
HistoryResponse* = object
requestID*: uint64
messages*: seq[Message]
WakuStore* = ref object of LPProtocol
messages*: seq[Message]
method init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
var msg = HistoryQuery()
let pb = initProtoBuffer(buffer)
var topics: seq[string]
discard ? pb.getField(1, msg.requestID)
discard ? pb.getRepeatedField(2, topics)
msg.topics = topics
ok(msg)
proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] =
var msg = HistoryResponse()
let pb = initProtoBuffer(buffer)
var messages: seq[seq[byte]]
discard ? pb.getField(1, msg.requestID)
discard ? pb.getRepeatedField(2, messages)
for buf in messages:
msg.messages.add(? protobuf.decodeMessage(initProtoBuffer(buf)))
ok(msg)
proc init*(T: type StoreRPC, buffer: seq[byte]): ProtoResult[T] =
var rpc = StoreRPC()
let pb = initProtoBuffer(buffer)
var queries: seq[seq[byte]]
discard ? pb.getRepeatedField(1, queries)
for buffer in queries:
rpc.query.add(? HistoryQuery.init(buffer))
var responses: seq[seq[byte]]
discard ? pb.getRepeatedField(2, responses)
for buffer in responses:
rpc.response.add(? HistoryResponse.init(buffer))
ok(rpc)
method encode*(query: HistoryQuery): ProtoBuffer =
result = initProtoBuffer()
result.write(1, query.requestID)
for topic in query.topics:
result.write(2, topic)
method encode*(response: HistoryResponse): ProtoBuffer =
result = initProtoBuffer()
result.write(1, response.requestID)
for msg in response.messages:
result.write(2, msg.encodeMessage())
method encode*(response: StoreRPC): ProtoBuffer =
result = initProtoBuffer()
for query in response.query:
result.write(1, query.encode().buffer)
for response in response.response:
result.write(2, response.encode().buffer)
proc query(w: WakuStore, query: HistoryQuery): HistoryResponse =
result = HistoryResponse(requestID: query.requestID, messages: newSeq[Message]())
for msg in w.messages:
for topic in query.topics:
if topic in msg.topicIDs:
result.messages.insert(msg)
break
method init*(T: type WakuStore): T =
var ws = WakuStore()
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var message = await conn.readLp(64*1024)
var rpc = StoreRPC.init(message)
if rpc.isErr:
return
info "received query"
var response = StoreRPC(query: newSeq[HistoryQuery](0), response: newSeq[HistoryResponse](0))
for query in rpc.value.query:
let res = ws.query(query)
response.response.add(res)
await conn.writeLp(response.encode().buffer)
ws.handler = handle
ws.codec = WakuStoreCodec
result = ws
proc filter*(proto: WakuStore): Filter =
## The filter function returns the pubsub filter for the node.
## This is used to pipe messages into the storage, therefore
## the filter should be used by the component that receives
## new messages.
proc handle(msg: Message) =
proto.messages.add(msg)
Filter.init(@[], handle)