Ivan FB c01a21e01f chore: bump dependencies for v0.35 (#3255)
Changes:
	modified:   .gitmodules
	modified:   tests/waku_discv5/utils.nim
	modified:   tests/waku_enr/utils.nim
	modified:   tests/waku_rln_relay/test_rln_group_manager_onchain.nim
	modified:   tests/waku_rln_relay/utils.nim
	modified:   tests/waku_rln_relay/utils_onchain.nim

        modified:   vendor/nim-chronicles
	modified:   vendor/nim-eth
	modified:   vendor/nim-http-utils
	modified:   vendor/nim-json-rpc
	modified:   vendor/nim-json-serialization
	modified:   vendor/nim-libp2p - 1.8.0!
	modified:   vendor/nim-metrics
	new file:   vendor/nim-minilru
	modified:   vendor/nim-nat-traversal
	modified:   vendor/nim-presto
	modified:   vendor/nim-secp256k1
	modified:   vendor/nim-serialization
	modified:   vendor/nim-stew
	modified:   vendor/nim-taskpools
	modified:   vendor/nim-testutils
	modified:   vendor/nim-toml-serialization
	modified:   vendor/nim-unicodedb
	modified:   vendor/nim-unittest2
	modified:   vendor/nim-web3 - from distinct branch that solves Ethereum ABI issue.
	modified:   vendor/nim-websock
	modified:   vendor/nim-zlib
	modified:   vendor/nimcrypto
	modified:   waku.nimble

        modified:   waku/common/enr/builder.nim
	modified:   waku/common/enr/typed_record.nim
	modified:   waku/common/utils/nat.nim
	modified:   waku/discovery/waku_discv5.nim
	modified:   waku/waku_rln_relay/conversion_utils.nim
	modified:   waku/waku_rln_relay/group_manager/on_chain/group_manager.nim
	modified:   waku/waku_rln_relay/rln/wrappers.nim
	modified:   waku/waku_rln_relay/rln_relay.nim

* Eliminate C compilation issue with chat2bridge due to an overcomplicating import from json_rpc instead of using std/json
* Adapt ENR Record handling to new interface of nim-eth
* Fix chrash in group_manager on_chain
* Fix signature of register and MemberRegister to UInt256, check transaction success in register
* Upgrade json-rpc and serialization
* Update to match latest enr and nat interface
* Using of extracted result of contract macro - with necessary adaption
* Bump nim-chornicles, nim-libp2p, nimcrypto
* Bump nim-web3, nim-eth and deps - on_chain/group_manager.nim adaption
* Added status-im/nim-minilru submodule required by latest nim-eth
Fixing tests.
* group_manager: adapt smart contract param types
* update web3 vendor
* bump vendors for v0.35.0
* protobuf.nim: fix compilation error after nim-libp2p bump
* changes to make it compile after rebase from master
---------

Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
2025-01-28 10:04:34 +01:00

368 lines
9.9 KiB
Nim

{.push raises: [].}
import
std/[sequtils, options],
stew/byteutils,
results,
chronicles,
chronos,
metrics,
libp2p/utility,
libp2p/protocols/protocol,
libp2p/stream/connection,
libp2p/crypto/crypto,
eth/p2p/discoveryv5/enr
import
../common/nimchronos,
../common/protobuf,
../common/paging,
../waku_enr,
../waku_core/codecs,
../waku_core/time,
../waku_core/topics/pubsub_topic,
../waku_core/message/digest,
../waku_core/message/message,
../node/peer_manager/peer_manager,
../waku_archive,
./common,
./codec,
./storage/storage,
./storage/seq_storage,
./storage/range_processing,
./protocols_metrics
logScope:
topics = "waku reconciliation"
const DefaultStorageCap = 50_000
type SyncReconciliation* = ref object of LPProtocol
peerManager: PeerManager
wakuArchive: WakuArchive
storage: SyncStorage
# Receive IDs from transfer protocol for storage
idsRx: AsyncQueue[SyncID]
# Send Hashes to transfer protocol for reception
localWantsTx: AsyncQueue[(PeerId, WakuMessageHash)]
# Send Hashes to transfer protocol for transmission
remoteNeedsTx: AsyncQueue[(PeerId, WakuMessageHash)]
# params
syncInterval: timer.Duration # Time between each synchronization attempt
syncRange: timer.Duration # Amount of time in the past to sync
relayJitter: Duration # Amount of time since the present to ignore when syncing
# futures
periodicSyncFut: Future[void]
periodicPruneFut: Future[void]
idsReceiverFut: Future[void]
proc messageIngress*(
self: SyncReconciliation, pubsubTopic: PubsubTopic, msg: WakuMessage
) =
let msgHash = computeMessageHash(pubsubTopic, msg)
let id = SyncID(time: msg.timestamp, hash: msgHash)
self.storage.insert(id).isOkOr:
error "failed to insert new message", msg_hash = msgHash.toHex(), err = error
proc messageIngress*(
self: SyncReconciliation, msgHash: WakuMessageHash, msg: WakuMessage
) =
let id = SyncID(time: msg.timestamp, hash: msgHash)
self.storage.insert(id).isOkOr:
error "failed to insert new message", msg_hash = msgHash.toHex(), err = error
proc messageIngress*(self: SyncReconciliation, id: SyncID) =
self.storage.insert(id).isOkOr:
error "failed to insert new message", msg_hash = id.hash.toHex(), err = error
proc processRequest(
self: SyncReconciliation, conn: Connection
): Future[Result[void, string]] {.async.} =
var roundTrips = 0
while true:
let readRes = catch:
await conn.readLp(int.high)
let buffer: seq[byte] = readRes.valueOr:
return err("connection read error: " & error.msg)
total_bytes_exchanged.observe(buffer.len, labelValues = [Reconciliation, Receiving])
let recvPayload = RangesData.deltaDecode(buffer).valueOr:
return err("payload decoding error: " & error)
roundTrips.inc()
trace "sync payload received",
local = self.peerManager.switch.peerInfo.peerId,
remote = conn.peerId,
payload = recvPayload
if recvPayload.ranges.len == 0 or recvPayload.ranges.allIt(it[1] == RangeType.Skip):
break
var
hashToRecv: seq[WakuMessageHash]
hashToSend: seq[WakuMessageHash]
let sendPayload = self.storage.processPayload(recvPayload, hashToSend, hashToRecv)
for hash in hashToSend:
await self.remoteNeedsTx.addLast((conn.peerId, hash))
for hash in hashToRecv:
await self.localWantstx.addLast((conn.peerId, hash))
let rawPayload = sendPayload.deltaEncode()
total_bytes_exchanged.observe(
rawPayload.len, labelValues = [Reconciliation, Sending]
)
let writeRes = catch:
await conn.writeLP(rawPayload)
if writeRes.isErr():
return err("connection write error: " & writeRes.error.msg)
trace "sync payload sent",
local = self.peerManager.switch.peerInfo.peerId,
remote = conn.peerId,
payload = sendPayload
if sendPayload.ranges.len == 0 or sendPayload.ranges.allIt(it[1] == RangeType.Skip):
break
continue
reconciliation_roundtrips.observe(roundTrips)
await conn.close()
return ok()
proc initiate(
self: SyncReconciliation, connection: Connection
): Future[Result[void, string]] {.async.} =
let
timeRange = calculateTimeRange(self.relayJitter, self.syncRange)
lower = SyncID(time: timeRange.a, hash: EmptyFingerprint)
upper = SyncID(time: timeRange.b, hash: FullFingerprint)
bounds = lower .. upper
fingerprint = self.storage.computeFingerprint(bounds)
initPayload = RangesData(
ranges: @[(bounds, RangeType.Fingerprint)],
fingerprints: @[fingerprint],
itemSets: @[],
)
let sendPayload = initPayload.deltaEncode()
total_bytes_exchanged.observe(
sendPayload.len, labelValues = [Reconciliation, Sending]
)
let writeRes = catch:
await connection.writeLP(sendPayload)
if writeRes.isErr():
return err("connection write error: " & writeRes.error.msg)
trace "sync payload sent",
local = self.peerManager.switch.peerInfo.peerId,
remote = connection.peerId,
payload = sendPayload
?await self.processRequest(connection)
return ok()
proc storeSynchronization*(
self: SyncReconciliation, peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo)
): Future[Result[void, string]] {.async.} =
let peer = peerInfo.valueOr:
self.peerManager.selectPeer(WakuReconciliationCodec).valueOr:
return err("no suitable peer found for sync")
let connOpt = await self.peerManager.dialPeer(peer, WakuReconciliationCodec)
let conn: Connection = connOpt.valueOr:
return err("cannot establish sync connection")
debug "sync session initialized",
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId
(await self.initiate(conn)).isOkOr:
error "sync session failed",
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId, err = error
return err("sync request error: " & error)
debug "sync session ended gracefully",
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId
return ok()
proc initFillStorage(
syncRange: timer.Duration, wakuArchive: WakuArchive
): Future[Result[seq[SyncID], string]] {.async.} =
if wakuArchive.isNil():
return err("waku archive unavailable")
let endTime = getNowInNanosecondTime()
let starTime = endTime - syncRange.nanos
#TODO special query for only timestap and hash ???
var query = ArchiveQuery(
includeData: true,
cursor: none(ArchiveCursor),
startTime: some(starTime),
endTime: some(endTime),
pageSize: 100,
direction: PagingDirection.FORWARD,
)
debug "initial storage filling started"
var ids = newSeq[SyncID](DefaultStorageCap)
# we assume IDs are in order
while true:
let response = (await wakuArchive.findMessages(query)).valueOr:
return err("archive retrival failed: " & $error)
for i in 0 ..< response.hashes.len:
let hash = response.hashes[i]
let msg = response.messages[i]
ids.add(SyncID(time: msg.timestamp, hash: hash))
if response.cursor.isNone():
break
query.cursor = response.cursor
debug "initial storage filling done", elements = ids.len
return ok(ids)
proc new*(
T: type SyncReconciliation,
peerManager: PeerManager,
wakuArchive: WakuArchive,
syncRange: timer.Duration = DefaultSyncRange,
syncInterval: timer.Duration = DefaultSyncInterval,
relayJitter: timer.Duration = DefaultGossipSubJitter,
idsRx: AsyncQueue[SyncID],
localWantsTx: AsyncQueue[(PeerId, WakuMessageHash)],
remoteNeedsTx: AsyncQueue[(PeerId, WakuMessageHash)],
): Future[Result[T, string]] {.async.} =
let res = await initFillStorage(syncRange, wakuArchive)
let storage =
if res.isErr():
warn "will not sync messages before this point in time", error = res.error
SeqStorage.new(DefaultStorageCap)
else:
SeqStorage.new(res.get())
var sync = SyncReconciliation(
peerManager: peerManager,
storage: storage,
syncRange: syncRange,
syncInterval: syncInterval,
relayJitter: relayJitter,
idsRx: idsRx,
localWantsTx: localWantsTx,
remoteNeedsTx: remoteNeedsTx,
)
let handler = proc(conn: Connection, proto: string) {.async, closure.} =
(await sync.processRequest(conn)).isOkOr:
error "request processing error", error = error
return
sync.handler = handler
sync.codec = WakuReconciliationCodec
info "Store Reconciliation protocol initialized"
return ok(sync)
proc periodicSync(self: SyncReconciliation) {.async.} =
debug "periodic sync initialized", interval = $self.syncInterval
while true: # infinite loop
await sleepAsync(self.syncInterval)
debug "periodic sync started"
(await self.storeSynchronization()).isOkOr:
error "periodic sync failed", err = error
continue
debug "periodic sync done"
proc periodicPrune(self: SyncReconciliation) {.async.} =
debug "periodic prune initialized", interval = $self.syncInterval
# preventing sync and prune loops of happening at the same time.
await sleepAsync((self.syncInterval div 2))
while true: # infinite loop
await sleepAsync(self.syncInterval)
debug "periodic prune started"
let time = getNowInNanosecondTime() - self.syncRange.nanos
let count = self.storage.prune(time)
debug "periodic prune done", elements_pruned = count
proc idsReceiverLoop(self: SyncReconciliation) {.async.} =
while true: # infinite loop
let id = await self.idsRx.popfirst()
self.messageIngress(id)
proc start*(self: SyncReconciliation) =
if self.started:
return
self.started = true
if self.syncInterval > ZeroDuration:
self.periodicSyncFut = self.periodicSync()
if self.syncInterval > ZeroDuration:
self.periodicPruneFut = self.periodicPrune()
self.idsReceiverFut = self.idsReceiverLoop()
info "Store Sync Reconciliation protocol started"
proc stop*(self: SyncReconciliation) =
if self.syncInterval > ZeroDuration:
self.periodicSyncFut.cancelSoon()
if self.syncInterval > ZeroDuration:
self.periodicPruneFut.cancelSoon()
self.idsReceiverFut.cancelSoon()
info "Store Sync Reconciliation protocol stopped"