Improve portal wire logging by adding protocolId (#997)
This commit is contained in:
parent
a3fe57f2b8
commit
b14dfea553
|
@ -12,7 +12,7 @@
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[sequtils, sets, algorithm],
|
std/[sequtils, sets, algorithm],
|
||||||
stew/results, chronicles, chronos, nimcrypto/hash, bearssl,
|
stew/[results, byteutils], chronicles, chronos, nimcrypto/hash, bearssl,
|
||||||
ssz_serialization,
|
ssz_serialization,
|
||||||
eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2,
|
eth/rlp, eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2,
|
||||||
nodes_verification, lru],
|
nodes_verification, lru],
|
||||||
|
@ -70,6 +70,9 @@ type
|
||||||
of Nodes:
|
of Nodes:
|
||||||
nodes*: seq[Node]
|
nodes*: seq[Node]
|
||||||
|
|
||||||
|
func `$`(id: PortalProtocolId): string =
|
||||||
|
id.toHex()
|
||||||
|
|
||||||
proc addNode*(p: PortalProtocol, node: Node): NodeStatus =
|
proc addNode*(p: PortalProtocol, node: Node): NodeStatus =
|
||||||
p.routingTable.addNode(node)
|
p.routingTable.addNode(node)
|
||||||
|
|
||||||
|
@ -202,6 +205,9 @@ proc messageHandler(protocol: TalkProtocol, request: seq[byte],
|
||||||
srcId: NodeId, srcUdpAddress: Address): seq[byte] =
|
srcId: NodeId, srcUdpAddress: Address): seq[byte] =
|
||||||
doAssert(protocol of PortalProtocol)
|
doAssert(protocol of PortalProtocol)
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
protocolId = p.protocolId
|
||||||
|
|
||||||
let p = PortalProtocol(protocol)
|
let p = PortalProtocol(protocol)
|
||||||
|
|
||||||
let decoded = decodeMessage(request)
|
let decoded = decodeMessage(request)
|
||||||
|
@ -281,11 +287,16 @@ proc new*(T: type PortalProtocol,
|
||||||
# validates the proper response, and updates the Portal Network routing table.
|
# validates the proper response, and updates the Portal Network routing table.
|
||||||
proc reqResponse[Request: SomeMessage, Response: SomeMessage](
|
proc reqResponse[Request: SomeMessage, Response: SomeMessage](
|
||||||
p: PortalProtocol,
|
p: PortalProtocol,
|
||||||
toNode: Node,
|
dst: Node,
|
||||||
request: Request
|
request: Request
|
||||||
): Future[PortalResult[Response]] {.async.} =
|
): Future[PortalResult[Response]] {.async.} =
|
||||||
|
logScope:
|
||||||
|
protocolId = p.protocolId
|
||||||
|
|
||||||
|
trace "Send message request", dstId = dst.id, kind = messageKind(Request)
|
||||||
|
|
||||||
let talkresp =
|
let talkresp =
|
||||||
await talkreq(p.baseProtocol, toNode, @(p.protocolId), encodeMessage(request))
|
await talkreq(p.baseProtocol, dst, @(p.protocolId), encodeMessage(request))
|
||||||
|
|
||||||
# Note: Failure of `decodeMessage` might also simply mean that the peer is
|
# Note: Failure of `decodeMessage` might also simply mean that the peer is
|
||||||
# not supporting the specific talk protocol, as according to specification
|
# not supporting the specific talk protocol, as according to specification
|
||||||
|
@ -299,13 +310,13 @@ proc reqResponse[Request: SomeMessage, Response: SomeMessage](
|
||||||
)
|
)
|
||||||
|
|
||||||
if messageResponse.isOk():
|
if messageResponse.isOk():
|
||||||
trace "Received message response", srcId = toNode.id,
|
trace "Received message response", srcId = dst.id,
|
||||||
srcAddress = toNode.address, kind = messageKind(Response)
|
srcAddress = dst.address, kind = messageKind(Response)
|
||||||
p.routingTable.setJustSeen(toNode)
|
p.routingTable.setJustSeen(dst)
|
||||||
else:
|
else:
|
||||||
debug "Error receiving message response", error = messageResponse.error,
|
debug "Error receiving message response", error = messageResponse.error,
|
||||||
srcId = toNode.id, srcAddress = toNode.address
|
srcId = dst.id, srcAddress = dst.address
|
||||||
p.routingTable.replaceNode(toNode)
|
p.routingTable.replaceNode(dst)
|
||||||
|
|
||||||
return messageResponse
|
return messageResponse
|
||||||
|
|
||||||
|
@ -315,14 +326,12 @@ proc pingImpl*(p: PortalProtocol, dst: Node):
|
||||||
let ping = PingMessage(enrSeq: p.baseProtocol.localNode.record.seqNum,
|
let ping = PingMessage(enrSeq: p.baseProtocol.localNode.record.seqNum,
|
||||||
customPayload: ByteList(SSZ.encode(customPayload)))
|
customPayload: ByteList(SSZ.encode(customPayload)))
|
||||||
|
|
||||||
trace "Send message request", dstId = dst.id, kind = MessageKind.ping
|
|
||||||
return await reqResponse[PingMessage, PongMessage](p, dst, ping)
|
return await reqResponse[PingMessage, PongMessage](p, dst, ping)
|
||||||
|
|
||||||
proc findNodes*(p: PortalProtocol, dst: Node, distances: List[uint16, 256]):
|
proc findNodes*(p: PortalProtocol, dst: Node, distances: List[uint16, 256]):
|
||||||
Future[PortalResult[NodesMessage]] {.async.} =
|
Future[PortalResult[NodesMessage]] {.async.} =
|
||||||
let fn = FindNodesMessage(distances: distances)
|
let fn = FindNodesMessage(distances: distances)
|
||||||
|
|
||||||
trace "Send message request", dstId = dst.id, kind = MessageKind.findnodes
|
|
||||||
# TODO Add nodes validation
|
# TODO Add nodes validation
|
||||||
return await reqResponse[FindNodesMessage, NodesMessage](p, dst, fn)
|
return await reqResponse[FindNodesMessage, NodesMessage](p, dst, fn)
|
||||||
|
|
||||||
|
@ -330,15 +339,12 @@ proc findContentImpl*(p: PortalProtocol, dst: Node, contentKey: ByteList):
|
||||||
Future[PortalResult[ContentMessage]] {.async.} =
|
Future[PortalResult[ContentMessage]] {.async.} =
|
||||||
let fc = FindContentMessage(contentKey: contentKey)
|
let fc = FindContentMessage(contentKey: contentKey)
|
||||||
|
|
||||||
trace "Send message request", dstId = dst.id, kind = MessageKind.findcontent
|
|
||||||
return await reqResponse[FindContentMessage, ContentMessage](p, dst, fc)
|
return await reqResponse[FindContentMessage, ContentMessage](p, dst, fc)
|
||||||
|
|
||||||
proc offerImpl*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
|
proc offerImpl*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
|
||||||
Future[PortalResult[AcceptMessage]] {.async.} =
|
Future[PortalResult[AcceptMessage]] {.async.} =
|
||||||
let offer = OfferMessage(contentKeys: contentKeys)
|
let offer = OfferMessage(contentKeys: contentKeys)
|
||||||
|
|
||||||
trace "Send message request", dstId = dst.id, kind = MessageKind.offer
|
|
||||||
|
|
||||||
return await reqResponse[OfferMessage, AcceptMessage](p, dst, offer)
|
return await reqResponse[OfferMessage, AcceptMessage](p, dst, offer)
|
||||||
|
|
||||||
proc recordsFromBytes*(rawRecords: List[ByteList, 32]): PortalResult[seq[Record]] =
|
proc recordsFromBytes*(rawRecords: List[ByteList, 32]): PortalResult[seq[Record]] =
|
||||||
|
@ -744,6 +750,8 @@ proc seedTable*(p: PortalProtocol) =
|
||||||
# bootstrap the portal networks, however it would require a flag in the ENR to
|
# bootstrap the portal networks, however it would require a flag in the ENR to
|
||||||
# be added and there might be none in the routing table due to low amount of
|
# be added and there might be none in the routing table due to low amount of
|
||||||
# Portal nodes versus other nodes.
|
# Portal nodes versus other nodes.
|
||||||
|
logScope:
|
||||||
|
protocolId = p.protocolId
|
||||||
|
|
||||||
for record in p.bootstrapRecords:
|
for record in p.bootstrapRecords:
|
||||||
if p.addNode(record):
|
if p.addNode(record):
|
||||||
|
@ -756,6 +764,9 @@ proc seedTable*(p: PortalProtocol) =
|
||||||
proc populateTable(p: PortalProtocol) {.async.} =
|
proc populateTable(p: PortalProtocol) {.async.} =
|
||||||
## Do a set of initial lookups to quickly populate the table.
|
## Do a set of initial lookups to quickly populate the table.
|
||||||
# start with a self target query (neighbour nodes)
|
# start with a self target query (neighbour nodes)
|
||||||
|
logScope:
|
||||||
|
protocolId = p.protocolId
|
||||||
|
|
||||||
let selfQuery = await p.query(p.baseProtocol.localNode.id)
|
let selfQuery = await p.query(p.baseProtocol.localNode.id)
|
||||||
trace "Discovered nodes in self target query", nodes = selfQuery.len
|
trace "Discovered nodes in self target query", nodes = selfQuery.len
|
||||||
|
|
||||||
|
@ -795,6 +806,9 @@ proc refreshLoop(p: PortalProtocol) {.async.} =
|
||||||
## Loop that refreshes the routing table by starting a random query in case
|
## Loop that refreshes the routing table by starting a random query in case
|
||||||
## no queries were done since `refreshInterval` or more.
|
## no queries were done since `refreshInterval` or more.
|
||||||
## It also refreshes the majority address voted for via pong responses.
|
## It also refreshes the majority address voted for via pong responses.
|
||||||
|
logScope:
|
||||||
|
protocolId = p.protocolId
|
||||||
|
|
||||||
try:
|
try:
|
||||||
while true:
|
while true:
|
||||||
# TODO: It would be nicer and more secure if this was event based and/or
|
# TODO: It would be nicer and more secure if this was event based and/or
|
||||||
|
|
Loading…
Reference in New Issue