Additional trace logging and code cleanup (#61)
* Adds bunch of traces * Adds a lot more traces * more traces * adds prints to resolve tree * Cleanup of trace log messages * Restore some log messages
This commit is contained in:
parent
348cb0f1ad
commit
bd517f0e8d
|
@ -205,6 +205,8 @@ proc addNode*(d: Protocol, r: SignedPeerRecord): bool =
|
||||||
let node = newNode(r)
|
let node = newNode(r)
|
||||||
if node.isOk():
|
if node.isOk():
|
||||||
return d.addNode(node[])
|
return d.addNode(node[])
|
||||||
|
else:
|
||||||
|
return false
|
||||||
|
|
||||||
proc addNode*(d: Protocol, spr: SprUri): bool =
|
proc addNode*(d: Protocol, spr: SprUri): bool =
|
||||||
## Add `Node` from a SPR URI to discovery routing table.
|
## Add `Node` from a SPR URI to discovery routing table.
|
||||||
|
@ -367,7 +369,6 @@ proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address,
|
||||||
|
|
||||||
proc addProviderLocal(p: Protocol, cId: NodeId, prov: SignedPeerRecord) {.async.} =
|
proc addProviderLocal(p: Protocol, cId: NodeId, prov: SignedPeerRecord) {.async.} =
|
||||||
trace "adding provider to local db", n = p.localNode, cId, prov
|
trace "adding provider to local db", n = p.localNode, cId, prov
|
||||||
|
|
||||||
if (let res = (await p.providers.add(cid, prov)); res.isErr):
|
if (let res = (await p.providers.add(cid, prov)); res.isErr):
|
||||||
trace "Unable to add provider", cid, peerId = prov.data.peerId
|
trace "Unable to add provider", cid, peerId = prov.data.peerId
|
||||||
|
|
||||||
|
@ -553,6 +554,7 @@ proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]):
|
||||||
d.routingTable.setJustSeen(toNode)
|
d.routingTable.setJustSeen(toNode)
|
||||||
return ok(res)
|
return ok(res)
|
||||||
else:
|
else:
|
||||||
|
trace "findNode nodes not OK."
|
||||||
d.replaceNode(toNode)
|
d.replaceNode(toNode)
|
||||||
return err(nodes.error)
|
return err(nodes.error)
|
||||||
|
|
||||||
|
@ -691,6 +693,7 @@ proc lookup*(d: Protocol, target: NodeId, fast: bool = false): Future[seq[Node]]
|
||||||
closestNodes.del(closestNodes.high())
|
closestNodes.del(closestNodes.high())
|
||||||
|
|
||||||
d.lastLookup = now(chronos.Moment)
|
d.lastLookup = now(chronos.Moment)
|
||||||
|
trace "Closest nodes", nodes = closestNodes.len
|
||||||
return closestNodes
|
return closestNodes
|
||||||
|
|
||||||
proc addProvider*(
|
proc addProvider*(
|
||||||
|
@ -699,7 +702,6 @@ proc addProvider*(
|
||||||
pr: SignedPeerRecord): Future[seq[Node]] {.async.} =
|
pr: SignedPeerRecord): Future[seq[Node]] {.async.} =
|
||||||
|
|
||||||
var res = await d.lookup(cId)
|
var res = await d.lookup(cId)
|
||||||
trace "lookup returned:", res
|
|
||||||
# TODO: lookup is specified as not returning local, even if that is the closest. Is this OK?
|
# TODO: lookup is specified as not returning local, even if that is the closest. Is this OK?
|
||||||
if res.len == 0:
|
if res.len == 0:
|
||||||
res.add(d.localNode)
|
res.add(d.localNode)
|
||||||
|
@ -778,7 +780,6 @@ proc getProviders*(
|
||||||
|
|
||||||
while providersFut.len > 0:
|
while providersFut.len > 0:
|
||||||
let providersMsg = await one(providersFut)
|
let providersMsg = await one(providersFut)
|
||||||
# trace "Got providers response", providersMsg
|
|
||||||
|
|
||||||
let index = providersFut.find(providersMsg)
|
let index = providersFut.find(providersMsg)
|
||||||
if index != -1:
|
if index != -1:
|
||||||
|
@ -880,14 +881,17 @@ proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]] {.async.} =
|
||||||
|
|
||||||
# TODO: Handle failures better. E.g. stop on different failures than timeout
|
# TODO: Handle failures better. E.g. stop on different failures than timeout
|
||||||
if request.isOk() and request[].len > 0:
|
if request.isOk() and request[].len > 0:
|
||||||
|
trace "resolve (with local node found) is returning:", r = (request[][0])
|
||||||
return some(request[][0])
|
return some(request[][0])
|
||||||
|
|
||||||
let discovered = await d.lookup(id)
|
let discovered = await d.lookup(id)
|
||||||
for n in discovered:
|
for n in discovered:
|
||||||
if n.id == id:
|
if n.id == id:
|
||||||
if node.isSome() and node.get().record.seqNum >= n.record.seqNum:
|
if node.isSome() and node.get().record.seqNum >= n.record.seqNum:
|
||||||
|
trace "resolve (lookup) found node, but local one has greater or equal seqNum"
|
||||||
return node
|
return node
|
||||||
else:
|
else:
|
||||||
|
trace "resolve (lookup) found new node with equal or greater seqNum", n = n
|
||||||
return some(n)
|
return some(n)
|
||||||
|
|
||||||
return node
|
return node
|
||||||
|
@ -945,6 +949,7 @@ proc revalidateLoop(d: Protocol) {.async.} =
|
||||||
traceAsyncErrors d.revalidateNode(n)
|
traceAsyncErrors d.revalidateNode(n)
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
trace "revalidateLoop canceled"
|
trace "revalidateLoop canceled"
|
||||||
|
trace "revalidator loop exited!"
|
||||||
|
|
||||||
proc refreshLoop(d: Protocol) {.async.} =
|
proc refreshLoop(d: Protocol) {.async.} =
|
||||||
## Loop that refreshes the routing table by starting a random query in case
|
## Loop that refreshes the routing table by starting a random query in case
|
||||||
|
@ -963,6 +968,7 @@ proc refreshLoop(d: Protocol) {.async.} =
|
||||||
await sleepAsync(RefreshInterval)
|
await sleepAsync(RefreshInterval)
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
trace "refreshLoop canceled"
|
trace "refreshLoop canceled"
|
||||||
|
trace "refreshloop exited!"
|
||||||
|
|
||||||
proc ipMajorityLoop(d: Protocol) {.async.} =
|
proc ipMajorityLoop(d: Protocol) {.async.} =
|
||||||
#TODO this should be handled by libp2p, not the DHT
|
#TODO this should be handled by libp2p, not the DHT
|
||||||
|
@ -1011,6 +1017,7 @@ proc ipMajorityLoop(d: Protocol) {.async.} =
|
||||||
await sleepAsync(IpMajorityInterval)
|
await sleepAsync(IpMajorityInterval)
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
trace "ipMajorityLoop canceled"
|
trace "ipMajorityLoop canceled"
|
||||||
|
trace "ipMajorityLoop exited!"
|
||||||
|
|
||||||
func init*(
|
func init*(
|
||||||
T: type DiscoveryConfig,
|
T: type DiscoveryConfig,
|
||||||
|
@ -1025,76 +1032,6 @@ func init*(
|
||||||
bitsPerHop: bitsPerHop
|
bitsPerHop: bitsPerHop
|
||||||
)
|
)
|
||||||
|
|
||||||
proc newProtocol*(
|
|
||||||
privKey: PrivateKey,
|
|
||||||
enrIp: Option[ValidIpAddress],
|
|
||||||
enrTcpPort, enrUdpPort: Option[Port],
|
|
||||||
localEnrFields: openArray[(string, seq[byte])] = [],
|
|
||||||
bootstrapRecords: openArray[SignedPeerRecord] = [],
|
|
||||||
previousRecord = none[SignedPeerRecord](),
|
|
||||||
bindPort: Port,
|
|
||||||
bindIp = IPv4_any(),
|
|
||||||
enrAutoUpdate = false,
|
|
||||||
config = defaultDiscoveryConfig,
|
|
||||||
rng = newRng(),
|
|
||||||
providers = ProvidersManager.new(
|
|
||||||
SQLiteDatastore.new(Memory)
|
|
||||||
.expect("Should not fail!"))):
|
|
||||||
Protocol =
|
|
||||||
# TODO: Tried adding bindPort = udpPort as parameter but that gave
|
|
||||||
# "Error: internal error: environment misses: udpPort" in nim-beacon-chain.
|
|
||||||
# Anyhow, nim-beacon-chain would also require some changes to support port
|
|
||||||
# remapping through NAT and this API is also subject to change once we
|
|
||||||
# introduce support for ipv4 + ipv6 binding/listening.
|
|
||||||
|
|
||||||
# TODO: Implement SignedPeerRecord custom fields?
|
|
||||||
# let extraFields = mapIt(localEnrFields, toFieldPair(it[0], it[1]))
|
|
||||||
|
|
||||||
# TODO:
|
|
||||||
# - Defect as is now or return a result for spr errors?
|
|
||||||
# - In case incorrect key, allow for new spr based on new key (new node id)?
|
|
||||||
var record: SignedPeerRecord
|
|
||||||
if previousRecord.isSome():
|
|
||||||
record = previousRecord.get()
|
|
||||||
record.update(privKey, enrIp, enrTcpPort, enrUdpPort)
|
|
||||||
.expect("SignedPeerRecord within size limits and correct key")
|
|
||||||
else:
|
|
||||||
record = SignedPeerRecord.init(1, privKey, enrIp, enrTcpPort, enrUdpPort)
|
|
||||||
.expect("SignedPeerRecord within size limits")
|
|
||||||
|
|
||||||
info "SPR initialized", ip = enrIp, tcp = enrTcpPort, udp = enrUdpPort,
|
|
||||||
seqNum = record.seqNum, uri = toURI(record)
|
|
||||||
if enrIp.isNone():
|
|
||||||
if enrAutoUpdate:
|
|
||||||
notice "No external IP provided for the SPR, this node will not be " &
|
|
||||||
"discoverable until the SPR is updated with the discovered external IP address"
|
|
||||||
else:
|
|
||||||
warn "No external IP provided for the SPR, this node will not be discoverable"
|
|
||||||
|
|
||||||
let node = newNode(record).expect("Properly initialized record")
|
|
||||||
|
|
||||||
# TODO Consider whether this should be a Defect
|
|
||||||
doAssert rng != nil, "RNG initialization failed"
|
|
||||||
|
|
||||||
let
|
|
||||||
routingTable = RoutingTable.init(
|
|
||||||
node,
|
|
||||||
config.bitsPerHop,
|
|
||||||
config.tableIpLimits,
|
|
||||||
rng)
|
|
||||||
|
|
||||||
result = Protocol(
|
|
||||||
privateKey: privKey,
|
|
||||||
localNode: node,
|
|
||||||
bootstrapRecords: @bootstrapRecords,
|
|
||||||
ipVote: IpVote.init(),
|
|
||||||
enrAutoUpdate: enrAutoUpdate,
|
|
||||||
routingTable: routingTable,
|
|
||||||
rng: rng,
|
|
||||||
providers: providers)
|
|
||||||
|
|
||||||
result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng)
|
|
||||||
|
|
||||||
proc newProtocol*(
|
proc newProtocol*(
|
||||||
privKey: PrivateKey,
|
privKey: PrivateKey,
|
||||||
bindPort: Port,
|
bindPort: Port,
|
||||||
|
@ -1131,16 +1068,20 @@ proc newProtocol*(
|
||||||
rng: rng,
|
rng: rng,
|
||||||
providers: providers)
|
providers: providers)
|
||||||
|
|
||||||
|
trace "newProtocol initiated."
|
||||||
result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng)
|
result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng)
|
||||||
|
|
||||||
proc open*(d: Protocol) {.raises: [Defect, CatchableError].} =
|
proc open*(d: Protocol) {.raises: [Defect, CatchableError].} =
|
||||||
info "Starting discovery node", node = d.localNode
|
info "Starting discovery node", node = d.localNode
|
||||||
|
|
||||||
d.transport.open()
|
d.transport.open()
|
||||||
|
trace "Transport open."
|
||||||
|
|
||||||
d.seedTable()
|
d.seedTable()
|
||||||
|
trace "Routing table seeded."
|
||||||
|
|
||||||
proc start*(d: Protocol) {.async.} =
|
proc start*(d: Protocol) {.async.} =
|
||||||
|
trace "Protocol start..."
|
||||||
d.refreshLoop = refreshLoop(d)
|
d.refreshLoop = refreshLoop(d)
|
||||||
d.revalidateLoop = revalidateLoop(d)
|
d.revalidateLoop = revalidateLoop(d)
|
||||||
d.ipMajorityLoop = ipMajorityLoop(d)
|
d.ipMajorityLoop = ipMajorityLoop(d)
|
||||||
|
|
|
@ -214,6 +214,7 @@ proc remove(k: KBucket, n: Node): bool =
|
||||||
if k.nodes[i].seen:
|
if k.nodes[i].seen:
|
||||||
routing_table_nodes.dec(labelValues = ["seen"])
|
routing_table_nodes.dec(labelValues = ["seen"])
|
||||||
k.nodes.delete(i)
|
k.nodes.delete(i)
|
||||||
|
trace "removed node:", node = n
|
||||||
true
|
true
|
||||||
else:
|
else:
|
||||||
false
|
false
|
||||||
|
@ -316,12 +317,15 @@ proc addReplacement(r: var RoutingTable, k: KBucket, n: Node): NodeStatus =
|
||||||
# gets moved to the tail.
|
# gets moved to the tail.
|
||||||
if k.replacementCache[nodeIdx].address.get().ip != n.address.get().ip:
|
if k.replacementCache[nodeIdx].address.get().ip != n.address.get().ip:
|
||||||
if not ipLimitInc(r, k, n):
|
if not ipLimitInc(r, k, n):
|
||||||
|
trace "replace: ip limit reached"
|
||||||
return IpLimitReached
|
return IpLimitReached
|
||||||
ipLimitDec(r, k, k.replacementCache[nodeIdx])
|
ipLimitDec(r, k, k.replacementCache[nodeIdx])
|
||||||
k.replacementCache.delete(nodeIdx)
|
k.replacementCache.delete(nodeIdx)
|
||||||
k.replacementCache.add(n)
|
k.replacementCache.add(n)
|
||||||
|
trace "replace: already existed"
|
||||||
return ReplacementExisting
|
return ReplacementExisting
|
||||||
elif not ipLimitInc(r, k, n):
|
elif not ipLimitInc(r, k, n):
|
||||||
|
trace "replace: ip limit reached (2)"
|
||||||
return IpLimitReached
|
return IpLimitReached
|
||||||
else:
|
else:
|
||||||
doAssert(k.replacementCache.len <= REPLACEMENT_CACHE_SIZE)
|
doAssert(k.replacementCache.len <= REPLACEMENT_CACHE_SIZE)
|
||||||
|
@ -332,6 +336,7 @@ proc addReplacement(r: var RoutingTable, k: KBucket, n: Node): NodeStatus =
|
||||||
k.replacementCache.delete(0)
|
k.replacementCache.delete(0)
|
||||||
|
|
||||||
k.replacementCache.add(n)
|
k.replacementCache.add(n)
|
||||||
|
trace "replace: added"
|
||||||
return ReplacementAdded
|
return ReplacementAdded
|
||||||
|
|
||||||
proc addNode*(r: var RoutingTable, n: Node): NodeStatus =
|
proc addNode*(r: var RoutingTable, n: Node): NodeStatus =
|
||||||
|
@ -367,6 +372,7 @@ proc addNode*(r: var RoutingTable, n: Node): NodeStatus =
|
||||||
# In case of a newer record, it gets replaced.
|
# In case of a newer record, it gets replaced.
|
||||||
if bucket.nodes[nodeIdx].address.get().ip != n.address.get().ip:
|
if bucket.nodes[nodeIdx].address.get().ip != n.address.get().ip:
|
||||||
if not ipLimitInc(r, bucket, n):
|
if not ipLimitInc(r, bucket, n):
|
||||||
|
trace "Cannot add node. IP limit reached. (1)"
|
||||||
return IpLimitReached
|
return IpLimitReached
|
||||||
ipLimitDec(r, bucket, bucket.nodes[nodeIdx])
|
ipLimitDec(r, bucket, bucket.nodes[nodeIdx])
|
||||||
# Copy over the seen status, we trust here that after the SPR update the
|
# Copy over the seen status, we trust here that after the SPR update the
|
||||||
|
@ -393,6 +399,7 @@ proc addNode*(r: var RoutingTable, n: Node): NodeStatus =
|
||||||
# immediately add nodes to the most recently seen spot.
|
# immediately add nodes to the most recently seen spot.
|
||||||
if bucket.len < BUCKET_SIZE:
|
if bucket.len < BUCKET_SIZE:
|
||||||
if not ipLimitInc(r, bucket, n):
|
if not ipLimitInc(r, bucket, n):
|
||||||
|
trace "Cannot add node. IP limit reached. (2)"
|
||||||
return IpLimitReached
|
return IpLimitReached
|
||||||
|
|
||||||
bucket.add(n)
|
bucket.add(n)
|
||||||
|
@ -435,11 +442,12 @@ proc replaceNode*(r: var RoutingTable, n: Node) =
|
||||||
|
|
||||||
proc getNode*(r: RoutingTable, id: NodeId): Option[Node] =
|
proc getNode*(r: RoutingTable, id: NodeId): Option[Node] =
|
||||||
## Get the `Node` with `id` as `NodeId` from the routing table.
|
## Get the `Node` with `id` as `NodeId` from the routing table.
|
||||||
## If no node with provided node id can be found,`none` is returned .
|
## If no node with provided node id can be found,`none` is returned.
|
||||||
let b = r.bucketForNode(id)
|
let b = r.bucketForNode(id)
|
||||||
for n in b.nodes:
|
for n in b.nodes:
|
||||||
if n.id == id:
|
if n.id == id:
|
||||||
return some(n)
|
return some(n)
|
||||||
|
trace "routingTable.getNode failed to find"
|
||||||
|
|
||||||
proc contains*(r: RoutingTable, n: Node): bool = n in r.bucketForNode(n.id)
|
proc contains*(r: RoutingTable, n: Node): bool = n in r.bucketForNode(n.id)
|
||||||
# Check if the routing table contains node `n`.
|
# Check if the routing table contains node `n`.
|
||||||
|
|
Loading…
Reference in New Issue