From 66f9dc9167f5ba8e5b6867be54f3e63052b93797 Mon Sep 17 00:00:00 2001 From: Tanguy Date: Wed, 28 Jun 2023 16:44:58 +0200 Subject: [PATCH] Remove all `Result.get()`s & `Option` -> `Opt` (#902) Co-authored-by: Ludovic Chenut Co-authored-by: Diego --- libp2p/builders.nim | 14 +- libp2p/cid.nim | 5 +- libp2p/connmanager.nim | 2 +- libp2p/crypto/crypto.nim | 9 +- libp2p/debugutils.nim | 99 +++---- libp2p/dialer.nim | 32 +-- libp2p/multiaddress.nim | 30 +- libp2p/nameresolving/nameresolver.nim | 4 +- libp2p/observedaddrmanager.nim | 47 ++- libp2p/peerid.nim | 12 +- libp2p/peerinfo.nim | 13 +- libp2p/peerstore.nim | 28 +- libp2p/protobuf/minprotobuf.nim | 32 +-- .../protocols/connectivity/autonat/client.nim | 23 +- .../protocols/connectivity/autonat/core.nim | 93 +++--- .../protocols/connectivity/autonat/server.nim | 64 ++--- .../connectivity/autonat/service.nim | 22 +- .../protocols/connectivity/dcutr/server.nim | 3 +- .../protocols/connectivity/relay/client.nim | 82 +++--- .../protocols/connectivity/relay/messages.nim | 269 ++++++++---------- libp2p/protocols/connectivity/relay/relay.nim | 73 +++-- .../connectivity/relay/rtransport.nim | 20 +- libp2p/protocols/connectivity/relay/utils.nim | 9 +- libp2p/protocols/identify.nim | 100 +++---- libp2p/protocols/pubsub/gossipsub.nim | 6 +- .../protocols/pubsub/gossipsub/behavior.nim | 34 +-- libp2p/protocols/pubsub/gossipsub/scoring.nim | 25 +- libp2p/protocols/pubsub/pubsub.nim | 10 +- libp2p/protocols/pubsub/pubsubpeer.nim | 18 +- libp2p/protocols/pubsub/rpc/message.nim | 15 +- libp2p/protocols/pubsub/rpc/messages.nim | 17 +- libp2p/protocols/pubsub/rpc/protobuf.nim | 18 +- libp2p/protocols/pubsub/timedcache.nim | 8 +- libp2p/protocols/rendezvous.nim | 259 ++++++++--------- libp2p/protocols/secure/noise.nim | 3 +- libp2p/protocols/secure/secio.nim | 9 +- libp2p/routing_record.nim | 6 +- libp2p/services/hpservice.nim | 6 +- libp2p/signed_envelope.nim | 17 +- libp2p/transports/tortransport.nim | 20 +- libp2p/transports/transport.nim | 5 +- libp2p/utility.nim | 35 ++- libp2p/wire.nim | 4 +- tests/testautonat.nim | 8 +- tests/testautonatservice.nim | 44 +-- tests/testrelayv1.nim | 138 ++++----- tests/testrelayv2.nim | 2 +- tests/testutility.nim | 1 - 48 files changed, 820 insertions(+), 973 deletions(-) diff --git a/libp2p/builders.nim b/libp2p/builders.nim index d8e7d5117..2d38c7d59 100644 --- a/libp2p/builders.nim +++ b/libp2p/builders.nim @@ -54,7 +54,7 @@ type protoVersion: string agentVersion: string nameResolver: NameResolver - peerStoreCapacity: Option[int] + peerStoreCapacity: Opt[int] autonat: bool circuitRelay: Relay rdv: RendezVous @@ -170,7 +170,7 @@ proc withMaxConnsPerPeer*(b: SwitchBuilder, maxConnsPerPeer: int): SwitchBuilder b proc withPeerStore*(b: SwitchBuilder, capacity: int): SwitchBuilder {.public.} = - b.peerStoreCapacity = some(capacity) + b.peerStoreCapacity = Opt.some(capacity) b proc withProtoVersion*(b: SwitchBuilder, protoVersion: string): SwitchBuilder {.public.} = @@ -242,9 +242,9 @@ proc build*(b: SwitchBuilder): Switch if isNil(b.rng): b.rng = newRng() - let peerStore = - if isSome(b.peerStoreCapacity): - PeerStore.new(identify, b.peerStoreCapacity.get()) + let peerStore = block: + b.peerStoreCapacity.withValue(capacity): + PeerStore.new(identify, capacity) else: PeerStore.new(identify) @@ -316,7 +316,7 @@ proc newStandardSwitch*( .withNameResolver(nameResolver) .withNoise() - if privKey.isSome(): - b = b.withPrivateKey(privKey.get()) + privKey.withValue(pkey): + b = b.withPrivateKey(pkey) b.build() diff --git a/libp2p/cid.nim b/libp2p/cid.nim index 195c1b607..970988814 100644 --- a/libp2p/cid.nim +++ b/libp2p/cid.nim @@ -276,9 +276,6 @@ proc `$`*(cid: Cid): string = BTCBase58.encode(cid.data.buffer) elif cid.cidver == CIDv1: let res = MultiBase.encode("base58btc", cid.data.buffer) - if res.isOk(): - res.get() - else: - "" + res.get("") else: "" diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index 619c5b2c0..0ab7d8b12 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -9,7 +9,7 @@ {.push raises: [].} -import std/[options, tables, sequtils, sets] +import std/[tables, sequtils, sets] import pkg/[chronos, chronicles, metrics] import peerinfo, peerstore, diff --git a/libp2p/crypto/crypto.nim b/libp2p/crypto/crypto.nim index 1c8e5c2a9..088322a0c 100644 --- a/libp2p/crypto/crypto.nim +++ b/libp2p/crypto/crypto.nim @@ -468,7 +468,7 @@ proc init*[T: PrivateKey|PublicKey](key: var T, data: openArray[byte]): bool = var pb = initProtoBuffer(@data) let r1 = pb.getField(1, id) let r2 = pb.getField(2, buffer) - if not(r1.isOk() and r1.get() and r2.isOk() and r2.get()): + if not(r1.get(false) and r2.get(false)): false else: if cast[int8](id) notin SupportedSchemesInt or len(buffer) <= 0: @@ -973,9 +973,8 @@ proc decodeProposal*(message: seq[byte], nonce, pubkey: var seq[byte], let r4 = pb.getField(4, ciphers) let r5 = pb.getField(5, hashes) - r1.isOk() and r1.get() and r2.isOk() and r2.get() and - r3.isOk() and r3.get() and r4.isOk() and r4.get() and - r5.isOk() and r5.get() + r1.get(false) and r2.get(false) and r3.get(false) and + r4.get(false) and r5.get(false) proc createExchange*(epubkey, signature: openArray[byte]): seq[byte] = ## Create SecIO exchange message using ephemeral public key ``epubkey`` and @@ -995,7 +994,7 @@ proc decodeExchange*(message: seq[byte], var pb = initProtoBuffer(message) let r1 = pb.getField(1, pubkey) let r2 = pb.getField(2, signature) - r1.isOk() and r1.get() and r2.isOk() and r2.get() + r1.get(false) and r2.get(false) ## Serialization/Deserialization helpers diff --git a/libp2p/debugutils.nim b/libp2p/debugutils.nim index ecc4c1d80..1bec5292e 100644 --- a/libp2p/debugutils.nim +++ b/libp2p/debugutils.nim @@ -25,7 +25,7 @@ ## 5. LocalAddress: optional bytes ## 6. RemoteAddress: optional bytes ## 7. Message: required bytes -import os, options +import os import nimcrypto/utils, stew/endians2 import protobuf/minprotobuf, stream/connection, protocols/secure/secure, multiaddress, peerid, varint, muxers/mplex/coder @@ -33,7 +33,7 @@ import protobuf/minprotobuf, stream/connection, protocols/secure/secure, from times import getTime, toUnix, fromUnix, nanosecond, format, Time, NanosecondRange, initTime from strutils import toHex, repeat -export peerid, options, multiaddress +export peerid, multiaddress type FlowDirection* = enum @@ -43,10 +43,10 @@ type timestamp*: uint64 direction*: FlowDirection message*: seq[byte] - seqID*: Option[uint64] - mtype*: Option[uint64] - local*: Option[MultiAddress] - remote*: Option[MultiAddress] + seqID*: Opt[uint64] + mtype*: Opt[uint64] + local*: Opt[MultiAddress] + remote*: Opt[MultiAddress] const libp2p_dump_dir* {.strdefine.} = "nim-libp2p" @@ -72,7 +72,8 @@ proc dumpMessage*(conn: SecureConn, direction: FlowDirection, var pb = initProtoBuffer(options = {WithVarintLength}) pb.write(2, getTimestamp()) pb.write(4, uint64(direction)) - pb.write(6, conn.observedAddr) + conn.observedAddr.withValue(oaddr): + pb.write(6, oaddr) pb.write(7, data) pb.finish() @@ -100,7 +101,7 @@ proc dumpMessage*(conn: SecureConn, direction: FlowDirection, finally: close(handle) -proc decodeDumpMessage*(data: openArray[byte]): Option[ProtoMessage] = +proc decodeDumpMessage*(data: openArray[byte]): Opt[ProtoMessage] = ## Decode protobuf's message ProtoMessage from array of bytes ``data``. var pb = initProtoBuffer(data) @@ -108,13 +109,12 @@ proc decodeDumpMessage*(data: openArray[byte]): Option[ProtoMessage] = ma1, ma2: MultiAddress pmsg: ProtoMessage - let res2 = pb.getField(2, pmsg.timestamp) - if res2.isErr() or not(res2.get()): - return none[ProtoMessage]() - - let res4 = pb.getField(4, value) - if res4.isErr() or not(res4.get()): - return none[ProtoMessage]() + let + r2 = pb.getField(2, pmsg.timestamp) + r4 = pb.getField(4, value) + r7 = pb.getField(7, pmsg.message) + if not r2.get(false) or not r4.get(false) or not r7.get(false): + return Opt.none(ProtoMessage) # `case` statement could not work here with an error "selector must be of an # ordinal type, float or string" @@ -124,30 +124,27 @@ proc decodeDumpMessage*(data: openArray[byte]): Option[ProtoMessage] = elif value == uint64(Incoming): Incoming else: - return none[ProtoMessage]() + return Opt.none(ProtoMessage) - let res7 = pb.getField(7, pmsg.message) - if res7.isErr() or not(res7.get()): - return none[ProtoMessage]() + let r1 = pb.getField(1, value) + if r1.get(false): + pmsg.seqID = Opt.some(value) - value = 0'u64 - let res1 = pb.getField(1, value) - if res1.isOk() and res1.get(): - pmsg.seqID = some(value) - value = 0'u64 - let res3 = pb.getField(3, value) - if res3.isOk() and res3.get(): - pmsg.mtype = some(value) - let res5 = pb.getField(5, ma1) - if res5.isOk() and res5.get(): - pmsg.local = some(ma1) - let res6 = pb.getField(6, ma2) - if res6.isOk() and res6.get(): - pmsg.remote = some(ma2) + let r3 = pb.getField(3, value) + if r3.get(false): + pmsg.mtype = Opt.some(value) - some(pmsg) + let + r5 = pb.getField(5, ma1) + r6 = pb.getField(6, ma2) + if r5.get(false): + pmsg.local = Opt.some(ma1) + if r6.get(false): + pmsg.remote = Opt.some(ma2) -iterator messages*(data: seq[byte]): Option[ProtoMessage] = + Opt.some(pmsg) + +iterator messages*(data: seq[byte]): Opt[ProtoMessage] = ## Iterate over sequence of bytes and decode all the ``ProtoMessage`` ## messages we found. var value: uint64 @@ -242,27 +239,19 @@ proc toString*(msg: ProtoMessage, dump = true): string = " >> " let address = block: - let local = - if msg.local.isSome(): - "[" & $(msg.local.get()) & "]" - else: - "[LOCAL]" - let remote = - if msg.remote.isSome(): - "[" & $(msg.remote.get()) & "]" - else: - "[REMOTE]" + let local = block: + msg.local.withValue(loc): "[" & $loc & "]" + else: "[LOCAL]" + let remote = block: + msg.remote.withValue(rem): "[" & $rem & "]" + else: "[REMOTE]" local & direction & remote - let seqid = - if msg.seqID.isSome(): - "seqID = " & $(msg.seqID.get()) & " " - else: - "" - let mtype = - if msg.mtype.isSome(): - "type = " & $(msg.mtype.get()) & " " - else: - "" + let seqid = block: + msg.seqID.wihValue(seqid): "seqID = " & $seqid & " " + else: "" + let mtype = block: + msg.mtype.withValue(typ): "type = " & $typ & " " + else: "" res.add(" ") res.add(address) res.add(" ") diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index 2ff052c7c..59c0c0ca6 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -150,7 +150,7 @@ proc dialAndUpgrade( if not isNil(result): return result -proc tryReusingConnection(self: Dialer, peerId: PeerId): Future[Opt[Muxer]] {.async.} = +proc tryReusingConnection(self: Dialer, peerId: PeerId): Opt[Muxer] = let muxer = self.connManager.selectMuxer(peerId) if muxer == nil: return Opt.none(Muxer) @@ -174,10 +174,10 @@ proc internalConnect( try: await lock.acquire() - if peerId.isSome and reuseConnection: - let muxOpt = await self.tryReusingConnection(peerId.get()) - if muxOpt.isSome: - return muxOpt.get() + if reuseConnection: + peerId.withValue(peerId): + self.tryReusingConnection(peerId).withValue(mux): + return mux let slot = self.connManager.getOutgoingSlot(forceDial) let muxed = @@ -225,20 +225,20 @@ method connect*( allowUnknownPeerId = false): Future[PeerId] {.async.} = ## Connects to a peer and retrieve its PeerId - let fullAddress = parseFullAddress(address) - if fullAddress.isOk: + parseFullAddress(address).toOpt().withValue(fullAddress): return (await self.internalConnect( - Opt.some(fullAddress.get()[0]), - @[fullAddress.get()[1]], - false)).connection.peerId - else: - if allowUnknownPeerId == false: - raise newException(DialFailedError, "Address without PeerID and unknown peer id disabled!") - return (await self.internalConnect( - Opt.none(PeerId), - @[address], + Opt.some(fullAddress[0]), + @[fullAddress[1]], false)).connection.peerId + if allowUnknownPeerId == false: + raise newException(DialFailedError, "Address without PeerID and unknown peer id disabled!") + + return (await self.internalConnect( + Opt.none(PeerId), + @[address], + false)).connection.peerId + proc negotiateStream( self: Dialer, conn: Connection, diff --git a/libp2p/multiaddress.nim b/libp2p/multiaddress.nim index 14a32e7cc..02f5c4803 100644 --- a/libp2p/multiaddress.nim +++ b/libp2p/multiaddress.nim @@ -1080,19 +1080,15 @@ proc matchPart(pat: MaPattern, protos: seq[MultiCodec]): MaPatResult = proc match*(pat: MaPattern, address: MultiAddress): bool = ## Match full ``address`` using pattern ``pat`` and return ``true`` if ## ``address`` satisfies pattern. - let protos = address.protocols() - if protos.isErr(): - return false - let res = matchPart(pat, protos.get()) + let protos = address.protocols().valueOr: return false + let res = matchPart(pat, protos) res.flag and (len(res.rem) == 0) proc matchPartial*(pat: MaPattern, address: MultiAddress): bool = ## Match prefix part of ``address`` using pattern ``pat`` and return ## ``true`` if ``address`` starts with pattern. - let protos = address.protocols() - if protos.isErr(): - return false - let res = matchPart(pat, protos.get()) + let protos = address.protocols().valueOr: return false + let res = matchPart(pat, protos) res.flag proc `$`*(pat: MaPattern): string = @@ -1121,12 +1117,8 @@ proc getField*(pb: ProtoBuffer, field: int, if not(res): ok(false) else: - let ma = MultiAddress.init(buffer) - if ma.isOk(): - value = ma.get() - ok(true) - else: - err(ProtoError.IncorrectBlob) + value = MultiAddress.init(buffer).valueOr: return err(ProtoError.IncorrectBlob) + ok(true) proc getRepeatedField*(pb: ProtoBuffer, field: int, value: var seq[MultiAddress]): ProtoResult[bool] {. @@ -1142,11 +1134,11 @@ proc getRepeatedField*(pb: ProtoBuffer, field: int, ok(false) else: for item in items: - let ma = MultiAddress.init(item) - if ma.isOk(): - value.add(ma.get()) - else: - debug "Not supported MultiAddress in blob", ma = item + let ma = MultiAddress.init(item).valueOr: + debug "Unsupported MultiAddress in blob", ma = item + continue + + value.add(ma) if value.len == 0: err(ProtoError.IncorrectBlob) else: diff --git a/libp2p/nameresolving/nameresolver.nim b/libp2p/nameresolving/nameresolver.nim index cb58ffadc..015901d61 100644 --- a/libp2p/nameresolving/nameresolver.nim +++ b/libp2p/nameresolving/nameresolver.nim @@ -118,7 +118,7 @@ proc resolveMAddress*( if not DNS.matchPartial(address): res.incl(address) else: - let code = address[0].get().protoCode().get() + let code = address[0].tryGet().protoCode().tryGet() let seq = case code: of multiCodec("dns"): await self.resolveOneAddress(address) @@ -129,7 +129,7 @@ proc resolveMAddress*( of multiCodec("dnsaddr"): await self.resolveDnsAddr(address) else: - doAssert false + assert false @[address] for ad in seq: res.incl(ad) diff --git a/libp2p/observedaddrmanager.nim b/libp2p/observedaddrmanager.nim index b84e905e2..e1644aba1 100644 --- a/libp2p/observedaddrmanager.nim +++ b/libp2p/observedaddrmanager.nim @@ -9,10 +9,9 @@ {.push raises: [].} -import - std/[sequtils, tables], - chronos, chronicles, - multiaddress, multicodec +import std/[sequtils, tables, sugar] +import chronos +import multiaddress, multicodec type ## Manages observed MultiAddresses by reomte peers. It keeps track of the most observed IP and IP/Port. @@ -33,14 +32,16 @@ proc getProtocol(self: ObservedAddrManager, observations: seq[MultiAddress], mul countTable.sort() var orderedPairs = toSeq(countTable.pairs) for (ma, count) in orderedPairs: - let maFirst = ma[0].get() - if maFirst.protoCode.get() == multiCodec and count >= self.minCount: + let protoCode = (ma[0].flatMap(protoCode)).valueOr: continue + if protoCode == multiCodec and count >= self.minCount: return Opt.some(ma) return Opt.none(MultiAddress) proc getMostObservedProtocol(self: ObservedAddrManager, multiCodec: MultiCodec): Opt[MultiAddress] = ## Returns the most observed IP address or none if the number of observations are less than minCount. - let observedIPs = self.observedIPsAndPorts.mapIt(it[0].get()) + let observedIPs = collect: + for observedIp in self.observedIPsAndPorts: + observedIp[0].valueOr: continue return self.getProtocol(observedIPs, multiCodec) proc getMostObservedProtoAndPort(self: ObservedAddrManager, multiCodec: MultiCodec): Opt[MultiAddress] = @@ -51,34 +52,24 @@ proc getMostObservedProtosAndPorts*(self: ObservedAddrManager): seq[MultiAddress ## Returns the most observed IP4/Port and IP6/Port address or an empty seq if the number of observations ## are less than minCount. var res: seq[MultiAddress] - let ip4 = self.getMostObservedProtoAndPort(multiCodec("ip4")) - if ip4.isSome(): - res.add(ip4.get()) - let ip6 = self.getMostObservedProtoAndPort(multiCodec("ip6")) - if ip6.isSome(): - res.add(ip6.get()) + self.getMostObservedProtoAndPort(multiCodec("ip4")).withValue(ip4): + res.add(ip4) + self.getMostObservedProtoAndPort(multiCodec("ip6")).withValue(ip6): + res.add(ip6) return res proc guessDialableAddr*( self: ObservedAddrManager, ma: MultiAddress): MultiAddress = - ## Replaces the first proto valeu of each listen address by the corresponding (matching the proto code) most observed value. + ## Replaces the first proto value of each listen address by the corresponding (matching the proto code) most observed value. ## If the most observed value is not available, the original MultiAddress is returned. - try: - let maFirst = ma[0] - let maRest = ma[1..^1] - if maRest.isErr(): - return ma + let + maFirst = ma[0].valueOr: return ma + maRest = ma[1..^1].valueOr: return ma + maFirstProto = maFirst.protoCode().valueOr: return ma - let observedIP = self.getMostObservedProtocol(maFirst.get().protoCode().get()) - return - if observedIP.isNone() or maFirst.get() == observedIP.get(): - ma - else: - observedIP.get() & maRest.get() - except CatchableError as error: - debug "Error while handling manual port forwarding", msg = error.msg - return ma + let observedIP = self.getMostObservedProtocol(maFirstProto).valueOr: return ma + return concat(observedIP, maRest).valueOr: ma proc `$`*(self: ObservedAddrManager): string = ## Returns a string representation of the ObservedAddrManager. diff --git a/libp2p/peerid.nim b/libp2p/peerid.nim index c705d2e97..4f7e6f9d1 100644 --- a/libp2p/peerid.nim +++ b/libp2p/peerid.nim @@ -185,19 +185,11 @@ proc random*(t: typedesc[PeerId], rng = newRng()): Result[PeerId, cstring] = func match*(pid: PeerId, pubkey: PublicKey): bool = ## Returns ``true`` if ``pid`` matches public key ``pubkey``. - let p = PeerId.init(pubkey) - if p.isErr: - false - else: - pid == p.get() + PeerId.init(pubkey) == Result[PeerId, cstring].ok(pid) func match*(pid: PeerId, seckey: PrivateKey): bool = ## Returns ``true`` if ``pid`` matches private key ``seckey``. - let p = PeerId.init(seckey) - if p.isErr: - false - else: - pid == p.get() + PeerId.init(seckey) == Result[PeerId, cstring].ok(pid) ## Serialization/Deserialization helpers diff --git a/libp2p/peerinfo.nim b/libp2p/peerinfo.nim index e017bbb63..a31f42eb5 100644 --- a/libp2p/peerinfo.nim +++ b/libp2p/peerinfo.nim @@ -10,7 +10,7 @@ {.push raises: [].} {.push public.} -import std/[options, sequtils] +import std/sequtils import pkg/[chronos, chronicles, stew/results] import peerid, multiaddress, multicodec, crypto/crypto, routing_record, errors, utility @@ -53,15 +53,12 @@ proc update*(p: PeerInfo) {.async.} = for mapper in p.addressMappers: p.addrs = await mapper(p.addrs) - let sprRes = SignedPeerRecord.init( + p.signedPeerRecord = SignedPeerRecord.init( p.privateKey, PeerRecord.init(p.peerId, p.addrs) - ) - if sprRes.isOk: - p.signedPeerRecord = sprRes.get() - else: - discard - #info "Can't update the signed peer record" + ).valueOr(): + info "Can't update the signed peer record" + return proc addrs*(p: PeerInfo): seq[MultiAddress] = p.addrs diff --git a/libp2p/peerstore.nim b/libp2p/peerstore.nim index 5c4a212f2..41698b75d 100644 --- a/libp2p/peerstore.nim +++ b/libp2p/peerstore.nim @@ -16,7 +16,7 @@ runnableExamples: # Create a custom book type type MoodBook = ref object of PeerBook[string] - var somePeerId = PeerId.random().get() + var somePeerId = PeerId.random().expect("get random key") peerStore[MoodBook][somePeerId] = "Happy" doAssert peerStore[MoodBook][somePeerId] == "Happy" @@ -158,20 +158,20 @@ proc updatePeerInfo*( if info.addrs.len > 0: peerStore[AddressBook][info.peerId] = info.addrs - if info.pubkey.isSome: - peerStore[KeyBook][info.peerId] = info.pubkey.get() + info.pubkey.withValue(pubkey): + peerStore[KeyBook][info.peerId] = pubkey - if info.agentVersion.isSome: - peerStore[AgentBook][info.peerId] = info.agentVersion.get().string + info.agentVersion.withValue(agentVersion): + peerStore[AgentBook][info.peerId] = agentVersion.string - if info.protoVersion.isSome: - peerStore[ProtoVersionBook][info.peerId] = info.protoVersion.get().string + info.protoVersion.withValue(protoVersion): + peerStore[ProtoVersionBook][info.peerId] = protoVersion.string if info.protos.len > 0: peerStore[ProtoBook][info.peerId] = info.protos - if info.signedPeerRecord.isSome: - peerStore[SPRBook][info.peerId] = info.signedPeerRecord.get() + info.signedPeerRecord.withValue(signedPeerRecord): + peerStore[SPRBook][info.peerId] = signedPeerRecord let cleanupPos = peerStore.toClean.find(info.peerId) if cleanupPos >= 0: @@ -207,11 +207,11 @@ proc identify*( let info = await peerStore.identify.identify(stream, stream.peerId) when defined(libp2p_agents_metrics): - var knownAgent = "unknown" - if info.agentVersion.isSome and info.agentVersion.get().len > 0: - let shortAgent = info.agentVersion.get().split("/")[0].safeToLowerAscii() - if shortAgent.isOk() and KnownLibP2PAgentsSeq.contains(shortAgent.get()): - knownAgent = shortAgent.get() + var + knownAgent = "unknown" + shortAgent = info.agentVersion.get("").split("/")[0].safeToLowerAscii().get("") + if KnownLibP2PAgentsSeq.contains(shortAgent): + knownAgent = shortAgent muxer.connection.setShortAgent(knownAgent) peerStore.updatePeerInfo(info) diff --git a/libp2p/protobuf/minprotobuf.nim b/libp2p/protobuf/minprotobuf.nim index fa96175d0..31c98d1cf 100644 --- a/libp2p/protobuf/minprotobuf.nim +++ b/libp2p/protobuf/minprotobuf.nim @@ -576,26 +576,18 @@ proc getField*[T: seq[byte]|string](data: ProtoBuffer, field: int, proc getField*(pb: ProtoBuffer, field: int, output: var ProtoBuffer): ProtoResult[bool] {.inline.} = var buffer: seq[byte] - let res = pb.getField(field, buffer) - if res.isOk(): - if res.get(): - output = initProtoBuffer(buffer) - ok(true) - else: - ok(false) + if ? pb.getField(field, buffer): + output = initProtoBuffer(buffer) + ok(true) else: - err(res.error) + ok(false) proc getRequiredField*[T](pb: ProtoBuffer, field: int, output: var T): ProtoResult[void] {.inline.} = - let res = pb.getField(field, output) - if res.isOk(): - if res.get(): - ok() - else: - err(RequiredFieldMissing) + if ? pb.getField(field, output): + ok() else: - err(res.error) + err(RequiredFieldMissing) proc getRepeatedField*[T: seq[byte]|string](data: ProtoBuffer, field: int, output: var seq[T]): ProtoResult[bool] = @@ -675,14 +667,10 @@ proc getRepeatedField*[T: ProtoScalar](data: ProtoBuffer, field: int, proc getRequiredRepeatedField*[T](pb: ProtoBuffer, field: int, output: var seq[T]): ProtoResult[void] {.inline.} = - let res = pb.getRepeatedField(field, output) - if res.isOk(): - if res.get(): - ok() - else: - err(RequiredFieldMissing) + if ? pb.getRepeatedField(field, output): + ok() else: - err(res.error) + err(RequiredFieldMissing) proc getPackedRepeatedField*[T: ProtoScalar](data: ProtoBuffer, field: int, output: var seq[T]): ProtoResult[bool] = diff --git a/libp2p/protocols/connectivity/autonat/client.nim b/libp2p/protocols/connectivity/autonat/client.nim index cdc8b9917..81a4efe2c 100644 --- a/libp2p/protocols/connectivity/autonat/client.nim +++ b/libp2p/protocols/connectivity/autonat/client.nim @@ -9,7 +9,6 @@ {.push raises: [].} -import std/options import stew/results import chronos, chronicles import ../../../switch, @@ -24,8 +23,8 @@ type AutonatClient* = ref object of RootObj proc sendDial(conn: Connection, pid: PeerId, addrs: seq[MultiAddress]) {.async.} = - let pb = AutonatDial(peerInfo: some(AutonatPeerInfo( - id: some(pid), + let pb = AutonatDial(peerInfo: Opt.some(AutonatPeerInfo( + id: Opt.some(pid), addrs: addrs ))).encode() await conn.writeLp(pb.buffer) @@ -33,15 +32,13 @@ proc sendDial(conn: Connection, pid: PeerId, addrs: seq[MultiAddress]) {.async.} method dialMe*(self: AutonatClient, switch: Switch, pid: PeerId, addrs: seq[MultiAddress] = newSeq[MultiAddress]()): Future[MultiAddress] {.base, async.} = - proc getResponseOrRaise(autonatMsg: Option[AutonatMsg]): AutonatDialResponse {.raises: [AutonatError].} = - if autonatMsg.isNone() or - autonatMsg.get().msgType != DialResponse or - autonatMsg.get().response.isNone() or - (autonatMsg.get().response.get().status == Ok and - autonatMsg.get().response.get().ma.isNone()): - raise newException(AutonatError, "Unexpected response") - else: - autonatMsg.get().response.get() + proc getResponseOrRaise(autonatMsg: Opt[AutonatMsg]): AutonatDialResponse {.raises: [AutonatError].} = + autonatMsg.withValue(msg): + if msg.msgType == DialResponse: + msg.response.withValue(res): + if not (res.status == Ok and res.ma.isNone()): + return res + raise newException(AutonatError, "Unexpected response") let conn = try: @@ -66,7 +63,7 @@ method dialMe*(self: AutonatClient, switch: Switch, pid: PeerId, addrs: seq[Mult let response = getResponseOrRaise(AutonatMsg.decode(await conn.readLp(1024))) return case response.status: of ResponseStatus.Ok: - response.ma.get() + response.ma.tryGet() of ResponseStatus.DialError: raise newException(AutonatUnreachableError, "Peer could not dial us back: " & response.text.get("")) else: diff --git a/libp2p/protocols/connectivity/autonat/core.nim b/libp2p/protocols/connectivity/autonat/core.nim index e97bc0af3..bfb9beaaa 100644 --- a/libp2p/protocols/connectivity/autonat/core.nim +++ b/libp2p/protocols/connectivity/autonat/core.nim @@ -9,7 +9,6 @@ {.push raises: [].} -import std/[options] import stew/[results, objects] import chronos, chronicles import ../../../multiaddress, @@ -39,29 +38,29 @@ type InternalError = 300 AutonatPeerInfo* = object - id*: Option[PeerId] + id*: Opt[PeerId] addrs*: seq[MultiAddress] AutonatDial* = object - peerInfo*: Option[AutonatPeerInfo] + peerInfo*: Opt[AutonatPeerInfo] AutonatDialResponse* = object status*: ResponseStatus - text*: Option[string] - ma*: Option[MultiAddress] + text*: Opt[string] + ma*: Opt[MultiAddress] AutonatMsg* = object msgType*: MsgType - dial*: Option[AutonatDial] - response*: Option[AutonatDialResponse] + dial*: Opt[AutonatDial] + response*: Opt[AutonatDialResponse] NetworkReachability* {.pure.} = enum Unknown, NotReachable, Reachable proc encode(p: AutonatPeerInfo): ProtoBuffer = result = initProtoBuffer() - if p.id.isSome(): - result.write(1, p.id.get()) + p.id.withValue(id): + result.write(1, id) for ma in p.addrs: result.write(2, ma.data.buffer) result.finish() @@ -70,8 +69,8 @@ proc encode*(d: AutonatDial): ProtoBuffer = result = initProtoBuffer() result.write(1, MsgType.Dial.uint) var dial = initProtoBuffer() - if d.peerInfo.isSome(): - dial.write(1, encode(d.peerInfo.get())) + d.peerInfo.withValue(pinfo): + dial.write(1, encode(pinfo)) dial.finish() result.write(2, dial.buffer) result.finish() @@ -81,72 +80,60 @@ proc encode*(r: AutonatDialResponse): ProtoBuffer = result.write(1, MsgType.DialResponse.uint) var bufferResponse = initProtoBuffer() bufferResponse.write(1, r.status.uint) - if r.text.isSome(): - bufferResponse.write(2, r.text.get()) - if r.ma.isSome(): - bufferResponse.write(3, r.ma.get()) + r.text.withValue(text): + bufferResponse.write(2, text) + r.ma.withValue(ma): + bufferResponse.write(3, ma) bufferResponse.finish() result.write(3, bufferResponse.buffer) result.finish() proc encode*(msg: AutonatMsg): ProtoBuffer = - if msg.dial.isSome(): - return encode(msg.dial.get()) - if msg.response.isSome(): - return encode(msg.response.get()) + msg.dial.withValue(dial): + return encode(dial) + msg.response.withValue(res): + return encode(res) -proc decode*(_: typedesc[AutonatMsg], buf: seq[byte]): Option[AutonatMsg] = +proc decode*(_: typedesc[AutonatMsg], buf: seq[byte]): Opt[AutonatMsg] = var msgTypeOrd: uint32 pbDial: ProtoBuffer pbResponse: ProtoBuffer msg: AutonatMsg - let - pb = initProtoBuffer(buf) - r1 = pb.getField(1, msgTypeOrd) - r2 = pb.getField(2, pbDial) - r3 = pb.getField(3, pbResponse) - if r1.isErr() or r2.isErr() or r3.isErr(): return none(AutonatMsg) + let pb = initProtoBuffer(buf) - if r1.get() and not checkedEnumAssign(msg.msgType, msgTypeOrd): - return none(AutonatMsg) - if r2.get(): + if ? pb.getField(1, msgTypeOrd).toOpt() and not checkedEnumAssign(msg.msgType, msgTypeOrd): + return Opt.none(AutonatMsg) + if ? pb.getField(2, pbDial).toOpt(): var pbPeerInfo: ProtoBuffer dial: AutonatDial - let - r4 = pbDial.getField(1, pbPeerInfo) - if r4.isErr(): return none(AutonatMsg) + let r4 = ? pbDial.getField(1, pbPeerInfo).toOpt() var peerInfo: AutonatPeerInfo - if r4.get(): + if r4: var pid: PeerId let - r5 = pbPeerInfo.getField(1, pid) - r6 = pbPeerInfo.getRepeatedField(2, peerInfo.addrs) - if r5.isErr() or r6.isErr(): return none(AutonatMsg) - if r5.get(): peerInfo.id = some(pid) - dial.peerInfo = some(peerInfo) - msg.dial = some(dial) + r5 = ? pbPeerInfo.getField(1, pid).toOpt() + r6 = ? pbPeerInfo.getRepeatedField(2, peerInfo.addrs).toOpt() + if r5: peerInfo.id = Opt.some(pid) + dial.peerInfo = Opt.some(peerInfo) + msg.dial = Opt.some(dial) - if r3.get(): + if ? pb.getField(3, pbResponse).toOpt(): var statusOrd: uint text: string ma: MultiAddress response: AutonatDialResponse - let - r4 = pbResponse.getField(1, statusOrd) - r5 = pbResponse.getField(2, text) - r6 = pbResponse.getField(3, ma) - - if r4.isErr() or r5.isErr() or r6.isErr() or - (r4.get() and not checkedEnumAssign(response.status, statusOrd)): - return none(AutonatMsg) - if r5.get(): response.text = some(text) - if r6.get(): response.ma = some(ma) - msg.response = some(response) - - return some(msg) + if ? pbResponse.getField(1, statusOrd).optValue(): + if not checkedEnumAssign(response.status, statusOrd): + return Opt.none(AutonatMsg) + if ? pbResponse.getField(2, text).optValue(): + response.text = Opt.some(text) + if ? pbResponse.getField(3, ma).optValue(): + response.ma = Opt.some(ma) + msg.response = Opt.some(response) + return Opt.some(msg) diff --git a/libp2p/protocols/connectivity/autonat/server.nim b/libp2p/protocols/connectivity/autonat/server.nim index 82641ea26..a15f0b249 100644 --- a/libp2p/protocols/connectivity/autonat/server.nim +++ b/libp2p/protocols/connectivity/autonat/server.nim @@ -9,7 +9,7 @@ {.push raises: [].} -import std/[options, sets, sequtils] +import std/[sets, sequtils] import stew/results import chronos, chronicles import ../../protocol, @@ -33,8 +33,8 @@ type dialTimeout: Duration proc sendDial(conn: Connection, pid: PeerId, addrs: seq[MultiAddress]) {.async.} = - let pb = AutonatDial(peerInfo: some(AutonatPeerInfo( - id: some(pid), + let pb = AutonatDial(peerInfo: Opt.some(AutonatPeerInfo( + id: Opt.some(pid), addrs: addrs ))).encode() await conn.writeLp(pb.buffer) @@ -42,16 +42,16 @@ proc sendDial(conn: Connection, pid: PeerId, addrs: seq[MultiAddress]) {.async.} proc sendResponseError(conn: Connection, status: ResponseStatus, text: string = "") {.async.} = let pb = AutonatDialResponse( status: status, - text: if text == "": none(string) else: some(text), - ma: none(MultiAddress) + text: if text == "": Opt.none(string) else: Opt.some(text), + ma: Opt.none(MultiAddress) ).encode() await conn.writeLp(pb.buffer) proc sendResponseOk(conn: Connection, ma: MultiAddress) {.async.} = let pb = AutonatDialResponse( status: ResponseStatus.Ok, - text: some("Ok"), - ma: some(ma) + text: Opt.some("Ok"), + ma: Opt.some(ma) ).encode() await conn.writeLp(pb.buffer) @@ -70,8 +70,8 @@ proc tryDial(autonat: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.asy futs = addrs.mapIt(autonat.switch.dialer.tryDial(conn.peerId, @[it])) let fut = await anyCompleted(futs).wait(autonat.dialTimeout) let ma = await fut - if ma.isSome: - await conn.sendResponseOk(ma.get()) + ma.withValue(maddr): + await conn.sendResponseOk(maddr) else: await conn.sendResponseError(DialError, "Missing observed address") except CancelledError as exc: @@ -92,42 +92,40 @@ proc tryDial(autonat: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.asy f.cancel() proc handleDial(autonat: Autonat, conn: Connection, msg: AutonatMsg): Future[void] = - if msg.dial.isNone() or msg.dial.get().peerInfo.isNone(): + let dial = msg.dial.valueOr: + return conn.sendResponseError(BadRequest, "Missing Dial") + let peerInfo = dial.peerInfo.valueOr: return conn.sendResponseError(BadRequest, "Missing Peer Info") - let peerInfo = msg.dial.get().peerInfo.get() - if peerInfo.id.isSome() and peerInfo.id.get() != conn.peerId: - return conn.sendResponseError(BadRequest, "PeerId mismatch") + peerInfo.id.withValue(id): + if id != conn.peerId: + return conn.sendResponseError(BadRequest, "PeerId mismatch") - if conn.observedAddr.isNone: + let observedAddr = conn.observedAddr.valueOr: return conn.sendResponseError(BadRequest, "Missing observed address") - let observedAddr = conn.observedAddr.get() - var isRelayed = observedAddr.contains(multiCodec("p2p-circuit")) - if isRelayed.isErr() or isRelayed.get(): + var isRelayed = observedAddr.contains(multiCodec("p2p-circuit")).valueOr: + return conn.sendResponseError(DialRefused, "Invalid observed address") + if isRelayed: return conn.sendResponseError(DialRefused, "Refused to dial a relayed observed address") - let hostIp = observedAddr[0] - if hostIp.isErr() or not IP.match(hostIp.get()): - trace "wrong observed address", address=observedAddr + let hostIp = observedAddr[0].valueOr: + return conn.sendResponseError(InternalError, "Wrong observed address") + if not IP.match(hostIp): return conn.sendResponseError(InternalError, "Expected an IP address") var addrs = initHashSet[MultiAddress]() addrs.incl(observedAddr) trace "addrs received", addrs = peerInfo.addrs for ma in peerInfo.addrs: - isRelayed = ma.contains(multiCodec("p2p-circuit")) - if isRelayed.isErr() or isRelayed.get(): - continue - let maFirst = ma[0] - if maFirst.isErr() or not DNS_OR_IP.match(maFirst.get()): - continue + isRelayed = ma.contains(multiCodec("p2p-circuit")).valueOr: continue + let maFirst = ma[0].valueOr: continue + if not DNS_OR_IP.match(maFirst): continue try: addrs.incl( - if maFirst.get() == hostIp.get(): + if maFirst == hostIp: ma else: - let maEnd = ma[1..^1] - if maEnd.isErr(): continue - hostIp.get() & maEnd.get() + let maEnd = ma[1..^1].valueOr: continue + hostIp & maEnd ) except LPError as exc: continue @@ -144,10 +142,10 @@ proc new*(T: typedesc[Autonat], switch: Switch, semSize: int = 1, dialTimeout = let autonat = T(switch: switch, sem: newAsyncSemaphore(semSize), dialTimeout: dialTimeout) proc handleStream(conn: Connection, proto: string) {.async, gcsafe.} = try: - let msgOpt = AutonatMsg.decode(await conn.readLp(1024)) - if msgOpt.isNone() or msgOpt.get().msgType != MsgType.Dial: + let msg = AutonatMsg.decode(await conn.readLp(1024)).valueOr: raise newException(AutonatError, "Received malformed message") - let msg = msgOpt.get() + if msg.msgType != MsgType.Dial: + raise newException(AutonatError, "Message type should be dial") await autonat.handleDial(conn, msg) except CancelledError as exc: raise exc diff --git a/libp2p/protocols/connectivity/autonat/service.nim b/libp2p/protocols/connectivity/autonat/service.nim index 33a7b20e9..7726c6b11 100644 --- a/libp2p/protocols/connectivity/autonat/service.nim +++ b/libp2p/protocols/connectivity/autonat/service.nim @@ -9,7 +9,7 @@ {.push raises: [].} -import std/[options, deques, sequtils] +import std/[deques, sequtils] import chronos, metrics import ../../../switch import ../../../wire @@ -18,7 +18,7 @@ from core import NetworkReachability, AutonatUnreachableError import ../../../utils/heartbeat import ../../../crypto/crypto -export options, core.NetworkReachability +export core.NetworkReachability logScope: topics = "libp2p autonatservice" @@ -31,12 +31,12 @@ type addressMapper: AddressMapper scheduleHandle: Future[void] networkReachability*: NetworkReachability - confidence: Option[float] + confidence: Opt[float] answers: Deque[NetworkReachability] autonatClient: AutonatClient statusAndConfidenceHandler: StatusAndConfidenceHandler rng: ref HmacDrbgContext - scheduleInterval: Option[Duration] + scheduleInterval: Opt[Duration] askNewConnectedPeers: bool numPeersToAsk: int maxQueueSize: int @@ -44,13 +44,13 @@ type dialTimeout: Duration enableAddressMapper: bool - StatusAndConfidenceHandler* = proc (networkReachability: NetworkReachability, confidence: Option[float]): Future[void] {.gcsafe, raises: [].} + StatusAndConfidenceHandler* = proc (networkReachability: NetworkReachability, confidence: Opt[float]): Future[void] {.gcsafe, raises: [].} proc new*( T: typedesc[AutonatService], autonatClient: AutonatClient, rng: ref HmacDrbgContext, - scheduleInterval: Option[Duration] = none(Duration), + scheduleInterval: Opt[Duration] = Opt.none(Duration), askNewConnectedPeers = true, numPeersToAsk: int = 5, maxQueueSize: int = 10, @@ -60,7 +60,7 @@ proc new*( return T( scheduleInterval: scheduleInterval, networkReachability: Unknown, - confidence: none(float), + confidence: Opt.none(float), answers: initDeque[NetworkReachability](), autonatClient: autonatClient, rng: rng, @@ -95,14 +95,14 @@ proc handleAnswer(self: AutonatService, ans: NetworkReachability): Future[bool] self.answers.addLast(ans) self.networkReachability = Unknown - self.confidence = none(float) + self.confidence = Opt.none(float) const reachabilityPriority = [Reachable, NotReachable] for reachability in reachabilityPriority: let confidence = self.answers.countIt(it == reachability) / self.maxQueueSize libp2p_autonat_reachability_confidence.set(value = confidence, labelValues = [$reachability]) if self.confidence.isNone and confidence >= self.minConfidence: self.networkReachability = reachability - self.confidence = some(confidence) + self.confidence = Opt.some(confidence) debug "Current status", currentStats = $self.networkReachability, confidence = $self.confidence, answers = self.answers @@ -189,8 +189,8 @@ method setup*(self: AutonatService, switch: Switch): Future[bool] {.async.} = self.newConnectedPeerHandler = proc (peerId: PeerId, event: PeerEvent): Future[void] {.async.} = discard askPeer(self, switch, peerId) switch.connManager.addPeerEventHandler(self.newConnectedPeerHandler, PeerEventKind.Joined) - if self.scheduleInterval.isSome(): - self.scheduleHandle = schedule(self, switch, self.scheduleInterval.get()) + self.scheduleInterval.withValue(interval): + self.scheduleHandle = schedule(self, switch, interval) if self.enableAddressMapper: switch.peerInfo.addressMappers.add(self.addressMapper) return hasBeenSetup diff --git a/libp2p/protocols/connectivity/dcutr/server.nim b/libp2p/protocols/connectivity/dcutr/server.nim index 4b5a032a4..ecd4d15f0 100644 --- a/libp2p/protocols/connectivity/dcutr/server.nim +++ b/libp2p/protocols/connectivity/dcutr/server.nim @@ -9,8 +9,7 @@ {.push raises: [].} -import std/[options, sets, sequtils] - +import std/[sets, sequtils] import stew/[results, objects] import chronos, chronicles diff --git a/libp2p/protocols/connectivity/relay/client.nim b/libp2p/protocols/connectivity/relay/client.nim index 94d794eb5..5e1537a9b 100644 --- a/libp2p/protocols/connectivity/relay/client.nim +++ b/libp2p/protocols/connectivity/relay/client.nim @@ -9,10 +9,8 @@ {.push raises: [].} -import times, options - +import times import chronos, chronicles - import ./relay, ./messages, ./rconn, @@ -22,8 +20,6 @@ import ./relay, ../../../multiaddress, ../../../stream/connection -export options - logScope: topics = "libp2p relay relay-client" @@ -44,28 +40,27 @@ type Rsvp* = object expire*: uint64 # required, Unix expiration time (UTC) addrs*: seq[MultiAddress] # relay address for reserving peer - voucher*: Option[Voucher] # optional, reservation voucher + voucher*: Opt[Voucher] # optional, reservation voucher limitDuration*: uint32 # seconds limitData*: uint64 # bytes proc sendStopError(conn: Connection, code: StatusV2) {.async.} = trace "send stop status", status = $code & " (" & $ord(code) & ")" - let msg = StopMessage(msgType: StopMessageType.Status, status: some(code)) + let msg = StopMessage(msgType: StopMessageType.Status, status: Opt.some(code)) await conn.writeLp(encode(msg).buffer) proc handleRelayedConnect(cl: RelayClient, conn: Connection, msg: StopMessage) {.async.} = - if msg.peer.isNone(): - await sendStopError(conn, MalformedMessage) - return let # TODO: check the go version to see in which way this could fail # it's unclear in the spec - src = msg.peer.get() + src = msg.peer.valueOr: + await sendStopError(conn, MalformedMessage) + return limitDuration = msg.limit.duration limitData = msg.limit.data msg = StopMessage( msgType: StopMessageType.Status, - status: some(Ok)) + status: Opt.some(Ok)) pb = encode(msg) trace "incoming relay connection", src @@ -89,7 +84,7 @@ proc reserve*(cl: RelayClient, pb = encode(HopMessage(msgType: HopMessageType.Reserve)) msg = try: await conn.writeLp(pb.buffer) - HopMessage.decode(await conn.readLp(RelayClientMsgSize)).get() + HopMessage.decode(await conn.readLp(RelayClientMsgSize)).tryGet() except CancelledError as exc: raise exc except CatchableError as exc: @@ -100,21 +95,21 @@ proc reserve*(cl: RelayClient, raise newException(ReservationError, "Unexpected relay response type") if msg.status.get(UnexpectedMessage) != Ok: raise newException(ReservationError, "Reservation failed") - if msg.reservation.isNone(): - raise newException(ReservationError, "Missing reservation information") - let reservation = msg.reservation.get() + let reservation = msg.reservation.valueOr: + raise newException(ReservationError, "Missing reservation information") if reservation.expire > int64.high().uint64 or now().utc > reservation.expire.int64.fromUnix.utc: raise newException(ReservationError, "Bad expiration date") result.expire = reservation.expire result.addrs = reservation.addrs - if reservation.svoucher.isSome(): - let svoucher = SignedVoucher.decode(reservation.svoucher.get()) - if svoucher.isErr() or svoucher.get().data.relayPeerId != peerId: + reservation.svoucher.withValue(sv): + let svoucher = SignedVoucher.decode(sv).valueOr: raise newException(ReservationError, "Invalid voucher") - result.voucher = some(svoucher.get().data) + if svoucher.data.relayPeerId != peerId: + raise newException(ReservationError, "Invalid voucher PeerId") + result.voucher = Opt.some(svoucher.data) result.limitDuration = msg.limit.duration result.limitData = msg.limit.data @@ -126,9 +121,9 @@ proc dialPeerV1*( dstAddrs: seq[MultiAddress]): Future[Connection] {.async.} = var msg = RelayMessage( - msgType: some(RelayType.Hop), - srcPeer: some(RelayPeer(peerId: cl.switch.peerInfo.peerId, addrs: cl.switch.peerInfo.addrs)), - dstPeer: some(RelayPeer(peerId: dstPeerId, addrs: dstAddrs))) + msgType: Opt.some(RelayType.Hop), + srcPeer: Opt.some(RelayPeer(peerId: cl.switch.peerInfo.peerId, addrs: cl.switch.peerInfo.addrs)), + dstPeer: Opt.some(RelayPeer(peerId: dstPeerId, addrs: dstAddrs))) pb = encode(msg) trace "Dial peer", msgSend=msg @@ -151,16 +146,18 @@ proc dialPeerV1*( raise exc try: - if msgRcvFromRelayOpt.isNone: + let msgRcvFromRelay = msgRcvFromRelayOpt.valueOr: raise newException(RelayV1DialError, "Hop can't open destination stream") - let msgRcvFromRelay = msgRcvFromRelayOpt.get() - if msgRcvFromRelay.msgType.isNone or msgRcvFromRelay.msgType.get() != RelayType.Status: + if msgRcvFromRelay.msgType.tryGet() != RelayType.Status: raise newException(RelayV1DialError, "Hop can't open destination stream: wrong message type") - if msgRcvFromRelay.status.isNone or msgRcvFromRelay.status.get() != StatusV1.Success: + if msgRcvFromRelay.status.tryGet() != StatusV1.Success: raise newException(RelayV1DialError, "Hop can't open destination stream: status failed") except RelayV1DialError as exc: await sendStatus(conn, StatusV1.HopCantOpenDstStream) raise exc + except ValueError as exc: + await sendStatus(conn, StatusV1.HopCantOpenDstStream) + raise newException(RelayV1DialError, exc.msg) result = conn proc dialPeerV2*( @@ -170,13 +167,13 @@ proc dialPeerV2*( dstAddrs: seq[MultiAddress]): Future[Connection] {.async.} = let p = Peer(peerId: dstPeerId, addrs: dstAddrs) - pb = encode(HopMessage(msgType: HopMessageType.Connect, peer: some(p))) + pb = encode(HopMessage(msgType: HopMessageType.Connect, peer: Opt.some(p))) trace "Dial peer", p let msgRcvFromRelay = try: await conn.writeLp(pb.buffer) - HopMessage.decode(await conn.readLp(RelayClientMsgSize)).get() + HopMessage.decode(await conn.readLp(RelayClientMsgSize)).tryGet() except CancelledError as exc: raise exc except CatchableError as exc: @@ -186,19 +183,17 @@ proc dialPeerV2*( if msgRcvFromRelay.msgType != HopMessageType.Status: raise newException(RelayV2DialError, "Unexpected stop response") if msgRcvFromRelay.status.get(UnexpectedMessage) != Ok: - trace "Relay stop failed", msg = msgRcvFromRelay.status.get() + trace "Relay stop failed", msg = msgRcvFromRelay.status raise newException(RelayV2DialError, "Relay stop failure") conn.limitDuration = msgRcvFromRelay.limit.duration conn.limitData = msgRcvFromRelay.limit.data return conn proc handleStopStreamV2(cl: RelayClient, conn: Connection) {.async, gcsafe.} = - let msgOpt = StopMessage.decode(await conn.readLp(RelayClientMsgSize)) - if msgOpt.isNone(): + let msg = StopMessage.decode(await conn.readLp(RelayClientMsgSize)).valueOr: await sendHopStatus(conn, MalformedMessage) return - trace "client circuit relay v2 handle stream", msg = msgOpt.get() - let msg = msgOpt.get() + trace "client circuit relay v2 handle stream", msg if msg.msgType == StopMessageType.Connect: await cl.handleRelayedConnect(conn, msg) @@ -207,16 +202,14 @@ proc handleStopStreamV2(cl: RelayClient, conn: Connection) {.async, gcsafe.} = await sendStopError(conn, MalformedMessage) proc handleStop(cl: RelayClient, conn: Connection, msg: RelayMessage) {.async, gcsafe.} = - if msg.srcPeer.isNone: + let src = msg.srcPeer.valueOr: await sendStatus(conn, StatusV1.StopSrcMultiaddrInvalid) return - let src = msg.srcPeer.get() - if msg.dstPeer.isNone: + let dst = msg.dstPeer.valueOr: await sendStatus(conn, StatusV1.StopDstMultiaddrInvalid) return - let dst = msg.dstPeer.get() if dst.peerId != cl.switch.peerInfo.peerId: await sendStatus(conn, StatusV1.StopDstMultiaddrInvalid) return @@ -234,13 +227,16 @@ proc handleStop(cl: RelayClient, conn: Connection, msg: RelayMessage) {.async, g else: await conn.close() proc handleStreamV1(cl: RelayClient, conn: Connection) {.async, gcsafe.} = - let msgOpt = RelayMessage.decode(await conn.readLp(RelayClientMsgSize)) - if msgOpt.isNone: + let msg = RelayMessage.decode(await conn.readLp(RelayClientMsgSize)).valueOr: await sendStatus(conn, StatusV1.MalformedMessage) return - trace "client circuit relay v1 handle stream", msg = msgOpt.get() - let msg = msgOpt.get() - case msg.msgType.get: + trace "client circuit relay v1 handle stream", msg + + let typ = msg.msgType.valueOr: + trace "Message type not set" + await sendStatus(conn, StatusV1.MalformedMessage) + return + case typ: of RelayType.Hop: if cl.canHop: await cl.handleHop(conn, msg) else: await sendStatus(conn, StatusV1.HopCantSpeakRelay) diff --git a/libp2p/protocols/connectivity/relay/messages.nim b/libp2p/protocols/connectivity/relay/messages.nim index 6c8e4583c..8cb2bfa65 100644 --- a/libp2p/protocols/connectivity/relay/messages.nim +++ b/libp2p/protocols/connectivity/relay/messages.nim @@ -9,8 +9,8 @@ {.push raises: [].} -import options, macros -import stew/objects +import macros +import stew/[objects, results] import ../../../peerinfo, ../../../signed_envelope @@ -46,36 +46,36 @@ type addrs*: seq[MultiAddress] RelayMessage* = object - msgType*: Option[RelayType] - srcPeer*: Option[RelayPeer] - dstPeer*: Option[RelayPeer] - status*: Option[StatusV1] + msgType*: Opt[RelayType] + srcPeer*: Opt[RelayPeer] + dstPeer*: Opt[RelayPeer] + status*: Opt[StatusV1] proc encode*(msg: RelayMessage): ProtoBuffer = result = initProtoBuffer() - if isSome(msg.msgType): - result.write(1, msg.msgType.get().ord.uint) - if isSome(msg.srcPeer): + msg.msgType.withValue(typ): + result.write(1, typ.ord.uint) + msg.srcPeer.withValue(srcPeer): var peer = initProtoBuffer() - peer.write(1, msg.srcPeer.get().peerId) - for ma in msg.srcPeer.get().addrs: + peer.write(1, srcPeer.peerId) + for ma in srcPeer.addrs: peer.write(2, ma.data.buffer) peer.finish() result.write(2, peer.buffer) - if isSome(msg.dstPeer): + msg.dstPeer.withValue(dstPeer): var peer = initProtoBuffer() - peer.write(1, msg.dstPeer.get().peerId) - for ma in msg.dstPeer.get().addrs: + peer.write(1, dstPeer.peerId) + for ma in dstPeer.addrs: peer.write(2, ma.data.buffer) peer.finish() result.write(3, peer.buffer) - if isSome(msg.status): - result.write(4, msg.status.get().ord.uint) + msg.status.withValue(status): + result.write(4, status.ord.uint) result.finish() -proc decode*(_: typedesc[RelayMessage], buf: seq[byte]): Option[RelayMessage] = +proc decode*(_: typedesc[RelayMessage], buf: seq[byte]): Opt[RelayMessage] = var rMsg: RelayMessage msgTypeOrd: uint32 @@ -85,38 +85,29 @@ proc decode*(_: typedesc[RelayMessage], buf: seq[byte]): Option[RelayMessage] = pbSrc: ProtoBuffer pbDst: ProtoBuffer - let - pb = initProtoBuffer(buf) - r1 = pb.getField(1, msgTypeOrd) - r2 = pb.getField(2, pbSrc) - r3 = pb.getField(3, pbDst) - r4 = pb.getField(4, statusOrd) + let pb = initProtoBuffer(buf) - if r1.isErr() or r2.isErr() or r3.isErr() or r4.isErr(): - return none(RelayMessage) - - if r2.get() and - (pbSrc.getField(1, src.peerId).isErr() or - pbSrc.getRepeatedField(2, src.addrs).isErr()): - return none(RelayMessage) - - if r3.get() and - (pbDst.getField(1, dst.peerId).isErr() or - pbDst.getRepeatedField(2, dst.addrs).isErr()): - return none(RelayMessage) - - if r1.get(): + if ? pb.getField(1, msgTypeOrd).toOpt(): if msgTypeOrd.int notin RelayType: - return none(RelayMessage) - rMsg.msgType = some(RelayType(msgTypeOrd)) - if r2.get(): rMsg.srcPeer = some(src) - if r3.get(): rMsg.dstPeer = some(dst) - if r4.get(): + return Opt.none(RelayMessage) + rMsg.msgType = Opt.some(RelayType(msgTypeOrd)) + + if ? pb.getField(2, pbSrc).toOpt(): + discard ? pbSrc.getField(1, src.peerId).toOpt() + discard ? pbSrc.getRepeatedField(2, src.addrs).toOpt() + rMsg.srcPeer = Opt.some(src) + + if ? pb.getField(3, pbDst).toOpt(): + discard ? pbDst.getField(1, dst.peerId).toOpt() + discard ? pbDst.getRepeatedField(2, dst.addrs).toOpt() + rMsg.dstPeer = Opt.some(dst) + + if ? pb.getField(4, statusOrd).toOpt(): var status: StatusV1 if not checkedEnumAssign(status, statusOrd): - return none(RelayMessage) - rMsg.status = some(status) - some(rMsg) + return Opt.none(RelayMessage) + rMsg.status = Opt.some(status) + Opt.some(rMsg) # Voucher @@ -176,7 +167,7 @@ type Reservation* = object expire*: uint64 # required, Unix expiration time (UTC) addrs*: seq[MultiAddress] # relay address for reserving peer - svoucher*: Option[seq[byte]] # optional, reservation voucher + svoucher*: Opt[seq[byte]] # optional, reservation voucher Limit* = object duration*: uint32 # seconds data*: uint64 # bytes @@ -196,30 +187,29 @@ type Status = 2 HopMessage* = object msgType*: HopMessageType - peer*: Option[Peer] - reservation*: Option[Reservation] + peer*: Opt[Peer] + reservation*: Opt[Reservation] limit*: Limit - status*: Option[StatusV2] + status*: Opt[StatusV2] proc encode*(msg: HopMessage): ProtoBuffer = var pb = initProtoBuffer() pb.write(1, msg.msgType.ord.uint) - if msg.peer.isSome(): + msg.peer.withValue(peer): var ppb = initProtoBuffer() - ppb.write(1, msg.peer.get().peerId) - for ma in msg.peer.get().addrs: + ppb.write(1, peer.peerId) + for ma in peer.addrs: ppb.write(2, ma.data.buffer) ppb.finish() pb.write(2, ppb.buffer) - if msg.reservation.isSome(): - let rsrv = msg.reservation.get() + msg.reservation.withValue(rsrv): var rpb = initProtoBuffer() rpb.write(1, rsrv.expire) for ma in rsrv.addrs: rpb.write(2, ma.data.buffer) - if rsrv.svoucher.isSome(): - rpb.write(3, rsrv.svoucher.get()) + rsrv.svoucher.withValue(vouch): + rpb.write(3, vouch) rpb.finish() pb.write(3, rpb.buffer) if msg.limit.duration > 0 or msg.limit.data > 0: @@ -228,66 +218,51 @@ proc encode*(msg: HopMessage): ProtoBuffer = if msg.limit.data > 0: lpb.write(2, msg.limit.data) lpb.finish() pb.write(4, lpb.buffer) - if msg.status.isSome(): - pb.write(5, msg.status.get().ord.uint) + msg.status.withValue(status): + pb.write(5, status.ord.uint) pb.finish() pb -proc decode*(_: typedesc[HopMessage], buf: seq[byte]): Option[HopMessage] = - var - msg: HopMessage - msgTypeOrd: uint32 - pbPeer: ProtoBuffer - pbReservation: ProtoBuffer - pbLimit: ProtoBuffer - statusOrd: uint32 - peer: Peer - reservation: Reservation - limit: Limit - res: bool - - let - pb = initProtoBuffer(buf) - r1 = pb.getRequiredField(1, msgTypeOrd) - r2 = pb.getField(2, pbPeer) - r3 = pb.getField(3, pbReservation) - r4 = pb.getField(4, pbLimit) - r5 = pb.getField(5, statusOrd) - - if r1.isErr() or r2.isErr() or r3.isErr() or r4.isErr() or r5.isErr(): - return none(HopMessage) - - if r2.get() and - (pbPeer.getRequiredField(1, peer.peerId).isErr() or - pbPeer.getRepeatedField(2, peer.addrs).isErr()): - return none(HopMessage) - - if r3.get(): - var svoucher: seq[byte] - let rSVoucher = pbReservation.getField(3, svoucher) - if pbReservation.getRequiredField(1, reservation.expire).isErr() or - pbReservation.getRepeatedField(2, reservation.addrs).isErr() or - rSVoucher.isErr(): - return none(HopMessage) - if rSVoucher.get(): reservation.svoucher = some(svoucher) - - if r4.get() and - (pbLimit.getField(1, limit.duration).isErr() or - pbLimit.getField(2, limit.data).isErr()): - return none(HopMessage) +proc decode*(_: typedesc[HopMessage], buf: seq[byte]): Opt[HopMessage] = + var msg: HopMessage + let pb = initProtoBuffer(buf) + var msgTypeOrd: uint32 + ? pb.getRequiredField(1, msgTypeOrd).toOpt() if not checkedEnumAssign(msg.msgType, msgTypeOrd): - return none(HopMessage) - if r2.get(): msg.peer = some(peer) - if r3.get(): msg.reservation = some(reservation) - if r4.get(): msg.limit = limit - if r5.get(): + return Opt.none(HopMessage) + + var pbPeer: ProtoBuffer + if ? pb.getField(2, pbPeer).toOpt(): + var peer: Peer + ? pbPeer.getRequiredField(1, peer.peerId).toOpt() + discard ? pbPeer.getRepeatedField(2, peer.addrs).toOpt() + msg.peer = Opt.some(peer) + + var pbReservation: ProtoBuffer + if ? pb.getField(3, pbReservation).toOpt(): + var + svoucher: seq[byte] + reservation: Reservation + if ? pbReservation.getField(3, svoucher).toOpt(): + reservation.svoucher = Opt.some(svoucher) + ? pbReservation.getRequiredField(1, reservation.expire).toOpt() + discard ? pbReservation.getRepeatedField(2, reservation.addrs).toOpt() + msg.reservation = Opt.some(reservation) + + var pbLimit: ProtoBuffer + if ? pb.getField(4, pbLimit).toOpt(): + discard ? pbLimit.getField(1, msg.limit.duration).toOpt() + discard ? pbLimit.getField(2, msg.limit.data).toOpt() + + var statusOrd: uint32 + if ? pb.getField(5, statusOrd).toOpt(): var status: StatusV2 if not checkedEnumAssign(status, statusOrd): - return none(HopMessage) - msg.status = some(status) - some(msg) + return Opt.none(HopMessage) + msg.status = Opt.some(status) + Opt.some(msg) # Circuit Relay V2 Stop Message @@ -297,19 +272,19 @@ type Status = 1 StopMessage* = object msgType*: StopMessageType - peer*: Option[Peer] + peer*: Opt[Peer] limit*: Limit - status*: Option[StatusV2] + status*: Opt[StatusV2] proc encode*(msg: StopMessage): ProtoBuffer = var pb = initProtoBuffer() pb.write(1, msg.msgType.ord.uint) - if msg.peer.isSome(): + msg.peer.withValue(peer): var ppb = initProtoBuffer() - ppb.write(1, msg.peer.get().peerId) - for ma in msg.peer.get().addrs: + ppb.write(1, peer.peerId) + for ma in peer.addrs: ppb.write(2, ma.data.buffer) ppb.finish() pb.write(2, ppb.buffer) @@ -319,52 +294,40 @@ proc encode*(msg: StopMessage): ProtoBuffer = if msg.limit.data > 0: lpb.write(2, msg.limit.data) lpb.finish() pb.write(3, lpb.buffer) - if msg.status.isSome(): - pb.write(4, msg.status.get().ord.uint) + msg.status.withValue(status): + pb.write(4, status.ord.uint) pb.finish() pb -proc decode*(_: typedesc[StopMessage], buf: seq[byte]): Option[StopMessage] = - var - msg: StopMessage - msgTypeOrd: uint32 - pbPeer: ProtoBuffer - pbLimit: ProtoBuffer - statusOrd: uint32 - peer: Peer - limit: Limit - rVoucher: ProtoResult[bool] - res: bool +proc decode*(_: typedesc[StopMessage], buf: seq[byte]): Opt[StopMessage] = + var msg: StopMessage - let - pb = initProtoBuffer(buf) - r1 = pb.getRequiredField(1, msgTypeOrd) - r2 = pb.getField(2, pbPeer) - r3 = pb.getField(3, pbLimit) - r4 = pb.getField(4, statusOrd) + let pb = initProtoBuffer(buf) - if r1.isErr() or r2.isErr() or r3.isErr() or r4.isErr(): - return none(StopMessage) - - if r2.get() and - (pbPeer.getRequiredField(1, peer.peerId).isErr() or - pbPeer.getRepeatedField(2, peer.addrs).isErr()): - return none(StopMessage) - - if r3.get() and - (pbLimit.getField(1, limit.duration).isErr() or - pbLimit.getField(2, limit.data).isErr()): - return none(StopMessage) - - if msgTypeOrd.int notin StopMessageType.low.ord .. StopMessageType.high.ord: - return none(StopMessage) + var msgTypeOrd: uint32 + ? pb.getRequiredField(1, msgTypeOrd).toOpt() + if msgTypeOrd.int notin StopMessageType: + return Opt.none(StopMessage) msg.msgType = StopMessageType(msgTypeOrd) - if r2.get(): msg.peer = some(peer) - if r3.get(): msg.limit = limit - if r4.get(): + + + var pbPeer: ProtoBuffer + if ? pb.getField(2, pbPeer).toOpt(): + var peer: Peer + ? pbPeer.getRequiredField(1, peer.peerId).toOpt() + discard ? pbPeer.getRepeatedField(2, peer.addrs).toOpt() + msg.peer = Opt.some(peer) + + var pbLimit: ProtoBuffer + if ? pb.getField(3, pbLimit).toOpt(): + discard ? pbLimit.getField(1, msg.limit.duration).toOpt() + discard ? pbLimit.getField(2, msg.limit.data).toOpt() + + var statusOrd: uint32 + if ? pb.getField(4, statusOrd).toOpt(): var status: StatusV2 if not checkedEnumAssign(status, statusOrd): - return none(StopMessage) - msg.status = some(status) - some(msg) + return Opt.none(StopMessage) + msg.status = Opt.some(status) + Opt.some(msg) diff --git a/libp2p/protocols/connectivity/relay/relay.nim b/libp2p/protocols/connectivity/relay/relay.nim index 638115901..5165fde53 100644 --- a/libp2p/protocols/connectivity/relay/relay.nim +++ b/libp2p/protocols/connectivity/relay/relay.nim @@ -9,7 +9,7 @@ {.push raises: [].} -import options, sequtils, tables +import sequtils, tables import chronos, chronicles @@ -90,11 +90,11 @@ proc createReserveResponse( rsrv = Reservation(expire: expireUnix, addrs: r.switch.peerInfo.addrs.mapIt( ? it.concat(ma).orErr(CryptoError.KeyError)), - svoucher: some(? sv.encode)) + svoucher: Opt.some(? sv.encode)) msg = HopMessage(msgType: HopMessageType.Status, - reservation: some(rsrv), + reservation: Opt.some(rsrv), limit: r.limit, - status: some(Ok)) + status: Opt.some(Ok)) return ok(msg) proc isRelayed*(conn: Connection): bool = @@ -115,17 +115,16 @@ proc handleReserve(r: Relay, conn: Connection) {.async, gcsafe.} = trace "Too many reservations", pid = conn.peerId await sendHopStatus(conn, ReservationRefused) return + trace "reserving relay slot for", pid = conn.peerId let pid = conn.peerId expire = now().utc + r.reservationTTL - msg = r.createReserveResponse(pid, expire) + msg = r.createReserveResponse(pid, expire).valueOr: + trace "error signing the voucher", pid + return - trace "reserving relay slot for", pid - if msg.isErr(): - trace "error signing the voucher", error = error(msg), pid - return r.rsvp[pid] = expire - await conn.writeLp(encode(msg.get()).buffer) + await conn.writeLp(encode(msg).buffer) proc handleConnect(r: Relay, connSrc: Connection, @@ -134,13 +133,12 @@ proc handleConnect(r: Relay, trace "connection attempt over relay connection" await sendHopStatus(connSrc, PermissionDenied) return - if msg.peer.isNone(): - await sendHopStatus(connSrc, MalformedMessage) - return - let + msgPeer = msg.peer.valueOr: + await sendHopStatus(connSrc, MalformedMessage) + return src = connSrc.peerId - dst = msg.peer.get().peerId + dst = msgPeer.peerId if dst notin r.rsvp: trace "refusing connection, no reservation", src, dst await sendHopStatus(connSrc, NoReservation) @@ -173,16 +171,17 @@ proc handleConnect(r: Relay, proc sendStopMsg() {.async.} = let stopMsg = StopMessage(msgType: StopMessageType.Connect, - peer: some(Peer(peerId: src, addrs: @[])), + peer: Opt.some(Peer(peerId: src, addrs: @[])), limit: r.limit) await connDst.writeLp(encode(stopMsg).buffer) - let msg = StopMessage.decode(await connDst.readLp(r.msgSize)).get() + let msg = StopMessage.decode(await connDst.readLp(r.msgSize)).valueOr: + raise newException(SendStopError, "Malformed message") if msg.msgType != StopMessageType.Status: raise newException(SendStopError, "Unexpected stop response, not a status message") if msg.status.get(UnexpectedMessage) != Ok: raise newException(SendStopError, "Relay stop failure") await connSrc.writeLp(encode(HopMessage(msgType: HopMessageType.Status, - status: some(Ok))).buffer) + status: Opt.some(Ok))).buffer) try: await sendStopMsg() except CancelledError as exc: @@ -202,12 +201,10 @@ proc handleConnect(r: Relay, await bridge(rconnSrc, rconnDst) proc handleHopStreamV2*(r: Relay, conn: Connection) {.async, gcsafe.} = - let msgOpt = HopMessage.decode(await conn.readLp(r.msgSize)) - if msgOpt.isNone(): + let msg = HopMessage.decode(await conn.readLp(r.msgSize)).valueOr: await sendHopStatus(conn, MalformedMessage) return - trace "relayv2 handle stream", msg = msgOpt.get() - let msg = msgOpt.get() + trace "relayv2 handle stream", msg = msg case msg.msgType: of HopMessageType.Reserve: await r.handleReserve(conn) of HopMessageType.Connect: await r.handleConnect(conn, msg) @@ -225,15 +222,14 @@ proc handleHop*(r: Relay, connSrc: Connection, msg: RelayMessage) {.async, gcsaf await sendStatus(connSrc, StatusV1.HopCantSpeakRelay) return + var src, dst: RelayPeer proc checkMsg(): Result[RelayMessage, StatusV1] = - if msg.srcPeer.isNone: + src = msg.srcPeer.valueOr: return err(StatusV1.HopSrcMultiaddrInvalid) - let src = msg.srcPeer.get() if src.peerId != connSrc.peerId: return err(StatusV1.HopSrcMultiaddrInvalid) - if msg.dstPeer.isNone: + dst = msg.dstPeer.valueOr: return err(StatusV1.HopDstMultiaddrInvalid) - let dst = msg.dstPeer.get() if dst.peerId == r.switch.peerInfo.peerId: return err(StatusV1.HopCantRelayToSelf) if not r.switch.isConnected(dst.peerId): @@ -245,9 +241,6 @@ proc handleHop*(r: Relay, connSrc: Connection, msg: RelayMessage) {.async, gcsaf await sendStatus(connSrc, check.error()) return - let - src = msg.srcPeer.get() - dst = msg.dstPeer.get() if r.peerCount[src.peerId] >= r.maxCircuitPerPeer or r.peerCount[dst.peerId] >= r.maxCircuitPerPeer: trace "refusing connection; too many connection from src or to dst", src, dst @@ -271,9 +264,9 @@ proc handleHop*(r: Relay, connSrc: Connection, msg: RelayMessage) {.async, gcsaf await connDst.close() let msgToSend = RelayMessage( - msgType: some(RelayType.Stop), - srcPeer: some(src), - dstPeer: some(dst)) + msgType: Opt.some(RelayType.Stop), + srcPeer: Opt.some(src), + dstPeer: Opt.some(dst)) let msgRcvFromDstOpt = try: await connDst.writeLp(encode(msgToSend).buffer) @@ -285,12 +278,11 @@ proc handleHop*(r: Relay, connSrc: Connection, msg: RelayMessage) {.async, gcsaf await sendStatus(connSrc, StatusV1.HopCantOpenDstStream) return - if msgRcvFromDstOpt.isNone: + let msgRcvFromDst = msgRcvFromDstOpt.valueOr: trace "error reading stop response", msg = msgRcvFromDstOpt await sendStatus(connSrc, StatusV1.HopCantOpenDstStream) return - let msgRcvFromDst = msgRcvFromDstOpt.get() if msgRcvFromDst.msgType.get(RelayType.Stop) != RelayType.Status or msgRcvFromDst.status.get(StatusV1.StopRelayRefused) != StatusV1.Success: trace "unexcepted relay stop response", msgRcvFromDst @@ -302,13 +294,16 @@ proc handleHop*(r: Relay, connSrc: Connection, msg: RelayMessage) {.async, gcsaf await bridge(connSrc, connDst) proc handleStreamV1(r: Relay, conn: Connection) {.async, gcsafe.} = - let msgOpt = RelayMessage.decode(await conn.readLp(r.msgSize)) - if msgOpt.isNone: + let msg = RelayMessage.decode(await conn.readLp(r.msgSize)).valueOr: await sendStatus(conn, StatusV1.MalformedMessage) return - trace "relay handle stream", msg = msgOpt.get() - let msg = msgOpt.get() - case msg.msgType.get: + trace "relay handle stream", msg + + let typ = msg.msgType.valueOr: + trace "Message type not set" + await sendStatus(conn, StatusV1.MalformedMessage) + return + case typ: of RelayType.Hop: await r.handleHop(conn, msg) of RelayType.Stop: await sendStatus(conn, StatusV1.StopRelayRefused) of RelayType.CanHop: await sendStatus(conn, StatusV1.Success) diff --git a/libp2p/protocols/connectivity/relay/rtransport.nim b/libp2p/protocols/connectivity/relay/rtransport.nim index 123a1e936..3008d9b04 100644 --- a/libp2p/protocols/connectivity/relay/rtransport.nim +++ b/libp2p/protocols/connectivity/relay/rtransport.nim @@ -61,9 +61,9 @@ proc dial*(self: RelayTransport, ma: MultiAddress): Future[Connection] {.async, var relayPeerId: PeerId dstPeerId: PeerId - if not relayPeerId.init(($(sma[^3].get())).split('/')[2]): + if not relayPeerId.init(($(sma[^3].tryGet())).split('/')[2]): raise newException(RelayV2DialError, "Relay doesn't exist") - if not dstPeerId.init(($(sma[^1].get())).split('/')[2]): + if not dstPeerId.init(($(sma[^1].tryGet())).split('/')[2]): raise newException(RelayV2DialError, "Destination doesn't exist") trace "Dial", relayPeerId, dstPeerId @@ -91,13 +91,17 @@ method dial*( hostname: string, ma: MultiAddress, peerId: Opt[PeerId] = Opt.none(PeerId)): Future[Connection] {.async, gcsafe.} = - let address = MultiAddress.init($ma & "/p2p/" & $peerId.get()).tryGet() - result = await self.dial(address) + peerId.withValue(pid): + let address = MultiAddress.init($ma & "/p2p/" & $pid).tryGet() + result = await self.dial(address) -method handles*(self: RelayTransport, ma: MultiAddress): bool {.gcsafe} = - if ma.protocols.isOk(): - let sma = toSeq(ma.items()) - result = sma.len >= 2 and CircuitRelay.match(sma[^1].get()) +method handles*(self: RelayTransport, ma: MultiAddress): bool {.gcsafe.} = + try: + if ma.protocols.isOk(): + let sma = toSeq(ma.items()) + result = sma.len >= 2 and CircuitRelay.match(sma[^1].tryGet()) + except CatchableError as exc: + result = false trace "Handles return", ma, result proc new*(T: typedesc[RelayTransport], cl: RelayClient, upgrader: Upgrade): T = diff --git a/libp2p/protocols/connectivity/relay/utils.nim b/libp2p/protocols/connectivity/relay/utils.nim index 50fd6a062..9a337e6d7 100644 --- a/libp2p/protocols/connectivity/relay/utils.nim +++ b/libp2p/protocols/connectivity/relay/utils.nim @@ -9,10 +9,7 @@ {.push raises: [].} -import options - import chronos, chronicles - import ./messages, ../../../stream/connection @@ -27,21 +24,21 @@ const proc sendStatus*(conn: Connection, code: StatusV1) {.async, gcsafe.} = trace "send relay/v1 status", status = $code & "(" & $ord(code) & ")" let - msg = RelayMessage(msgType: some(RelayType.Status), status: some(code)) + msg = RelayMessage(msgType: Opt.some(RelayType.Status), status: Opt.some(code)) pb = encode(msg) await conn.writeLp(pb.buffer) proc sendHopStatus*(conn: Connection, code: StatusV2) {.async, gcsafe.} = trace "send hop relay/v2 status", status = $code & "(" & $ord(code) & ")" let - msg = HopMessage(msgType: HopMessageType.Status, status: some(code)) + msg = HopMessage(msgType: HopMessageType.Status, status: Opt.some(code)) pb = encode(msg) await conn.writeLp(pb.buffer) proc sendStopStatus*(conn: Connection, code: StatusV2) {.async.} = trace "send stop relay/v2 status", status = $code & " (" & $ord(code) & ")" let - msg = StopMessage(msgType: StopMessageType.Status, status: some(code)) + msg = StopMessage(msgType: StopMessageType.Status, status: Opt.some(code)) pb = encode(msg) await conn.writeLp(pb.buffer) diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index 96f7ff694..d677e9729 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -71,9 +71,7 @@ chronicles.expandIt(IdentifyInfo): pubkey = ($it.pubkey).shortLog addresses = it.addrs.map(x => $x).join(",") protocols = it.protos.map(x => $x).join(",") - observable_address = - if it.observedAddr.isSome(): $it.observedAddr.get() - else: "None" + observable_address = $it.observedAddr proto_version = it.protoVersion.get("None") agent_version = it.agentVersion.get("None") signedPeerRecord = @@ -88,13 +86,13 @@ proc encodeMsg(peerInfo: PeerInfo, observedAddr: Opt[MultiAddress], sendSpr: boo let pkey = peerInfo.publicKey - result.write(1, pkey.getBytes().get()) + result.write(1, pkey.getBytes().expect("valid key")) for ma in peerInfo.addrs: result.write(2, ma.data.buffer) for proto in peerInfo.protocols: result.write(3, proto) - if observedAddr.isSome: - result.write(4, observedAddr.get().data.buffer) + observedAddr.withValue(observed): + result.write(4, observed.data.buffer) let protoVersion = ProtoVersion result.write(5, protoVersion) let agentVersion = if peerInfo.agentVersion.len <= 0: @@ -106,13 +104,12 @@ proc encodeMsg(peerInfo: PeerInfo, observedAddr: Opt[MultiAddress], sendSpr: boo ## Optionally populate signedPeerRecord field. ## See https://github.com/libp2p/go-libp2p/blob/ddf96ce1cfa9e19564feb9bd3e8269958bbc0aba/p2p/protocol/identify/pb/identify.proto for reference. if sendSpr: - let sprBuff = peerInfo.signedPeerRecord.envelope.encode() - if sprBuff.isOk(): - result.write(8, sprBuff.get()) + peerInfo.signedPeerRecord.envelope.encode().toOpt().withValue(sprBuff): + result.write(8, sprBuff) result.finish() -proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] = +proc decodeMsg*(buf: seq[byte]): Opt[IdentifyInfo] = var iinfo: IdentifyInfo pubkey: PublicKey @@ -122,37 +119,22 @@ proc decodeMsg*(buf: seq[byte]): Option[IdentifyInfo] = signedPeerRecord: SignedPeerRecord var pb = initProtoBuffer(buf) + if ? pb.getField(1, pubkey).toOpt(): + iinfo.pubkey = some(pubkey) + if ? pb.getField(8, signedPeerRecord).toOpt() and + pubkey == signedPeerRecord.envelope.publicKey: + iinfo.signedPeerRecord = some(signedPeerRecord.envelope) + discard ? pb.getRepeatedField(2, iinfo.addrs).toOpt() + discard ? pb.getRepeatedField(3, iinfo.protos).toOpt() + if ? pb.getField(4, oaddr).toOpt(): + iinfo.observedAddr = some(oaddr) + if ? pb.getField(5, protoVersion).toOpt(): + iinfo.protoVersion = some(protoVersion) + if ? pb.getField(6, agentVersion).toOpt(): + iinfo.agentVersion = some(agentVersion) - let r1 = pb.getField(1, pubkey) - let r2 = pb.getRepeatedField(2, iinfo.addrs) - let r3 = pb.getRepeatedField(3, iinfo.protos) - let r4 = pb.getField(4, oaddr) - let r5 = pb.getField(5, protoVersion) - let r6 = pb.getField(6, agentVersion) - - let r8 = pb.getField(8, signedPeerRecord) - - let res = r1.isOk() and r2.isOk() and r3.isOk() and - r4.isOk() and r5.isOk() and r6.isOk() and - r8.isOk() - - if res: - if r1.get(): - iinfo.pubkey = some(pubkey) - if r4.get(): - iinfo.observedAddr = some(oaddr) - if r5.get(): - iinfo.protoVersion = some(protoVersion) - if r6.get(): - iinfo.agentVersion = some(agentVersion) - if r8.get() and r1.get(): - if iinfo.pubkey.get() == signedPeerRecord.envelope.publicKey: - iinfo.signedPeerRecord = some(signedPeerRecord.envelope) - debug "decodeMsg: decoded identify", iinfo - some(iinfo) - else: - trace "decodeMsg: failed to decode received message" - none[IdentifyInfo]() + debug "decodeMsg: decoded identify", iinfo + Opt.some(iinfo) proc new*( T: typedesc[Identify], @@ -193,26 +175,19 @@ proc identify*(self: Identify, trace "identify: Empty message received!", conn raise newException(IdentityInvalidMsgError, "Empty message received!") - let infoOpt = decodeMsg(message) - if infoOpt.isNone(): - raise newException(IdentityInvalidMsgError, "Incorrect message received!") + var info = decodeMsg(message).valueOr: raise newException(IdentityInvalidMsgError, "Incorrect message received!") + let + pubkey = info.pubkey.valueOr: raise newException(IdentityInvalidMsgError, "No pubkey in identify") + peer = PeerId.init(pubkey).valueOr: raise newException(IdentityInvalidMsgError, $error) - var info = infoOpt.get() - if info.pubkey.isNone(): - raise newException(IdentityInvalidMsgError, "No pubkey in identify") - - let peer = PeerId.init(info.pubkey.get()) - if peer.isErr: - raise newException(IdentityInvalidMsgError, $peer.error) - - if peer.get() != remotePeerId: + if peer != remotePeerId: trace "Peer ids don't match", remote = peer, local = remotePeerId raise newException(IdentityNoMatchError, "Peer ids don't match") - info.peerId = peer.get() + info.peerId = peer - if info.observedAddr.isSome: - if not self.observedAddrManager.addObservation(info.observedAddr.get()): - debug "Observed address is not valid", observedAddr = info.observedAddr.get() + info.observedAddr.withValue(observed): + if not self.observedAddrManager.addObservation(observed): + debug "Observed address is not valid", observedAddr = observed return info proc new*(T: typedesc[IdentifyPush], handler: IdentifyPushHandler = nil): T {.public.} = @@ -228,21 +203,18 @@ proc init*(p: IdentifyPush) = try: var message = await conn.readLp(64*1024) - let infoOpt = decodeMsg(message) - if infoOpt.isNone(): + var identInfo = decodeMsg(message).valueOr: raise newException(IdentityInvalidMsgError, "Incorrect message received!") - var indentInfo = infoOpt.get() - - if indentInfo.pubkey.isSome: - let receivedPeerId = PeerId.init(indentInfo.pubkey.get()).tryGet() + identInfo.pubkey.withValue(pubkey): + let receivedPeerId = PeerId.init(pubkey).tryGet() if receivedPeerId != conn.peerId: raise newException(IdentityNoMatchError, "Peer ids don't match") - indentInfo.peerId = receivedPeerId + identInfo.peerId = receivedPeerId trace "triggering peer event", peerInfo = conn.peerId if not isNil(p.identifyHandler): - await p.identifyHandler(conn.peerId, indentInfo) + await p.identifyHandler(conn.peerId, identInfo) except CancelledError as exc: raise exc except CatchableError as exc: diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 0743aa67a..997fdd9f6 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -185,11 +185,11 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) = return # remove from peer IPs collection too - if pubSubPeer.address.isSome(): - g.peersInIP.withValue(pubSubPeer.address.get(), s): + pubSubPeer.address.withValue(address): + g.peersInIP.withValue(address, s): s[].excl(pubSubPeer.peerId) if s[].len == 0: - g.peersInIP.del(pubSubPeer.address.get()) + g.peersInIP.del(address) for t in toSeq(g.mesh.keys): trace "pruning unsubscribing peer", pubSubPeer, score = pubSubPeer.score diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index b89d66804..651ccf8d4 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -49,9 +49,7 @@ proc pruned*(g: GossipSub, backoff = none(Duration)) {.raises: [].} = if setBackoff: let - backoffDuration = - if isSome(backoff): backoff.get() - else: g.parameters.pruneBackoff + backoffDuration = backoff.get(g.parameters.pruneBackoff) backoffMoment = Moment.fromNow(backoffDuration) g.backingOff @@ -191,20 +189,15 @@ proc handleGraft*(g: GossipSub, proc getPeers(prune: ControlPrune, peer: PubSubPeer): seq[(PeerId, Option[PeerRecord])] = var routingRecords: seq[(PeerId, Option[PeerRecord])] for record in prune.peers: - let peerRecord = - if record.signedPeerRecord.len == 0: - none(PeerRecord) - else: - let signedRecord = SignedPeerRecord.decode(record.signedPeerRecord) - if signedRecord.isErr: - trace "peer sent invalid SPR", peer, error=signedRecord.error - none(PeerRecord) + var peerRecord = none(PeerRecord) + if record.signedPeerRecord.len > 0: + SignedPeerRecord.decode(record.signedPeerRecord).toOpt().withValue(spr): + if record.peerId != spr.data.peerId: + trace "peer sent envelope with wrong public key", peer else: - if record.peerId != signedRecord.get().data.peerId: - trace "peer sent envelope with wrong public key", peer - none(PeerRecord) - else: - some(signedRecord.get().data) + peerRecord = some(spr.data) + else: + trace "peer sent invalid SPR", peer routingRecords.add((record.peerId, peerRecord)) @@ -296,12 +289,11 @@ proc handleIWant*(g: GossipSub, libp2p_gossipsub_received_iwants.inc(1, labelValues=["skipped"]) return messages continue - let msg = g.mcache.get(mid) - if msg.isSome: - libp2p_gossipsub_received_iwants.inc(1, labelValues=["correct"]) - messages.add(msg.get()) - else: + let msg = g.mcache.get(mid).valueOr: libp2p_gossipsub_received_iwants.inc(1, labelValues=["unknown"]) + continue + libp2p_gossipsub_received_iwants.inc(1, labelValues=["correct"]) + messages.add(msg) return messages proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} = diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index bebd1250d..9601452d3 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -9,7 +9,7 @@ {.push raises: [].} -import std/[tables, sets, options] +import std/[tables, sets] import chronos, chronicles, metrics import "."/[types] import ".."/[pubsubpeer] @@ -71,20 +71,17 @@ func `/`(a, b: Duration): float64 = func byScore*(x,y: PubSubPeer): int = system.cmp(x.score, y.score) proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = - if peer.address.isNone(): - 0.0 + let address = peer.address.valueOr: return 0.0 + + g.peersInIP.mgetOrPut(address, initHashSet[PeerId]()).incl(peer.peerId) + let + ipPeers = g.peersInIP.getOrDefault(address).len().float64 + if ipPeers > g.parameters.ipColocationFactorThreshold: + trace "colocationFactor over threshold", peer, address, ipPeers + let over = ipPeers - g.parameters.ipColocationFactorThreshold + over * over else: - let - address = peer.address.get() - g.peersInIP.mgetOrPut(address, initHashSet[PeerId]()).incl(peer.peerId) - let - ipPeers = g.peersInIP.getOrDefault(address).len().float64 - if ipPeers > g.parameters.ipColocationFactorThreshold: - trace "colocationFactor over threshold", peer, address, ipPeers - let over = ipPeers - g.parameters.ipColocationFactorThreshold - over * over - else: - 0.0 + 0.0 {.pop.} diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index d5d71e5ab..02436a9b4 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -170,10 +170,9 @@ proc broadcast*( else: libp2p_pubsub_broadcast_messages.inc(npeers, labelValues = ["generic"]) - if msg.control.isSome(): - libp2p_pubsub_broadcast_iwant.inc(npeers * msg.control.get().iwant.len.int64) + msg.control.withValue(control): + libp2p_pubsub_broadcast_iwant.inc(npeers * control.iwant.len.int64) - let control = msg.control.get() for ihave in control.ihave: if p.knownTopics.contains(ihave.topicId): libp2p_pubsub_broadcast_ihave.inc(npeers, labelValues = [ihave.topicId]) @@ -244,9 +243,8 @@ proc updateMetrics*(p: PubSub, rpcMsg: RPCMsg) = else: libp2p_pubsub_received_messages.inc(labelValues = ["generic"]) - if rpcMsg.control.isSome(): - libp2p_pubsub_received_iwant.inc(rpcMsg.control.get().iwant.len.int64) - template control: untyped = rpcMsg.control.unsafeGet() + rpcMsg.control.withValue(control): + libp2p_pubsub_received_iwant.inc(control.iwant.len.int64) for ihave in control.ihave: if p.knownTopics.contains(ihave.topicId): libp2p_pubsub_received_ihave.inc(labelValues = [ihave.topicId]) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 371a6f3e7..d10ec1e4b 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -133,28 +133,26 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = conn, peer = p, closed = conn.closed, data = data.shortLog - var rmsg = decodeRpcMsg(data) - data = newSeq[byte]() # Release memory - - if rmsg.isErr(): - notice "failed to decode msg from peer", + var rmsg = decodeRpcMsg(data).valueOr: + debug "failed to decode msg from peer", conn, peer = p, closed = conn.closed, - err = rmsg.error() + err = error break + data = newSeq[byte]() # Release memory trace "decoded msg from peer", conn, peer = p, closed = conn.closed, - msg = rmsg.get().shortLog + msg = rmsg.shortLog # trigger hooks - p.recvObservers(rmsg.get()) + p.recvObservers(rmsg) when defined(libp2p_expensive_metrics): - for m in rmsg.get().messages: + for m in rmsg.messages: for t in m.topicIDs: # metrics libp2p_pubsub_received_messages.inc(labelValues = [$p.peerId, t]) - await p.handler(p, rmsg.get()) + await p.handler(p, rmsg) finally: await conn.close() except CancelledError: diff --git a/libp2p/protocols/pubsub/rpc/message.nim b/libp2p/protocols/pubsub/rpc/message.nim index d84ef4f74..4c90e3157 100644 --- a/libp2p/protocols/pubsub/rpc/message.nim +++ b/libp2p/protocols/pubsub/rpc/message.nim @@ -66,18 +66,17 @@ proc init*( var msg = Message(data: data, topicIDs: @[topic]) # order matters, we want to include seqno in the signature - if seqno.isSome: - msg.seqno = @(seqno.get().toBytesBE()) + seqno.withValue(seqn): + msg.seqno = @(seqn.toBytesBE()) - if peer.isSome: - let peer = peer.get() + peer.withValue(peer): msg.fromPeer = peer.peerId if sign: msg.signature = sign(msg, peer.privateKey).expect("Couldn't sign message!") msg.key = peer.privateKey.getPublicKey().expect("Invalid private key!") .getBytes().expect("Couldn't get public key bytes!") - elif sign: - raise (ref LPError)(msg: "Cannot sign message without peer info") + else: + if sign: raise (ref LPError)(msg: "Cannot sign message without peer info") msg @@ -91,6 +90,6 @@ proc init*( var msg = Message(data: data, topicIDs: @[topic]) msg.fromPeer = peerId - if seqno.isSome: - msg.seqno = @(seqno.get().toBytesBE()) + seqno.withValue(seqn): + msg.seqno = @(seqn.toBytesBE()) msg diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index bd67edfe3..6c3ee794e 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -110,15 +110,8 @@ func shortLog*(msg: Message): auto = ) func shortLog*(m: RPCMsg): auto = - if m.control.isSome: - ( - subscriptions: m.subscriptions, - messages: mapIt(m.messages, it.shortLog), - control: m.control.get().shortLog - ) - else: - ( - subscriptions: m.subscriptions, - messages: mapIt(m.messages, it.shortLog), - control: ControlMessage().shortLog - ) + ( + subscriptions: m.subscriptions, + messages: mapIt(m.messages, it.shortLog), + control: m.control.get(ControlMessage()).shortLog + ) diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index 11dc27740..87bc1d1b4 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -314,8 +314,8 @@ proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] = pb.write(1, item) for item in msg.messages: pb.write(2, item, anonymize) - if msg.control.isSome(): - pb.write(3, msg.control.get()) + msg.control.withValue(control): + pb.write(3, control) # nim-libp2p extension, using fields which are unlikely to be used # by other extensions if msg.ping.len > 0: @@ -329,10 +329,10 @@ proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] = proc decodeRpcMsg*(msg: seq[byte]): ProtoResult[RPCMsg] {.inline.} = trace "decodeRpcMsg: decoding message", msg = msg.shortLog() var pb = initProtoBuffer(msg, maxSize = uint.high) - var rpcMsg = ok(RPCMsg()) - assign(rpcMsg.get().messages, ? pb.decodeMessages()) - assign(rpcMsg.get().subscriptions, ? pb.decodeSubscriptions()) - assign(rpcMsg.get().control, ? pb.decodeControl()) - discard ? pb.getField(60, rpcMsg.get().ping) - discard ? pb.getField(61, rpcMsg.get().pong) - rpcMsg + var rpcMsg = RPCMsg() + assign(rpcMsg.messages, ? pb.decodeMessages()) + assign(rpcMsg.subscriptions, ? pb.decodeSubscriptions()) + assign(rpcMsg.control, ? pb.decodeControl()) + discard ? pb.getField(60, rpcMsg.ping) + discard ? pb.getField(61, rpcMsg.pong) + ok(rpcMsg) diff --git a/libp2p/protocols/pubsub/timedcache.nim b/libp2p/protocols/pubsub/timedcache.nim index 868099d74..fbac8db6b 100644 --- a/libp2p/protocols/pubsub/timedcache.nim +++ b/libp2p/protocols/pubsub/timedcache.nim @@ -13,6 +13,8 @@ import std/[tables] import chronos/timer, stew/results +import ../../utility + const Timeout* = 10.seconds # default timeout in ms type @@ -55,9 +57,9 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool = var previous = t.del(k) # Refresh existing item - let addedAt = - if previous.isSome: previous.get().addedAt - else: now + var addedAt = now + previous.withValue(previous): + addedAt = previous.addedAt let node = TimedEntry[K](key: k, addedAt: addedAt, expiresAt: now + t.timeout) diff --git a/libp2p/protocols/rendezvous.nim b/libp2p/protocols/rendezvous.nim index 53cf45277..6823d2459 100644 --- a/libp2p/protocols/rendezvous.nim +++ b/libp2p/protocols/rendezvous.nim @@ -9,12 +9,12 @@ {.push raises: [].} -import tables, sequtils, sugar, sets, options +import tables, sequtils, sugar, sets import metrics except collect import chronos, chronicles, bearssl/rand, - stew/[byteutils, objects] + stew/[byteutils, objects, results] import ./protocol, ../switch, ../routing_record, @@ -68,34 +68,34 @@ type Register = object ns : string signedPeerRecord: seq[byte] - ttl: Option[uint64] # in seconds + ttl: Opt[uint64] # in seconds RegisterResponse = object status: ResponseStatus - text: Option[string] - ttl: Option[uint64] # in seconds + text: Opt[string] + ttl: Opt[uint64] # in seconds Unregister = object ns: string Discover = object ns: string - limit: Option[uint64] - cookie: Option[seq[byte]] + limit: Opt[uint64] + cookie: Opt[seq[byte]] DiscoverResponse = object registrations: seq[Register] - cookie: Option[seq[byte]] + cookie: Opt[seq[byte]] status: ResponseStatus - text: Option[string] + text: Opt[string] Message = object msgType: MessageType - register: Option[Register] - registerResponse: Option[RegisterResponse] - unregister: Option[Unregister] - discover: Option[Discover] - discoverResponse: Option[DiscoverResponse] + register: Opt[Register] + registerResponse: Opt[RegisterResponse] + unregister: Opt[Unregister] + discover: Opt[Discover] + discoverResponse: Opt[DiscoverResponse] proc encode(c: Cookie): ProtoBuffer = result = initProtoBuffer() @@ -107,17 +107,17 @@ proc encode(r: Register): ProtoBuffer = result = initProtoBuffer() result.write(1, r.ns) result.write(2, r.signedPeerRecord) - if r.ttl.isSome(): - result.write(3, r.ttl.get()) + r.ttl.withValue(ttl): + result.write(3, ttl) result.finish() proc encode(rr: RegisterResponse): ProtoBuffer = result = initProtoBuffer() result.write(1, rr.status.uint) - if rr.text.isSome(): - result.write(2, rr.text.get()) - if rr.ttl.isSome(): - result.write(3, rr.ttl.get()) + rr.text.withValue(text): + result.write(2, text) + rr.ttl.withValue(ttl): + result.write(3, ttl) result.finish() proc encode(u: Unregister): ProtoBuffer = @@ -128,48 +128,48 @@ proc encode(u: Unregister): ProtoBuffer = proc encode(d: Discover): ProtoBuffer = result = initProtoBuffer() result.write(1, d.ns) - if d.limit.isSome(): - result.write(2, d.limit.get()) - if d.cookie.isSome(): - result.write(3, d.cookie.get()) + d.limit.withValue(limit): + result.write(2, limit) + d.cookie.withValue(cookie): + result.write(3, cookie) result.finish() -proc encode(d: DiscoverResponse): ProtoBuffer = +proc encode(dr: DiscoverResponse): ProtoBuffer = result = initProtoBuffer() - for reg in d.registrations: + for reg in dr.registrations: result.write(1, reg.encode()) - if d.cookie.isSome(): - result.write(2, d.cookie.get()) - result.write(3, d.status.uint) - if d.text.isSome(): - result.write(4, d.text.get()) + dr.cookie.withValue(cookie): + result.write(2, cookie) + result.write(3, dr.status.uint) + dr.text.withValue(text): + result.write(4, text) result.finish() proc encode(msg: Message): ProtoBuffer = result = initProtoBuffer() result.write(1, msg.msgType.uint) - if msg.register.isSome(): - result.write(2, msg.register.get().encode()) - if msg.registerResponse.isSome(): - result.write(3, msg.registerResponse.get().encode()) - if msg.unregister.isSome(): - result.write(4, msg.unregister.get().encode()) - if msg.discover.isSome(): - result.write(5, msg.discover.get().encode()) - if msg.discoverResponse.isSome(): - result.write(6, msg.discoverResponse.get().encode()) + msg.register.withValue(register): + result.write(2, register.encode()) + msg.registerResponse.withValue(registerResponse): + result.write(3, registerResponse.encode()) + msg.unregister.withValue(unregister): + result.write(4, unregister.encode()) + msg.discover.withValue(discover): + result.write(5, discover.encode()) + msg.discoverResponse.withValue(discoverResponse): + result.write(6, discoverResponse.encode()) result.finish() -proc decode(_: typedesc[Cookie], buf: seq[byte]): Option[Cookie] = +proc decode(_: typedesc[Cookie], buf: seq[byte]): Opt[Cookie] = var c: Cookie let pb = initProtoBuffer(buf) r1 = pb.getRequiredField(1, c.offset) r2 = pb.getRequiredField(2, c.ns) - if r1.isErr() or r2.isErr(): return none(Cookie) - some(c) + if r1.isErr() or r2.isErr(): return Opt.none(Cookie) + Opt.some(c) -proc decode(_: typedesc[Register], buf: seq[byte]): Option[Register] = +proc decode(_: typedesc[Register], buf: seq[byte]): Opt[Register] = var r: Register ttl: uint64 @@ -178,11 +178,11 @@ proc decode(_: typedesc[Register], buf: seq[byte]): Option[Register] = r1 = pb.getRequiredField(1, r.ns) r2 = pb.getRequiredField(2, r.signedPeerRecord) r3 = pb.getField(3, ttl) - if r1.isErr() or r2.isErr() or r3.isErr(): return none(Register) - if r3.get(): r.ttl = some(ttl) - some(r) + if r1.isErr() or r2.isErr() or r3.isErr(): return Opt.none(Register) + if r3.get(false): r.ttl = Opt.some(ttl) + Opt.some(r) -proc decode(_: typedesc[RegisterResponse], buf: seq[byte]): Option[RegisterResponse] = +proc decode(_: typedesc[RegisterResponse], buf: seq[byte]): Opt[RegisterResponse] = var rr: RegisterResponse statusOrd: uint @@ -194,20 +194,20 @@ proc decode(_: typedesc[RegisterResponse], buf: seq[byte]): Option[RegisterRespo r2 = pb.getField(2, text) r3 = pb.getField(3, ttl) if r1.isErr() or r2.isErr() or r3.isErr() or - not checkedEnumAssign(rr.status, statusOrd): return none(RegisterResponse) - if r2.get(): rr.text = some(text) - if r3.get(): rr.ttl = some(ttl) - some(rr) + not checkedEnumAssign(rr.status, statusOrd): return Opt.none(RegisterResponse) + if r2.get(false): rr.text = Opt.some(text) + if r3.get(false): rr.ttl = Opt.some(ttl) + Opt.some(rr) -proc decode(_: typedesc[Unregister], buf: seq[byte]): Option[Unregister] = +proc decode(_: typedesc[Unregister], buf: seq[byte]): Opt[Unregister] = var u: Unregister let pb = initProtoBuffer(buf) r1 = pb.getRequiredField(1, u.ns) - if r1.isErr(): return none(Unregister) - some(u) + if r1.isErr(): return Opt.none(Unregister) + Opt.some(u) -proc decode(_: typedesc[Discover], buf: seq[byte]): Option[Discover] = +proc decode(_: typedesc[Discover], buf: seq[byte]): Opt[Discover] = var d: Discover limit: uint64 @@ -217,12 +217,12 @@ proc decode(_: typedesc[Discover], buf: seq[byte]): Option[Discover] = r1 = pb.getRequiredField(1, d.ns) r2 = pb.getField(2, limit) r3 = pb.getField(3, cookie) - if r1.isErr() or r2.isErr() or r3.isErr: return none(Discover) - if r2.get(): d.limit = some(limit) - if r3.get(): d.cookie = some(cookie) - some(d) + if r1.isErr() or r2.isErr() or r3.isErr: return Opt.none(Discover) + if r2.get(false): d.limit = Opt.some(limit) + if r3.get(false): d.cookie = Opt.some(cookie) + Opt.some(d) -proc decode(_: typedesc[DiscoverResponse], buf: seq[byte]): Option[DiscoverResponse] = +proc decode(_: typedesc[DiscoverResponse], buf: seq[byte]): Opt[DiscoverResponse] = var dr: DiscoverResponse registrations: seq[seq[byte]] @@ -236,48 +236,47 @@ proc decode(_: typedesc[DiscoverResponse], buf: seq[byte]): Option[DiscoverRespo r3 = pb.getRequiredField(3, statusOrd) r4 = pb.getField(4, text) if r1.isErr() or r2.isErr() or r3.isErr or r4.isErr() or - not checkedEnumAssign(dr.status, statusOrd): return none(DiscoverResponse) + not checkedEnumAssign(dr.status, statusOrd): return Opt.none(DiscoverResponse) for reg in registrations: var r: Register - let regOpt = Register.decode(reg) - if regOpt.isNone(): return none(DiscoverResponse) - dr.registrations.add(regOpt.get()) - if r2.get(): dr.cookie = some(cookie) - if r4.get(): dr.text = some(text) - some(dr) + let regOpt = Register.decode(reg).valueOr: + return + dr.registrations.add(regOpt) + if r2.get(false): dr.cookie = Opt.some(cookie) + if r4.get(false): dr.text = Opt.some(text) + Opt.some(dr) -proc decode(_: typedesc[Message], buf: seq[byte]): Option[Message] = +proc decode(_: typedesc[Message], buf: seq[byte]): Opt[Message] = var msg: Message statusOrd: uint pbr, pbrr, pbu, pbd, pbdr: ProtoBuffer - let - pb = initProtoBuffer(buf) - r1 = pb.getRequiredField(1, statusOrd) - r2 = pb.getField(2, pbr) - r3 = pb.getField(3, pbrr) - r4 = pb.getField(4, pbu) - r5 = pb.getField(5, pbd) - r6 = pb.getField(6, pbdr) - if r1.isErr() or r2.isErr() or r3.isErr() or - r4.isErr() or r5.isErr() or r6.isErr() or - not checkedEnumAssign(msg.msgType, statusOrd): return none(Message) - if r2.get(): + let pb = initProtoBuffer(buf) + + ? pb.getRequiredField(1, statusOrd).toOpt + if not checkedEnumAssign(msg.msgType, statusOrd): return Opt.none(Message) + + if ? pb.getField(2, pbr).optValue: msg.register = Register.decode(pbr.buffer) - if msg.register.isNone(): return none(Message) - if r3.get(): + if msg.register.isNone(): return Opt.none(Message) + + if ? pb.getField(3, pbrr).optValue: msg.registerResponse = RegisterResponse.decode(pbrr.buffer) - if msg.registerResponse.isNone(): return none(Message) - if r4.get(): + if msg.registerResponse.isNone(): return Opt.none(Message) + + if ? pb.getField(4, pbu).optValue: msg.unregister = Unregister.decode(pbu.buffer) - if msg.unregister.isNone(): return none(Message) - if r5.get(): + if msg.unregister.isNone(): return Opt.none(Message) + + if ? pb.getField(5, pbd).optValue: msg.discover = Discover.decode(pbd.buffer) - if msg.discover.isNone(): return none(Message) - if r6.get(): + if msg.discover.isNone(): return Opt.none(Message) + + if ? pb.getField(6, pbdr).optValue: msg.discoverResponse = DiscoverResponse.decode(pbdr.buffer) - if msg.discoverResponse.isNone(): return none(Message) - some(msg) + if msg.discoverResponse.isNone(): return Opt.none(Message) + + Opt.some(msg) type @@ -317,7 +316,7 @@ proc sendRegisterResponse(conn: Connection, ttl: uint64) {.async.} = let msg = encode(Message( msgType: MessageType.RegisterResponse, - registerResponse: some(RegisterResponse(status: Ok, ttl: some(ttl))))) + registerResponse: Opt.some(RegisterResponse(status: Ok, ttl: Opt.some(ttl))))) await conn.writeLp(msg.buffer) proc sendRegisterResponseError(conn: Connection, @@ -325,7 +324,7 @@ proc sendRegisterResponseError(conn: Connection, text: string = "") {.async.} = let msg = encode(Message( msgType: MessageType.RegisterResponse, - registerResponse: some(RegisterResponse(status: status, text: some(text))))) + registerResponse: Opt.some(RegisterResponse(status: status, text: Opt.some(text))))) await conn.writeLp(msg.buffer) proc sendDiscoverResponse(conn: Connection, @@ -333,10 +332,10 @@ proc sendDiscoverResponse(conn: Connection, cookie: Cookie) {.async.} = let msg = encode(Message( msgType: MessageType.DiscoverResponse, - discoverResponse: some(DiscoverResponse( + discoverResponse: Opt.some(DiscoverResponse( status: Ok, registrations: s, - cookie: some(cookie.encode().buffer) + cookie: Opt.some(cookie.encode().buffer) )) )) await conn.writeLp(msg.buffer) @@ -346,7 +345,7 @@ proc sendDiscoverResponseError(conn: Connection, text: string = "") {.async.} = let msg = encode(Message( msgType: MessageType.DiscoverResponse, - discoverResponse: some(DiscoverResponse(status: status, text: some(text))))) + discoverResponse: Opt.some(DiscoverResponse(status: status, text: Opt.some(text))))) await conn.writeLp(msg.buffer) proc countRegister(rdv: RendezVous, peerId: PeerId): int = @@ -419,7 +418,7 @@ proc discover(rdv: RendezVous, conn: Connection, d: Discover) {.async.} = cookie = if d.cookie.isSome(): try: - Cookie.decode(d.cookie.get()).get() + Cookie.decode(d.cookie.tryGet()).tryGet() except CatchableError: await conn.sendDiscoverResponseError(InvalidCookie) return @@ -450,7 +449,7 @@ proc discover(rdv: RendezVous, conn: Connection, d: Discover) {.async.} = break if reg.expiration < n or index.uint64 <= cookie.offset: continue limit.dec() - reg.data.ttl = some((reg.expiration - Moment.now()).seconds.uint64) + reg.data.ttl = Opt.some((reg.expiration - Moment.now()).seconds.uint64) reg.data rdv.rng.shuffle(s) await conn.sendDiscoverResponse(s, Cookie(offset: offset.uint64, ns: d.ns)) @@ -465,11 +464,10 @@ proc advertisePeer(rdv: RendezVous, await conn.writeLp(msg) let buf = await conn.readLp(4096) - msgRecv = Message.decode(buf).get() + msgRecv = Message.decode(buf).tryGet() if msgRecv.msgType != MessageType.RegisterResponse: trace "Unexpected register response", peer, msgType = msgRecv.msgType - elif msgRecv.registerResponse.isNone() or - msgRecv.registerResponse.get().status != ResponseStatus.Ok: + elif msgRecv.registerResponse.tryGet().status != ResponseStatus.Ok: trace "Refuse to register", peer, response = msgRecv.registerResponse except CatchableError as exc: trace "exception in the advertise", error = exc.msg @@ -481,16 +479,15 @@ proc advertisePeer(rdv: RendezVous, proc advertise*(rdv: RendezVous, ns: string, ttl: Duration = MinimumDuration) {.async.} = - let sprBuff = rdv.switch.peerInfo.signedPeerRecord.encode() - if sprBuff.isErr(): + let sprBuff = rdv.switch.peerInfo.signedPeerRecord.encode().valueOr: raise newException(RendezVousError, "Wrong Signed Peer Record") if ns.len notin 1..255: raise newException(RendezVousError, "Invalid namespace") if ttl notin MinimumDuration..MaximumDuration: raise newException(RendezVousError, "Invalid time to live") let - r = Register(ns: ns, signedPeerRecord: sprBuff.get(), ttl: some(ttl.seconds.uint64)) - msg = encode(Message(msgType: MessageType.Register, register: some(r))) + r = Register(ns: ns, signedPeerRecord: sprBuff, ttl: Opt.some(ttl.seconds.uint64)) + msg = encode(Message(msgType: MessageType.Register, register: Opt.some(r))) rdv.save(ns, rdv.switch.peerInfo.peerId, r) let fut = collect(newSeq()): for peer in rdv.peers: @@ -506,7 +503,9 @@ proc requestLocally*(rdv: RendezVous, ns: string): seq[PeerRecord] = collect(newSeq()): for index in rdv.namespaces[nsSalted]: if rdv.registered[index].expiration > n: - SignedPeerRecord.decode(rdv.registered[index].data.signedPeerRecord).get().data + let res = SignedPeerRecord.decode(rdv.registered[index].data.signedPeerRecord).valueOr: + continue + res.data except KeyError as exc: @[] @@ -527,38 +526,42 @@ proc request*(rdv: RendezVous, proc requestPeer(peer: PeerId) {.async.} = let conn = await rdv.switch.dial(peer, RendezVousCodec) defer: await conn.close() - d.limit = some(limit) + d.limit = Opt.some(limit) d.cookie = try: - some(rdv.cookiesSaved[peer][ns]) + Opt.some(rdv.cookiesSaved[peer][ns]) except KeyError as exc: - none(seq[byte]) + Opt.none(seq[byte]) await conn.writeLp(encode(Message( msgType: MessageType.Discover, - discover: some(d))).buffer) + discover: Opt.some(d))).buffer) let buf = await conn.readLp(65536) - msgRcv = Message.decode(buf).get() - if msgRcv.msgType != MessageType.DiscoverResponse or - msgRcv.discoverResponse.isNone(): + msgRcv = Message.decode(buf).valueOr: + debug "Message undecodable" + return + if msgRcv.msgType != MessageType.DiscoverResponse: debug "Unexpected discover response", msgType = msgRcv.msgType return - let resp = msgRcv.discoverResponse.get() + let resp = msgRcv.discoverResponse.valueOr: + debug "Discover response is empty" + return if resp.status != ResponseStatus.Ok: trace "Cannot discover", ns, status = resp.status, text = resp.text return - if resp.cookie.isSome() and resp.cookie.get().len < 1000: - if rdv.cookiesSaved.hasKeyOrPut(peer, {ns: resp.cookie.get()}.toTable): - rdv.cookiesSaved[peer][ns] = resp.cookie.get() + resp.cookie.withValue(cookie): + if cookie.len() < 1000 and rdv.cookiesSaved.hasKeyOrPut(peer, {ns: cookie}.toTable()): + rdv.cookiesSaved[peer][ns] = cookie for r in resp.registrations: if limit == 0: return - if r.ttl.isNone() or r.ttl.get() > MaximumTTL: continue - let sprRes = SignedPeerRecord.decode(r.signedPeerRecord) - if sprRes.isErr(): continue - let pr = sprRes.get().data + let ttl = r.ttl.get(MaximumTTL + 1) + if ttl > MaximumTTL: continue + let + spr = SignedPeerRecord.decode(r.signedPeerRecord).valueOr: continue + pr = spr.data if s.hasKey(pr.peerId): let (prSaved, rSaved) = s[pr.peerId] - if (prSaved.seqNo == pr.seqNo and rSaved.ttl.get() < r.ttl.get()) or + if (prSaved.seqNo == pr.seqNo and rSaved.ttl.get(MaximumTTL) < ttl) or prSaved.seqNo < pr.seqNo: s[pr.peerId] = (pr, r) else: @@ -597,7 +600,7 @@ proc unsubscribe*(rdv: RendezVous, ns: string) {.async.} = rdv.unsubscribeLocally(ns) let msg = encode(Message( msgType: MessageType.Unregister, - unregister: some(Unregister(ns: ns)))) + unregister: Opt.some(Unregister(ns: ns)))) proc unsubscribePeer(rdv: RendezVous, peerId: PeerId) {.async.} = try: @@ -635,13 +638,13 @@ proc new*(T: typedesc[RendezVous], try: let buf = await conn.readLp(4096) - msg = Message.decode(buf).get() + msg = Message.decode(buf).tryGet() case msg.msgType: - of MessageType.Register: await rdv.register(conn, msg.register.get()) + of MessageType.Register: await rdv.register(conn, msg.register.tryGet()) of MessageType.RegisterResponse: trace "Got an unexpected Register Response", response = msg.registerResponse - of MessageType.Unregister: rdv.unregister(conn, msg.unregister.get()) - of MessageType.Discover: await rdv.discover(conn, msg.discover.get()) + of MessageType.Unregister: rdv.unregister(conn, msg.unregister.tryGet()) + of MessageType.Discover: await rdv.discover(conn, msg.discover.tryGet()) of MessageType.DiscoverResponse: trace "Got an unexpected Discover Response", response = msg.discoverResponse except CancelledError as exc: diff --git a/libp2p/protocols/secure/noise.nim b/libp2p/protocols/secure/noise.nim index b5c30abee..4c3de72b7 100644 --- a/libp2p/protocols/secure/noise.nim +++ b/libp2p/protocols/secure/noise.nim @@ -554,8 +554,7 @@ method handshake*(p: Noise, conn: Connection, initiator: bool, peerId: Opt[PeerI trace "Remote peer id", pid = $pid - if peerId.isSome(): - let targetPid = peerId.get() + peerId.withValue(targetPid): if not targetPid.validate(): raise newException(NoiseHandshakeError, "Failed to validate expected peerId.") diff --git a/libp2p/protocols/secure/secio.nim b/libp2p/protocols/secure/secio.nim index 77e498350..435acaf2c 100644 --- a/libp2p/protocols/secure/secio.nim +++ b/libp2p/protocols/secure/secio.nim @@ -339,8 +339,7 @@ method handshake*(s: Secio, conn: Connection, initiator: bool, peerId: Opt[PeerI remotePeerId = PeerId.init(remotePubkey).tryGet() - if peerId.isSome(): - let targetPid = peerId.get() + peerId.withValue(targetPid): if not targetPid.validate(): raise newException(SecioError, "Failed to validate expected peerId.") @@ -436,14 +435,10 @@ proc new*( T: typedesc[Secio], rng: ref HmacDrbgContext, localPrivateKey: PrivateKey): T = - let pkRes = localPrivateKey.getPublicKey() - if pkRes.isErr: - raise newException(Defect, "Invalid private key") - let secio = Secio( rng: rng, localPrivateKey: localPrivateKey, - localPublicKey: pkRes.get(), + localPublicKey: localPrivateKey.getPublicKey().expect("Invalid private key"), ) secio.init() secio diff --git a/libp2p/routing_record.nim b/libp2p/routing_record.nim index 655c87c8c..e5a059b3a 100644 --- a/libp2p/routing_record.nim +++ b/libp2p/routing_record.nim @@ -42,14 +42,12 @@ proc decode*( ? pb.getRequiredField(2, record.seqNo) var addressInfos: seq[seq[byte]] - let pb3 = ? pb.getRepeatedField(3, addressInfos) - - if pb3: + if ? pb.getRepeatedField(3, addressInfos): for address in addressInfos: var addressInfo = AddressInfo() let subProto = initProtoBuffer(address) let f = subProto.getField(1, addressInfo.address) - if f.isOk() and f.get(): + if f.get(false): record.addresses &= addressInfo if record.addresses.len == 0: diff --git a/libp2p/services/hpservice.nim b/libp2p/services/hpservice.nim index ce1e4522e..68055d275 100644 --- a/libp2p/services/hpservice.nim +++ b/libp2p/services/hpservice.nim @@ -45,9 +45,7 @@ proc tryStartingDirectConn(self: HPService, switch: Switch, peerId: PeerId): Fut for address in switch.peerStore[AddressBook][peerId]: try: let isRelayed = address.contains(multiCodec("p2p-circuit")) - if isRelayed.isErr() or isRelayed.get(): - continue - if address.isPublicMA(): + if not isRelayed.get(false) and address.isPublicMA(): return await tryConnect(address) except CatchableError as err: debug "Failed to create direct connection.", err = err.msg @@ -96,7 +94,7 @@ method setup*(self: HPService, switch: Switch): Future[bool] {.async.} = switch.connManager.addPeerEventHandler(self.newConnectedPeerHandler, PeerEventKind.Joined) - self.onNewStatusHandler = proc (networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = + self.onNewStatusHandler = proc (networkReachability: NetworkReachability, confidence: Opt[float]) {.gcsafe, async.} = if networkReachability == NetworkReachability.NotReachable and not self.autoRelayService.isRunning(): discard await self.autoRelayService.setup(switch) elif networkReachability == NetworkReachability.Reachable and self.autoRelayService.isRunning(): diff --git a/libp2p/signed_envelope.nim b/libp2p/signed_envelope.nim index c2add96d5..402d6494c 100644 --- a/libp2p/signed_envelope.nim +++ b/libp2p/signed_envelope.nim @@ -112,19 +112,12 @@ proc getField*(pb: ProtoBuffer, field: int, if not(res): ok(false) else: - let env = Envelope.decode(buffer, domain) - if env.isOk(): - value = env.get() - ok(true) - else: - err(ProtoError.IncorrectBlob) + value = Envelope.decode(buffer, domain).valueOr: return err(ProtoError.IncorrectBlob) + ok(true) proc write*(pb: var ProtoBuffer, field: int, env: Envelope): Result[void, CryptoError] = - let e = env.encode() - - if e.isErr(): - return err(e.error) - pb.write(field, e.get()) + let e = ? env.encode() + pb.write(field, e) ok() type @@ -142,7 +135,7 @@ proc init*[T](_: typedesc[SignedPayload[T]], T.payloadType(), data.encode(), T.payloadDomain) - + ok(SignedPayload[T](data: data, envelope: envelope)) proc getField*[T](pb: ProtoBuffer, field: int, diff --git a/libp2p/transports/tortransport.nim b/libp2p/transports/tortransport.nim index 8543f7113..278c04bd6 100644 --- a/libp2p/transports/tortransport.nim +++ b/libp2p/transports/tortransport.nim @@ -141,7 +141,9 @@ proc parseOnion3(address: MultiAddress): (byte, seq[byte], seq[byte]) {.raises: dstPort = address.data.buffer[37..38] return (Socks5AddressType.FQDN.byte, dstAddr, dstPort) -proc parseIpTcp(address: MultiAddress): (byte, seq[byte], seq[byte]) {.raises: [LPError, ValueError].} = +proc parseIpTcp(address: MultiAddress): + (byte, seq[byte], seq[byte]) + {.raises: [LPError, ValueError].} = let (codec, atyp) = if IPv4Tcp.match(address): (multiCodec("ip4"), Socks5AddressType.IPv4.byte) @@ -150,15 +152,17 @@ proc parseIpTcp(address: MultiAddress): (byte, seq[byte], seq[byte]) {.raises: [ else: raise newException(LPError, fmt"IP address not supported {address}") let - dstAddr = address[codec].get().protoArgument().get() - dstPort = address[multiCodec("tcp")].get().protoArgument().get() + dstAddr = address[codec].tryGet().protoArgument().tryGet() + dstPort = address[multiCodec("tcp")].tryGet().protoArgument().tryGet() (atyp, dstAddr, dstPort) -proc parseDnsTcp(address: MultiAddress): (byte, seq[byte], seq[byte]) = +proc parseDnsTcp(address: MultiAddress): + (byte, seq[byte], seq[byte]) + {.raises: [LPError, ValueError].} = let - dnsAddress = address[multiCodec("dns")].get().protoArgument().get() + dnsAddress = address[multiCodec("dns")].tryGet().protoArgument().tryGet() dstAddr = @(uint8(dnsAddress.len).toBytes()) & dnsAddress - dstPort = address[multiCodec("tcp")].get().protoArgument().get() + dstPort = address[multiCodec("tcp")].tryGet().protoArgument().tryGet() (Socks5AddressType.FQDN.byte, dstAddr, dstPort) proc dialPeer( @@ -214,9 +218,9 @@ method start*( warn "Invalid address detected, skipping!", address = ma continue - let listenAddress = ma[0..1].get() + let listenAddress = ma[0..1].tryGet() listenAddrs.add(listenAddress) - let onion3 = ma[multiCodec("onion3")].get() + let onion3 = ma[multiCodec("onion3")].tryGet() onion3Addrs.add(onion3) if len(listenAddrs) != 0 and len(onion3Addrs) != 0: diff --git a/libp2p/transports/transport.nim b/libp2p/transports/transport.nim index 86c31a669..5c4a53503 100644 --- a/libp2p/transports/transport.nim +++ b/libp2p/transports/transport.nim @@ -99,9 +99,8 @@ method handles*( # by default we skip circuit addresses to avoid # having to repeat the check in every transport - if address.protocols.isOk: - return address.protocols - .get() + let protocols = address.protocols.valueOr: return false + return protocols .filterIt( it == multiCodec("p2p-circuit") ).len == 0 diff --git a/libp2p/utility.nim b/libp2p/utility.nim index 09c1fac15..85995bee4 100644 --- a/libp2p/utility.nim +++ b/libp2p/utility.nim @@ -9,7 +9,10 @@ {.push raises: [].} -import stew/byteutils +import std/options, std/macros +import stew/[byteutils, results] + +export results template public* {.pragma.} @@ -50,9 +53,6 @@ when defined(libp2p_agents_metrics): import strutils export split - import stew/results - export results - proc safeToLowerAscii*(s: string): Result[string, cstring] = try: ok(s.toLowerAscii()) @@ -83,3 +83,30 @@ template exceptionToAssert*(body: untyped): untyped = when defined(nimHasWarnBareExcept): {.pop.} res + +template withValue*[T](self: Opt[T] | Option[T], value, body: untyped): untyped = + if self.isSome: + let value {.inject.} = self.get() + body + +macro withValue*[T](self: Opt[T] | Option[T], value, body, body2: untyped): untyped = + let elseBody = body2[0] + quote do: + if `self`.isSome: + let `value` {.inject.} = `self`.get() + `body` + else: + `elseBody` + +template valueOr*[T](self: Option[T], body: untyped): untyped = + if self.isSome: + self.get() + else: + body + +template toOpt*[T, E](self: Result[T, E]): Opt[T] = + if self.isOk: + when T is void: Result[void, void].ok() + else: Opt.some(self.unsafeGet()) + else: + Opt.none(type(T)) diff --git a/libp2p/wire.nim b/libp2p/wire.nim index 030fa310e..70d5574fd 100644 --- a/libp2p/wire.nim +++ b/libp2p/wire.nim @@ -89,7 +89,7 @@ proc connect*( compilesOr: return connect(transportAddress, bufferSize, child, - if localAddress.isSome(): initTAddress(localAddress.get()).tryGet() else : TransportAddress(), + if localAddress.isSome(): initTAddress(localAddress.expect("just checked")).tryGet() else: TransportAddress(), flags) do: # support for older chronos versions @@ -152,7 +152,7 @@ proc createStreamServer*[T](ma: MultiAddress, raise newException(LPError, exc.msg) proc createAsyncSocket*(ma: MultiAddress): AsyncFD - {.raises: [LPError].} = + {.raises: [ValueError, LPError].} = ## Create new asynchronous socket using MultiAddress' ``ma`` socket type and ## protocol information. ## diff --git a/tests/testautonat.nim b/tests/testautonat.nim index 6217d5e0e..579382dbc 100644 --- a/tests/testautonat.nim +++ b/tests/testautonat.nim @@ -43,8 +43,8 @@ proc makeAutonatServicePrivate(): Switch = discard await conn.readLp(1024) await conn.writeLp(AutonatDialResponse( status: DialError, - text: some("dial failed"), - ma: none(MultiAddress)).encode().buffer) + text: Opt.some("dial failed"), + ma: Opt.none(MultiAddress)).encode().buffer) await conn.close() autonatProtocol.codec = AutonatCodec result = newStandardSwitch() @@ -93,8 +93,8 @@ suite "Autonat": await src.connect(dst.peerInfo.peerId, dst.peerInfo.addrs) let conn = await src.dial(dst.peerInfo.peerId, @[AutonatCodec]) - let buffer = AutonatDial(peerInfo: some(AutonatPeerInfo( - id: some(src.peerInfo.peerId), + let buffer = AutonatDial(peerInfo: Opt.some(AutonatPeerInfo( + id: Opt.some(src.peerInfo.peerId), # we ask to be dialed in the does nothing listener instead addrs: doesNothingListener.addrs ))).encode().buffer diff --git a/tests/testautonatservice.nim b/tests/testautonatservice.nim index 969d24f83..1d81b91a2 100644 --- a/tests/testautonatservice.nim +++ b/tests/testautonatservice.nim @@ -78,7 +78,7 @@ suite "Autonat Service": asyncTest "Peer must be reachable": - let autonatService = AutonatService.new(AutonatClient.new(), newRng(), some(1.seconds)) + let autonatService = AutonatService.new(AutonatClient.new(), newRng(), Opt.some(1.seconds)) let switch1 = createSwitch(autonatService) let switch2 = createSwitch() @@ -87,7 +87,7 @@ suite "Autonat Service": let awaiter = newFuture[void]() - proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = + proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Opt[float]) {.gcsafe, async.} = if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() >= 0.3: if not awaiter.finished: awaiter.complete() @@ -122,7 +122,7 @@ suite "Autonat Service": let autonatClientStub = AutonatClientStub.new(expectedDials = 6) autonatClientStub.answer = NotReachable - let autonatService = AutonatService.new(autonatClientStub, newRng(), some(1.seconds)) + let autonatService = AutonatService.new(autonatClientStub, newRng(), Opt.some(1.seconds)) let switch1 = createSwitch(autonatService) let switch2 = createSwitch() @@ -131,7 +131,7 @@ suite "Autonat Service": let awaiter = newFuture[void]() - proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = + proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Opt[float]) {.gcsafe, async.} = if networkReachability == NetworkReachability.NotReachable and confidence.isSome() and confidence.get() >= 0.3: if not awaiter.finished: autonatClientStub.answer = Reachable @@ -164,7 +164,7 @@ suite "Autonat Service": asyncTest "Peer must be reachable when one connected peer has autonat disabled": - let autonatService = AutonatService.new(AutonatClient.new(), newRng(), some(1.seconds), maxQueueSize = 2) + let autonatService = AutonatService.new(AutonatClient.new(), newRng(), Opt.some(1.seconds), maxQueueSize = 2) let switch1 = createSwitch(autonatService) let switch2 = createSwitch(withAutonat = false) @@ -173,7 +173,7 @@ suite "Autonat Service": let awaiter = newFuture[void]() - proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = + proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Opt[float]) {.gcsafe, async.} = if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() == 1: if not awaiter.finished: awaiter.complete() @@ -204,7 +204,7 @@ suite "Autonat Service": let autonatClientStub = AutonatClientStub.new(expectedDials = 6) autonatClientStub.answer = NotReachable - let autonatService = AutonatService.new(autonatClientStub, newRng(), some(1.seconds), maxQueueSize = 3) + let autonatService = AutonatService.new(autonatClientStub, newRng(), Opt.some(1.seconds), maxQueueSize = 3) let switch1 = createSwitch(autonatService) let switch2 = createSwitch() @@ -213,7 +213,7 @@ suite "Autonat Service": let awaiter = newFuture[void]() - proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = + proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Opt[float]) {.gcsafe, async.} = if networkReachability == NetworkReachability.NotReachable and confidence.isSome() and confidence.get() >= 0.3: if not awaiter.finished: autonatClientStub.answer = Unknown @@ -247,7 +247,7 @@ suite "Autonat Service": asyncTest "Calling setup and stop twice must work": let switch = createSwitch() - let autonatService = AutonatService.new(AutonatClientStub.new(expectedDials = 0), newRng(), some(1.seconds)) + let autonatService = AutonatService.new(AutonatClientStub.new(expectedDials = 0), newRng(), Opt.some(1.seconds)) check (await autonatService.setup(switch)) == true check (await autonatService.setup(switch)) == false @@ -258,7 +258,7 @@ suite "Autonat Service": await allFuturesThrowing(switch.stop()) asyncTest "Must bypass maxConnectionsPerPeer limit": - let autonatService = AutonatService.new(AutonatClient.new(), newRng(), some(1.seconds), maxQueueSize = 1) + let autonatService = AutonatService.new(AutonatClient.new(), newRng(), Opt.some(1.seconds), maxQueueSize = 1) let switch1 = createSwitch(autonatService, maxConnsPerPeer = 0) await switch1.setDNSAddr() @@ -267,7 +267,7 @@ suite "Autonat Service": let awaiter = newFuture[void]() - proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = + proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Opt[float]) {.gcsafe, async.} = if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() == 1: if not awaiter.finished: awaiter.complete() @@ -290,9 +290,9 @@ suite "Autonat Service": switch1.stop(), switch2.stop()) asyncTest "Must work when peers ask each other at the same time with max 1 conn per peer": - let autonatService1 = AutonatService.new(AutonatClient.new(), newRng(), some(500.millis), maxQueueSize = 3) - let autonatService2 = AutonatService.new(AutonatClient.new(), newRng(), some(500.millis), maxQueueSize = 3) - let autonatService3 = AutonatService.new(AutonatClient.new(), newRng(), some(500.millis), maxQueueSize = 3) + let autonatService1 = AutonatService.new(AutonatClient.new(), newRng(), Opt.some(500.millis), maxQueueSize = 3) + let autonatService2 = AutonatService.new(AutonatClient.new(), newRng(), Opt.some(500.millis), maxQueueSize = 3) + let autonatService3 = AutonatService.new(AutonatClient.new(), newRng(), Opt.some(500.millis), maxQueueSize = 3) let switch1 = createSwitch(autonatService1, maxConnsPerPeer = 0) let switch2 = createSwitch(autonatService2, maxConnsPerPeer = 0) @@ -302,12 +302,12 @@ suite "Autonat Service": let awaiter2 = newFuture[void]() let awaiter3 = newFuture[void]() - proc statusAndConfidenceHandler1(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = + proc statusAndConfidenceHandler1(networkReachability: NetworkReachability, confidence: Opt[float]) {.gcsafe, async.} = if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() == 1: if not awaiter1.finished: awaiter1.complete() - proc statusAndConfidenceHandler2(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = + proc statusAndConfidenceHandler2(networkReachability: NetworkReachability, confidence: Opt[float]) {.gcsafe, async.} = if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() == 1: if not awaiter2.finished: awaiter2.complete() @@ -337,15 +337,15 @@ suite "Autonat Service": switch1.stop(), switch2.stop(), switch3.stop()) asyncTest "Must work for one peer when two peers ask each other at the same time with max 1 conn per peer": - let autonatService1 = AutonatService.new(AutonatClient.new(), newRng(), some(500.millis), maxQueueSize = 3) - let autonatService2 = AutonatService.new(AutonatClient.new(), newRng(), some(500.millis), maxQueueSize = 3) + let autonatService1 = AutonatService.new(AutonatClient.new(), newRng(), Opt.some(500.millis), maxQueueSize = 3) + let autonatService2 = AutonatService.new(AutonatClient.new(), newRng(), Opt.some(500.millis), maxQueueSize = 3) let switch1 = createSwitch(autonatService1, maxConnsPerPeer = 0) let switch2 = createSwitch(autonatService2, maxConnsPerPeer = 0) let awaiter1 = newFuture[void]() - proc statusAndConfidenceHandler1(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = + proc statusAndConfidenceHandler1(networkReachability: NetworkReachability, confidence: Opt[float]) {.gcsafe, async.} = if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() == 1: if not awaiter1.finished: awaiter1.complete() @@ -378,7 +378,7 @@ suite "Autonat Service": switch1.stop(), switch2.stop()) asyncTest "Must work with low maxConnections": - let autonatService = AutonatService.new(AutonatClient.new(), newRng(), some(1.seconds), maxQueueSize = 1) + let autonatService = AutonatService.new(AutonatClient.new(), newRng(), Opt.some(1.seconds), maxQueueSize = 1) let switch1 = createSwitch(autonatService, maxConns = 4) let switch2 = createSwitch() @@ -388,7 +388,7 @@ suite "Autonat Service": var awaiter = newFuture[void]() - proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = + proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Opt[float]) {.gcsafe, async.} = if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() == 1: if not awaiter.finished: awaiter.complete() @@ -428,7 +428,7 @@ suite "Autonat Service": let switch1 = createSwitch(autonatService) let switch2 = createSwitch() - proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = + proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Opt[float]) {.gcsafe, async.} = fail() check autonatService.networkReachability == NetworkReachability.Unknown diff --git a/tests/testrelayv1.nim b/tests/testrelayv1.nim index cda512e99..b461f2d15 100644 --- a/tests/testrelayv1.nim +++ b/tests/testrelayv1.nim @@ -48,20 +48,20 @@ suite "Circuit Relay": r {.threadvar.}: Relay conn {.threadvar.}: Connection msg {.threadvar.}: ProtoBuffer - rcv {.threadvar.}: Option[RelayMessage] + rcv {.threadvar.}: Opt[RelayMessage] proc createMsg( - msgType: Option[RelayType] = RelayType.none, - status: Option[StatusV1] = StatusV1.none, - src: Option[RelayPeer] = RelayPeer.none, - dst: Option[RelayPeer] = RelayPeer.none): ProtoBuffer = + msgType: Opt[RelayType] = Opt.none(RelayType), + status: Opt[StatusV1] = Opt.none(StatusV1), + src: Opt[RelayPeer] = Opt.none(RelayPeer), + dst: Opt[RelayPeer] = Opt.none(RelayPeer)): ProtoBuffer = encode(RelayMessage(msgType: msgType, srcPeer: src, dstPeer: dst, status: status)) - proc checkMsg(msg: Option[RelayMessage], - msgType: Option[RelayType] = none[RelayType](), - status: Option[StatusV1] = none[StatusV1](), - src: Option[RelayPeer] = none[RelayPeer](), - dst: Option[RelayPeer] = none[RelayPeer]()) = + proc checkMsg(msg: Opt[RelayMessage], + msgType: Opt[RelayType] = Opt.none(RelayType), + status: Opt[StatusV1] = Opt.none(StatusV1), + src: Opt[RelayPeer] = Opt.none(RelayPeer), + dst: Opt[RelayPeer] = Opt.none(RelayPeer)) = check: msg.isSome let m = msg.get() check: m.msgType == msgType @@ -119,116 +119,116 @@ suite "Circuit Relay": await srelay.start() asyncTest "Handle CanHop": - msg = createMsg(some(CanHop)) + msg = createMsg(Opt.some(CanHop)) conn = await src.dial(srelay.peerInfo.peerId, srelay.peerInfo.addrs, RelayV1Codec) await conn.writeLp(msg.buffer) rcv = RelayMessage.decode(await conn.readLp(1024)) - rcv.checkMsg(some(RelayType.Status), some(StatusV1.Success)) + rcv.checkMsg(Opt.some(RelayType.Status), Opt.some(StatusV1.Success)) conn = await src.dial(dst.peerInfo.peerId, dst.peerInfo.addrs, RelayV1Codec) await conn.writeLp(msg.buffer) rcv = RelayMessage.decode(await conn.readLp(1024)) - rcv.checkMsg(some(RelayType.Status), some(HopCantSpeakRelay)) + rcv.checkMsg(Opt.some(RelayType.Status), Opt.some(HopCantSpeakRelay)) await conn.close() asyncTest "Malformed": conn = await srelay.dial(dst.peerInfo.peerId, dst.peerInfo.addrs, RelayV1Codec) - msg = createMsg(some(RelayType.Status)) + msg = createMsg(Opt.some(RelayType.Status)) await conn.writeLp(msg.buffer) rcv = RelayMessage.decode(await conn.readLp(1024)) await conn.close() - rcv.checkMsg(some(RelayType.Status), some(StatusV1.MalformedMessage)) + rcv.checkMsg(Opt.some(RelayType.Status), Opt.some(StatusV1.MalformedMessage)) asyncTest "Handle Stop Error": conn = await srelay.dial(dst.peerInfo.peerId, dst.peerInfo.addrs, RelayV1Codec) - msg = createMsg(some(RelayType.Stop), - none(StatusV1), - none(RelayPeer), - some(RelayPeer(peerId: dst.peerInfo.peerId, addrs: dst.peerInfo.addrs))) + msg = createMsg(Opt.some(RelayType.Stop), + Opt.none(StatusV1), + Opt.none(RelayPeer), + Opt.some(RelayPeer(peerId: dst.peerInfo.peerId, addrs: dst.peerInfo.addrs))) await conn.writeLp(msg.buffer) rcv = RelayMessage.decode(await conn.readLp(1024)) - rcv.checkMsg(some(RelayType.Status), some(StopSrcMultiaddrInvalid)) + rcv.checkMsg(Opt.some(RelayType.Status), Opt.some(StopSrcMultiaddrInvalid)) conn = await srelay.dial(dst.peerInfo.peerId, dst.peerInfo.addrs, RelayV1Codec) - msg = createMsg(some(RelayType.Stop), - none(StatusV1), - some(RelayPeer(peerId: src.peerInfo.peerId, addrs: src.peerInfo.addrs)), - none(RelayPeer)) + msg = createMsg(Opt.some(RelayType.Stop), + Opt.none(StatusV1), + Opt.some(RelayPeer(peerId: src.peerInfo.peerId, addrs: src.peerInfo.addrs)), + Opt.none(RelayPeer)) await conn.writeLp(msg.buffer) rcv = RelayMessage.decode(await conn.readLp(1024)) - rcv.checkMsg(some(RelayType.Status), some(StopDstMultiaddrInvalid)) + rcv.checkMsg(Opt.some(RelayType.Status), Opt.some(StopDstMultiaddrInvalid)) conn = await srelay.dial(dst.peerInfo.peerId, dst.peerInfo.addrs, RelayV1Codec) - msg = createMsg(some(RelayType.Stop), - none(StatusV1), - some(RelayPeer(peerId: dst.peerInfo.peerId, addrs: dst.peerInfo.addrs)), - some(RelayPeer(peerId: src.peerInfo.peerId, addrs: src.peerInfo.addrs))) + msg = createMsg(Opt.some(RelayType.Stop), + Opt.none(StatusV1), + Opt.some(RelayPeer(peerId: dst.peerInfo.peerId, addrs: dst.peerInfo.addrs)), + Opt.some(RelayPeer(peerId: src.peerInfo.peerId, addrs: src.peerInfo.addrs))) await conn.writeLp(msg.buffer) rcv = RelayMessage.decode(await conn.readLp(1024)) await conn.close() - rcv.checkMsg(some(RelayType.Status), some(StopDstMultiaddrInvalid)) + rcv.checkMsg(Opt.some(RelayType.Status), Opt.some(StopDstMultiaddrInvalid)) asyncTest "Handle Hop Error": conn = await src.dial(dst.peerInfo.peerId, dst.peerInfo.addrs, RelayV1Codec) - msg = createMsg(some(RelayType.Hop)) + msg = createMsg(Opt.some(RelayType.Hop)) await conn.writeLp(msg.buffer) rcv = RelayMessage.decode(await conn.readLp(1024)) - rcv.checkMsg(some(RelayType.Status), some(HopCantSpeakRelay)) + rcv.checkMsg(Opt.some(RelayType.Status), Opt.some(HopCantSpeakRelay)) conn = await src.dial(srelay.peerInfo.peerId, srelay.peerInfo.addrs, RelayV1Codec) - msg = createMsg(some(RelayType.Hop), - none(StatusV1), - none(RelayPeer), - some(RelayPeer(peerId: dst.peerInfo.peerId, addrs: dst.peerInfo.addrs))) + msg = createMsg(Opt.some(RelayType.Hop), + Opt.none(StatusV1), + Opt.none(RelayPeer), + Opt.some(RelayPeer(peerId: dst.peerInfo.peerId, addrs: dst.peerInfo.addrs))) await conn.writeLp(msg.buffer) rcv = RelayMessage.decode(await conn.readLp(1024)) - rcv.checkMsg(some(RelayType.Status), some(HopSrcMultiaddrInvalid)) + rcv.checkMsg(Opt.some(RelayType.Status), Opt.some(HopSrcMultiaddrInvalid)) conn = await src.dial(srelay.peerInfo.peerId, srelay.peerInfo.addrs, RelayV1Codec) - msg = createMsg(some(RelayType.Hop), - none(StatusV1), - some(RelayPeer(peerId: dst.peerInfo.peerId, addrs: dst.peerInfo.addrs)), - some(RelayPeer(peerId: dst.peerInfo.peerId, addrs: dst.peerInfo.addrs))) + msg = createMsg(Opt.some(RelayType.Hop), + Opt.none(StatusV1), + Opt.some(RelayPeer(peerId: dst.peerInfo.peerId, addrs: dst.peerInfo.addrs)), + Opt.some(RelayPeer(peerId: dst.peerInfo.peerId, addrs: dst.peerInfo.addrs))) await conn.writeLp(msg.buffer) rcv = RelayMessage.decode(await conn.readLp(1024)) - rcv.checkMsg(some(RelayType.Status), some(HopSrcMultiaddrInvalid)) + rcv.checkMsg(Opt.some(RelayType.Status), Opt.some(HopSrcMultiaddrInvalid)) conn = await src.dial(srelay.peerInfo.peerId, srelay.peerInfo.addrs, RelayV1Codec) - msg = createMsg(some(RelayType.Hop), - none(StatusV1), - some(RelayPeer(peerId: src.peerInfo.peerId, addrs: src.peerInfo.addrs)), - none(RelayPeer)) + msg = createMsg(Opt.some(RelayType.Hop), + Opt.none(StatusV1), + Opt.some(RelayPeer(peerId: src.peerInfo.peerId, addrs: src.peerInfo.addrs)), + Opt.none(RelayPeer)) await conn.writeLp(msg.buffer) rcv = RelayMessage.decode(await conn.readLp(1024)) - rcv.checkMsg(some(RelayType.Status), some(HopDstMultiaddrInvalid)) + rcv.checkMsg(Opt.some(RelayType.Status), Opt.some(HopDstMultiaddrInvalid)) conn = await src.dial(srelay.peerInfo.peerId, srelay.peerInfo.addrs, RelayV1Codec) - msg = createMsg(some(RelayType.Hop), - none(StatusV1), - some(RelayPeer(peerId: src.peerInfo.peerId, addrs: src.peerInfo.addrs)), - some(RelayPeer(peerId: srelay.peerInfo.peerId, addrs: srelay.peerInfo.addrs))) + msg = createMsg(Opt.some(RelayType.Hop), + Opt.none(StatusV1), + Opt.some(RelayPeer(peerId: src.peerInfo.peerId, addrs: src.peerInfo.addrs)), + Opt.some(RelayPeer(peerId: srelay.peerInfo.peerId, addrs: srelay.peerInfo.addrs))) await conn.writeLp(msg.buffer) rcv = RelayMessage.decode(await conn.readLp(1024)) - rcv.checkMsg(some(RelayType.Status), some(HopCantRelayToSelf)) + rcv.checkMsg(Opt.some(RelayType.Status), Opt.some(HopCantRelayToSelf)) conn = await src.dial(srelay.peerInfo.peerId, srelay.peerInfo.addrs, RelayV1Codec) - msg = createMsg(some(RelayType.Hop), - none(StatusV1), - some(RelayPeer(peerId: src.peerInfo.peerId, addrs: src.peerInfo.addrs)), - some(RelayPeer(peerId: srelay.peerInfo.peerId, addrs: srelay.peerInfo.addrs))) + msg = createMsg(Opt.some(RelayType.Hop), + Opt.none(StatusV1), + Opt.some(RelayPeer(peerId: src.peerInfo.peerId, addrs: src.peerInfo.addrs)), + Opt.some(RelayPeer(peerId: srelay.peerInfo.peerId, addrs: srelay.peerInfo.addrs))) await conn.writeLp(msg.buffer) rcv = RelayMessage.decode(await conn.readLp(1024)) - rcv.checkMsg(some(RelayType.Status), some(HopCantRelayToSelf)) + rcv.checkMsg(Opt.some(RelayType.Status), Opt.some(HopCantRelayToSelf)) conn = await src.dial(srelay.peerInfo.peerId, srelay.peerInfo.addrs, RelayV1Codec) - msg = createMsg(some(RelayType.Hop), - none(StatusV1), - some(RelayPeer(peerId: src.peerInfo.peerId, addrs: src.peerInfo.addrs)), - some(RelayPeer(peerId: dst.peerInfo.peerId, addrs: dst.peerInfo.addrs))) + msg = createMsg(Opt.some(RelayType.Hop), + Opt.none(StatusV1), + Opt.some(RelayPeer(peerId: src.peerInfo.peerId, addrs: src.peerInfo.addrs)), + Opt.some(RelayPeer(peerId: dst.peerInfo.peerId, addrs: dst.peerInfo.addrs))) await conn.writeLp(msg.buffer) rcv = RelayMessage.decode(await conn.readLp(1024)) - rcv.checkMsg(some(RelayType.Status), some(HopNoConnToDst)) + rcv.checkMsg(Opt.some(RelayType.Status), Opt.some(HopNoConnToDst)) await srelay.connect(dst.peerInfo.peerId, dst.peerInfo.addrs) @@ -237,7 +237,7 @@ suite "Circuit Relay": conn = await src.dial(srelay.peerInfo.peerId, srelay.peerInfo.addrs, RelayV1Codec) await conn.writeLp(msg.buffer) rcv = RelayMessage.decode(await conn.readLp(1024)) - rcv.checkMsg(some(RelayType.Status), some(HopCantSpeakRelay)) + rcv.checkMsg(Opt.some(RelayType.Status), Opt.some(HopCantSpeakRelay)) r.maxCircuit = tmp await conn.close() @@ -246,7 +246,7 @@ suite "Circuit Relay": conn = await src.dial(srelay.peerInfo.peerId, srelay.peerInfo.addrs, RelayV1Codec) await conn.writeLp(msg.buffer) rcv = RelayMessage.decode(await conn.readLp(1024)) - rcv.checkMsg(some(RelayType.Status), some(HopCantSpeakRelay)) + rcv.checkMsg(Opt.some(RelayType.Status), Opt.some(HopCantSpeakRelay)) r.maxCircuitPerPeer = tmp await conn.close() @@ -255,13 +255,13 @@ suite "Circuit Relay": await srelay.connect(dst2.peerInfo.peerId, dst2.peerInfo.addrs) conn = await src.dial(srelay.peerInfo.peerId, srelay.peerInfo.addrs, RelayV1Codec) - msg = createMsg(some(RelayType.Hop), - none(StatusV1), - some(RelayPeer(peerId: src.peerInfo.peerId, addrs: src.peerInfo.addrs)), - some(RelayPeer(peerId: dst2.peerInfo.peerId, addrs: dst2.peerInfo.addrs))) + msg = createMsg(Opt.some(RelayType.Hop), + Opt.none(StatusV1), + Opt.some(RelayPeer(peerId: src.peerInfo.peerId, addrs: src.peerInfo.addrs)), + Opt.some(RelayPeer(peerId: dst2.peerInfo.peerId, addrs: dst2.peerInfo.addrs))) await conn.writeLp(msg.buffer) rcv = RelayMessage.decode(await conn.readLp(1024)) - rcv.checkMsg(some(RelayType.Status), some(HopCantDialDst)) + rcv.checkMsg(Opt.some(RelayType.Status), Opt.some(HopCantDialDst)) await allFutures(dst2.stop()) asyncTest "Dial Peer": diff --git a/tests/testrelayv2.nim b/tests/testrelayv2.nim index 41267aaa4..6802e8d0e 100644 --- a/tests/testrelayv2.nim +++ b/tests/testrelayv2.nim @@ -81,7 +81,7 @@ suite "Circuit Relay V2": let msg = HopMessage.decode(await conn.readLp(RelayMsgSize)).get() check: msg.msgType == HopMessageType.Status - msg.status == some(StatusV2.ReservationRefused) + msg.status == Opt.some(StatusV2.ReservationRefused) asyncTest "Too many reservations + Reconnect": expect(ReservationError): diff --git a/tests/testutility.nim b/tests/testutility.nim index e6a499fa3..3d655a90a 100644 --- a/tests/testutility.nim +++ b/tests/testutility.nim @@ -9,7 +9,6 @@ # This file may not be copied, modified, or distributed except according to # those terms. -import strformat import ./helpers import ../libp2p/utility