small cleanups & docs (#347)

* simplify gossipsub heartbeat start / stop
* avoid alloc in peerid check
* stop iterating over seq after unsubscribing item (could crash)
* don't crash on missing private key with enabled sigs (shouldn't happen
but...)
This commit is contained in:
Jacek Sieka 2020-09-04 18:31:43 +02:00 committed by GitHub
parent 0b85192119
commit 6d91d61844
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 36 additions and 42 deletions

View File

@ -54,7 +54,6 @@ type
mcache*: MCache # messages cache mcache*: MCache # messages cache
heartbeatFut: Future[void] # cancellation future for heartbeat interval heartbeatFut: Future[void] # cancellation future for heartbeat interval
heartbeatRunning: bool heartbeatRunning: bool
heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
declareGauge(libp2p_gossipsub_peers_per_topic_mesh, declareGauge(libp2p_gossipsub_peers_per_topic_mesh,
@ -244,7 +243,7 @@ proc heartbeat(g: GossipSub) {.async.} =
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
trace "exception ocurred in gossipsub heartbeat", exc = exc.msg warn "exception ocurred in gossipsub heartbeat", exc = exc.msg
await sleepAsync(GossipSubHeartbeatInterval) await sleepAsync(GossipSubHeartbeatInterval)
@ -527,25 +526,18 @@ method publish*(g: GossipSub,
method start*(g: GossipSub) {.async.} = method start*(g: GossipSub) {.async.} =
trace "gossipsub start" trace "gossipsub start"
## start pubsub if not g.heartbeatFut.isNil:
## start long running/repeating procedures warn "Starting gossipsub twice"
return
# interlock start to to avoid overlapping to stops
await g.heartbeatLock.acquire()
# setup the heartbeat interval
g.heartbeatRunning = true g.heartbeatRunning = true
g.heartbeatFut = g.heartbeat() g.heartbeatFut = g.heartbeat()
g.heartbeatLock.release()
method stop*(g: GossipSub) {.async.} = method stop*(g: GossipSub) {.async.} =
trace "gossipsub stop" trace "gossipsub stop"
if g.heartbeatFut.isNil:
## stop pubsub warn "Stopping gossipsub without starting it"
## stop long running tasks return
await g.heartbeatLock.acquire()
# stop heartbeat interval # stop heartbeat interval
g.heartbeatRunning = false g.heartbeatRunning = false
@ -553,8 +545,8 @@ method stop*(g: GossipSub) {.async.} =
trace "awaiting last heartbeat" trace "awaiting last heartbeat"
await g.heartbeatFut await g.heartbeatFut
trace "heartbeat stopped" trace "heartbeat stopped"
g.heartbeatFut = nil
g.heartbeatLock.release()
method initPubSub*(g: GossipSub) = method initPubSub*(g: GossipSub) =
procCall FloodSub(g).initPubSub() procCall FloodSub(g).initPubSub()
@ -567,4 +559,3 @@ method initPubSub*(g: GossipSub) =
g.lastFanoutPubSub = initTable[string, Moment]() # last publish time for fanout topics g.lastFanoutPubSub = initTable[string, Moment]() # last publish time for fanout topics
g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip g.gossip = initTable[string, seq[ControlIHave]]() # pending gossip
g.control = initTable[string, ControlMessage]() # pending control messages g.control = initTable[string, ControlMessage]() # pending control messages
g.heartbeatLock = newAsyncLock()

View File

@ -14,9 +14,11 @@ type
PeerTable* = Table[string, HashSet[PubSubPeer]] # topic string to peer map PeerTable* = Table[string, HashSet[PubSubPeer]] # topic string to peer map
proc hasPeerID*(t: PeerTable, topic: string, peerId: PeerID): bool = proc hasPeerID*(t: PeerTable, topic: string, peerId: PeerID): bool =
let peers = toSeq(t.getOrDefault(topic)) if topic in t:
peers.any do (peer: PubSubPeer) -> bool: for peer in t[topic]:
peer.peerId == peerId if peer.peerId == peerId:
return true
false
func addPeer*(table: var PeerTable, topic: string, peer: PubSubPeer): bool = func addPeer*(table: var PeerTable, topic: string, peer: PubSubPeer): bool =
# returns true if the peer was added, # returns true if the peer was added,

View File

@ -70,11 +70,8 @@ method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} =
libp2p_pubsub_peers.set(p.peers.len.int64) libp2p_pubsub_peers.set(p.peers.len.int64)
proc send*( proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) =
p: PubSub, ## Attempt to send `msg` to remote peer
peer: PubSubPeer,
msg: RPCMsg) =
## send to remote peer
## ##
trace "sending pubsub message to peer", peer = $peer, msg = shortLog(msg) trace "sending pubsub message to peer", peer = $peer, msg = shortLog(msg)
@ -84,11 +81,10 @@ proc broadcast*(
p: PubSub, p: PubSub,
sendPeers: openArray[PubSubPeer], sendPeers: openArray[PubSubPeer],
msg: RPCMsg) = # raises: [Defect] msg: RPCMsg) = # raises: [Defect]
## send messages - returns number of send attempts made. ## Attempt to send `msg` to the given peers
##
trace "broadcasting messages to peers", trace "broadcasting messages to peers",
peers = sendPeers.len, message = shortLog(msg) peers = sendPeers.len, msg = shortLog(msg)
for peer in sendPeers: for peer in sendPeers:
p.send(peer, msg) p.send(peer, msg)
@ -201,16 +197,14 @@ method unsubscribe*(p: PubSub,
topics: seq[TopicPair]) {.base, async.} = topics: seq[TopicPair]) {.base, async.} =
## unsubscribe from a list of ``topic`` strings ## unsubscribe from a list of ``topic`` strings
for t in topics: for t in topics:
p.topics.withValue(t.topic, subs): p.topics.withValue(t.topic, topic):
for i, h in subs[].handler: topic[].handler.keepIf(proc (x: auto): bool = x != t.handler)
if h == t.handler:
subs[].handler.del(i) if topic[].handler.len == 0:
# make sure we delete the topic if
# no more handlers are left
p.topics.del(t.topic)
# make sure we delete the topic if
# no more handlers are left
if subs.handler.len <= 0:
p.topics.del(t.topic) # careful, invalidates subs
# metrics
libp2p_pubsub_topics.set(p.topics.len.int64) libp2p_pubsub_topics.set(p.topics.len.int64)
proc unsubscribe*(p: PubSub, proc unsubscribe*(p: PubSub,
@ -252,6 +246,10 @@ method publish*(p: PubSub,
topic: string, topic: string,
data: seq[byte]): Future[int] {.base, async.} = data: seq[byte]): Future[int] {.base, async.} =
## publish to a ``topic`` ## publish to a ``topic``
## The return value is the number of neighbours that we attempted to send the
## message to, excluding self. Note that this is an optimistic number of
## attempts - the number of peers that actually receive the message might
## be lower.
if p.triggerSelf: if p.triggerSelf:
await handleData(p, topic, data) await handleData(p, topic, data)

View File

@ -30,8 +30,8 @@ declareCounter(libp2p_pubsub_sig_verify_failure, "pubsub failed validated messag
func defaultMsgIdProvider*(m: Message): string = func defaultMsgIdProvider*(m: Message): string =
byteutils.toHex(m.seqno) & m.fromPeer.pretty byteutils.toHex(m.seqno) & m.fromPeer.pretty
proc sign*(msg: Message, p: PeerInfo): CryptoResult[seq[byte]] = proc sign*(msg: Message, privateKey: PrivateKey): CryptoResult[seq[byte]] =
ok((? p.privateKey.sign(PubSubPrefix & encodeMessage(msg))).getBytes()) ok((? privateKey.sign(PubSubPrefix & encodeMessage(msg))).getBytes())
proc verify*(m: Message, p: PeerID): bool = proc verify*(m: Message, p: PeerID): bool =
if m.signature.len > 0 and m.key.len > 0: if m.signature.len > 0 and m.key.len > 0:
@ -63,6 +63,9 @@ proc init*(
seqno: @(seqno.toBytesBE), # unefficient, fine for now seqno: @(seqno.toBytesBE), # unefficient, fine for now
topicIDs: @[topic]) topicIDs: @[topic])
if sign and peer.publicKey.isSome: if sign:
result.signature = sign(result, peer).tryGet() if peer.keyType != KeyType.HasPrivate:
result.key = peer.publicKey.get().getBytes().tryGet() raise (ref CatchableError)(msg: "Cannot sign message without private key")
result.signature = sign(result, peer.privateKey).tryGet()
result.key = peer.privateKey.getKey().tryGet().getBytes().tryGet()