mirror of
https://github.com/logos-storage/logos-storage-nim-dht.git
synced 2026-01-08 00:13:07 +00:00
more trace and debug messages
Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
parent
84ffef9787
commit
622164ccf8
@ -302,7 +302,7 @@ proc sendNodes(d: Protocol, toId: NodeId, toAddr: Address, reqId: RequestId,
|
|||||||
nodes: openArray[Node]) =
|
nodes: openArray[Node]) =
|
||||||
proc sendNodes(d: Protocol, toId: NodeId, toAddr: Address,
|
proc sendNodes(d: Protocol, toId: NodeId, toAddr: Address,
|
||||||
message: NodesMessage, reqId: RequestId) {.nimcall.} =
|
message: NodesMessage, reqId: RequestId) {.nimcall.} =
|
||||||
trace "Respond message packet", dstId = toId, address = toAddr,
|
trace "Send response packet", myport = d.transport.bindAddress.port, dstId = toId, address = toAddr,
|
||||||
kind = MessageKind.nodes
|
kind = MessageKind.nodes
|
||||||
d.sendResponse(toId, toAddr, message, reqId)
|
d.sendResponse(toId, toAddr, message, reqId)
|
||||||
|
|
||||||
@ -420,6 +420,7 @@ proc handleAddValue(
|
|||||||
fromAddr: Address,
|
fromAddr: Address,
|
||||||
addValue: AddValueMessage,
|
addValue: AddValueMessage,
|
||||||
reqId: RequestId) =
|
reqId: RequestId) =
|
||||||
|
trace "handleAddValue", myid = d.localNode, dist = logDistance(d.localNode.id, addValue.cId), scr = fromId, key = addValue.cId
|
||||||
asyncSpawn d.addValueLocal(addValue.cId, addValue.value)
|
asyncSpawn d.addValueLocal(addValue.cId, addValue.value)
|
||||||
|
|
||||||
proc handleGetValue(
|
proc handleGetValue(
|
||||||
@ -443,6 +444,8 @@ proc handleGetValue(
|
|||||||
|
|
||||||
proc handleFindValue(d: Protocol, fromId: NodeId, fromAddr: Address,
|
proc handleFindValue(d: Protocol, fromId: NodeId, fromAddr: Address,
|
||||||
fv: FindValueMessage, reqId: RequestId) {.async.} =
|
fv: FindValueMessage, reqId: RequestId) {.async.} =
|
||||||
|
trace "got findValueMessage", myid = d.localNode, key = fv.cId, src = fromId
|
||||||
|
|
||||||
try:
|
try:
|
||||||
let value = d.valueStore[fv.cId]
|
let value = d.valueStore[fv.cId]
|
||||||
trace "retrieved value from local db", n = d.localNode, cID = fv.cId, value
|
trace "retrieved value from local db", n = d.localNode, cID = fv.cId, value
|
||||||
@ -525,7 +528,7 @@ proc sendRequest*[T: SomeMessage](d: Protocol, toNode: Node, m: T,
|
|||||||
let
|
let
|
||||||
message = encodeMessage(m, reqId)
|
message = encodeMessage(m, reqId)
|
||||||
|
|
||||||
trace "Send message packet", dstId = toNode.id,
|
trace "Send request packet", myport = d.transport.bindAddress.port, dstId = toNode.id,
|
||||||
address = toNode.address, kind = messageKind(T)
|
address = toNode.address, kind = messageKind(T)
|
||||||
discovery_message_requests_outgoing.inc()
|
discovery_message_requests_outgoing.inc()
|
||||||
|
|
||||||
@ -862,7 +865,7 @@ proc addValue*(
|
|||||||
value: seq[byte]): Future[seq[Node]] {.async.} =
|
value: seq[byte]): Future[seq[Node]] {.async.} =
|
||||||
|
|
||||||
var res = await d.lookup(cId, fast=true)
|
var res = await d.lookup(cId, fast=true)
|
||||||
trace "lookup returned:", res
|
trace "addValue lookup returned:", key=cId, res
|
||||||
# TODO: lookup is specified as not returning local, even if that is the closest. Is this OK?
|
# TODO: lookup is specified as not returning local, even if that is the closest. Is this OK?
|
||||||
if res.len == 0:
|
if res.len == 0:
|
||||||
res.add(d.localNode)
|
res.add(d.localNode)
|
||||||
@ -1004,6 +1007,7 @@ proc findValue*(
|
|||||||
proc worker(d: Protocol, destNode: Node, target: NodeId):
|
proc worker(d: Protocol, destNode: Node, target: NodeId):
|
||||||
Future[(seq[Node], seq[byte])] {.async.} =
|
Future[(seq[Node], seq[byte])] {.async.} =
|
||||||
|
|
||||||
|
trace "findValue send", myid = d.localNode, key = target, destNode
|
||||||
let r = await d.sendFindValue(destNode, target)
|
let r = await d.sendFindValue(destNode, target)
|
||||||
|
|
||||||
if r.isOk:
|
if r.isOk:
|
||||||
@ -1013,9 +1017,13 @@ proc findValue*(
|
|||||||
# Attempt to add all nodes discovered
|
# Attempt to add all nodes discovered
|
||||||
for n in nodes:
|
for n in nodes:
|
||||||
discard d.addNode(n)
|
discard d.addNode(n)
|
||||||
|
trace "findValue receive", myid = d.localNode, key = target, nodes, value
|
||||||
|
else:
|
||||||
|
trace "findValue worker timeout", myid = d.localNode, key = target
|
||||||
|
|
||||||
var closestNodes = d.routingTable.neighbours(target, BUCKET_SIZE,
|
var closestNodes = d.routingTable.neighbours(target, BUCKET_SIZE,
|
||||||
seenOnly = false)
|
seenOnly = false)
|
||||||
|
trace "findValue start", myid = d.localNode, key = target, closestNodes
|
||||||
|
|
||||||
var asked, seen = initHashSet[NodeId]()
|
var asked, seen = initHashSet[NodeId]()
|
||||||
asked.incl(d.localNode.id) # No need to ask our own node
|
asked.incl(d.localNode.id) # No need to ask our own node
|
||||||
@ -1052,6 +1060,7 @@ proc findValue*(
|
|||||||
let (nodes, value) = query.read
|
let (nodes, value) = query.read
|
||||||
# TODO: Remove node on timed-out query?
|
# TODO: Remove node on timed-out query?
|
||||||
if value.len > 0:
|
if value.len > 0:
|
||||||
|
trace "findValue found", myid = d.localNode, key = target, value
|
||||||
return ok(value)
|
return ok(value)
|
||||||
for n in nodes:
|
for n in nodes:
|
||||||
if not seen.containsOrIncl(n.id):
|
if not seen.containsOrIncl(n.id):
|
||||||
@ -1064,6 +1073,7 @@ proc findValue*(
|
|||||||
if closestNodes.len > BUCKET_SIZE:
|
if closestNodes.len > BUCKET_SIZE:
|
||||||
closestNodes.del(closestNodes.high())
|
closestNodes.del(closestNodes.high())
|
||||||
|
|
||||||
|
debug "findValue not found", myid = d.localNode, key = target
|
||||||
d.lastLookup = now(chronos.Moment)
|
d.lastLookup = now(chronos.Moment)
|
||||||
|
|
||||||
proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
|
proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
|
||||||
@ -1199,6 +1209,8 @@ proc revalidateNode*(d: Protocol, n: Node) {.async.} =
|
|||||||
# Get IP and port from pong message and add it to the ip votes
|
# Get IP and port from pong message and add it to the ip votes
|
||||||
let a = Address(ip: ValidIpAddress.init(res.ip), port: Port(res.port))
|
let a = Address(ip: ValidIpAddress.init(res.ip), port: Port(res.port))
|
||||||
d.ipVote.insert(n.id, a)
|
d.ipVote.insert(n.id, a)
|
||||||
|
else:
|
||||||
|
debug "ping error", error = pong.error
|
||||||
|
|
||||||
proc revalidateLoop(d: Protocol) {.async.} =
|
proc revalidateLoop(d: Protocol) {.async.} =
|
||||||
## Loop which revalidates the nodes in the routing table by sending the ping
|
## Loop which revalidates the nodes in the routing table by sending the ping
|
||||||
@ -1225,7 +1237,7 @@ proc refreshLoop(d: Protocol) {.async.} =
|
|||||||
if currentTime > (d.lastLookup + RefreshInterval):
|
if currentTime > (d.lastLookup + RefreshInterval):
|
||||||
let randomQuery = await d.queryRandom()
|
let randomQuery = await d.queryRandom()
|
||||||
trace "Discovered nodes in random target query", nodes = randomQuery.len
|
trace "Discovered nodes in random target query", nodes = randomQuery.len
|
||||||
debug "Total nodes in discv5 routing table", total = d.routingTable.len()
|
trace "Total nodes in discv5 routing table", total = d.routingTable.len()
|
||||||
|
|
||||||
await sleepAsync(RefreshInterval)
|
await sleepAsync(RefreshInterval)
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
@ -1272,7 +1284,7 @@ proc ipMajorityLoop(d: Protocol) {.async.} =
|
|||||||
warn "Discovered new external address but SPR auto update is off",
|
warn "Discovered new external address but SPR auto update is off",
|
||||||
majority, previous
|
majority, previous
|
||||||
else:
|
else:
|
||||||
debug "Discovered external address matches current address", majority,
|
trace "Discovered external address matches current address", majority,
|
||||||
current = d.localNode.address
|
current = d.localNode.address
|
||||||
|
|
||||||
await sleepAsync(IpMajorityInterval)
|
await sleepAsync(IpMajorityInterval)
|
||||||
|
|||||||
@ -23,7 +23,7 @@ const
|
|||||||
type
|
type
|
||||||
Transport* [Client] = ref object
|
Transport* [Client] = ref object
|
||||||
client: Client
|
client: Client
|
||||||
bindAddress: Address ## UDP binding address
|
bindAddress*: Address ## UDP binding address
|
||||||
transp: DatagramTransport
|
transp: DatagramTransport
|
||||||
pendingRequests: Table[AESGCMNonce, PendingRequest]
|
pendingRequests: Table[AESGCMNonce, PendingRequest]
|
||||||
handshakeInProgress: HashSet[NodeId]
|
handshakeInProgress: HashSet[NodeId]
|
||||||
@ -176,7 +176,7 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
|
|||||||
discard t.sendPending(toNode)
|
discard t.sendPending(toNode)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
debug "Timed out or unrequested whoareyou packet", address = a
|
debug "Timed out or unrequested whoareyou packet", myport = t.bindAddress.port, address = a
|
||||||
of HandshakeMessage:
|
of HandshakeMessage:
|
||||||
trace "Received handshake message packet", myport = t.bindAddress.port, srcId = packet.srcIdHs,
|
trace "Received handshake message packet", myport = t.bindAddress.port, srcId = packet.srcIdHs,
|
||||||
address = a, kind = packet.message.kind
|
address = a, kind = packet.message.kind
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user