From 4d5f204f607181b7230b9351914e43b2bf97429e Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 6 Feb 2025 15:32:39 +0100 Subject: [PATCH] hacky-crawl wip --- .gitignore | 1 - codexcrawler/application.nim | 33 ++++++++++++++++++++-- codexcrawler/config.nim | 3 ++ codexcrawler/dht.nim | 53 ++++++++++++++++++++++++++++++++---- codexcrawler/list.nim | 10 ++++--- config.nims | 4 +-- 6 files changed, 88 insertions(+), 16 deletions(-) diff --git a/.gitignore b/.gitignore index 6f79490..ca02be0 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,3 @@ NimBinaries .vscode/* *.exe crawler_data -dht diff --git a/codexcrawler/application.nim b/codexcrawler/application.nim index c4bb91a..238aa21 100644 --- a/codexcrawler/application.nim +++ b/codexcrawler/application.nim @@ -1,4 +1,5 @@ import std/os +import std/sequtils import pkg/chronicles import pkg/chronos import pkg/questionable @@ -45,7 +46,7 @@ proc createTypedDatastore(app: Application, path: string): ?!TypedDatastore = return success(TypedDatastore.init(store)) proc initializeLists(app: Application): Future[?!void] {.async.} = - without store =? app.createTypedDatastore(app.config.dataDir), err: + without store =? app.createTypedDatastore(app.config.dataDir / "lists"), err: return failure(err) # We can't extract this into a function because gauges cannot be passed as argument. @@ -73,13 +74,17 @@ proc initializeLists(app: Application): Future[?!void] {.async.} = return success() proc initializeDht(app: Application): Future[?!void] {.async.} = - without dhtStore =? app.createDatastore("dht"), err: + without dhtStore =? app.createDatastore(app.config.dataDir / "dht"), err: return failure(err) let keyPath = app.config.dataDir / "privatekey" without privateKey =? setupKey(keyPath), err: return failure(err) - let announceAddresses = newSeq[MultiAddress]() + var announceAddresses = newSeq[MultiAddress]() + let aaa = MultiAddress.init("/ip4/172.21.64.1/udp/8090").expect("Should init multiaddress") + # /ip4/45.82.185.194/udp/8090 + # /ip4/172.21.64.1/udp/8090 + announceAddresses.add(aaa) app.dht = Dht.new( privateKey, @@ -103,6 +108,26 @@ proc initializeApp(app: Application): Future[?!void] {.async.} = return success() +proc hackyCrawl(app: Application) {.async.} = + info "starting hacky crawl..." + await sleepAsync(3000) + + var nodeIds = await app.dht.getRoutingTableNodeIds() + trace "starting with routing table nodes", nodes = nodeIds.len + + while app.status == ApplicationStatus.Running: + let nodeId = nodeIds[0] + nodeIds.delete(0) + + without newNodes =? (await app.dht.getNeighbors(nodeId)), err: + error "getneighbors failed", err = err.msg + + trace "adding new nodes", len = newNodes.len + for id in newNodes.mapIt(it.id): + nodeIds.add(id) + await sleepAsync(1000) + + proc stop*(app: Application) = app.status = ApplicationStatus.Stopping waitFor app.dht.stop() @@ -128,6 +153,8 @@ proc run*(app: Application) = error "Failed to start application", err = err.msg return + asyncSpawn app.hackyCrawl() + while app.status == ApplicationStatus.Running: try: chronos.poll() diff --git a/codexcrawler/config.nim b/codexcrawler/config.nim index c1ad7f2..1298051 100644 --- a/codexcrawler/config.nim +++ b/codexcrawler/config.nim @@ -47,6 +47,9 @@ proc getDefaultTestnetBootNodes(): seq[string] = "spr:CiUIAhIhAzZn3JmJab46BNjadVnLNQKbhnN3eYxwqpteKYY32SbOEgIDARo8CicAJQgCEiEDNmfcmYlpvjoE2Np1Wcs1ApuGc3d5jHCqm14phjfZJs4QrvWesAYaCwoJBKpA-TaRAnViKkcwRQIhANuMmZDD2c25xzTbKSirEpkZYoxbq-FU_lpI0K0e4mIVAiBfQX4yR47h1LCnHznXgDs6xx5DLO5q3lUcicqUeaqGeg", "spr:CiUIAhIhAgybmRwboqDdUJjeZrzh43sn5mp8jt6ENIb08tLn4x01EgIDARo8CicAJQgCEiECDJuZHBuioN1QmN5mvOHjeyfmanyO3oQ0hvTy0ufjHTUQh4ifsAYaCwoJBI_0zSiRAnVsKkcwRQIhAJCb_z0E3RsnQrEePdJzMSQrmn_ooHv6mbw1DOh5IbVNAiBbBJrWR8eBV6ftzMd6ofa5khNA2h88OBhMqHCIzSjCeA", "spr:CiUIAhIhAntGLadpfuBCD9XXfiN_43-V3L5VWgFCXxg4a8uhDdnYEgIDARo8CicAJQgCEiECe0Ytp2l-4EIP1dd-I3_jf5XcvlVaAUJfGDhry6EN2dgQsIufsAYaCwoJBNEmoCiRAnV2KkYwRAIgXO3bzd5VF8jLZG8r7dcLJ_FnQBYp1BcxrOvovEa40acCIDhQ14eJRoPwJ6GKgqOkXdaFAsoszl-HIRzYcXKeb7D9", + "spr:CiUIAhIhA2AEPzVj1Z_pshWAwvTp0xvRZTigIkYphXGZdiYGmYRwEgIDARo8CicAJQgCEiEDYAQ_NWPVn-myFYDC9OnTG9FlOKAiRimFcZl2JgaZhHAQvKCXugYaCwoJBES3CuORAnd-KkYwRAIgNwrc7n8A107pYUoWfJxL8X0f-flfUKeA6bFrjVKzEo0CID_0q-KO5ZAGf65VsK-d9rV3S0PbFg7Hj3Cv4aVX2Lnn", + "spr:CiUIAhIhAuhggJhkjeRoR7MHjZ_L_naZKnjF541X0GXTI7LEwXi_EgIDARo8CicAJQgCEiEC6GCAmGSN5GhHsweNn8v-dpkqeMXnjVfQZdMjssTBeL8Qop2quwYaCwoJBJK-4V-RAncuKkYwRAIgaXWoxvKkzrjUZ5K_ayQHKNlYhUEzBXhGviujxfJiGXkCICbsYFivi6Ny1FT6tbofVBRj7lnaR3K9_3j5pUT4862k", + "spr:CiUIAhIhA-pnA5sLGDVbqEXsRxDUjQEpiSAximHNbyqr2DwLmTq8EgIDARo8CicAJQgCEiED6mcDmwsYNVuoRexHENSNASmJIDGKYc1vKqvYPAuZOrwQyrekvAYaCwoJBIDHOw-RAnc4KkcwRQIhAJtKNeTykcE5bkKwe-vhSmqyBwc2AnexqFX1tAQGLQJ4AiBJOPseqvI3PyEM8l3hY3zvelZU9lT03O7MA_8cUfF4Uw" ] proc getBootNodeStrings(input: string): seq[string] = diff --git a/codexcrawler/dht.nim b/codexcrawler/dht.nim index ceb3753..b93e175 100644 --- a/codexcrawler/dht.nim +++ b/codexcrawler/dht.nim @@ -34,10 +34,53 @@ type Dht* = ref object # readUintBE[256](keccak256.digest(host.toArray).data) +proc getNode*(d: Dht, nodeId: NodeId): ?!Node = + let node = d.protocol.getNode(nodeId) + if node.isSome(): + return success(node.get()) + return failure("Node not found for id: " & $nodeId) + +proc hacky*(d: Dht, nodeId: NodeId) {.async.} = + let node = await d.protocol.resolve(nodeId) + if node.isSome(): + info "that worked" + else: + info "that didn't work" + +proc getRoutingTableNodeIds*(d: Dht): Future[seq[NodeId]] {.async.} = + var ids = newSeq[NodeId]() + for bucket in d.protocol.routingTable.buckets: + for node in bucket.nodes: + warn "node seen", node = $node.id, seen = $node.seen + ids.add(node.id) + + # await d.hacky(node.id) + await sleepAsync(1) + return ids + +proc getDistances(): seq[uint16] = + var d = newSeq[uint16]() + for i in 0..10: + d.add(i.uint16) + return d + +proc getNeighbors*(d: Dht, target: NodeId): Future[?!seq[Node]] {.async.} = + without node =? d.getNode(target), err: + return failure(err) + + let distances = getDistances() + let response = await d.protocol.findNode(node, distances) + + if response.isOk(): + let nodes = response.get() + if nodes.len > 0: + return success(nodes) + + # Both returning 0 nodes and a failure result are treated as failure of getNeighbors + return failure("No nodes returned") + proc findPeer*(d: Dht, peerId: PeerId): Future[?PeerRecord] {.async.} = trace "protocol.resolve..." - ## Find peer using the given Discovery object - ## let node = await d.protocol.resolve(toNodeId(peerId)) return @@ -90,15 +133,13 @@ proc new*( self.updateAnnounceRecord(announceAddrs) - # -------------------------------------------------------------------------- - # FIXME disable IP limits temporarily so we can run our workshop. Re-enable - # and figure out proper solution. + # This disables IP limits: let discoveryConfig = DiscoveryConfig( tableIpLimits: TableIpLimits(tableIpLimit: high(uint), bucketIpLimit: high(uint)), bitsPerHop: DefaultBitsPerHop, ) - # -------------------------------------------------------------------------- + trace "Creating DHT protocol", ip = $bindIp, port = $bindPort self.protocol = newProtocol( key, bindIp = bindIp, diff --git a/codexcrawler/list.nim b/codexcrawler/list.nim index 25ddfb8..16ff445 100644 --- a/codexcrawler/list.nim +++ b/codexcrawler/list.nim @@ -8,6 +8,7 @@ import pkg/stew/endians2 import pkg/questionable import pkg/questionable/results +import std/sets import std/strutils import std/os @@ -22,7 +23,7 @@ type List* = ref object name: string store: TypedDatastore - items: seq[NodeEntry] + items: HashSet[NodeEntry] onMetric: OnUpdateMetric proc encode(s: NodeEntry): seq[byte] = @@ -57,18 +58,19 @@ proc load*(this: List): Future[?!void] {.async.} = without value =? item.value, err: return failure(err) if value.id.len > 0: - this.items.add(value) + this.items.incl(value) + this.onMetric(this.items.len.int64) info "Loaded list", name = this.name, items = this.items.len return success() proc new*( _: type List, name: string, store: TypedDatastore, onMetric: OnUpdateMetric ): List = - List(name: name, store: store, items: newSeq[NodeEntry](), onMetric: onMetric) + List(name: name, store: store, onMetric: onMetric) proc add*(this: List, item: NodeEntry): Future[?!void] {.async.} = - this.items.add(item) + this.items.incl(item) this.onMetric(this.items.len.int64) if err =? (await this.saveItem(item)).errorOption: diff --git a/config.nims b/config.nims index d47ab5b..78b62c3 100644 --- a/config.nims +++ b/config.nims @@ -3,8 +3,8 @@ # switch("define", "chronicles_runtime_filtering=true") # Sets TRACE logging for everything except DHT -switch("define", "chronicles_log_level=TRACE") -switch("define", "chronicles_disabled_topics:discv5") +switch("define", "chronicles_log_level=INFO") +# switch("define", "chronicles_disabled_topics:discv5") when (NimMajor, NimMinor) >= (2, 0): --mm: