diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 8a32d8b..52ee8b8 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -54,7 +54,6 @@ type mcache*: MCache # messages cache heartbeatFut: Future[void] # cancellation future for heartbeat interval heartbeatRunning: bool - heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats when defined(libp2p_expensive_metrics): declareGauge(libp2p_gossipsub_peers_per_topic_mesh, @@ -244,7 +243,7 @@ proc heartbeat(g: GossipSub) {.async.} = except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception ocurred in gossipsub heartbeat", exc = exc.msg + warn "exception ocurred in gossipsub heartbeat", exc = exc.msg await sleepAsync(GossipSubHeartbeatInterval) @@ -527,25 +526,18 @@ method publish*(g: GossipSub, method start*(g: GossipSub) {.async.} = trace "gossipsub start" - ## start pubsub - ## start long running/repeating procedures + if not g.heartbeatFut.isNil: + warn "Starting gossipsub twice" + return - # interlock start to to avoid overlapping to stops - await g.heartbeatLock.acquire() - - # setup the heartbeat interval g.heartbeatRunning = true g.heartbeatFut = g.heartbeat() - g.heartbeatLock.release() - method stop*(g: GossipSub) {.async.} = trace "gossipsub stop" - - ## stop pubsub - ## stop long running tasks - - await g.heartbeatLock.acquire() + if g.heartbeatFut.isNil: + warn "Stopping gossipsub without starting it" + return # stop heartbeat interval g.heartbeatRunning = false @@ -553,8 +545,8 @@ method stop*(g: GossipSub) {.async.} = trace "awaiting last heartbeat" await g.heartbeatFut trace "heartbeat stopped" + g.heartbeatFut = nil - g.heartbeatLock.release() method initPubSub*(g: GossipSub) = procCall FloodSub(g).initPubSub() @@ -567,4 +559,3 @@ method initPubSub*(g: GossipSub) = g.lastFanoutPubSub = initTable[string, Moment]() # last publish time for fanout topics g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip g.control = initTable[string, ControlMessage]() # pending control messages - g.heartbeatLock = newAsyncLock() diff --git a/libp2p/protocols/pubsub/peertable.nim b/libp2p/protocols/pubsub/peertable.nim index d294c01..15a1bf5 100644 --- a/libp2p/protocols/pubsub/peertable.nim +++ b/libp2p/protocols/pubsub/peertable.nim @@ -14,9 +14,11 @@ type PeerTable* = Table[string, HashSet[PubSubPeer]] # topic string to peer map proc hasPeerID*(t: PeerTable, topic: string, peerId: PeerID): bool = - let peers = toSeq(t.getOrDefault(topic)) - peers.any do (peer: PubSubPeer) -> bool: - peer.peerId == peerId + if topic in t: + for peer in t[topic]: + if peer.peerId == peerId: + return true + false func addPeer*(table: var PeerTable, topic: string, peer: PubSubPeer): bool = # returns true if the peer was added, diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 6946ca4..fed1512 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -70,11 +70,8 @@ method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} = libp2p_pubsub_peers.set(p.peers.len.int64) -proc send*( - p: PubSub, - peer: PubSubPeer, - msg: RPCMsg) = - ## send to remote peer +proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) = + ## Attempt to send `msg` to remote peer ## trace "sending pubsub message to peer", peer = $peer, msg = shortLog(msg) @@ -84,11 +81,10 @@ proc broadcast*( p: PubSub, sendPeers: openArray[PubSubPeer], msg: RPCMsg) = # raises: [Defect] - ## send messages - returns number of send attempts made. - ## + ## Attempt to send `msg` to the given peers trace "broadcasting messages to peers", - peers = sendPeers.len, message = shortLog(msg) + peers = sendPeers.len, msg = shortLog(msg) for peer in sendPeers: p.send(peer, msg) @@ -201,16 +197,14 @@ method unsubscribe*(p: PubSub, topics: seq[TopicPair]) {.base, async.} = ## unsubscribe from a list of ``topic`` strings for t in topics: - p.topics.withValue(t.topic, subs): - for i, h in subs[].handler: - if h == t.handler: - subs[].handler.del(i) + p.topics.withValue(t.topic, topic): + topic[].handler.keepIf(proc (x: auto): bool = x != t.handler) + + if topic[].handler.len == 0: + # make sure we delete the topic if + # no more handlers are left + p.topics.del(t.topic) - # make sure we delete the topic if - # no more handlers are left - if subs.handler.len <= 0: - p.topics.del(t.topic) # careful, invalidates subs - # metrics libp2p_pubsub_topics.set(p.topics.len.int64) proc unsubscribe*(p: PubSub, @@ -252,6 +246,10 @@ method publish*(p: PubSub, topic: string, data: seq[byte]): Future[int] {.base, async.} = ## publish to a ``topic`` + ## The return value is the number of neighbours that we attempted to send the + ## message to, excluding self. Note that this is an optimistic number of + ## attempts - the number of peers that actually receive the message might + ## be lower. if p.triggerSelf: await handleData(p, topic, data) diff --git a/libp2p/protocols/pubsub/rpc/message.nim b/libp2p/protocols/pubsub/rpc/message.nim index e49b54d..e5d15d0 100644 --- a/libp2p/protocols/pubsub/rpc/message.nim +++ b/libp2p/protocols/pubsub/rpc/message.nim @@ -30,8 +30,8 @@ declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messag func defaultMsgIdProvider*(m: Message): string = byteutils.toHex(m.seqno) & m.fromPeer.pretty -proc sign*(msg: Message, p: PeerInfo): CryptoResult[seq[byte]] = - ok((? p.privateKey.sign(PubSubPrefix & encodeMessage(msg))).getBytes()) +proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] = + ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg))).getBytes()) proc verify*(m: Message, p: PeerID): bool = if m.signature.len > 0 and m.key.len > 0: @@ -63,6 +63,9 @@ proc init*( seqno: @(seqno.toBytesBE), # unefficient, fine for now topicIDs: @[topic]) - if sign and peer.publicKey.isSome: - result.signature = sign(result, peer).tryGet() - result.key = peer.publicKey.get().getBytes().tryGet() + if sign: + if peer.keyType != KeyType.HasPrivate: + raise (ref CatchableError)(msg: "Cannot sign message without private key") + + result.signature = sign(result, peer.privateKey).tryGet() + result.key = peer.privateKey.getKey().tryGet().getBytes().tryGet()