2020-11-24 12:34:32 +08:00
|
|
|
## Waku Store protocol for historical messaging support.
|
|
|
|
## See spec for more details:
|
|
|
|
## https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md
|
2022-11-04 10:52:27 +01:00
|
|
|
when (NimMajor, NimMinor) < (1, 4):
|
|
|
|
{.push raises: [Defect].}
|
|
|
|
else:
|
|
|
|
{.push raises: [].}
|
2021-06-09 16:37:08 +02:00
|
|
|
|
2020-08-31 05:32:41 +02:00
|
|
|
import
|
2022-11-24 19:42:19 +01:00
|
|
|
std/options,
|
2022-08-01 18:21:11 +02:00
|
|
|
stew/results,
|
2021-07-16 15:28:35 -07:00
|
|
|
chronicles,
|
2022-11-21 09:36:41 +01:00
|
|
|
chronos,
|
2022-09-07 16:31:27 +01:00
|
|
|
bearssl/rand,
|
2020-09-28 23:44:14 +02:00
|
|
|
libp2p/crypto/crypto,
|
2020-08-31 05:32:41 +02:00
|
|
|
libp2p/protocols/protocol,
|
|
|
|
libp2p/protobuf/minprotobuf,
|
|
|
|
libp2p/stream/connection,
|
2022-07-25 13:01:37 +02:00
|
|
|
metrics
|
|
|
|
import
|
2023-04-19 16:39:52 +02:00
|
|
|
../waku_core,
|
2023-04-18 15:22:10 +02:00
|
|
|
../node/peer_manager,
|
2022-11-09 18:50:18 +01:00
|
|
|
./common,
|
2022-07-25 13:01:37 +02:00
|
|
|
./rpc,
|
2022-09-28 13:36:05 +02:00
|
|
|
./rpc_codec,
|
2022-10-20 18:09:40 +02:00
|
|
|
./protocol_metrics
|
2020-11-24 12:34:32 +08:00
|
|
|
|
2020-08-27 04:44:09 +02:00
|
|
|
|
2020-09-16 12:23:10 +08:00
|
|
|
logScope:
|
2022-11-03 16:36:24 +01:00
|
|
|
topics = "waku store"
|
2020-09-16 12:23:10 +08:00
|
|
|
|
2022-09-22 11:17:38 +02:00
|
|
|
|
2022-11-21 09:36:41 +01:00
|
|
|
const
|
2022-10-06 17:07:12 +02:00
|
|
|
MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift"
|
2022-07-25 13:01:37 +02:00
|
|
|
|
2020-08-27 04:44:09 +02:00
|
|
|
|
2023-05-25 17:34:34 +02:00
|
|
|
type HistoryQueryHandler* = proc(req: HistoryQuery): Future[HistoryResult] {.async, gcsafe.}
|
2022-11-21 11:16:57 +01:00
|
|
|
|
2022-06-13 19:59:53 +02:00
|
|
|
type
|
|
|
|
WakuStore* = ref object of LPProtocol
|
2022-11-21 11:16:57 +01:00
|
|
|
peerManager: PeerManager
|
|
|
|
rng: ref rand.HmacDrbgContext
|
2024-01-31 17:43:59 +01:00
|
|
|
queryHandler*: HistoryQueryHandler
|
2022-11-09 18:50:18 +01:00
|
|
|
|
|
|
|
## Protocol
|
2020-11-08 20:48:09 -08:00
|
|
|
|
2022-11-21 09:36:41 +01:00
|
|
|
proc initProtocolHandler(ws: WakuStore) =
|
|
|
|
|
2021-06-09 16:37:08 +02:00
|
|
|
proc handler(conn: Connection, proto: string) {.async.} =
|
2022-09-02 10:14:58 +02:00
|
|
|
let buf = await conn.readLp(MaxRpcSize.int)
|
|
|
|
|
2022-11-09 18:50:18 +01:00
|
|
|
let decodeRes = HistoryRPC.decode(buf)
|
|
|
|
if decodeRes.isErr():
|
2022-12-07 12:30:32 +01:00
|
|
|
error "failed to decode rpc", peerId= $conn.peerId
|
2021-02-09 10:31:38 +02:00
|
|
|
waku_store_errors.inc(labelValues = [decodeRpcFailure])
|
2022-11-09 18:50:18 +01:00
|
|
|
# TODO: Return (BAD_REQUEST, cause: "decode rpc failed")
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
let reqRpc = decodeRes.value
|
|
|
|
|
2022-11-17 20:40:08 +01:00
|
|
|
if reqRpc.query.isNone():
|
2022-12-07 12:30:32 +01:00
|
|
|
error "empty query rpc", peerId= $conn.peerId, requestId=reqRpc.requestId
|
2022-11-09 18:50:18 +01:00
|
|
|
waku_store_errors.inc(labelValues = [emptyRpcQueryFailure])
|
|
|
|
# TODO: Return (BAD_REQUEST, cause: "empty query")
|
2020-08-27 04:44:09 +02:00
|
|
|
return
|
|
|
|
|
2022-11-21 11:16:57 +01:00
|
|
|
let
|
|
|
|
requestId = reqRpc.requestId
|
|
|
|
request = reqRpc.query.get().toAPI()
|
2022-09-02 10:14:58 +02:00
|
|
|
|
2022-11-21 11:16:57 +01:00
|
|
|
info "received history query", peerId=conn.peerId, requestId=requestId, query=request
|
2022-01-06 12:23:25 +01:00
|
|
|
waku_store_queries.inc()
|
2020-08-27 04:44:09 +02:00
|
|
|
|
2023-04-11 13:12:45 +02:00
|
|
|
var responseRes: HistoryResult
|
|
|
|
try:
|
2023-05-25 17:34:34 +02:00
|
|
|
responseRes = await ws.queryHandler(request)
|
2023-04-11 13:12:45 +02:00
|
|
|
except Exception:
|
|
|
|
error "history query failed", peerId= $conn.peerId, requestId=requestId, error=getCurrentExceptionMsg()
|
|
|
|
|
|
|
|
let error = HistoryError(kind: HistoryErrorKind.UNKNOWN).toRPC()
|
|
|
|
let response = HistoryResponseRPC(error: error)
|
|
|
|
let rpc = HistoryRPC(requestId: requestId, response: some(response))
|
|
|
|
await conn.writeLp(rpc.encode().buffer)
|
|
|
|
return
|
2022-11-09 18:50:18 +01:00
|
|
|
|
2022-11-21 11:16:57 +01:00
|
|
|
if responseRes.isErr():
|
2022-12-07 12:30:32 +01:00
|
|
|
error "history query failed", peerId= $conn.peerId, requestId=requestId, error=responseRes.error
|
2022-11-09 18:50:18 +01:00
|
|
|
|
2022-11-21 11:16:57 +01:00
|
|
|
let response = responseRes.toRPC()
|
|
|
|
let rpc = HistoryRPC(requestId: requestId, response: some(response))
|
2022-11-09 18:50:18 +01:00
|
|
|
await conn.writeLp(rpc.encode().buffer)
|
|
|
|
return
|
|
|
|
|
|
|
|
|
2022-11-21 11:16:57 +01:00
|
|
|
let response = responseRes.toRPC()
|
2020-11-24 12:53:42 +08:00
|
|
|
|
2022-11-21 11:16:57 +01:00
|
|
|
info "sending history response", peerId=conn.peerId, requestId=requestId, messages=response.messages.len
|
2022-11-09 18:50:18 +01:00
|
|
|
|
2022-11-21 11:16:57 +01:00
|
|
|
let rpc = HistoryRPC(requestId: requestId, response: some(response))
|
2022-09-02 10:14:58 +02:00
|
|
|
await conn.writeLp(rpc.encode().buffer)
|
2020-08-27 04:44:09 +02:00
|
|
|
|
2021-06-09 16:37:08 +02:00
|
|
|
ws.handler = handler
|
2020-08-27 04:44:09 +02:00
|
|
|
ws.codec = WakuStoreCodec
|
2020-11-16 09:38:52 +01:00
|
|
|
|
2022-11-21 11:16:57 +01:00
|
|
|
|
|
|
|
proc new*(T: type WakuStore,
|
|
|
|
peerManager: PeerManager,
|
|
|
|
rng: ref rand.HmacDrbgContext,
|
|
|
|
queryHandler: HistoryQueryHandler): T =
|
|
|
|
|
|
|
|
# Raise a defect if history query handler is nil
|
|
|
|
if queryHandler.isNil():
|
|
|
|
raise newException(NilAccessDefect, "history query handler is nil")
|
|
|
|
|
|
|
|
let ws = WakuStore(
|
|
|
|
rng: rng,
|
|
|
|
peerManager: peerManager,
|
|
|
|
queryHandler: queryHandler
|
|
|
|
)
|
2022-09-20 11:39:52 +02:00
|
|
|
ws.initProtocolHandler()
|
2022-11-09 18:50:18 +01:00
|
|
|
ws
|