diff --git a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim index 9987c52..ef207a9 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim @@ -205,6 +205,8 @@ proc addNode*(d: Protocol, r: SignedPeerRecord): bool = let node = newNode(r) if node.isOk(): return d.addNode(node[]) + else: + return false proc addNode*(d: Protocol, spr: SprUri): bool = ## Add `Node` from a SPR URI to discovery routing table. @@ -367,7 +369,6 @@ proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address, proc addProviderLocal(p: Protocol, cId: NodeId, prov: SignedPeerRecord) {.async.} = trace "adding provider to local db", n = p.localNode, cId, prov - if (let res = (await p.providers.add(cid, prov)); res.isErr): trace "Unable to add provider", cid, peerId = prov.data.peerId @@ -553,6 +554,7 @@ proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]): d.routingTable.setJustSeen(toNode) return ok(res) else: + trace "findNode nodes not OK." d.replaceNode(toNode) return err(nodes.error) @@ -691,6 +693,7 @@ proc lookup*(d: Protocol, target: NodeId, fast: bool = false): Future[seq[Node]] closestNodes.del(closestNodes.high()) d.lastLookup = now(chronos.Moment) + trace "Closest nodes", nodes = closestNodes.len return closestNodes proc addProvider*( @@ -699,7 +702,6 @@ proc addProvider*( pr: SignedPeerRecord): Future[seq[Node]] {.async.} = var res = await d.lookup(cId) - trace "lookup returned:", res # TODO: lookup is specified as not returning local, even if that is the closest. Is this OK? if res.len == 0: res.add(d.localNode) @@ -778,7 +780,6 @@ proc getProviders*( while providersFut.len > 0: let providersMsg = await one(providersFut) - # trace "Got providers response", providersMsg let index = providersFut.find(providersMsg) if index != -1: @@ -880,14 +881,17 @@ proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]] {.async.} = # TODO: Handle failures better. E.g. stop on different failures than timeout if request.isOk() and request[].len > 0: + trace "resolve (with local node found) is returning:", r = (request[][0]) return some(request[][0]) let discovered = await d.lookup(id) for n in discovered: if n.id == id: if node.isSome() and node.get().record.seqNum >= n.record.seqNum: + trace "resolve (lookup) found node, but local one has greater or equal seqNum" return node else: + trace "resolve (lookup) found new node with equal or greater seqNum", n = n return some(n) return node @@ -945,6 +949,7 @@ proc revalidateLoop(d: Protocol) {.async.} = traceAsyncErrors d.revalidateNode(n) except CancelledError: trace "revalidateLoop canceled" + trace "revalidator loop exited!" proc refreshLoop(d: Protocol) {.async.} = ## Loop that refreshes the routing table by starting a random query in case @@ -963,6 +968,7 @@ proc refreshLoop(d: Protocol) {.async.} = await sleepAsync(RefreshInterval) except CancelledError: trace "refreshLoop canceled" + trace "refreshloop exited!" proc ipMajorityLoop(d: Protocol) {.async.} = #TODO this should be handled by libp2p, not the DHT @@ -1011,6 +1017,7 @@ proc ipMajorityLoop(d: Protocol) {.async.} = await sleepAsync(IpMajorityInterval) except CancelledError: trace "ipMajorityLoop canceled" + trace "ipMajorityLoop exited!" func init*( T: type DiscoveryConfig, @@ -1025,76 +1032,6 @@ func init*( bitsPerHop: bitsPerHop ) -proc newProtocol*( - privKey: PrivateKey, - enrIp: Option[ValidIpAddress], - enrTcpPort, enrUdpPort: Option[Port], - localEnrFields: openArray[(string, seq[byte])] = [], - bootstrapRecords: openArray[SignedPeerRecord] = [], - previousRecord = none[SignedPeerRecord](), - bindPort: Port, - bindIp = IPv4_any(), - enrAutoUpdate = false, - config = defaultDiscoveryConfig, - rng = newRng(), - providers = ProvidersManager.new( - SQLiteDatastore.new(Memory) - .expect("Should not fail!"))): - Protocol = - # TODO: Tried adding bindPort = udpPort as parameter but that gave - # "Error: internal error: environment misses: udpPort" in nim-beacon-chain. - # Anyhow, nim-beacon-chain would also require some changes to support port - # remapping through NAT and this API is also subject to change once we - # introduce support for ipv4 + ipv6 binding/listening. - - # TODO: Implement SignedPeerRecord custom fields? - # let extraFields = mapIt(localEnrFields, toFieldPair(it[0], it[1])) - - # TODO: - # - Defect as is now or return a result for spr errors? - # - In case incorrect key, allow for new spr based on new key (new node id)? - var record: SignedPeerRecord - if previousRecord.isSome(): - record = previousRecord.get() - record.update(privKey, enrIp, enrTcpPort, enrUdpPort) - .expect("SignedPeerRecord within size limits and correct key") - else: - record = SignedPeerRecord.init(1, privKey, enrIp, enrTcpPort, enrUdpPort) - .expect("SignedPeerRecord within size limits") - - info "SPR initialized", ip = enrIp, tcp = enrTcpPort, udp = enrUdpPort, - seqNum = record.seqNum, uri = toURI(record) - if enrIp.isNone(): - if enrAutoUpdate: - notice "No external IP provided for the SPR, this node will not be " & - "discoverable until the SPR is updated with the discovered external IP address" - else: - warn "No external IP provided for the SPR, this node will not be discoverable" - - let node = newNode(record).expect("Properly initialized record") - - # TODO Consider whether this should be a Defect - doAssert rng != nil, "RNG initialization failed" - - let - routingTable = RoutingTable.init( - node, - config.bitsPerHop, - config.tableIpLimits, - rng) - - result = Protocol( - privateKey: privKey, - localNode: node, - bootstrapRecords: @bootstrapRecords, - ipVote: IpVote.init(), - enrAutoUpdate: enrAutoUpdate, - routingTable: routingTable, - rng: rng, - providers: providers) - - result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng) - proc newProtocol*( privKey: PrivateKey, bindPort: Port, @@ -1131,16 +1068,20 @@ proc newProtocol*( rng: rng, providers: providers) + trace "newProtocol initiated." result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng) proc open*(d: Protocol) {.raises: [Defect, CatchableError].} = info "Starting discovery node", node = d.localNode d.transport.open() + trace "Transport open." d.seedTable() + trace "Routing table seeded." proc start*(d: Protocol) {.async.} = + trace "Protocol start..." d.refreshLoop = refreshLoop(d) d.revalidateLoop = revalidateLoop(d) d.ipMajorityLoop = ipMajorityLoop(d) diff --git a/libp2pdht/private/eth/p2p/discoveryv5/routing_table.nim b/libp2pdht/private/eth/p2p/discoveryv5/routing_table.nim index 01c24a3..60ec69c 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/routing_table.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/routing_table.nim @@ -214,6 +214,7 @@ proc remove(k: KBucket, n: Node): bool = if k.nodes[i].seen: routing_table_nodes.dec(labelValues = ["seen"]) k.nodes.delete(i) + trace "removed node:", node = n true else: false @@ -316,12 +317,15 @@ proc addReplacement(r: var RoutingTable, k: KBucket, n: Node): NodeStatus = # gets moved to the tail. if k.replacementCache[nodeIdx].address.get().ip != n.address.get().ip: if not ipLimitInc(r, k, n): + trace "replace: ip limit reached" return IpLimitReached ipLimitDec(r, k, k.replacementCache[nodeIdx]) k.replacementCache.delete(nodeIdx) k.replacementCache.add(n) + trace "replace: already existed" return ReplacementExisting elif not ipLimitInc(r, k, n): + trace "replace: ip limit reached (2)" return IpLimitReached else: doAssert(k.replacementCache.len <= REPLACEMENT_CACHE_SIZE) @@ -332,6 +336,7 @@ proc addReplacement(r: var RoutingTable, k: KBucket, n: Node): NodeStatus = k.replacementCache.delete(0) k.replacementCache.add(n) + trace "replace: added" return ReplacementAdded proc addNode*(r: var RoutingTable, n: Node): NodeStatus = @@ -367,6 +372,7 @@ proc addNode*(r: var RoutingTable, n: Node): NodeStatus = # In case of a newer record, it gets replaced. if bucket.nodes[nodeIdx].address.get().ip != n.address.get().ip: if not ipLimitInc(r, bucket, n): + trace "Cannot add node. IP limit reached. (1)" return IpLimitReached ipLimitDec(r, bucket, bucket.nodes[nodeIdx]) # Copy over the seen status, we trust here that after the SPR update the @@ -393,6 +399,7 @@ proc addNode*(r: var RoutingTable, n: Node): NodeStatus = # immediately add nodes to the most recently seen spot. if bucket.len < BUCKET_SIZE: if not ipLimitInc(r, bucket, n): + trace "Cannot add node. IP limit reached. (2)" return IpLimitReached bucket.add(n) @@ -435,11 +442,12 @@ proc replaceNode*(r: var RoutingTable, n: Node) = proc getNode*(r: RoutingTable, id: NodeId): Option[Node] = ## Get the `Node` with `id` as `NodeId` from the routing table. - ## If no node with provided node id can be found,`none` is returned . + ## If no node with provided node id can be found,`none` is returned. let b = r.bucketForNode(id) for n in b.nodes: if n.id == id: return some(n) + trace "routingTable.getNode failed to find" proc contains*(r: RoutingTable, n: Node): bool = n in r.bucketForNode(n.id) # Check if the routing table contains node `n`.