2022-05-19 14:56:03 -05:00
|
|
|
## Nim-Codex
|
2022-05-18 20:29:15 -06:00
|
|
|
## Copyright (c) 2022 Status Research & Development GmbH
|
|
|
|
## Licensed under either of
|
|
|
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
|
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
|
|
## at your option.
|
|
|
|
## This file may not be copied, modified, or distributed except according to
|
|
|
|
## those terms.
|
|
|
|
|
|
|
|
import std/sequtils
|
|
|
|
|
|
|
|
import pkg/chronos
|
2023-08-01 16:47:57 -07:00
|
|
|
import pkg/libp2p/cid
|
2024-05-09 10:03:35 +02:00
|
|
|
import pkg/libp2p/multicodec
|
2022-08-23 10:11:21 -06:00
|
|
|
import pkg/metrics
|
2022-12-02 18:00:55 -06:00
|
|
|
import pkg/questionable
|
2022-11-14 18:01:05 -06:00
|
|
|
import pkg/questionable/results
|
2022-05-18 20:29:15 -06:00
|
|
|
|
feat: create logging proxy (#663)
* implement a logging proxy
The logging proxy:
- prevents the need to import chronicles (as well as export except toJson),
- prevents the need to override `writeValue` or use or import nim-json-seralization elsewhere in the codebase, allowing for sole use of utils/json for de/serialization,
- and handles json formatting correctly in chronicles json sinks
* Rename logging -> logutils to avoid ambiguity with common names
* clean up
* add setProperty for JsonRecord, remove nim-json-serialization conflict
* Allow specifying textlines and json format separately
Not specifying a LogFormat will apply the formatting to both textlines and json sinks.
Specifying a LogFormat will apply the formatting to only that sink.
* remove unneeded usages of std/json
We only need to import utils/json instead of std/json
* move serialization from rest/json to utils/json so it can be shared
* fix NoColors ambiguity
Was causing unit tests to fail on Windows.
* Remove nre usage to fix Windows error
Windows was erroring with `could not load: pcre64.dll`. Instead of fixing that error, remove the pcre usage :)
* Add logutils module doc
* Shorten logutils.formatIt for `NBytes`
Both json and textlines formatIt were not needed, and could be combined into one formatIt
* remove debug integration test config
debug output and logformat of json for integration test logs
* Use ## module doc to support docgen
* bump nim-poseidon2 to export fromBytes
Before the changes in this branch, fromBytes was likely being resolved by nim-stew, or other dependency. With the changes in this branch, that dependency was removed and fromBytes could no longer be resolved. By exporting fromBytes from nim-poseidon, the correct resolution is now happening.
* fixes to get compiling after rebasing master
* Add support for Result types being logged using formatIt
2024-01-23 18:35:03 +11:00
|
|
|
import ./pendingblocks
|
2023-12-20 13:24:40 +11:00
|
|
|
|
feat: create logging proxy (#663)
* implement a logging proxy
The logging proxy:
- prevents the need to import chronicles (as well as export except toJson),
- prevents the need to override `writeValue` or use or import nim-json-seralization elsewhere in the codebase, allowing for sole use of utils/json for de/serialization,
- and handles json formatting correctly in chronicles json sinks
* Rename logging -> logutils to avoid ambiguity with common names
* clean up
* add setProperty for JsonRecord, remove nim-json-serialization conflict
* Allow specifying textlines and json format separately
Not specifying a LogFormat will apply the formatting to both textlines and json sinks.
Specifying a LogFormat will apply the formatting to only that sink.
* remove unneeded usages of std/json
We only need to import utils/json instead of std/json
* move serialization from rest/json to utils/json so it can be shared
* fix NoColors ambiguity
Was causing unit tests to fail on Windows.
* Remove nre usage to fix Windows error
Windows was erroring with `could not load: pcre64.dll`. Instead of fixing that error, remove the pcre usage :)
* Add logutils module doc
* Shorten logutils.formatIt for `NBytes`
Both json and textlines formatIt were not needed, and could be combined into one formatIt
* remove debug integration test config
debug output and logformat of json for integration test logs
* Use ## module doc to support docgen
* bump nim-poseidon2 to export fromBytes
Before the changes in this branch, fromBytes was likely being resolved by nim-stew, or other dependency. With the changes in this branch, that dependency was removed and fromBytes could no longer be resolved. By exporting fromBytes from nim-poseidon, the correct resolution is now happening.
* fixes to get compiling after rebasing master
* Add support for Result types being logged using formatIt
2024-01-23 18:35:03 +11:00
|
|
|
import ../protobuf/presence
|
2022-05-18 20:29:15 -06:00
|
|
|
import ../network
|
|
|
|
import ../peers
|
|
|
|
|
|
|
|
import ../../utils
|
|
|
|
import ../../discovery
|
|
|
|
import ../../stores/blockstore
|
feat: create logging proxy (#663)
* implement a logging proxy
The logging proxy:
- prevents the need to import chronicles (as well as export except toJson),
- prevents the need to override `writeValue` or use or import nim-json-seralization elsewhere in the codebase, allowing for sole use of utils/json for de/serialization,
- and handles json formatting correctly in chronicles json sinks
* Rename logging -> logutils to avoid ambiguity with common names
* clean up
* add setProperty for JsonRecord, remove nim-json-serialization conflict
* Allow specifying textlines and json format separately
Not specifying a LogFormat will apply the formatting to both textlines and json sinks.
Specifying a LogFormat will apply the formatting to only that sink.
* remove unneeded usages of std/json
We only need to import utils/json instead of std/json
* move serialization from rest/json to utils/json so it can be shared
* fix NoColors ambiguity
Was causing unit tests to fail on Windows.
* Remove nre usage to fix Windows error
Windows was erroring with `could not load: pcre64.dll`. Instead of fixing that error, remove the pcre usage :)
* Add logutils module doc
* Shorten logutils.formatIt for `NBytes`
Both json and textlines formatIt were not needed, and could be combined into one formatIt
* remove debug integration test config
debug output and logformat of json for integration test logs
* Use ## module doc to support docgen
* bump nim-poseidon2 to export fromBytes
Before the changes in this branch, fromBytes was likely being resolved by nim-stew, or other dependency. With the changes in this branch, that dependency was removed and fromBytes could no longer be resolved. By exporting fromBytes from nim-poseidon, the correct resolution is now happening.
* fixes to get compiling after rebasing master
* Add support for Result types being logged using formatIt
2024-01-23 18:35:03 +11:00
|
|
|
import ../../logutils
|
2024-05-09 10:03:35 +02:00
|
|
|
import ../../manifest
|
2022-05-18 20:29:15 -06:00
|
|
|
|
|
|
|
logScope:
|
2022-11-15 09:46:21 -06:00
|
|
|
topics = "codex discoveryengine"
|
2022-05-18 20:29:15 -06:00
|
|
|
|
2023-03-10 08:02:54 +01:00
|
|
|
declareGauge(codexInflightDiscovery, "inflight discovery requests")
|
2022-08-23 10:11:21 -06:00
|
|
|
|
2022-05-18 20:29:15 -06:00
|
|
|
const
|
|
|
|
DefaultConcurrentDiscRequests = 10
|
|
|
|
DefaultConcurrentAdvertRequests = 10
|
|
|
|
DefaultDiscoveryTimeout = 1.minutes
|
|
|
|
DefaultMinPeersPerBlock = 3
|
|
|
|
DefaultDiscoveryLoopSleep = 3.seconds
|
2022-12-02 18:00:55 -06:00
|
|
|
DefaultAdvertiseLoopSleep = 30.minutes
|
2022-05-18 20:29:15 -06:00
|
|
|
|
|
|
|
type
|
|
|
|
DiscoveryEngine* = ref object of RootObj
|
|
|
|
localStore*: BlockStore # Local block store for this instance
|
|
|
|
peers*: PeerCtxStore # Peer context store
|
|
|
|
network*: BlockExcNetwork # Network interface
|
|
|
|
discovery*: Discovery # Discovery interface
|
|
|
|
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
|
|
|
|
discEngineRunning*: bool # Indicates if discovery is running
|
|
|
|
concurrentAdvReqs: int # Concurrent advertise requests
|
|
|
|
concurrentDiscReqs: int # Concurrent discovery requests
|
|
|
|
advertiseLoop*: Future[void] # Advertise loop task handle
|
|
|
|
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
|
|
|
|
advertiseTasks*: seq[Future[void]] # Advertise tasks
|
|
|
|
discoveryLoop*: Future[void] # Discovery loop task handle
|
|
|
|
discoveryQueue*: AsyncQueue[Cid] # Discovery queue
|
2022-11-14 18:01:05 -06:00
|
|
|
discoveryTasks*: seq[Future[void]] # Discovery tasks
|
2022-05-18 20:29:15 -06:00
|
|
|
minPeersPerBlock*: int # Max number of peers with block
|
|
|
|
discoveryLoopSleep: Duration # Discovery loop sleep
|
|
|
|
advertiseLoopSleep: Duration # Advertise loop sleep
|
|
|
|
inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests
|
|
|
|
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests
|
2022-12-02 18:00:55 -06:00
|
|
|
advertiseType*: BlockType # Advertice blocks, manifests or both
|
2022-05-18 20:29:15 -06:00
|
|
|
|
|
|
|
proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
|
|
|
|
while b.discEngineRunning:
|
2023-11-14 13:02:17 +01:00
|
|
|
for cid in toSeq(b.pendingBlocks.wantListBlockCids):
|
2022-05-18 20:29:15 -06:00
|
|
|
try:
|
|
|
|
await b.discoveryQueue.put(cid)
|
|
|
|
except CatchableError as exc:
|
2024-04-30 11:31:06 +02:00
|
|
|
warn "Exception in discovery loop", exc = exc.msg
|
2022-05-18 20:29:15 -06:00
|
|
|
|
|
|
|
logScope:
|
|
|
|
sleep = b.discoveryLoopSleep
|
|
|
|
wanted = b.pendingBlocks.len
|
|
|
|
|
|
|
|
await sleepAsync(b.discoveryLoopSleep)
|
|
|
|
|
2024-05-09 10:03:35 +02:00
|
|
|
proc advertiseBlock(b: DiscoveryEngine, cid: Cid) {.async.} =
|
|
|
|
without isM =? cid.isManifest, err:
|
|
|
|
warn "Unable to determine if cid is manifest"
|
|
|
|
return
|
|
|
|
|
|
|
|
if isM:
|
|
|
|
without blk =? await b.localStore.getBlock(cid), err:
|
|
|
|
error "Error retrieving manifest block", cid, err = err.msg
|
|
|
|
return
|
|
|
|
|
|
|
|
without manifest =? Manifest.decode(blk), err:
|
|
|
|
error "Unable to decode as manifest", err = err.msg
|
|
|
|
return
|
|
|
|
|
|
|
|
# announce manifest cid and tree cid
|
|
|
|
await b.advertiseQueue.put(cid)
|
|
|
|
await b.advertiseQueue.put(manifest.treeCid)
|
|
|
|
|
2023-08-22 08:35:16 +02:00
|
|
|
proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} =
|
2022-05-18 20:29:15 -06:00
|
|
|
while b.discEngineRunning:
|
2022-12-02 18:00:55 -06:00
|
|
|
if cids =? await b.localStore.listBlocks(blockType = b.advertiseType):
|
2023-08-22 08:35:16 +02:00
|
|
|
trace "Begin iterating blocks..."
|
2022-12-02 18:00:55 -06:00
|
|
|
for c in cids:
|
|
|
|
if cid =? await c:
|
2024-05-09 10:03:35 +02:00
|
|
|
b.advertiseBlock(cid)
|
|
|
|
await sleepAsync(100.millis)
|
2023-08-22 08:35:16 +02:00
|
|
|
trace "Iterating blocks finished."
|
2022-05-18 20:29:15 -06:00
|
|
|
|
|
|
|
await sleepAsync(b.advertiseLoopSleep)
|
|
|
|
|
2024-04-30 11:31:06 +02:00
|
|
|
info "Exiting advertise task loop"
|
2022-05-18 20:29:15 -06:00
|
|
|
|
|
|
|
proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =
|
|
|
|
## Run advertise tasks
|
|
|
|
##
|
|
|
|
|
|
|
|
while b.discEngineRunning:
|
|
|
|
try:
|
|
|
|
let
|
|
|
|
cid = await b.advertiseQueue.get()
|
|
|
|
|
|
|
|
if cid in b.inFlightAdvReqs:
|
|
|
|
continue
|
|
|
|
|
|
|
|
try:
|
2022-11-14 18:01:05 -06:00
|
|
|
let
|
|
|
|
request = b.discovery.provide(cid)
|
|
|
|
|
2022-05-18 20:29:15 -06:00
|
|
|
b.inFlightAdvReqs[cid] = request
|
2023-03-10 08:02:54 +01:00
|
|
|
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
|
2022-05-18 20:29:15 -06:00
|
|
|
await request
|
2022-11-14 18:01:05 -06:00
|
|
|
|
2022-05-18 20:29:15 -06:00
|
|
|
finally:
|
|
|
|
b.inFlightAdvReqs.del(cid)
|
2023-03-10 08:02:54 +01:00
|
|
|
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
|
2022-05-18 20:29:15 -06:00
|
|
|
except CatchableError as exc:
|
2024-04-30 11:31:06 +02:00
|
|
|
warn "Exception in advertise task runner", exc = exc.msg
|
2022-05-18 20:29:15 -06:00
|
|
|
|
2024-04-30 11:31:06 +02:00
|
|
|
info "Exiting advertise task runner"
|
2022-05-18 20:29:15 -06:00
|
|
|
|
|
|
|
proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
|
|
|
|
## Run discovery tasks
|
|
|
|
##
|
|
|
|
|
|
|
|
while b.discEngineRunning:
|
|
|
|
try:
|
|
|
|
let
|
|
|
|
cid = await b.discoveryQueue.get()
|
|
|
|
|
|
|
|
if cid in b.inFlightDiscReqs:
|
2022-10-27 07:41:34 -06:00
|
|
|
trace "Discovery request already in progress", cid
|
2022-05-18 20:29:15 -06:00
|
|
|
continue
|
|
|
|
|
|
|
|
let
|
|
|
|
haves = b.peers.peersHave(cid)
|
|
|
|
|
2023-10-03 13:33:01 +02:00
|
|
|
trace "Current number of peers for block", cid, peers = haves.len
|
2022-05-18 20:29:15 -06:00
|
|
|
if haves.len < b.minPeersPerBlock:
|
2022-10-27 07:41:34 -06:00
|
|
|
trace "Discovering block", cid
|
2022-05-18 20:29:15 -06:00
|
|
|
try:
|
|
|
|
let
|
|
|
|
request = b.discovery
|
2022-05-25 20:29:31 -06:00
|
|
|
.find(cid)
|
2022-05-18 20:29:15 -06:00
|
|
|
.wait(DefaultDiscoveryTimeout)
|
|
|
|
|
|
|
|
b.inFlightDiscReqs[cid] = request
|
2023-03-10 08:02:54 +01:00
|
|
|
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
|
2022-05-18 20:29:15 -06:00
|
|
|
let
|
|
|
|
peers = await request
|
|
|
|
|
2023-07-18 07:50:47 +02:00
|
|
|
trace "Discovered peers for block", peers = peers.len, cid
|
2022-10-05 10:01:21 -06:00
|
|
|
let
|
|
|
|
dialed = await allFinished(
|
|
|
|
peers.mapIt( b.network.dialPeer(it.data) ))
|
|
|
|
|
|
|
|
for i, f in dialed:
|
|
|
|
if f.failed:
|
|
|
|
await b.discovery.removeProvider(peers[i].data.peerId)
|
|
|
|
|
2022-05-18 20:29:15 -06:00
|
|
|
finally:
|
|
|
|
b.inFlightDiscReqs.del(cid)
|
2023-03-10 08:02:54 +01:00
|
|
|
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
|
2022-05-18 20:29:15 -06:00
|
|
|
except CatchableError as exc:
|
2024-04-30 11:31:06 +02:00
|
|
|
warn "Exception in discovery task runner", exc = exc.msg
|
2022-05-18 20:29:15 -06:00
|
|
|
|
2024-04-30 11:31:06 +02:00
|
|
|
info "Exiting discovery task runner"
|
2022-05-18 20:29:15 -06:00
|
|
|
|
|
|
|
proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
|
2022-11-14 18:01:05 -06:00
|
|
|
for cid in cids:
|
|
|
|
if cid notin b.discoveryQueue:
|
|
|
|
try:
|
|
|
|
trace "Queueing find block", cid, queue = b.discoveryQueue.len
|
|
|
|
b.discoveryQueue.putNoWait(cid)
|
|
|
|
except CatchableError as exc:
|
|
|
|
trace "Exception queueing discovery request", exc = exc.msg
|
2022-05-18 20:29:15 -06:00
|
|
|
|
|
|
|
proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
|
2022-11-14 18:01:05 -06:00
|
|
|
for cid in cids:
|
|
|
|
if cid notin b.advertiseQueue:
|
|
|
|
try:
|
|
|
|
b.advertiseQueue.putNoWait(cid)
|
|
|
|
except CatchableError as exc:
|
2024-04-30 11:31:06 +02:00
|
|
|
warn "Exception queueing discovery request", exc = exc.msg
|
2022-05-18 20:29:15 -06:00
|
|
|
|
|
|
|
proc start*(b: DiscoveryEngine) {.async.} =
|
|
|
|
## Start the discengine task
|
|
|
|
##
|
|
|
|
|
2022-11-14 18:01:05 -06:00
|
|
|
trace "Discovery engine start"
|
2022-05-18 20:29:15 -06:00
|
|
|
|
|
|
|
if b.discEngineRunning:
|
|
|
|
warn "Starting discovery engine twice"
|
|
|
|
return
|
|
|
|
|
|
|
|
b.discEngineRunning = true
|
|
|
|
for i in 0..<b.concurrentAdvReqs:
|
|
|
|
b.advertiseTasks.add(advertiseTaskLoop(b))
|
|
|
|
|
|
|
|
for i in 0..<b.concurrentDiscReqs:
|
|
|
|
b.discoveryTasks.add(discoveryTaskLoop(b))
|
|
|
|
|
|
|
|
b.advertiseLoop = advertiseQueueLoop(b)
|
|
|
|
b.discoveryLoop = discoveryQueueLoop(b)
|
|
|
|
|
|
|
|
proc stop*(b: DiscoveryEngine) {.async.} =
|
|
|
|
## Stop the discovery engine
|
|
|
|
##
|
|
|
|
|
|
|
|
trace "Discovery engine stop"
|
|
|
|
if not b.discEngineRunning:
|
|
|
|
warn "Stopping discovery engine without starting it"
|
|
|
|
return
|
|
|
|
|
|
|
|
b.discEngineRunning = false
|
2023-03-10 08:02:54 +01:00
|
|
|
for task in b.advertiseTasks:
|
|
|
|
if not task.finished:
|
2022-05-18 20:29:15 -06:00
|
|
|
trace "Awaiting advertise task to stop"
|
2023-03-10 08:02:54 +01:00
|
|
|
await task.cancelAndWait()
|
2022-05-18 20:29:15 -06:00
|
|
|
trace "Advertise task stopped"
|
|
|
|
|
2023-03-10 08:02:54 +01:00
|
|
|
for task in b.discoveryTasks:
|
|
|
|
if not task.finished:
|
2022-05-18 20:29:15 -06:00
|
|
|
trace "Awaiting discovery task to stop"
|
2023-03-10 08:02:54 +01:00
|
|
|
await task.cancelAndWait()
|
2022-05-18 20:29:15 -06:00
|
|
|
trace "Discovery task stopped"
|
|
|
|
|
|
|
|
if not b.advertiseLoop.isNil and not b.advertiseLoop.finished:
|
|
|
|
trace "Awaiting advertise loop to stop"
|
|
|
|
await b.advertiseLoop.cancelAndWait()
|
|
|
|
trace "Advertise loop stopped"
|
|
|
|
|
|
|
|
if not b.discoveryLoop.isNil and not b.discoveryLoop.finished:
|
|
|
|
trace "Awaiting discovery loop to stop"
|
|
|
|
await b.discoveryLoop.cancelAndWait()
|
|
|
|
trace "Discovery loop stopped"
|
|
|
|
|
|
|
|
trace "Discovery engine stopped"
|
|
|
|
|
|
|
|
proc new*(
|
2023-06-22 08:11:18 -07:00
|
|
|
T: type DiscoveryEngine,
|
|
|
|
localStore: BlockStore,
|
|
|
|
peers: PeerCtxStore,
|
|
|
|
network: BlockExcNetwork,
|
|
|
|
discovery: Discovery,
|
|
|
|
pendingBlocks: PendingBlocksManager,
|
|
|
|
concurrentAdvReqs = DefaultConcurrentAdvertRequests,
|
|
|
|
concurrentDiscReqs = DefaultConcurrentDiscRequests,
|
|
|
|
discoveryLoopSleep = DefaultDiscoveryLoopSleep,
|
|
|
|
advertiseLoopSleep = DefaultAdvertiseLoopSleep,
|
|
|
|
minPeersPerBlock = DefaultMinPeersPerBlock,
|
2024-05-09 10:03:35 +02:00
|
|
|
advertiseType = BlockType.Manifest
|
2023-06-22 08:11:18 -07:00
|
|
|
): DiscoveryEngine =
|
|
|
|
## Create a discovery engine instance for advertising services
|
2023-08-22 08:35:16 +02:00
|
|
|
##
|
2023-06-22 08:11:18 -07:00
|
|
|
DiscoveryEngine(
|
2022-05-18 20:29:15 -06:00
|
|
|
localStore: localStore,
|
|
|
|
peers: peers,
|
|
|
|
network: network,
|
|
|
|
discovery: discovery,
|
|
|
|
pendingBlocks: pendingBlocks,
|
|
|
|
concurrentAdvReqs: concurrentAdvReqs,
|
|
|
|
concurrentDiscReqs: concurrentDiscReqs,
|
|
|
|
advertiseQueue: newAsyncQueue[Cid](concurrentAdvReqs),
|
|
|
|
discoveryQueue: newAsyncQueue[Cid](concurrentDiscReqs),
|
|
|
|
inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](),
|
|
|
|
inFlightAdvReqs: initTable[Cid, Future[void]](),
|
|
|
|
discoveryLoopSleep: discoveryLoopSleep,
|
|
|
|
advertiseLoopSleep: advertiseLoopSleep,
|
2022-12-02 18:00:55 -06:00
|
|
|
minPeersPerBlock: minPeersPerBlock,
|
|
|
|
advertiseType: advertiseType)
|