diff --git a/eth/p2p/discoveryv5/encodingv1.nim b/eth/p2p/discoveryv5/encodingv1.nim index 386a855..680b652 100644 --- a/eth/p2p/discoveryv5/encodingv1.nim +++ b/eth/p2p/discoveryv5/encodingv1.nim @@ -337,9 +337,14 @@ proc decodeMessage*(body: openarray[byte]): DecodeResult[Message] = of pong: rlp.decode(message.pong) of findNode: rlp.decode(message.findNode) of nodes: rlp.decode(message.nodes) - of talkreq, talkresp, regtopic, ticket, regconfirmation, topicquery: - # TODO: Implement support for topic advertisement and talkreq/resp - return err(UnsupportedMessage) + of talkreq: rlp.decode(message.talkreq) + of talkresp: rlp.decode(message.talkresp) + of regtopic, ticket, regconfirmation, topicquery: + # We just pass the empty type of this message without attempting to + # decode, so that the protocol knows what was received. + # But we ignore the message as per specification as "the content and + # semantics of this message are not final". + discard except RlpError, ValueError: return err(PacketError) diff --git a/eth/p2p/discoveryv5/protocolv1.nim b/eth/p2p/discoveryv5/protocolv1.nim index 85ab9a8..12a6c3a 100644 --- a/eth/p2p/discoveryv5/protocolv1.nim +++ b/eth/p2p/discoveryv5/protocolv1.nim @@ -284,6 +284,16 @@ proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address, # with empty nodes. d.sendNodes(fromId, fromAddr, reqId, []) +proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address, + talkreq: TalkReqMessage, reqId: RequestId) = + # No support for any protocol yet so an empty response is send as per + # specification. + let talkresp = TalkRespMessage(response: @[]) + let (data, _) = encodeMessagePacket(d.rng[], d.codec, fromId, fromAddr, + encodeMessage(talkresp, reqId)) + + d.send(fromAddr, data) + proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address, message: Message) {.raises:[Exception].} = case message.kind @@ -291,6 +301,11 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address, d.handlePing(srcId, fromAddr, message.ping, message.reqId) of findNode: d.handleFindNode(srcId, fromAddr, message.findNode, message.reqId) + of talkreq: + d.handleTalkReq(srcId, fromAddr, message.talkreq, message.reqId) + of regtopic, topicquery: + trace "Received unimplemented message kind", message = message.kind, + origin = fromAddr else: var waiter: Future[Option[Message]] if d.awaitedMessages.take((srcId, message.reqId), waiter): @@ -343,7 +358,7 @@ proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe, case packet.flag of OrdinaryMessage: if packet.messageOpt.isSome(): - trace "Received message" + trace "Received message", address = a, sender = packet.srcId d.handleMessage(packet.srcId, a, packet.messageOpt.get()) else: trace "Not decryptable message packet received, respond with whoareyou", @@ -592,6 +607,22 @@ proc findNode*(d: Protocol, toNode: Node, distances: seq[uint32]): d.replaceNode(toNode) return err(nodes.error) +proc talkreq*(d: Protocol, toNode: Node, protocol, request: seq[byte]): + Future[DiscResult[TalkRespMessage]] {.async, raises: [Exception, Defect].} = + ## Send a discovery talkreq message. + ## + ## Returns the received talkresp message or an error. + let reqId = d.sendMessage(toNode, + TalkReqMessage(protocol: protocol, request: request)) + let resp = await d.waitMessage(toNode, reqId) + + if resp.isSome() and resp.get().kind == talkresp: + d.routingTable.setJustSeen(toNode) + return ok(resp.get().talkresp) + else: + d.replaceNode(toNode) + return err("Talk response message not received in time") + proc lookupDistances(target, dest: NodeId): seq[uint32] {.raises: [Defect].} = let td = logDist(target, dest) result.add(td) diff --git a/eth/p2p/discoveryv5/typesv1.nim b/eth/p2p/discoveryv5/typesv1.nim index 8470178..34220db 100644 --- a/eth/p2p/discoveryv5/typesv1.nim +++ b/eth/p2p/discoveryv5/typesv1.nim @@ -49,7 +49,21 @@ type total*: uint32 enrs*: seq[Record] - SomeMessage* = PingMessage or PongMessage or FindNodeMessage or NodesMessage + TalkReqMessage* = object + protocol*: seq[byte] + request*: seq[byte] + + TalkRespMessage* = object + response*: seq[byte] + + # Not implemented, specification is not final here. + RegTopicMessage* = object + TicketMessage* = object + RegConfirmationMessage* = object + TopicQueryMessage* = object + + SomeMessage* = PingMessage or PongMessage or FindNodeMessage or NodesMessage or + TalkReqMessage or TalkRespMessage Message* = object reqId*: RequestId @@ -62,8 +76,19 @@ type findNode*: FindNodeMessage of nodes: nodes*: NodesMessage + of talkreq: + talkreq*: TalkReqMessage + of talkresp: + talkresp*: TalkRespMessage + of regtopic: + regtopic*: RegTopicMessage + of ticket: + ticket*: TicketMessage + of regconfirmation: + regconfirmation*: RegConfirmationMessage + of topicquery: + topicquery*: TopicQueryMessage else: - # TODO: Define the rest discard template messageKind*(T: typedesc[SomeMessage]): MessageKind = @@ -71,6 +96,8 @@ template messageKind*(T: typedesc[SomeMessage]): MessageKind = elif T is PongMessage: pong elif T is FindNodeMessage: findNode elif T is NodesMessage: nodes + elif T is TalkReqMessage: talkreq + elif T is TalkRespMessage: talkresp proc toBytes*(id: NodeId): array[32, byte] {.inline.} = id.toByteArrayBE()