merge latest master

This commit is contained in:
Dmitriy Ryajov 2021-05-24 11:55:33 -06:00
parent c949f14a99
commit 1c3616e3a5
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
7 changed files with 31 additions and 11 deletions

View File

@ -15,10 +15,11 @@ import
crypto/crypto, transports/[transport, tcptransport], crypto/crypto, transports/[transport, tcptransport],
muxers/[muxer, mplex/mplex], muxers/[muxer, mplex/mplex],
protocols/[identify, secure/secure, secure/noise], protocols/[identify, secure/secure, secure/noise],
connmanager, upgrademngrs/muxedupgrade connmanager, upgrademngrs/muxedupgrade,
errors
export export
switch, peerid, peerinfo, connection, multiaddress, crypto switch, peerid, peerinfo, connection, multiaddress, crypto, errors
type type
SecureProtocol* {.pure.} = enum SecureProtocol* {.pure.} = enum

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
{.push raises: [Defect].}
import chronos import chronos
import peerid, import peerid,
stream/connection stream/connection

View File

@ -19,9 +19,10 @@ import dial,
multistream, multistream,
connmanager, connmanager,
stream/connection, stream/connection,
transports/transport transports/transport,
errors
export dial export dial, errors
logScope: logScope:
topics = "libp2p dialer" topics = "libp2p dialer"
@ -32,7 +33,7 @@ declareCounter(libp2p_failed_dials, "failed dials")
declareCounter(libp2p_failed_upgrades_outgoing, "outgoing connections failed upgrades") declareCounter(libp2p_failed_upgrades_outgoing, "outgoing connections failed upgrades")
type type
DialFailedError* = object of CatchableError DialFailedError* = object of LPError
Dialer* = ref object of Dial Dialer* = ref object of Dial
peerInfo*: PeerInfo peerInfo*: PeerInfo

View File

@ -9,8 +9,7 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import chronos import pkg/[chronos, nimcrypto/utils, chronicles, stew/byteutils]
import nimcrypto/utils, chronicles, stew/byteutils
import ../../stream/connection, import ../../stream/connection,
../../utility, ../../utility,
../../varint, ../../varint,
@ -40,7 +39,7 @@ type
# https://github.com/libp2p/specs/tree/master/mplex#writing-to-a-stream # https://github.com/libp2p/specs/tree/master/mplex#writing-to-a-stream
const MaxMsgSize* = 1 shl 20 # 1mb const MaxMsgSize* = 1 shl 20 # 1mb
proc newInvalidMplexMsgType(): ref InvalidMplexMsgType = proc newInvalidMplexMsgType*(): ref InvalidMplexMsgType =
newException(InvalidMplexMsgType, "invalid message type") newException(InvalidMplexMsgType, "invalid message type")
proc readMsg*(conn: Connection): Future[Msg] {.async, gcsafe.} = proc readMsg*(conn: Connection): Future[Msg] {.async, gcsafe.} =

View File

@ -417,10 +417,24 @@ method publish*(g: GossipSub,
topic topic
trace "Publishing message on topic", data = data.shortLog trace "Publishing message on topic", data = data.shortLog
if topic.len <= 0: # data could be 0/empty
debug "Empty topic, skipping publish" debug "Empty topic, skipping publish"
return 0 return 0
var peers: HashSet[PubSubPeer]
if g.parameters.floodPublish:
# With flood publishing enabled, the mesh is used when propagating messages from other peers,
# but a peer's own messages will always be published to all known peers in the topic.
for peer in g.gossipsub.getOrDefault(topic):
if peer.score >= g.parameters.publishThreshold: if peer.score >= g.parameters.publishThreshold:
trace "publish: including flood/high score peer", peer
peers.incl(peer)
# add always direct peers
peers.incl(g.explicit.getOrDefault(topic))
if topic in g.topics: # if we're subscribed use the mesh if topic in g.topics: # if we're subscribed use the mesh
peers.incl(g.mesh.getOrDefault(topic)) peers.incl(g.mesh.getOrDefault(topic))
else: # not subscribed, send to fanout peers else: # not subscribed, send to fanout peers

View File

@ -439,6 +439,8 @@ proc newSecio*(rng: ref BrHmacDrbgContext, localPrivateKey: PrivateKey): Secio =
result = Secio( result = Secio(
rng: rng, rng: rng,
localPrivateKey: localPrivateKey, localPrivateKey: localPrivateKey,
localPublicKey: localPrivateKey.getKey().get(), localPublicKey: localPrivateKey
.getKey()
.expect("Can't fetch local private key"),
) )
result.init() result.init()

View File

@ -249,9 +249,10 @@ proc newSwitch*(peerInfo: PeerInfo,
muxers: Table[string, MuxerProvider], muxers: Table[string, MuxerProvider],
secureManagers: openarray[Secure] = [], secureManagers: openarray[Secure] = [],
connManager: ConnManager, connManager: ConnManager,
ms: MultistreamSelect): Switch = ms: MultistreamSelect): Switch
{.raises: [Defect, LPError].} =
if secureManagers.len == 0: if secureManagers.len == 0:
raise (ref LPError)(msg: "Provide at least one secure manager") raise newException(LPError, "Provide at least one secure manager")
let switch = Switch( let switch = Switch(
peerInfo: peerInfo, peerInfo: peerInfo,