nwaku/waku/protocol/v2/waku_store.nim

143 lines
4.0 KiB
Nim

import
std/tables,
bearssl,
chronos, chronicles, metrics, stew/results,
libp2p/switch,
libp2p/crypto/crypto,
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
./message_notifier,
./../../node/v2/waku_types
logScope:
topics = "wakustore"
const
WakuStoreCodec* = "/vac/waku/store/2.0.0-beta1"
proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
var msg = HistoryQuery()
let pb = initProtoBuffer(buffer)
var topics: seq[string]
discard ? pb.getRepeatedField(1, topics)
msg.topics = topics
ok(msg)
proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] =
var msg = HistoryResponse()
let pb = initProtoBuffer(buffer)
var messages: seq[seq[byte]]
discard ? pb.getRepeatedField(1, messages)
for buf in messages:
msg.messages.add(? WakuMessage.init(buf))
ok(msg)
proc init*(T: type HistoryRPC, buffer: seq[byte]): ProtoResult[T] =
var rpc = HistoryRPC()
let pb = initProtoBuffer(buffer)
discard ? pb.getField(1, rpc.requestId)
var queryBuffer: seq[byte]
discard ? pb.getField(2, queryBuffer)
rpc.query = ? HistoryQuery.init(queryBuffer)
var responseBuffer: seq[byte]
discard ? pb.getField(3, responseBuffer)
rpc.response = ? HistoryResponse.init(responseBuffer)
ok(rpc)
proc encode*(query: HistoryQuery): ProtoBuffer =
result = initProtoBuffer()
for topic in query.topics:
result.write(1, topic)
proc encode*(response: HistoryResponse): ProtoBuffer =
result = initProtoBuffer()
for msg in response.messages:
result.write(1, msg.encode())
proc encode*(rpc: HistoryRPC): ProtoBuffer =
result = initProtoBuffer()
result.write(1, rpc.requestId)
result.write(2, rpc.query.encode())
result.write(3, rpc.response.encode())
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
result = HistoryResponse(messages: newSeq[WakuMessage]())
for msg in w.messages:
if msg.contentTopic in query.topics:
result.messages.insert(msg)
method init*(ws: WakuStore) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var message = await conn.readLp(64*1024)
var res = HistoryRPC.init(message)
if res.isErr:
error "failed to decode rpc"
return
info "received query"
let value = res.value
let response = ws.findMessages(res.value.query)
await conn.writeLp(HistoryRPC(requestId: value.requestId, response: response).encode().buffer)
ws.handler = handle
ws.codec = WakuStoreCodec
proc init*(T: type WakuStore, switch: Switch, rng: ref BrHmacDrbgContext): T =
new result
result.rng = rng
result.switch = switch
result.init()
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
proc setPeer*(ws: WakuStore, peer: PeerInfo) =
ws.peers.add(HistoryPeer(peerInfo: peer))
proc subscription*(proto: WakuStore): MessageNotificationSubscription =
## The filter function returns the pubsub filter for the node.
## This is used to pipe messages into the storage, therefore
## the filter should be used by the component that receives
## new messages.
proc handle(topic: string, msg: WakuMessage) {.async.} =
proto.messages.add(msg)
MessageNotificationSubscription.init(@[], handle)
proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =
# @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service.
# Ideally depending on the query and our set of peers we take a subset of ideal peers.
# This will require us to check for various factors such as:
# - which topics they track
# - latency?
# - default store peer?
let peer = w.peers[0]
let conn = await w.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuStoreCodec)
await conn.writeLP(HistoryRPC(requestId: generateRequestId(w.rng), query: query).encode().buffer)
var message = await conn.readLp(64*1024)
let response = HistoryRPC.init(message)
if response.isErr:
error "failed to decode response"
return
handler(response.value.response)