diff --git a/fluffy/network/chain_history/messages.nim b/fluffy/network/chain_history/messages.nim index db5108b01..67d9dc3e8 100644 --- a/fluffy/network/chain_history/messages.nim +++ b/fluffy/network/chain_history/messages.nim @@ -10,7 +10,7 @@ {.push raises: [Defect].} import - options, + std/options, stint, stew/[results, objects], eth/ssz/ssz_serialization, eth/common/eth_types diff --git a/fluffy/network/state/messages.nim b/fluffy/network/state/messages.nim index f2c4c754b..6a1445b95 100644 --- a/fluffy/network/state/messages.nim +++ b/fluffy/network/state/messages.nim @@ -11,7 +11,7 @@ {.push raises: [Defect].} import - options, + std/options, stint, stew/[results, objects], eth/ssz/ssz_serialization @@ -159,13 +159,13 @@ template innerMessage[T: SomeMessage](message: Message, expected: MessageKind): else: none[T]() -# All our Message variants coresponds to enum MessageKind, therefore we are able to +# All our Message variants correspond to enum MessageKind, therefore we are able to # zoom in on inner structure of message by defining expected type T. -# If expected variant is not active, retrun None +# If expected variant is not active, return None proc getInnnerMessage*[T: SomeMessage](m: Message): Option[T] = innerMessage[T](m, messageKind(T)) -# Simple conversion from Option to Result, looks like somethif which coul live in +# Simple conversion from Option to Result, looks like something which could live in # Result library. proc optToResult*[T, E](opt: Option[T], e: E): Result[T, E] = if (opt.isSome()): diff --git a/fluffy/network/state/portal_network.nim b/fluffy/network/state/portal_network.nim index 745402462..266ebb774 100644 --- a/fluffy/network/state/portal_network.nim +++ b/fluffy/network/state/portal_network.nim @@ -4,8 +4,8 @@ import eth/p2p/discoveryv5/[protocol, node], ./content, ./portal_protocol -# TODO expose function in domain specific way i.e operating od state network objects i.e -# nodes, tries, hashes +# TODO expose function in domain specific way i.e operating od state network +# objects i.e nodes, tries, hashes type PortalNetwork* = ref object storage: ContentStorage portalProtocol*: PortalProtocol @@ -22,15 +22,20 @@ proc getHandler(storage: ContentStorage): ContentHandler = # 1. Return proper domain types instead of bytes # 2. First check if item is in storage instead of doing lookup # 3. Put item into storage (if in radius) after succesful lookup -proc getContent*(p:PortalNetwork, key: ContentKey): Future[Option[seq[byte]]] {.async.} = +proc getContent*(p:PortalNetwork, key: ContentKey): + Future[Option[seq[byte]]] {.async.} = let keyAsBytes = encodeKeyAsList(key) let id = contentIdAsUint256(toContentId(keyAsBytes)) let result = await p.portalProtocol.contentLookup(keyAsBytes, id) - # for now returning bytes, ultimatly it would be nice to return proper domain types from here + # for now returning bytes, ultimatly it would be nice to return proper domain + # types from here return result.map(x => x.asSeq()) -proc new*(T: type PortalNetwork, baseProtocol: protocol.Protocol, storage: ContentStorage , dataRadius = UInt256.high()): T = - let portalProto = PortalProtocol.new(baseProtocol, getHandler(storage), dataRadius) +proc new*(T: type PortalNetwork, baseProtocol: protocol.Protocol, + storage: ContentStorage , dataRadius = UInt256.high()): T = + let portalProto = + PortalProtocol.new(baseProtocol, getHandler(storage), dataRadius) + return PortalNetwork(storage: storage, portalProtocol: portalProto) proc start*(p: PortalNetwork) = @@ -38,4 +43,3 @@ proc start*(p: PortalNetwork) = proc stop*(p: PortalNetwork) = p.portalProtocol.stop() - diff --git a/fluffy/network/state/portal_protocol.nim b/fluffy/network/state/portal_protocol.nim index 22d43f98d..7cceb1248 100644 --- a/fluffy/network/state/portal_protocol.nim +++ b/fluffy/network/state/portal_protocol.nim @@ -42,9 +42,11 @@ type of ContentKeyValidationFailure: error*: string - # Treating Result as typed union type. If content is present handler should return - # it, if not it should return content id so that closest neighbours can be localized. - ContentHandler* = proc (contentKey: ByteList): ContentResult {.raises: [Defect], gcsafe.} + # Treating Result as typed union type. If the content is present the handler + # should return it, if not it should return the content id so that closest + # neighbours can be localized. + ContentHandler* = + proc(contentKey: ByteList): ContentResult {.raises: [Defect], gcsafe.} PortalProtocol* = ref object of TalkProtocol routingTable: RoutingTable @@ -187,27 +189,30 @@ proc new*(T: type PortalProtocol, baseProtocol: protocol.Protocol, return proto -# Requests, decoedes result, validates that proper response has been received -# and updates the routing table. -# in original disoveryv5 bootstrap node are not replaced in case of failure, but -# for now portal protocol has no notion of bootstrap nodes +# Sends the discv5 talkreq nessage with provided Portal message, awaits and +# validates the proper response, and updates the Portal Network routing table. +# In discoveryv5 bootstrap nodes are not replaced in case of failure, but +# for now the Portal protocol has no notion of bootstrap nodes. proc reqResponse[Request: SomeMessage, Response: SomeMessage]( - p: PortalProtocol, - toNode: Node, - protocol: seq[byte], - request: Request - ): Future[PortalResult[Response]] {.async.} = - let respResult = await talkreq(p.baseProtocol, toNode, protocol, encodeMessage(request)) - return respResult - .flatMap(proc (x: seq[byte]): Result[Message, cstring] = decodeMessage(x)) - .flatMap(proc (m: Message): Result[Response, cstring] = - let reqResult = getInnerMessageResult[Response](m, cstring"Invalid message response received") - if reqResult.isOk(): - p.routingTable.setJustSeen(toNode) - else: - p.routingTable.replaceNode(toNode) - reqResult - ) + p: PortalProtocol, + toNode: Node, + protocol: seq[byte], + request: Request + ): Future[PortalResult[Response]] {.async.} = + let respResult = + await talkreq(p.baseProtocol, toNode, protocol, encodeMessage(request)) + + return respResult + .flatMap(proc (x: seq[byte]): Result[Message, cstring] = decodeMessage(x)) + .flatMap(proc (m: Message): Result[Response, cstring] = + let reqResult = getInnerMessageResult[Response]( + m, cstring"Invalid message response received") + if reqResult.isOk(): + p.routingTable.setJustSeen(toNode) + else: + p.routingTable.replaceNode(toNode) + reqResult + ) proc ping*(p: PortalProtocol, dst: Node): Future[PortalResult[PongMessage]] {.async.} = @@ -215,7 +220,8 @@ proc ping*(p: PortalProtocol, dst: Node): dataRadius: p.dataRadius) trace "Send message request", dstId = dst.id, kind = MessageKind.ping - return await reqResponse[PingMessage, PongMessage](p, dst, PortalProtocolId, ping) + return await reqResponse[PingMessage, PongMessage]( + p, dst, PortalProtocolId, ping) proc findNode*(p: PortalProtocol, dst: Node, distances: List[uint16, 256]): Future[PortalResult[NodesMessage]] {.async.} = @@ -223,14 +229,16 @@ proc findNode*(p: PortalProtocol, dst: Node, distances: List[uint16, 256]): trace "Send message request", dstId = dst.id, kind = MessageKind.findnode # TODO Add nodes validation - return await reqResponse[FindNodeMessage, NodesMessage](p, dst, PortalProtocolId, fn) + return await reqResponse[FindNodeMessage, NodesMessage]( + p, dst, PortalProtocolId, fn) proc findContent*(p: PortalProtocol, dst: Node, contentKey: ByteList): Future[PortalResult[FoundContentMessage]] {.async.} = let fc = FindContentMessage(contentKey: contentKey) trace "Send message request", dstId = dst.id, kind = MessageKind.findcontent - return await reqResponse[FindContentMessage, FoundContentMessage](p, dst, PortalProtocolId, fc) + return await reqResponse[FindContentMessage, FoundContentMessage]( + p, dst, PortalProtocolId, fc) proc recordsFromBytes(rawRecords: List[ByteList, 32]): seq[Record] = var records: seq[Record] @@ -327,13 +335,14 @@ proc lookup*(p: PortalProtocol, target: NodeId): Future[seq[Node]] {.async.} = p.lastLookup = now(chronos.Moment) return closestNodes -proc handleFoundContentMessage(p: PortalProtocol, m: FoundContentMessage, dst: Node, nodes: var seq[Node]): LookupResult = +proc handleFoundContentMessage(p: PortalProtocol, m: FoundContentMessage, + dst: Node, nodes: var seq[Node]): LookupResult = if (m.enrs.len() != 0 and m.payload.len() == 0): let records = recordsFromBytes(m.enrs) # TODO cannot use verifyNodesRecords(records, destNode, @[0'u16]) as it - # also verify logdistances distances, but with content query those are not + # also verifies logdistances distances, but with content query those are not # used. - # Implement version of verifyNodesRecords wchich do not validate distances. + # Implement version of verifyNodesRecords which do not validate distances. for r in records: let node = newNode(r) if node.isOk(): @@ -360,14 +369,16 @@ proc contentLookupWorker(p: PortalProtocol, destNode: Node, target: ByteList): let contentMessageResponse = await p.findContent(destNode, target) if contentMessageResponse.isOk(): - return handleFoundContentMessage(p, contentMessageResponse.get(), destNode, nodes) + return handleFoundContentMessage( + p, contentMessageResponse.get(), destNode, nodes) else: return LookupResult(kind: Nodes, nodes: nodes) # TODO ContentLookup and Lookup look almost exactly the same, also lookups in other # networks will probably be very similar. Extract lookup function to separate module # and make it more generaic -proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256): Future[Option[ByteList]] {.async.} = +proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256): + Future[Option[ByteList]] {.async.} = ## Perform a lookup for the given target, return the closest n nodes to the ## target. Maximum value for n is `BUCKET_SIZE`. # `closestNodes` holds the k closest nodes to target found, sorted by distance