2020-11-24 04:34:32 +00:00
|
|
|
## Waku Store protocol for historical messaging support.
|
|
|
|
## See spec for more details:
|
|
|
|
## https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md
|
2022-11-04 09:52:27 +00:00
|
|
|
when (NimMajor, NimMinor) < (1, 4):
|
|
|
|
{.push raises: [Defect].}
|
|
|
|
else:
|
|
|
|
{.push raises: [].}
|
2021-06-09 14:37:08 +00:00
|
|
|
|
2020-08-31 03:32:41 +00:00
|
|
|
import
|
2022-10-03 15:36:17 +00:00
|
|
|
std/[tables, times, sequtils, options, algorithm],
|
2022-08-01 16:21:11 +00:00
|
|
|
stew/results,
|
2021-07-16 22:28:35 +00:00
|
|
|
chronicles,
|
|
|
|
chronos,
|
2022-09-07 15:31:27 +00:00
|
|
|
bearssl/rand,
|
2020-09-28 21:44:14 +00:00
|
|
|
libp2p/crypto/crypto,
|
2020-08-31 03:32:41 +00:00
|
|
|
libp2p/protocols/protocol,
|
|
|
|
libp2p/protobuf/minprotobuf,
|
|
|
|
libp2p/stream/connection,
|
2022-07-25 11:01:37 +00:00
|
|
|
metrics
|
|
|
|
import
|
2022-11-04 18:48:22 +00:00
|
|
|
../../node/message_store/message_retention_policy,
|
2021-07-16 22:28:35 +00:00
|
|
|
../../node/peer_manager/peer_manager,
|
2022-02-17 15:00:15 +00:00
|
|
|
../../utils/time,
|
2022-07-25 11:01:37 +00:00
|
|
|
../waku_message,
|
2021-07-16 22:28:35 +00:00
|
|
|
../waku_swap/waku_swap,
|
2022-11-09 17:50:18 +00:00
|
|
|
./common,
|
2022-07-25 11:01:37 +00:00
|
|
|
./rpc,
|
2022-09-28 11:36:05 +00:00
|
|
|
./rpc_codec,
|
2022-10-20 16:09:40 +00:00
|
|
|
./message_store,
|
|
|
|
./protocol_metrics
|
2020-11-24 04:34:32 +00:00
|
|
|
|
2020-08-27 02:44:09 +00:00
|
|
|
|
2020-09-16 04:23:10 +00:00
|
|
|
logScope:
|
2022-11-03 15:36:24 +00:00
|
|
|
topics = "waku store"
|
2020-09-16 04:23:10 +00:00
|
|
|
|
2022-09-22 09:17:38 +00:00
|
|
|
|
2022-08-01 16:21:11 +00:00
|
|
|
const
|
2022-10-06 15:07:12 +00:00
|
|
|
MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift"
|
2022-07-25 11:01:37 +00:00
|
|
|
|
2020-08-27 02:44:09 +00:00
|
|
|
|
2022-06-13 17:59:53 +00:00
|
|
|
type
|
|
|
|
WakuStore* = ref object of LPProtocol
|
|
|
|
peerManager*: PeerManager
|
2022-09-07 15:31:27 +00:00
|
|
|
rng*: ref rand.HmacDrbgContext
|
2022-09-20 09:39:52 +00:00
|
|
|
store*: MessageStore
|
2022-06-13 17:59:53 +00:00
|
|
|
wakuSwap*: WakuSwap
|
2022-09-16 10:55:22 +00:00
|
|
|
retentionPolicy: Option[MessageRetentionPolicy]
|
2022-06-13 17:59:53 +00:00
|
|
|
|
|
|
|
|
2022-11-09 17:50:18 +00:00
|
|
|
# TODO: Move to a message store wrapper
|
2022-09-21 09:32:59 +00:00
|
|
|
proc executeMessageRetentionPolicy*(w: WakuStore) =
|
2022-09-20 09:39:52 +00:00
|
|
|
if w.retentionPolicy.isNone():
|
2022-09-21 09:32:59 +00:00
|
|
|
return
|
2022-09-20 09:39:52 +00:00
|
|
|
|
|
|
|
if w.store.isNil():
|
2022-09-21 09:32:59 +00:00
|
|
|
return
|
2022-09-20 09:39:52 +00:00
|
|
|
|
2022-09-21 09:32:59 +00:00
|
|
|
let policy = w.retentionPolicy.get()
|
|
|
|
|
|
|
|
let retPolicyRes = policy.execute(w.store)
|
|
|
|
if retPolicyRes.isErr():
|
|
|
|
waku_store_errors.inc(labelValues = [retPolicyFailure])
|
|
|
|
debug "failed execution of retention policy", error=retPolicyRes.error
|
2022-09-20 09:39:52 +00:00
|
|
|
|
2022-11-09 17:50:18 +00:00
|
|
|
# TODO: Move to a message store wrapper
|
2022-09-20 09:39:52 +00:00
|
|
|
proc reportStoredMessagesMetric*(w: WakuStore) =
|
|
|
|
if w.store.isNil():
|
|
|
|
return
|
|
|
|
|
|
|
|
let resCount = w.store.getMessagesCount()
|
2022-09-15 16:13:30 +00:00
|
|
|
if resCount.isErr():
|
|
|
|
return
|
|
|
|
|
|
|
|
waku_store_messages.set(resCount.value, labelValues = ["stored"])
|
|
|
|
|
2022-11-09 17:50:18 +00:00
|
|
|
# TODO: Move to a message store wrapper
|
|
|
|
proc isValidMessage(msg: WakuMessage): bool =
|
|
|
|
if msg.timestamp == 0:
|
|
|
|
return true
|
2022-09-15 16:13:30 +00:00
|
|
|
|
2022-11-09 17:50:18 +00:00
|
|
|
let
|
|
|
|
now = getNanosecondTime(getTime().toUnixFloat())
|
|
|
|
lowerBound = now - MaxMessageTimestampVariance
|
|
|
|
upperBound = now + MaxMessageTimestampVariance
|
|
|
|
|
|
|
|
return lowerBound <= msg.timestamp and msg.timestamp <= upperBound
|
|
|
|
|
|
|
|
# TODO: Move to a message store wrapper
|
|
|
|
proc handleMessage*(w: WakuStore, pubsubTopic: PubsubTopic, msg: WakuMessage) =
|
|
|
|
if w.store.isNil():
|
|
|
|
# Messages should not be stored
|
|
|
|
return
|
|
|
|
|
|
|
|
if msg.ephemeral:
|
|
|
|
# The message is ephemeral, should not be stored
|
|
|
|
return
|
|
|
|
|
|
|
|
if not isValidMessage(msg):
|
|
|
|
waku_store_errors.inc(labelValues = [invalidMessage])
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
let insertStartTime = getTime().toUnixFloat()
|
|
|
|
|
|
|
|
block:
|
|
|
|
let
|
|
|
|
msgDigest = computeDigest(msg)
|
|
|
|
msgReceivedTime = if msg.timestamp > 0: msg.timestamp
|
|
|
|
else: getNanosecondTime(getTime().toUnixFloat())
|
|
|
|
|
|
|
|
trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest
|
|
|
|
|
|
|
|
let putStoreRes = w.store.put(pubsubTopic, msg, msgDigest, msgReceivedTime)
|
|
|
|
if putStoreRes.isErr():
|
|
|
|
debug "failed to insert message into the store", err=putStoreRes.error
|
|
|
|
waku_store_errors.inc(labelValues = [insertFailure])
|
|
|
|
return
|
|
|
|
|
|
|
|
let insertDuration = getTime().toUnixFloat() - insertStartTime
|
|
|
|
waku_store_insert_duration_seconds.observe(insertDuration)
|
|
|
|
|
|
|
|
|
|
|
|
# TODO: Move to a message store wrapper
|
|
|
|
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
|
2022-02-17 10:00:45 +00:00
|
|
|
## Query history to return a single page of messages matching the query
|
2022-03-11 05:26:15 +00:00
|
|
|
|
2022-09-02 08:14:58 +00:00
|
|
|
# Extract query criteria. All query criteria are optional
|
2022-01-11 13:32:09 +00:00
|
|
|
let
|
2022-11-09 17:50:18 +00:00
|
|
|
qContentTopics = if query.contentTopics.len == 0: none(seq[ContentTopic])
|
|
|
|
else: some(query.contentTopics)
|
|
|
|
qPubSubTopic = query.pubsubTopic
|
|
|
|
qCursor = query.cursor
|
|
|
|
qStartTime = query.startTime
|
|
|
|
qEndTime = query.endTime
|
|
|
|
qMaxPageSize = if query.pageSize <= 0: DefaultPageSize
|
|
|
|
else: min(query.pageSize, MaxPageSize)
|
|
|
|
qAscendingOrder = query.ascending
|
2022-09-13 14:48:33 +00:00
|
|
|
|
|
|
|
|
2022-09-20 09:39:52 +00:00
|
|
|
let queryStartTime = getTime().toUnixFloat()
|
|
|
|
|
|
|
|
let queryRes = w.store.getMessagesByHistoryQuery(
|
2022-08-01 16:21:11 +00:00
|
|
|
contentTopic = qContentTopics,
|
|
|
|
pubsubTopic = qPubSubTopic,
|
|
|
|
cursor = qCursor,
|
|
|
|
startTime = qStartTime,
|
|
|
|
endTime = qEndTime,
|
2022-10-03 15:36:17 +00:00
|
|
|
maxPageSize = qMaxPageSize + 1,
|
2022-08-01 16:21:11 +00:00
|
|
|
ascendingOrder = qAscendingOrder
|
|
|
|
)
|
2022-09-13 14:48:33 +00:00
|
|
|
|
2022-09-14 12:40:11 +00:00
|
|
|
let queryDuration = getTime().toUnixFloat() - queryStartTime
|
|
|
|
waku_store_query_duration_seconds.observe(queryDuration)
|
2022-09-13 14:48:33 +00:00
|
|
|
|
|
|
|
|
2022-09-02 08:14:58 +00:00
|
|
|
# Build response
|
2022-08-01 16:21:11 +00:00
|
|
|
if queryRes.isErr():
|
2022-11-09 17:50:18 +00:00
|
|
|
# TODO: Improve error reporting
|
|
|
|
return err(HistoryError(kind: HistoryErrorKind.UNKNOWN))
|
2022-01-11 13:32:09 +00:00
|
|
|
|
2022-10-03 15:36:17 +00:00
|
|
|
let rows = queryRes.get()
|
2022-01-11 13:32:09 +00:00
|
|
|
|
2022-10-03 15:36:17 +00:00
|
|
|
if rows.len <= 0:
|
2022-11-09 17:50:18 +00:00
|
|
|
return ok(HistoryResponse(
|
|
|
|
messages: @[],
|
|
|
|
pageSize: 0,
|
|
|
|
ascending: qAscendingOrder,
|
|
|
|
cursor: none(HistoryCursor)
|
|
|
|
))
|
2022-10-03 15:36:17 +00:00
|
|
|
|
2022-11-09 17:50:18 +00:00
|
|
|
|
2022-10-03 15:36:17 +00:00
|
|
|
var messages = if rows.len <= int(qMaxPageSize): rows.mapIt(it[1])
|
|
|
|
else: rows[0..^2].mapIt(it[1])
|
2022-11-09 17:50:18 +00:00
|
|
|
var cursor = none(HistoryCursor)
|
2022-10-03 15:36:17 +00:00
|
|
|
|
|
|
|
# The retrieved messages list should always be in chronological order
|
|
|
|
if not qAscendingOrder:
|
|
|
|
messages.reverse()
|
|
|
|
|
|
|
|
|
|
|
|
if rows.len > int(qMaxPageSize):
|
2022-10-20 10:24:40 +00:00
|
|
|
## Build last message cursor
|
|
|
|
## The cursor is built from the last message INCLUDED in the response
|
|
|
|
## (i.e. the second last message in the rows list)
|
|
|
|
let (pubsubTopic, message, digest, storeTimestamp) = rows[^2]
|
2022-10-03 15:36:17 +00:00
|
|
|
|
|
|
|
# TODO: Improve coherence of MessageDigest type
|
|
|
|
var messageDigest: array[32, byte]
|
|
|
|
for i in 0..<min(digest.len, 32):
|
|
|
|
messageDigest[i] = digest[i]
|
|
|
|
|
2022-11-09 17:50:18 +00:00
|
|
|
cursor = some(HistoryCursor(
|
2022-10-03 15:36:17 +00:00
|
|
|
pubsubTopic: pubsubTopic,
|
|
|
|
senderTime: message.timestamp,
|
2022-11-09 17:50:18 +00:00
|
|
|
storeTime: storeTimestamp,
|
2022-10-03 15:36:17 +00:00
|
|
|
digest: MessageDigest(data: messageDigest)
|
|
|
|
))
|
|
|
|
|
2022-11-09 17:50:18 +00:00
|
|
|
|
|
|
|
ok(HistoryResponse(
|
2022-08-01 16:21:11 +00:00
|
|
|
messages: messages,
|
2022-11-09 17:50:18 +00:00
|
|
|
pageSize: uint64(messages.len),
|
|
|
|
ascending: qAscendingOrder,
|
|
|
|
cursor: cursor
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
|
|
## Protocol
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2022-09-20 09:39:52 +00:00
|
|
|
proc initProtocolHandler*(ws: WakuStore) =
|
|
|
|
|
2021-06-09 14:37:08 +00:00
|
|
|
proc handler(conn: Connection, proto: string) {.async.} =
|
2022-09-02 08:14:58 +00:00
|
|
|
let buf = await conn.readLp(MaxRpcSize.int)
|
|
|
|
|
2022-11-09 17:50:18 +00:00
|
|
|
let decodeRes = HistoryRPC.decode(buf)
|
|
|
|
if decodeRes.isErr():
|
2022-09-02 08:14:58 +00:00
|
|
|
error "failed to decode rpc", peerId=conn.peerId
|
2021-02-09 08:31:38 +00:00
|
|
|
waku_store_errors.inc(labelValues = [decodeRpcFailure])
|
2022-11-09 17:50:18 +00:00
|
|
|
# TODO: Return (BAD_REQUEST, cause: "decode rpc failed")
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
let reqRpc = decodeRes.value
|
|
|
|
|
|
|
|
if reqRpc.query == default(HistoryQueryRPC):
|
|
|
|
error "empty query rpc", peerId=conn.peerId, requestId=reqRpc.requestId
|
|
|
|
waku_store_errors.inc(labelValues = [emptyRpcQueryFailure])
|
|
|
|
# TODO: Return (BAD_REQUEST, cause: "empty query")
|
2020-08-27 02:44:09 +00:00
|
|
|
return
|
|
|
|
|
2022-09-02 08:14:58 +00:00
|
|
|
|
2022-11-09 17:50:18 +00:00
|
|
|
info "received history query", peerId=conn.peerId, requestId=reqRpc.requestId, query=reqRpc.query
|
2022-01-06 11:23:25 +00:00
|
|
|
waku_store_queries.inc()
|
2020-08-27 02:44:09 +00:00
|
|
|
|
2022-11-09 17:50:18 +00:00
|
|
|
|
|
|
|
if ws.store.isNil():
|
|
|
|
let respErr = HistoryError(kind: HistoryErrorKind.SERVICE_UNAVAILABLE)
|
|
|
|
|
|
|
|
error "history query failed", peerId=conn.peerId, requestId=reqRpc.requestId, error= $respErr
|
|
|
|
|
|
|
|
let resp = HistoryResponseRPC(error: respErr.toRPC())
|
|
|
|
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: resp)
|
|
|
|
await conn.writeLp(rpc.encode().buffer)
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
let query = reqRpc.query.toApi()
|
|
|
|
let respRes = ws.findMessages(query)
|
|
|
|
|
|
|
|
if respRes.isErr():
|
|
|
|
error "history query failed", peerId=conn.peerId, requestId=reqRpc.requestId, error=respRes.error
|
|
|
|
|
|
|
|
let resp = respRes.toRPC()
|
|
|
|
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: resp)
|
|
|
|
await conn.writeLp(rpc.encode().buffer)
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
let resp = respRes.toRPC()
|
2020-11-24 04:53:42 +00:00
|
|
|
|
2022-09-13 16:06:23 +00:00
|
|
|
if not ws.wakuSwap.isNil():
|
2022-11-09 17:50:18 +00:00
|
|
|
info "handle store swap", peerId=conn.peerId, requestId=reqRpc.requestId, text=ws.wakuSwap.text
|
2022-09-02 08:14:58 +00:00
|
|
|
|
|
|
|
# Perform accounting operation
|
2022-11-09 17:50:18 +00:00
|
|
|
# TODO: Do accounting here, response is HistoryResponseRPC. How do we get node or swap context?
|
2021-10-06 12:29:08 +00:00
|
|
|
let peerId = conn.peerId
|
2022-09-02 08:14:58 +00:00
|
|
|
let messages = resp.messages
|
2021-10-06 12:29:08 +00:00
|
|
|
ws.wakuSwap.credit(peerId, messages.len)
|
2020-11-24 04:53:42 +00:00
|
|
|
|
2021-04-20 01:59:14 +00:00
|
|
|
|
2022-11-09 17:50:18 +00:00
|
|
|
info "sending history response", peerId=conn.peerId, requestId=reqRpc.requestId, messages=resp.messages.len
|
|
|
|
|
|
|
|
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: resp)
|
2022-09-02 08:14:58 +00:00
|
|
|
await conn.writeLp(rpc.encode().buffer)
|
2020-08-27 02:44:09 +00:00
|
|
|
|
2021-06-09 14:37:08 +00:00
|
|
|
ws.handler = handler
|
2020-08-27 02:44:09 +00:00
|
|
|
ws.codec = WakuStoreCodec
|
2020-11-16 08:38:52 +00:00
|
|
|
|
2022-10-28 18:11:28 +00:00
|
|
|
proc new*(T: type WakuStore,
|
2022-09-16 10:55:22 +00:00
|
|
|
peerManager: PeerManager,
|
|
|
|
rng: ref rand.HmacDrbgContext,
|
2022-09-20 09:39:52 +00:00
|
|
|
store: MessageStore,
|
2022-09-16 10:55:22 +00:00
|
|
|
wakuSwap: WakuSwap = nil,
|
|
|
|
retentionPolicy=none(MessageRetentionPolicy)): T =
|
|
|
|
let ws = WakuStore(
|
|
|
|
rng: rng,
|
|
|
|
peerManager: peerManager,
|
|
|
|
store: store,
|
|
|
|
wakuSwap: wakuSwap,
|
|
|
|
retentionPolicy: retentionPolicy
|
|
|
|
)
|
2022-09-20 09:39:52 +00:00
|
|
|
ws.initProtocolHandler()
|
2022-11-09 17:50:18 +00:00
|
|
|
ws
|