hacky-crawl wip

This commit is contained in:
Ben 2025-02-06 15:32:39 +01:00
parent cc36252a45
commit 4d5f204f60
No known key found for this signature in database
GPG Key ID: 0F16E812E736C24B
6 changed files with 88 additions and 16 deletions

1
.gitignore vendored
View File

@ -15,4 +15,3 @@ NimBinaries
.vscode/*
*.exe
crawler_data
dht

View File

@ -1,4 +1,5 @@
import std/os
import std/sequtils
import pkg/chronicles
import pkg/chronos
import pkg/questionable
@ -45,7 +46,7 @@ proc createTypedDatastore(app: Application, path: string): ?!TypedDatastore =
return success(TypedDatastore.init(store))
proc initializeLists(app: Application): Future[?!void] {.async.} =
without store =? app.createTypedDatastore(app.config.dataDir), err:
without store =? app.createTypedDatastore(app.config.dataDir / "lists"), err:
return failure(err)
# We can't extract this into a function because gauges cannot be passed as argument.
@ -73,13 +74,17 @@ proc initializeLists(app: Application): Future[?!void] {.async.} =
return success()
proc initializeDht(app: Application): Future[?!void] {.async.} =
without dhtStore =? app.createDatastore("dht"), err:
without dhtStore =? app.createDatastore(app.config.dataDir / "dht"), err:
return failure(err)
let keyPath = app.config.dataDir / "privatekey"
without privateKey =? setupKey(keyPath), err:
return failure(err)
let announceAddresses = newSeq[MultiAddress]()
var announceAddresses = newSeq[MultiAddress]()
let aaa = MultiAddress.init("/ip4/172.21.64.1/udp/8090").expect("Should init multiaddress")
# /ip4/45.82.185.194/udp/8090
# /ip4/172.21.64.1/udp/8090
announceAddresses.add(aaa)
app.dht = Dht.new(
privateKey,
@ -103,6 +108,26 @@ proc initializeApp(app: Application): Future[?!void] {.async.} =
return success()
proc hackyCrawl(app: Application) {.async.} =
info "starting hacky crawl..."
await sleepAsync(3000)
var nodeIds = await app.dht.getRoutingTableNodeIds()
trace "starting with routing table nodes", nodes = nodeIds.len
while app.status == ApplicationStatus.Running:
let nodeId = nodeIds[0]
nodeIds.delete(0)
without newNodes =? (await app.dht.getNeighbors(nodeId)), err:
error "getneighbors failed", err = err.msg
trace "adding new nodes", len = newNodes.len
for id in newNodes.mapIt(it.id):
nodeIds.add(id)
await sleepAsync(1000)
proc stop*(app: Application) =
app.status = ApplicationStatus.Stopping
waitFor app.dht.stop()
@ -128,6 +153,8 @@ proc run*(app: Application) =
error "Failed to start application", err = err.msg
return
asyncSpawn app.hackyCrawl()
while app.status == ApplicationStatus.Running:
try:
chronos.poll()

View File

@ -47,6 +47,9 @@ proc getDefaultTestnetBootNodes(): seq[string] =
"spr:CiUIAhIhAzZn3JmJab46BNjadVnLNQKbhnN3eYxwqpteKYY32SbOEgIDARo8CicAJQgCEiEDNmfcmYlpvjoE2Np1Wcs1ApuGc3d5jHCqm14phjfZJs4QrvWesAYaCwoJBKpA-TaRAnViKkcwRQIhANuMmZDD2c25xzTbKSirEpkZYoxbq-FU_lpI0K0e4mIVAiBfQX4yR47h1LCnHznXgDs6xx5DLO5q3lUcicqUeaqGeg",
"spr:CiUIAhIhAgybmRwboqDdUJjeZrzh43sn5mp8jt6ENIb08tLn4x01EgIDARo8CicAJQgCEiECDJuZHBuioN1QmN5mvOHjeyfmanyO3oQ0hvTy0ufjHTUQh4ifsAYaCwoJBI_0zSiRAnVsKkcwRQIhAJCb_z0E3RsnQrEePdJzMSQrmn_ooHv6mbw1DOh5IbVNAiBbBJrWR8eBV6ftzMd6ofa5khNA2h88OBhMqHCIzSjCeA",
"spr:CiUIAhIhAntGLadpfuBCD9XXfiN_43-V3L5VWgFCXxg4a8uhDdnYEgIDARo8CicAJQgCEiECe0Ytp2l-4EIP1dd-I3_jf5XcvlVaAUJfGDhry6EN2dgQsIufsAYaCwoJBNEmoCiRAnV2KkYwRAIgXO3bzd5VF8jLZG8r7dcLJ_FnQBYp1BcxrOvovEa40acCIDhQ14eJRoPwJ6GKgqOkXdaFAsoszl-HIRzYcXKeb7D9",
"spr:CiUIAhIhA2AEPzVj1Z_pshWAwvTp0xvRZTigIkYphXGZdiYGmYRwEgIDARo8CicAJQgCEiEDYAQ_NWPVn-myFYDC9OnTG9FlOKAiRimFcZl2JgaZhHAQvKCXugYaCwoJBES3CuORAnd-KkYwRAIgNwrc7n8A107pYUoWfJxL8X0f-flfUKeA6bFrjVKzEo0CID_0q-KO5ZAGf65VsK-d9rV3S0PbFg7Hj3Cv4aVX2Lnn",
"spr:CiUIAhIhAuhggJhkjeRoR7MHjZ_L_naZKnjF541X0GXTI7LEwXi_EgIDARo8CicAJQgCEiEC6GCAmGSN5GhHsweNn8v-dpkqeMXnjVfQZdMjssTBeL8Qop2quwYaCwoJBJK-4V-RAncuKkYwRAIgaXWoxvKkzrjUZ5K_ayQHKNlYhUEzBXhGviujxfJiGXkCICbsYFivi6Ny1FT6tbofVBRj7lnaR3K9_3j5pUT4862k",
"spr:CiUIAhIhA-pnA5sLGDVbqEXsRxDUjQEpiSAximHNbyqr2DwLmTq8EgIDARo8CicAJQgCEiED6mcDmwsYNVuoRexHENSNASmJIDGKYc1vKqvYPAuZOrwQyrekvAYaCwoJBIDHOw-RAnc4KkcwRQIhAJtKNeTykcE5bkKwe-vhSmqyBwc2AnexqFX1tAQGLQJ4AiBJOPseqvI3PyEM8l3hY3zvelZU9lT03O7MA_8cUfF4Uw"
]
proc getBootNodeStrings(input: string): seq[string] =

View File

@ -34,10 +34,53 @@ type Dht* = ref object
# readUintBE[256](keccak256.digest(host.toArray).data)
proc getNode*(d: Dht, nodeId: NodeId): ?!Node =
let node = d.protocol.getNode(nodeId)
if node.isSome():
return success(node.get())
return failure("Node not found for id: " & $nodeId)
proc hacky*(d: Dht, nodeId: NodeId) {.async.} =
let node = await d.protocol.resolve(nodeId)
if node.isSome():
info "that worked"
else:
info "that didn't work"
proc getRoutingTableNodeIds*(d: Dht): Future[seq[NodeId]] {.async.} =
var ids = newSeq[NodeId]()
for bucket in d.protocol.routingTable.buckets:
for node in bucket.nodes:
warn "node seen", node = $node.id, seen = $node.seen
ids.add(node.id)
# await d.hacky(node.id)
await sleepAsync(1)
return ids
proc getDistances(): seq[uint16] =
var d = newSeq[uint16]()
for i in 0..10:
d.add(i.uint16)
return d
proc getNeighbors*(d: Dht, target: NodeId): Future[?!seq[Node]] {.async.} =
without node =? d.getNode(target), err:
return failure(err)
let distances = getDistances()
let response = await d.protocol.findNode(node, distances)
if response.isOk():
let nodes = response.get()
if nodes.len > 0:
return success(nodes)
# Both returning 0 nodes and a failure result are treated as failure of getNeighbors
return failure("No nodes returned")
proc findPeer*(d: Dht, peerId: PeerId): Future[?PeerRecord] {.async.} =
trace "protocol.resolve..."
## Find peer using the given Discovery object
##
let node = await d.protocol.resolve(toNodeId(peerId))
return
@ -90,15 +133,13 @@ proc new*(
self.updateAnnounceRecord(announceAddrs)
# --------------------------------------------------------------------------
# FIXME disable IP limits temporarily so we can run our workshop. Re-enable
# and figure out proper solution.
# This disables IP limits:
let discoveryConfig = DiscoveryConfig(
tableIpLimits: TableIpLimits(tableIpLimit: high(uint), bucketIpLimit: high(uint)),
bitsPerHop: DefaultBitsPerHop,
)
# --------------------------------------------------------------------------
trace "Creating DHT protocol", ip = $bindIp, port = $bindPort
self.protocol = newProtocol(
key,
bindIp = bindIp,

View File

@ -8,6 +8,7 @@ import pkg/stew/endians2
import pkg/questionable
import pkg/questionable/results
import std/sets
import std/strutils
import std/os
@ -22,7 +23,7 @@ type
List* = ref object
name: string
store: TypedDatastore
items: seq[NodeEntry]
items: HashSet[NodeEntry]
onMetric: OnUpdateMetric
proc encode(s: NodeEntry): seq[byte] =
@ -57,18 +58,19 @@ proc load*(this: List): Future[?!void] {.async.} =
without value =? item.value, err:
return failure(err)
if value.id.len > 0:
this.items.add(value)
this.items.incl(value)
this.onMetric(this.items.len.int64)
info "Loaded list", name = this.name, items = this.items.len
return success()
proc new*(
_: type List, name: string, store: TypedDatastore, onMetric: OnUpdateMetric
): List =
List(name: name, store: store, items: newSeq[NodeEntry](), onMetric: onMetric)
List(name: name, store: store, onMetric: onMetric)
proc add*(this: List, item: NodeEntry): Future[?!void] {.async.} =
this.items.add(item)
this.items.incl(item)
this.onMetric(this.items.len.int64)
if err =? (await this.saveItem(item)).errorOption:

View File

@ -3,8 +3,8 @@
# switch("define", "chronicles_runtime_filtering=true")
# Sets TRACE logging for everything except DHT
switch("define", "chronicles_log_level=TRACE")
switch("define", "chronicles_disabled_topics:discv5")
switch("define", "chronicles_log_level=INFO")
# switch("define", "chronicles_disabled_topics:discv5")
when (NimMajor, NimMinor) >= (2, 0):
--mm: