mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-02 13:33:08 +00:00
creates instance of codex dht
This commit is contained in:
parent
82615952cb
commit
44f7c9fc5d
@ -12,6 +12,8 @@ import ./config
|
||||
import ./logging
|
||||
import ./metrics
|
||||
import ./list
|
||||
import ./dht
|
||||
import ./keyutils
|
||||
|
||||
declareGauge(todoNodesGauge, "DHT nodes to be visited")
|
||||
declareGauge(okNodesGauge, "DHT nodes successfully contacted")
|
||||
@ -29,15 +31,21 @@ type
|
||||
todoList*: List
|
||||
okNodes*: List
|
||||
nokNodes*: List
|
||||
dht*: Dht
|
||||
|
||||
proc createDatastore(app: Application): ?!TypedDatastore =
|
||||
without store =? LevelDbDatastore.new(app.config.dataDir), err:
|
||||
proc createDatastore(app: Application, path: string): ?!Datastore =
|
||||
without store =? LevelDbDatastore.new(path), err:
|
||||
error "Failed to create datastore"
|
||||
return failure(err)
|
||||
return success(Datastore(store))
|
||||
|
||||
proc createTypedDatastore(app: Application, path: string): ?!TypedDatastore =
|
||||
without store =? app.createDatastore(path), err:
|
||||
return failure(err)
|
||||
return success(TypedDatastore.init(store))
|
||||
|
||||
proc initializeLists(app: Application): Future[?!void] {.async.} =
|
||||
without store =? app.createDatastore(), err:
|
||||
without store =? app.createTypedDatastore(app.config.dataDir), err:
|
||||
return failure(err)
|
||||
|
||||
# We can't extract this into a function because gauges cannot be passed as argument.
|
||||
@ -64,10 +72,33 @@ proc initializeLists(app: Application): Future[?!void] {.async.} =
|
||||
|
||||
return success()
|
||||
|
||||
proc initializeDht(app: Application): ?!void =
|
||||
without dhtStore =? app.createDatastore("dht"), err:
|
||||
return failure(err)
|
||||
let keyPath = app.config.dataDir / "privatekey"
|
||||
without privateKey =? setupKey(keyPath), err:
|
||||
return failure(err)
|
||||
|
||||
let announceAddresses = newSeq[MultiAddress]()
|
||||
|
||||
app.dht = Dht.new(
|
||||
privateKey,
|
||||
bindPort = app.config.discPort,
|
||||
announceAddrs = announceAddresses,
|
||||
bootstrapNodes = app.config.bootNodes,
|
||||
store = dhtStore
|
||||
)
|
||||
return success()
|
||||
|
||||
proc initializeApp(app: Application): Future[?!void] {.async.} =
|
||||
if err =? (await app.initializeLists()).errorOption:
|
||||
error "Failed to initialize lists", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
if err =? app.initializeDht().errorOption:
|
||||
error "Failed to initialize DHT", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
return success()
|
||||
|
||||
proc stop*(app: Application) =
|
||||
|
||||
@ -1,4 +1,8 @@
|
||||
import std/net
|
||||
import std/sequtils
|
||||
import pkg/chronicles
|
||||
import pkg/libp2p
|
||||
import pkg/codexdht
|
||||
import ./version
|
||||
|
||||
let doc =
|
||||
@ -26,13 +30,13 @@ type CrawlerConfig* = ref object
|
||||
metricsPort*: Port
|
||||
dataDir*: string
|
||||
discPort*: Port
|
||||
bootNodes*: seq[string]
|
||||
bootNodes*: seq[SignedPeerRecord]
|
||||
|
||||
proc `$`*(config: CrawlerConfig): string =
|
||||
"CrawlerConfig:" & " logLevel=" & config.logLevel & " metricsAddress=" &
|
||||
$config.metricsAddress & " metricsPort=" & $config.metricsPort & " dataDir=" &
|
||||
config.dataDir & " discPort=" & $config.discPort & " bootNodes=" &
|
||||
config.bootNodes.join(";")
|
||||
config.bootNodes.mapIt($it).join(";")
|
||||
|
||||
proc getDefaultTestnetBootNodes(): seq[string] =
|
||||
@[
|
||||
@ -45,11 +49,28 @@ proc getDefaultTestnetBootNodes(): seq[string] =
|
||||
"spr:CiUIAhIhAntGLadpfuBCD9XXfiN_43-V3L5VWgFCXxg4a8uhDdnYEgIDARo8CicAJQgCEiECe0Ytp2l-4EIP1dd-I3_jf5XcvlVaAUJfGDhry6EN2dgQsIufsAYaCwoJBNEmoCiRAnV2KkYwRAIgXO3bzd5VF8jLZG8r7dcLJ_FnQBYp1BcxrOvovEa40acCIDhQ14eJRoPwJ6GKgqOkXdaFAsoszl-HIRzYcXKeb7D9",
|
||||
]
|
||||
|
||||
proc getBootNodes(input: string): seq[string] =
|
||||
proc getBootNodeStrings(input: string): seq[string] =
|
||||
if input == "testnet_sprs":
|
||||
return getDefaultTestnetBootNodes()
|
||||
return input.split(";")
|
||||
|
||||
proc stringToSpr(uri: string): SignedPeerRecord =
|
||||
var res: SignedPeerRecord
|
||||
try:
|
||||
if not res.fromURI(uri):
|
||||
warn "Invalid SignedPeerRecord uri", uri = uri
|
||||
quit QuitFailure
|
||||
except LPError as exc:
|
||||
warn "Invalid SignedPeerRecord uri", uri = uri, error = exc.msg
|
||||
quit QuitFailure
|
||||
except CatchableError as exc:
|
||||
warn "Invalid SignedPeerRecord uri", uri = uri, error = exc.msg
|
||||
quit QuitFailure
|
||||
res
|
||||
|
||||
proc getBootNodes(input: string): seq[SignedPeerRecord] =
|
||||
getBootNodeStrings(input).mapIt(stringToSpr(it))
|
||||
|
||||
proc parseConfig*(): CrawlerConfig =
|
||||
let args = docopt(doc, version = crawlerFullVersion)
|
||||
|
||||
|
||||
114
codexcrawler/dht.nim
Normal file
114
codexcrawler/dht.nim
Normal file
@ -0,0 +1,114 @@
|
||||
import std/net
|
||||
import pkg/chronicles
|
||||
import pkg/chronos
|
||||
import pkg/libp2p
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/codexdht/discv5/[routing_table, protocol as discv5]
|
||||
from pkg/nimcrypto import keccak256
|
||||
|
||||
import ./rng
|
||||
|
||||
export discv5
|
||||
|
||||
logScope:
|
||||
topics = "dht"
|
||||
|
||||
type Dht* = ref object
|
||||
protocol*: discv5.Protocol
|
||||
key: PrivateKey
|
||||
peerId: PeerId
|
||||
announceAddrs*: seq[MultiAddress]
|
||||
providerRecord*: ?SignedPeerRecord
|
||||
dhtRecord*: ?SignedPeerRecord
|
||||
|
||||
# proc toNodeId*(cid: Cid): NodeId =
|
||||
# ## Cid to discovery id
|
||||
# ##
|
||||
|
||||
# readUintBE[256](keccak256.digest(cid.data.buffer).data)
|
||||
|
||||
# proc toNodeId*(host: ca.Address): NodeId =
|
||||
# ## Eth address to discovery id
|
||||
# ##
|
||||
|
||||
# readUintBE[256](keccak256.digest(host.toArray).data)
|
||||
|
||||
proc findPeer*(d: Dht, peerId: PeerId): Future[?PeerRecord] {.async.} =
|
||||
trace "protocol.resolve..."
|
||||
## Find peer using the given Discovery object
|
||||
##
|
||||
let node = await d.protocol.resolve(toNodeId(peerId))
|
||||
|
||||
return
|
||||
if node.isSome():
|
||||
node.get().record.data.some
|
||||
else:
|
||||
PeerRecord.none
|
||||
|
||||
method removeProvider*(d: Dht, peerId: PeerId): Future[void] {.base, gcsafe.} =
|
||||
trace "Removing provider", peerId
|
||||
d.protocol.removeProvidersLocal(peerId)
|
||||
|
||||
proc updateAnnounceRecord*(d: Dht, addrs: openArray[MultiAddress]) =
|
||||
d.announceAddrs = @addrs
|
||||
|
||||
trace "Updating announce record", addrs = d.announceAddrs
|
||||
d.providerRecord = SignedPeerRecord
|
||||
.init(d.key, PeerRecord.init(d.peerId, d.announceAddrs))
|
||||
.expect("Should construct signed record").some
|
||||
|
||||
if not d.protocol.isNil:
|
||||
d.protocol.updateRecord(d.providerRecord).expect("Should update SPR")
|
||||
|
||||
proc updateDhtRecord*(d: Dht, addrs: openArray[MultiAddress]) =
|
||||
trace "Updating Dht record", addrs = addrs
|
||||
d.dhtRecord = SignedPeerRecord
|
||||
.init(d.key, PeerRecord.init(d.peerId, @addrs))
|
||||
.expect("Should construct signed record").some
|
||||
|
||||
if not d.protocol.isNil:
|
||||
d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR")
|
||||
|
||||
proc start*(d: Dht) {.async.} =
|
||||
d.protocol.open()
|
||||
await d.protocol.start()
|
||||
|
||||
proc stop*(d: Dht) {.async.} =
|
||||
await d.protocol.closeWait()
|
||||
|
||||
proc new*(
|
||||
T: type Dht,
|
||||
key: PrivateKey,
|
||||
bindIp = IPv4_any(),
|
||||
bindPort = 0.Port,
|
||||
announceAddrs: openArray[MultiAddress],
|
||||
bootstrapNodes: openArray[SignedPeerRecord] = [],
|
||||
store: Datastore = SQLiteDatastore.new(Memory).expect("Should not fail!"),
|
||||
): Dht =
|
||||
var self =
|
||||
Dht(key: key, peerId: PeerId.init(key).expect("Should construct PeerId"))
|
||||
|
||||
self.updateAnnounceRecord(announceAddrs)
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# FIXME disable IP limits temporarily so we can run our workshop. Re-enable
|
||||
# and figure out proper solution.
|
||||
let discoveryConfig = DiscoveryConfig(
|
||||
tableIpLimits: TableIpLimits(tableIpLimit: high(uint), bucketIpLimit: high(uint)),
|
||||
bitsPerHop: DefaultBitsPerHop,
|
||||
)
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
self.protocol = newProtocol(
|
||||
key,
|
||||
bindIp = bindIp,
|
||||
bindPort = bindPort,
|
||||
record = self.providerRecord.get,
|
||||
bootstrapRecords = bootstrapNodes,
|
||||
rng = Rng.instance(),
|
||||
providers = ProvidersManager.new(store),
|
||||
config = discoveryConfig,
|
||||
)
|
||||
|
||||
self
|
||||
68
codexcrawler/keyutils.nim
Normal file
68
codexcrawler/keyutils.nim
Normal file
@ -0,0 +1,68 @@
|
||||
import pkg/chronicles
|
||||
import pkg/questionable/results
|
||||
import pkg/libp2p/crypto/crypto
|
||||
import pkg/stew/io2
|
||||
|
||||
# This file is copied from nim-codex `codex/utils/keyutils.nim`
|
||||
|
||||
import ./rng
|
||||
|
||||
# from errors.nim:
|
||||
template mapFailure*[T, V, E](
|
||||
exp: Result[T, V], exc: typedesc[E]
|
||||
): Result[T, ref CatchableError] =
|
||||
## Convert `Result[T, E]` to `Result[E, ref CatchableError]`
|
||||
##
|
||||
|
||||
exp.mapErr(
|
||||
proc(e: V): ref CatchableError =
|
||||
(ref exc)(msg: $e)
|
||||
)
|
||||
|
||||
# from fileutils.nim
|
||||
when defined(windows):
|
||||
import stew/[windows/acl]
|
||||
|
||||
proc secureWriteFile*[T: byte | char](
|
||||
path: string, data: openArray[T]
|
||||
): IoResult[void] =
|
||||
when defined(windows):
|
||||
let sres = createFilesUserOnlySecurityDescriptor()
|
||||
if sres.isErr():
|
||||
error "Could not allocate security descriptor",
|
||||
path = path, errorMsg = ioErrorMsg(sres.error), errorCode = $sres.error
|
||||
err(sres.error)
|
||||
else:
|
||||
var sd = sres.get()
|
||||
writeFile(path, data, 0o600, secDescriptor = sd.getDescriptor())
|
||||
else:
|
||||
writeFile(path, data, 0o600)
|
||||
|
||||
proc checkSecureFile*(path: string): IoResult[bool] =
|
||||
when defined(windows):
|
||||
checkCurrentUserOnlyACL(path)
|
||||
else:
|
||||
ok (?getPermissionsSet(path) == {UserRead, UserWrite})
|
||||
|
||||
type
|
||||
KeyError* = object of CatchableError
|
||||
|
||||
proc setupKey*(path: string): ?!PrivateKey =
|
||||
if not path.fileAccessible({AccessFlags.Find}):
|
||||
info "Creating a private key and saving it"
|
||||
let
|
||||
res = ?PrivateKey.random(Rng.instance()[]).mapFailure(KeyError)
|
||||
bytes = ?res.getBytes().mapFailure(KeyError)
|
||||
|
||||
?path.secureWriteFile(bytes).mapFailure(KeyError)
|
||||
return PrivateKey.init(bytes).mapFailure(KeyError)
|
||||
|
||||
info "Found a network private key"
|
||||
if not ?checkSecureFile(path).mapFailure(KeyError):
|
||||
warn "The network private key file is not safe, aborting"
|
||||
return failure newException(
|
||||
KeyError, "The network private key file is not safe"
|
||||
)
|
||||
|
||||
let kb = ?path.readAllBytes().mapFailure(KeyError)
|
||||
return PrivateKey.init(kb).mapFailure(KeyError)
|
||||
49
codexcrawler/rng.nim
Normal file
49
codexcrawler/rng.nim
Normal file
@ -0,0 +1,49 @@
|
||||
import pkg/libp2p/crypto/crypto
|
||||
import pkg/bearssl/rand
|
||||
|
||||
# This file is copied from nim-codex `codex/rng.nim`
|
||||
|
||||
type
|
||||
RngSampleError = object of CatchableError
|
||||
Rng* = ref HmacDrbgContext
|
||||
|
||||
var rng {.threadvar.}: Rng
|
||||
|
||||
proc instance*(t: type Rng): Rng =
|
||||
if rng.isNil:
|
||||
rng = newRng()
|
||||
rng
|
||||
|
||||
# Random helpers: similar as in stdlib, but with HmacDrbgContext rng
|
||||
# TODO: Move these somewhere else?
|
||||
const randMax = 18_446_744_073_709_551_615'u64
|
||||
|
||||
proc rand*(rng: Rng, max: Natural): int =
|
||||
if max == 0:
|
||||
return 0
|
||||
|
||||
while true:
|
||||
let x = rng[].generate(uint64)
|
||||
if x < randMax - (randMax mod (uint64(max) + 1'u64)): # against modulo bias
|
||||
return int(x mod (uint64(max) + 1'u64))
|
||||
|
||||
proc sample*[T](rng: Rng, a: openArray[T]): T =
|
||||
result = a[rng.rand(a.high)]
|
||||
|
||||
proc sample*[T](
|
||||
rng: Rng, sample, exclude: openArray[T]
|
||||
): T {.raises: [Defect, RngSampleError].} =
|
||||
if sample == exclude:
|
||||
raise newException(RngSampleError, "Sample and exclude arrays are the same!")
|
||||
|
||||
while true:
|
||||
result = rng.sample(sample)
|
||||
if exclude.find(result) != -1:
|
||||
continue
|
||||
|
||||
break
|
||||
|
||||
proc shuffle*[T](rng: Rng, a: var openArray[T]) =
|
||||
for i in countdown(a.high, 1):
|
||||
let j = rng.rand(i)
|
||||
swap(a[i], a[j])
|
||||
@ -2,3 +2,6 @@
|
||||
metrics
|
||||
# switch("define", "chronicles_runtime_filtering=true")
|
||||
switch("define", "chronicles_log_level=TRACE")
|
||||
|
||||
when (NimMajor, NimMinor) >= (2, 0):
|
||||
--mm:refc
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user