2020-08-31 03:32:41 +00:00
|
|
|
import
|
|
|
|
std/tables,
|
|
|
|
chronos, chronicles, metrics, stew/results,
|
|
|
|
libp2p/protocols/pubsub/rpc/[messages, protobuf],
|
|
|
|
libp2p/protocols/protocol,
|
|
|
|
libp2p/protobuf/minprotobuf,
|
|
|
|
libp2p/stream/connection,
|
2020-09-07 11:26:32 +00:00
|
|
|
./message_notifier
|
2020-08-27 02:44:09 +00:00
|
|
|
|
|
|
|
const
|
|
|
|
WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha2"
|
|
|
|
|
|
|
|
type
|
|
|
|
StoreRPC* = object
|
|
|
|
query*: seq[HistoryQuery]
|
|
|
|
response*: seq[HistoryResponse]
|
|
|
|
|
|
|
|
HistoryQuery* = object
|
2020-08-28 11:07:52 +00:00
|
|
|
uuid*: string
|
2020-08-27 02:44:09 +00:00
|
|
|
topics*: seq[string]
|
|
|
|
|
|
|
|
HistoryResponse* = object
|
2020-08-28 11:07:52 +00:00
|
|
|
uuid*: string
|
2020-08-27 02:44:09 +00:00
|
|
|
messages*: seq[Message]
|
|
|
|
|
|
|
|
WakuStore* = ref object of LPProtocol
|
|
|
|
messages*: seq[Message]
|
|
|
|
|
2020-08-31 03:32:41 +00:00
|
|
|
proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
|
2020-08-27 02:44:09 +00:00
|
|
|
var msg = HistoryQuery()
|
|
|
|
let pb = initProtoBuffer(buffer)
|
|
|
|
|
|
|
|
var topics: seq[string]
|
|
|
|
|
2020-08-28 11:07:52 +00:00
|
|
|
discard ? pb.getField(1, msg.uuid)
|
2020-08-27 02:44:09 +00:00
|
|
|
discard ? pb.getRepeatedField(2, topics)
|
|
|
|
|
|
|
|
msg.topics = topics
|
|
|
|
ok(msg)
|
|
|
|
|
|
|
|
proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] =
|
|
|
|
var msg = HistoryResponse()
|
|
|
|
let pb = initProtoBuffer(buffer)
|
|
|
|
|
|
|
|
var messages: seq[seq[byte]]
|
|
|
|
|
2020-08-28 11:07:52 +00:00
|
|
|
discard ? pb.getField(1, msg.uuid)
|
2020-08-27 02:44:09 +00:00
|
|
|
discard ? pb.getRepeatedField(2, messages)
|
|
|
|
|
|
|
|
for buf in messages:
|
|
|
|
msg.messages.add(? protobuf.decodeMessage(initProtoBuffer(buf)))
|
|
|
|
|
|
|
|
ok(msg)
|
|
|
|
|
|
|
|
proc init*(T: type StoreRPC, buffer: seq[byte]): ProtoResult[T] =
|
|
|
|
var rpc = StoreRPC()
|
|
|
|
let pb = initProtoBuffer(buffer)
|
|
|
|
|
|
|
|
var queries: seq[seq[byte]]
|
|
|
|
discard ? pb.getRepeatedField(1, queries)
|
|
|
|
|
|
|
|
for buffer in queries:
|
|
|
|
rpc.query.add(? HistoryQuery.init(buffer))
|
|
|
|
|
|
|
|
var responses: seq[seq[byte]]
|
|
|
|
discard ? pb.getRepeatedField(2, responses)
|
|
|
|
|
|
|
|
for buffer in responses:
|
|
|
|
rpc.response.add(? HistoryResponse.init(buffer))
|
|
|
|
|
|
|
|
ok(rpc)
|
|
|
|
|
2020-08-31 03:32:41 +00:00
|
|
|
proc encode*(query: HistoryQuery): ProtoBuffer =
|
2020-08-27 02:44:09 +00:00
|
|
|
result = initProtoBuffer()
|
|
|
|
|
2020-08-28 11:07:52 +00:00
|
|
|
result.write(1, query.uuid)
|
2020-08-27 02:44:09 +00:00
|
|
|
|
|
|
|
for topic in query.topics:
|
|
|
|
result.write(2, topic)
|
|
|
|
|
2020-08-31 03:32:41 +00:00
|
|
|
proc encode*(response: HistoryResponse): ProtoBuffer =
|
2020-08-27 02:44:09 +00:00
|
|
|
result = initProtoBuffer()
|
|
|
|
|
2020-08-28 11:07:52 +00:00
|
|
|
result.write(1, response.uuid)
|
2020-08-27 02:44:09 +00:00
|
|
|
|
|
|
|
for msg in response.messages:
|
|
|
|
result.write(2, msg.encodeMessage())
|
|
|
|
|
2020-08-31 03:32:41 +00:00
|
|
|
proc encode*(response: StoreRPC): ProtoBuffer =
|
2020-08-27 02:44:09 +00:00
|
|
|
result = initProtoBuffer()
|
|
|
|
|
|
|
|
for query in response.query:
|
|
|
|
result.write(1, query.encode().buffer)
|
|
|
|
|
|
|
|
for response in response.response:
|
|
|
|
result.write(2, response.encode().buffer)
|
|
|
|
|
|
|
|
proc query(w: WakuStore, query: HistoryQuery): HistoryResponse =
|
2020-08-28 11:07:52 +00:00
|
|
|
result = HistoryResponse(uuid: query.uuid, messages: newSeq[Message]())
|
2020-08-27 02:44:09 +00:00
|
|
|
for msg in w.messages:
|
|
|
|
for topic in query.topics:
|
|
|
|
if topic in msg.topicIDs:
|
|
|
|
result.messages.insert(msg)
|
|
|
|
break
|
|
|
|
|
2020-08-31 03:32:41 +00:00
|
|
|
proc init*(T: type WakuStore): T =
|
2020-08-27 02:44:09 +00:00
|
|
|
var ws = WakuStore()
|
|
|
|
|
|
|
|
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
|
|
|
var message = await conn.readLp(64*1024)
|
|
|
|
var rpc = StoreRPC.init(message)
|
|
|
|
if rpc.isErr:
|
|
|
|
return
|
|
|
|
|
|
|
|
info "received query"
|
|
|
|
|
|
|
|
var response = StoreRPC(query: newSeq[HistoryQuery](0), response: newSeq[HistoryResponse](0))
|
|
|
|
|
|
|
|
for query in rpc.value.query:
|
|
|
|
let res = ws.query(query)
|
|
|
|
response.response.add(res)
|
|
|
|
|
|
|
|
await conn.writeLp(response.encode().buffer)
|
|
|
|
|
|
|
|
ws.handler = handle
|
|
|
|
ws.codec = WakuStoreCodec
|
|
|
|
result = ws
|
|
|
|
|
2020-09-07 11:26:32 +00:00
|
|
|
proc subscription*(proto: WakuStore): MessageNotificationSubscription =
|
2020-08-27 02:44:09 +00:00
|
|
|
## The filter function returns the pubsub filter for the node.
|
|
|
|
## This is used to pipe messages into the storage, therefore
|
|
|
|
## the filter should be used by the component that receives
|
|
|
|
## new messages.
|
|
|
|
proc handle(msg: Message) =
|
|
|
|
proto.messages.add(msg)
|
|
|
|
|
2020-09-07 11:26:32 +00:00
|
|
|
MessageNotificationSubscription.init(@[], handle)
|