From 44f7c9fc5dbc8fa71172edbe60021bb0822bace9 Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 6 Feb 2025 10:16:47 +0100 Subject: [PATCH] creates instance of codex dht --- codexcrawler/application.nim | 37 +++++++++++- codexcrawler/config.nim | 27 ++++++++- codexcrawler/dht.nim | 114 +++++++++++++++++++++++++++++++++++ codexcrawler/keyutils.nim | 68 +++++++++++++++++++++ codexcrawler/rng.nim | 49 +++++++++++++++ config.nims | 3 + 6 files changed, 292 insertions(+), 6 deletions(-) create mode 100644 codexcrawler/dht.nim create mode 100644 codexcrawler/keyutils.nim create mode 100644 codexcrawler/rng.nim diff --git a/codexcrawler/application.nim b/codexcrawler/application.nim index 1593a0c..ba3f3a2 100644 --- a/codexcrawler/application.nim +++ b/codexcrawler/application.nim @@ -12,6 +12,8 @@ import ./config import ./logging import ./metrics import ./list +import ./dht +import ./keyutils declareGauge(todoNodesGauge, "DHT nodes to be visited") declareGauge(okNodesGauge, "DHT nodes successfully contacted") @@ -29,15 +31,21 @@ type todoList*: List okNodes*: List nokNodes*: List + dht*: Dht -proc createDatastore(app: Application): ?!TypedDatastore = - without store =? LevelDbDatastore.new(app.config.dataDir), err: +proc createDatastore(app: Application, path: string): ?!Datastore = + without store =? LevelDbDatastore.new(path), err: error "Failed to create datastore" return failure(err) + return success(Datastore(store)) + +proc createTypedDatastore(app: Application, path: string): ?!TypedDatastore = + without store =? app.createDatastore(path), err: + return failure(err) return success(TypedDatastore.init(store)) proc initializeLists(app: Application): Future[?!void] {.async.} = - without store =? app.createDatastore(), err: + without store =? app.createTypedDatastore(app.config.dataDir), err: return failure(err) # We can't extract this into a function because gauges cannot be passed as argument. @@ -64,10 +72,33 @@ proc initializeLists(app: Application): Future[?!void] {.async.} = return success() +proc initializeDht(app: Application): ?!void = + without dhtStore =? app.createDatastore("dht"), err: + return failure(err) + let keyPath = app.config.dataDir / "privatekey" + without privateKey =? setupKey(keyPath), err: + return failure(err) + + let announceAddresses = newSeq[MultiAddress]() + + app.dht = Dht.new( + privateKey, + bindPort = app.config.discPort, + announceAddrs = announceAddresses, + bootstrapNodes = app.config.bootNodes, + store = dhtStore + ) + return success() + proc initializeApp(app: Application): Future[?!void] {.async.} = if err =? (await app.initializeLists()).errorOption: error "Failed to initialize lists", err = err.msg return failure(err) + + if err =? app.initializeDht().errorOption: + error "Failed to initialize DHT", err = err.msg + return failure(err) + return success() proc stop*(app: Application) = diff --git a/codexcrawler/config.nim b/codexcrawler/config.nim index 8f648f7..5185f4e 100644 --- a/codexcrawler/config.nim +++ b/codexcrawler/config.nim @@ -1,4 +1,8 @@ import std/net +import std/sequtils +import pkg/chronicles +import pkg/libp2p +import pkg/codexdht import ./version let doc = @@ -26,13 +30,13 @@ type CrawlerConfig* = ref object metricsPort*: Port dataDir*: string discPort*: Port - bootNodes*: seq[string] + bootNodes*: seq[SignedPeerRecord] proc `$`*(config: CrawlerConfig): string = "CrawlerConfig:" & " logLevel=" & config.logLevel & " metricsAddress=" & $config.metricsAddress & " metricsPort=" & $config.metricsPort & " dataDir=" & config.dataDir & " discPort=" & $config.discPort & " bootNodes=" & - config.bootNodes.join(";") + config.bootNodes.mapIt($it).join(";") proc getDefaultTestnetBootNodes(): seq[string] = @[ @@ -45,11 +49,28 @@ proc getDefaultTestnetBootNodes(): seq[string] = "spr:CiUIAhIhAntGLadpfuBCD9XXfiN_43-V3L5VWgFCXxg4a8uhDdnYEgIDARo8CicAJQgCEiECe0Ytp2l-4EIP1dd-I3_jf5XcvlVaAUJfGDhry6EN2dgQsIufsAYaCwoJBNEmoCiRAnV2KkYwRAIgXO3bzd5VF8jLZG8r7dcLJ_FnQBYp1BcxrOvovEa40acCIDhQ14eJRoPwJ6GKgqOkXdaFAsoszl-HIRzYcXKeb7D9", ] -proc getBootNodes(input: string): seq[string] = +proc getBootNodeStrings(input: string): seq[string] = if input == "testnet_sprs": return getDefaultTestnetBootNodes() return input.split(";") +proc stringToSpr(uri: string): SignedPeerRecord = + var res: SignedPeerRecord + try: + if not res.fromURI(uri): + warn "Invalid SignedPeerRecord uri", uri = uri + quit QuitFailure + except LPError as exc: + warn "Invalid SignedPeerRecord uri", uri = uri, error = exc.msg + quit QuitFailure + except CatchableError as exc: + warn "Invalid SignedPeerRecord uri", uri = uri, error = exc.msg + quit QuitFailure + res + +proc getBootNodes(input: string): seq[SignedPeerRecord] = + getBootNodeStrings(input).mapIt(stringToSpr(it)) + proc parseConfig*(): CrawlerConfig = let args = docopt(doc, version = crawlerFullVersion) diff --git a/codexcrawler/dht.nim b/codexcrawler/dht.nim new file mode 100644 index 0000000..37f30d2 --- /dev/null +++ b/codexcrawler/dht.nim @@ -0,0 +1,114 @@ +import std/net +import pkg/chronicles +import pkg/chronos +import pkg/libp2p +import pkg/questionable +import pkg/questionable/results +import pkg/codexdht/discv5/[routing_table, protocol as discv5] +from pkg/nimcrypto import keccak256 + +import ./rng + +export discv5 + +logScope: + topics = "dht" + +type Dht* = ref object + protocol*: discv5.Protocol + key: PrivateKey + peerId: PeerId + announceAddrs*: seq[MultiAddress] + providerRecord*: ?SignedPeerRecord + dhtRecord*: ?SignedPeerRecord + +# proc toNodeId*(cid: Cid): NodeId = +# ## Cid to discovery id +# ## + +# readUintBE[256](keccak256.digest(cid.data.buffer).data) + +# proc toNodeId*(host: ca.Address): NodeId = +# ## Eth address to discovery id +# ## + +# readUintBE[256](keccak256.digest(host.toArray).data) + +proc findPeer*(d: Dht, peerId: PeerId): Future[?PeerRecord] {.async.} = + trace "protocol.resolve..." + ## Find peer using the given Discovery object + ## + let node = await d.protocol.resolve(toNodeId(peerId)) + + return + if node.isSome(): + node.get().record.data.some + else: + PeerRecord.none + +method removeProvider*(d: Dht, peerId: PeerId): Future[void] {.base, gcsafe.} = + trace "Removing provider", peerId + d.protocol.removeProvidersLocal(peerId) + +proc updateAnnounceRecord*(d: Dht, addrs: openArray[MultiAddress]) = + d.announceAddrs = @addrs + + trace "Updating announce record", addrs = d.announceAddrs + d.providerRecord = SignedPeerRecord + .init(d.key, PeerRecord.init(d.peerId, d.announceAddrs)) + .expect("Should construct signed record").some + + if not d.protocol.isNil: + d.protocol.updateRecord(d.providerRecord).expect("Should update SPR") + +proc updateDhtRecord*(d: Dht, addrs: openArray[MultiAddress]) = + trace "Updating Dht record", addrs = addrs + d.dhtRecord = SignedPeerRecord + .init(d.key, PeerRecord.init(d.peerId, @addrs)) + .expect("Should construct signed record").some + + if not d.protocol.isNil: + d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR") + +proc start*(d: Dht) {.async.} = + d.protocol.open() + await d.protocol.start() + +proc stop*(d: Dht) {.async.} = + await d.protocol.closeWait() + +proc new*( + T: type Dht, + key: PrivateKey, + bindIp = IPv4_any(), + bindPort = 0.Port, + announceAddrs: openArray[MultiAddress], + bootstrapNodes: openArray[SignedPeerRecord] = [], + store: Datastore = SQLiteDatastore.new(Memory).expect("Should not fail!"), +): Dht = + var self = + Dht(key: key, peerId: PeerId.init(key).expect("Should construct PeerId")) + + self.updateAnnounceRecord(announceAddrs) + + # -------------------------------------------------------------------------- + # FIXME disable IP limits temporarily so we can run our workshop. Re-enable + # and figure out proper solution. + let discoveryConfig = DiscoveryConfig( + tableIpLimits: TableIpLimits(tableIpLimit: high(uint), bucketIpLimit: high(uint)), + bitsPerHop: DefaultBitsPerHop, + ) + # -------------------------------------------------------------------------- + + self.protocol = newProtocol( + key, + bindIp = bindIp, + bindPort = bindPort, + record = self.providerRecord.get, + bootstrapRecords = bootstrapNodes, + rng = Rng.instance(), + providers = ProvidersManager.new(store), + config = discoveryConfig, + ) + + self diff --git a/codexcrawler/keyutils.nim b/codexcrawler/keyutils.nim new file mode 100644 index 0000000..7030e5a --- /dev/null +++ b/codexcrawler/keyutils.nim @@ -0,0 +1,68 @@ +import pkg/chronicles +import pkg/questionable/results +import pkg/libp2p/crypto/crypto +import pkg/stew/io2 + +# This file is copied from nim-codex `codex/utils/keyutils.nim` + +import ./rng + +# from errors.nim: +template mapFailure*[T, V, E]( + exp: Result[T, V], exc: typedesc[E] +): Result[T, ref CatchableError] = + ## Convert `Result[T, E]` to `Result[E, ref CatchableError]` + ## + + exp.mapErr( + proc(e: V): ref CatchableError = + (ref exc)(msg: $e) + ) + +# from fileutils.nim +when defined(windows): + import stew/[windows/acl] + +proc secureWriteFile*[T: byte | char]( + path: string, data: openArray[T] +): IoResult[void] = + when defined(windows): + let sres = createFilesUserOnlySecurityDescriptor() + if sres.isErr(): + error "Could not allocate security descriptor", + path = path, errorMsg = ioErrorMsg(sres.error), errorCode = $sres.error + err(sres.error) + else: + var sd = sres.get() + writeFile(path, data, 0o600, secDescriptor = sd.getDescriptor()) + else: + writeFile(path, data, 0o600) + +proc checkSecureFile*(path: string): IoResult[bool] = + when defined(windows): + checkCurrentUserOnlyACL(path) + else: + ok (?getPermissionsSet(path) == {UserRead, UserWrite}) + +type + KeyError* = object of CatchableError + +proc setupKey*(path: string): ?!PrivateKey = + if not path.fileAccessible({AccessFlags.Find}): + info "Creating a private key and saving it" + let + res = ?PrivateKey.random(Rng.instance()[]).mapFailure(KeyError) + bytes = ?res.getBytes().mapFailure(KeyError) + + ?path.secureWriteFile(bytes).mapFailure(KeyError) + return PrivateKey.init(bytes).mapFailure(KeyError) + + info "Found a network private key" + if not ?checkSecureFile(path).mapFailure(KeyError): + warn "The network private key file is not safe, aborting" + return failure newException( + KeyError, "The network private key file is not safe" + ) + + let kb = ?path.readAllBytes().mapFailure(KeyError) + return PrivateKey.init(kb).mapFailure(KeyError) diff --git a/codexcrawler/rng.nim b/codexcrawler/rng.nim new file mode 100644 index 0000000..575a2b5 --- /dev/null +++ b/codexcrawler/rng.nim @@ -0,0 +1,49 @@ +import pkg/libp2p/crypto/crypto +import pkg/bearssl/rand + +# This file is copied from nim-codex `codex/rng.nim` + +type + RngSampleError = object of CatchableError + Rng* = ref HmacDrbgContext + +var rng {.threadvar.}: Rng + +proc instance*(t: type Rng): Rng = + if rng.isNil: + rng = newRng() + rng + +# Random helpers: similar as in stdlib, but with HmacDrbgContext rng +# TODO: Move these somewhere else? +const randMax = 18_446_744_073_709_551_615'u64 + +proc rand*(rng: Rng, max: Natural): int = + if max == 0: + return 0 + + while true: + let x = rng[].generate(uint64) + if x < randMax - (randMax mod (uint64(max) + 1'u64)): # against modulo bias + return int(x mod (uint64(max) + 1'u64)) + +proc sample*[T](rng: Rng, a: openArray[T]): T = + result = a[rng.rand(a.high)] + +proc sample*[T]( + rng: Rng, sample, exclude: openArray[T] +): T {.raises: [Defect, RngSampleError].} = + if sample == exclude: + raise newException(RngSampleError, "Sample and exclude arrays are the same!") + + while true: + result = rng.sample(sample) + if exclude.find(result) != -1: + continue + + break + +proc shuffle*[T](rng: Rng, a: var openArray[T]) = + for i in countdown(a.high, 1): + let j = rng.rand(i) + swap(a[i], a[j]) diff --git a/config.nims b/config.nims index 0b00ba9..1a79753 100644 --- a/config.nims +++ b/config.nims @@ -2,3 +2,6 @@ metrics # switch("define", "chronicles_runtime_filtering=true") switch("define", "chronicles_log_level=TRACE") + +when (NimMajor, NimMinor) >= (2, 0): + --mm:refc