avoid send deadlock by not allowing send to block (#342)

* avoid send deadlock by not allowing send to block

* handle message issues more consistently
This commit is contained in:
Jacek Sieka 2020-09-01 09:33:03 +02:00 committed by GitHub
parent d3182c4dba
commit cd1c68dbc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 225 additions and 246 deletions

View File

@ -63,50 +63,38 @@ method unsubscribePeer*(f: FloodSub, peer: PeerID) =
method rpcHandler*(f: FloodSub, method rpcHandler*(f: FloodSub,
peer: PubSubPeer, peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async.} = rpcMsg: RPCMsg) {.async.} =
await procCall PubSub(f).rpcHandler(peer, rpcMsgs) await procCall PubSub(f).rpcHandler(peer, rpcMsg)
for m in rpcMsgs: # for all RPC messages if rpcMsg.messages.len > 0: # if there are any messages
if m.messages.len > 0: # if there are any messages var toSendPeers = initHashSet[PubSubPeer]()
var toSendPeers = initHashSet[PubSubPeer]() for msg in rpcMsg.messages: # for every message
for msg in m.messages: # for every message let msgId = f.msgIdProvider(msg)
let msgId = f.msgIdProvider(msg) logScope: msgId
logScope: msgId
if msgId notin f.seen: if msgId notin f.seen:
f.seen.put(msgId) # add the message to the seen cache f.seen.put(msgId) # add the message to the seen cache
if f.verifySignature and not msg.verify(peer.peerId): if f.verifySignature and not msg.verify(peer.peerId):
trace "dropping message due to failed signature verification" trace "dropping message due to failed signature verification"
continue continue
if not (await f.validate(msg)): if not (await f.validate(msg)):
trace "dropping message due to failed validation" trace "dropping message due to failed validation"
continue continue
for t in msg.topicIDs: # for every topic in the message for t in msg.topicIDs: # for every topic in the message
if t in f.floodsub: if t in f.floodsub:
toSendPeers.incl(f.floodsub[t]) # get all the peers interested in this topic toSendPeers.incl(f.floodsub[t]) # get all the peers interested in this topic
if t in f.topics: # check that we're subscribed to it
for h in f.topics[t].handler:
trace "calling handler for message", topicId = t,
localPeer = f.peerInfo.id,
fromPeer = msg.fromPeer.pretty
try: await handleData(f, t, msg.data)
await h(t, msg.data) # trigger user provided handler
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in message handler", exc = exc.msg
# forward the message to all peers interested in it # forward the message to all peers interested in it
let published = await f.broadcast( f.broadcast(
toSeq(toSendPeers), toSeq(toSendPeers),
RPCMsg(messages: m.messages), RPCMsg(messages: rpcMsg.messages))
DefaultSendTimeout)
trace "forwared message to peers", peers = published trace "forwared message to peers", peers = toSendPeers.len
method init*(f: FloodSub) = method init*(f: FloodSub) =
proc handler(conn: Connection, proto: string) {.async.} = proc handler(conn: Connection, proto: string) {.async.} =
@ -122,14 +110,13 @@ method init*(f: FloodSub) =
method publish*(f: FloodSub, method publish*(f: FloodSub,
topic: string, topic: string,
data: seq[byte], data: seq[byte]): Future[int] {.async.} =
timeout: Duration = InfiniteDuration): Future[int] {.async.} =
# base returns always 0 # base returns always 0
discard await procCall PubSub(f).publish(topic, data, timeout) discard await procCall PubSub(f).publish(topic, data)
if data.len <= 0 or topic.len <= 0: if data.len <= 0 or topic.len <= 0:
trace "topic or data missing, skipping publish" trace "topic or data missing, skipping publish"
return return 0
if topic notin f.floodsub: if topic notin f.floodsub:
trace "missing peers for topic, skipping publish" trace "missing peers for topic, skipping publish"
@ -137,33 +124,34 @@ method publish*(f: FloodSub,
trace "publishing on topic", name = topic trace "publishing on topic", name = topic
inc f.msgSeqno inc f.msgSeqno
let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign) let
msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign)
peers = toSeq(f.floodsub.getOrDefault(topic))
# start the future but do not wait yet # start the future but do not wait yet
let published = await f.broadcast( f.broadcast(
toSeq(f.floodsub.getOrDefault(topic)), peers,
RPCMsg(messages: @[msg]), RPCMsg(messages: @[msg]))
timeout)
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
libp2p_pubsub_messages_published.inc(labelValues = [topic]) libp2p_pubsub_messages_published.inc(labelValues = [topic])
trace "published message to peers", peers = published, trace "published message to peers", peers = peers.len,
msg = msg.shortLog() msg = msg.shortLog()
return published return peers.len
method unsubscribe*(f: FloodSub, method unsubscribe*(f: FloodSub,
topics: seq[TopicPair]) {.async.} = topics: seq[TopicPair]) {.async.} =
await procCall PubSub(f).unsubscribe(topics) await procCall PubSub(f).unsubscribe(topics)
for p in f.peers.values: for p in f.peers.values:
discard await f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false) f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false)
method unsubscribeAll*(f: FloodSub, topic: string) {.async.} = method unsubscribeAll*(f: FloodSub, topic: string) {.async.} =
await procCall PubSub(f).unsubscribeAll(topic) await procCall PubSub(f).unsubscribeAll(topic)
for p in f.peers.values: for p in f.peers.values:
discard await f.sendSubs(p, @[topic], false) f.sendSubs(p, @[topic], false)
method initPubSub*(f: FloodSub) = method initPubSub*(f: FloodSub) =
procCall PubSub(f).initPubSub() procCall PubSub(f).initPubSub()

View File

@ -157,10 +157,10 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
# Send changes to peers after table updates to avoid stale state # Send changes to peers after table updates to avoid stale state
if grafts.len > 0: if grafts.len > 0:
let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)]))) let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))
discard await g.broadcast(grafts, graft, DefaultSendTimeout) g.broadcast(grafts, graft)
if prunes.len > 0: if prunes.len > 0:
let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
discard await g.broadcast(prunes, prune, DefaultSendTimeout) g.broadcast(prunes, prune)
proc dropFanoutPeers(g: GossipSub) = proc dropFanoutPeers(g: GossipSub) =
# drop peers that we haven't published to in # drop peers that we haven't published to in
@ -229,14 +229,11 @@ proc heartbeat(g: GossipSub) {.async.} =
g.replenishFanout(t) g.replenishFanout(t)
let peers = g.getGossipPeers() let peers = g.getGossipPeers()
var sent: seq[Future[bool]]
for peer, control in peers: for peer, control in peers:
g.peers.withValue(peer.peerId, pubsubPeer) do: g.peers.withValue(peer.peerId, pubsubPeer) do:
sent &= g.send( g.send(
pubsubPeer[], pubsubPeer[],
RPCMsg(control: some(control)), RPCMsg(control: some(control)))
DefaultSendTimeout)
checkFutures(await allFinished(sent))
g.mcache.shift() # shift the cache g.mcache.shift() # shift the cache
except CancelledError as exc: except CancelledError as exc:
@ -379,89 +376,68 @@ proc handleIWant(g: GossipSub,
method rpcHandler*(g: GossipSub, method rpcHandler*(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async.} = rpcMsg: RPCMsg) {.async.} =
await procCall PubSub(g).rpcHandler(peer, rpcMsgs) await procCall PubSub(g).rpcHandler(peer, rpcMsg)
for m in rpcMsgs: # for all RPC messages if rpcMsg.messages.len > 0: # if there are any messages
if m.messages.len > 0: # if there are any messages var toSendPeers: HashSet[PubSubPeer]
var toSendPeers: HashSet[PubSubPeer] for msg in rpcMsg.messages: # for every message
for msg in m.messages: # for every message let msgId = g.msgIdProvider(msg)
let msgId = g.msgIdProvider(msg) logScope: msgId
logScope: msgId
if msgId in g.seen: if msgId in g.seen:
trace "message already processed, skipping" trace "message already processed, skipping"
continue continue
trace "processing message" trace "processing message"
g.seen.put(msgId) # add the message to the seen cache g.seen.put(msgId) # add the message to the seen cache
if g.verifySignature and not msg.verify(peer.peerId): if g.verifySignature and not msg.verify(peer.peerId):
trace "dropping message due to failed signature verification" trace "dropping message due to failed signature verification"
continue continue
if not (await g.validate(msg)): if not (await g.validate(msg)):
trace "dropping message due to failed validation" trace "dropping message due to failed validation"
continue continue
# this shouldn't happen # this shouldn't happen
if g.peerInfo.peerId == msg.fromPeer: if g.peerInfo.peerId == msg.fromPeer:
trace "skipping messages from self" trace "skipping messages from self"
continue continue
for t in msg.topicIDs: # for every topic in the message for t in msg.topicIDs: # for every topic in the message
if t in g.floodsub: if t in g.floodsub:
toSendPeers.incl(g.floodsub[t]) # get all floodsub peers for topic toSendPeers.incl(g.floodsub[t]) # get all floodsub peers for topic
if t in g.mesh: if t in g.mesh:
toSendPeers.incl(g.mesh[t]) # get all mesh peers for topic toSendPeers.incl(g.mesh[t]) # get all mesh peers for topic
if t in g.topics: # if we're subscribed to the topic await handleData(g, t, msg.data)
for h in g.topics[t].handler:
trace "calling handler for message", topicId = t,
localPeer = g.peerInfo.id,
fromPeer = msg.fromPeer.pretty
try:
await h(t, msg.data) # trigger user provided handler
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in message handler", exc = exc.msg
# forward the message to all peers interested in it # forward the message to all peers interested in it
let published = await g.broadcast( g.broadcast(
toSeq(toSendPeers), toSeq(toSendPeers),
RPCMsg(messages: m.messages), RPCMsg(messages: rpcMsg.messages))
DefaultSendTimeout)
trace "forwared message to peers", peers = published trace "forwared message to peers", peers = toSendPeers.len
if rpcMsg.control.isSome:
let control = rpcMsg.control.get()
g.handlePrune(peer, control.prune)
var respControl: ControlMessage var respControl: ControlMessage
if m.control.isSome: respControl.iwant.add(g.handleIHave(peer, control.ihave))
let control = m.control.get() respControl.prune.add(g.handleGraft(peer, control.graft))
g.handlePrune(peer, control.prune) let messages = g.handleIWant(peer, control.iwant)
respControl.iwant.add(g.handleIHave(peer, control.ihave)) if respControl.graft.len > 0 or respControl.prune.len > 0 or
respControl.prune.add(g.handleGraft(peer, control.graft)) respControl.ihave.len > 0 or messages.len > 0:
let messages = g.handleIWant(peer, control.iwant)
if respControl.graft.len > 0 or respControl.prune.len > 0 or debug "sending control message", msg = shortLog(respControl)
respControl.ihave.len > 0: g.send(
try: peer,
info "sending control message", msg = respControl RPCMsg(control: some(respControl), messages: messages))
let sent = await g.send(
peer,
RPCMsg(control: some(respControl), messages: messages),
DefaultSendTimeout)
if not sent:
g.unsubscribePeer(peer.peerId)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception forwarding control messages", exc = exc.msg
method subscribe*(g: GossipSub, method subscribe*(g: GossipSub,
topic: string, topic: string,
@ -481,7 +457,7 @@ method unsubscribe*(g: GossipSub,
g.mesh.del(topic) g.mesh.del(topic)
let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
discard await g.broadcast(toSeq(peers), prune, DefaultSendTimeout) g.broadcast(toSeq(peers), prune)
method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = method unsubscribeAll*(g: GossipSub, topic: string) {.async.} =
await procCall PubSub(g).unsubscribeAll(topic) await procCall PubSub(g).unsubscribeAll(topic)
@ -491,14 +467,13 @@ method unsubscribeAll*(g: GossipSub, topic: string) {.async.} =
g.mesh.del(topic) g.mesh.del(topic)
let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
discard await g.broadcast(toSeq(peers), prune, DefaultSendTimeout) g.broadcast(toSeq(peers), prune)
method publish*(g: GossipSub, method publish*(g: GossipSub,
topic: string, topic: string,
data: seq[byte], data: seq[byte]): Future[int] {.async.} =
timeout: Duration = InfiniteDuration): Future[int] {.async.} =
# base returns always 0 # base returns always 0
discard await procCall PubSub(g).publish(topic, data, timeout) discard await procCall PubSub(g).publish(topic, data)
trace "publishing message on topic", topic, data = data.shortLog trace "publishing message on topic", topic, data = data.shortLog
if topic.len <= 0: # data could be 0/empty if topic.len <= 0: # data could be 0/empty
@ -533,14 +508,14 @@ method publish*(g: GossipSub,
g.mcache.put(msgId, msg) g.mcache.put(msgId, msg)
if peers.len > 0: if peers.len > 0:
let published = await g.broadcast(toSeq(peers), RPCMsg(messages: @[msg]), timeout) g.broadcast(toSeq(peers), RPCMsg(messages: @[msg]))
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
if published > 0: if peers.len > 0:
libp2p_pubsub_messages_published.inc(labelValues = [topic]) libp2p_pubsub_messages_published.inc(labelValues = [topic])
trace "published message to peers", peers = published, trace "published message to peers", peers = peers.len,
msg = msg.shortLog() msg = msg.shortLog()
return published return peers.len
else: else:
debug "No peers for gossip message", topic, msg = msg.shortLog() debug "No peers for gossip message", topic, msg = msg.shortLog()
return 0 return 0

View File

@ -66,54 +66,38 @@ method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} =
## ##
trace "unsubscribing pubsub peer", peer = $peerId trace "unsubscribing pubsub peer", peer = $peerId
if peerId in p.peers: p.peers.del(peerId)
p.peers.del(peerId)
libp2p_pubsub_peers.set(p.peers.len.int64) libp2p_pubsub_peers.set(p.peers.len.int64)
proc send*( proc send*(
p: PubSub, p: PubSub,
peer: PubSubPeer, peer: PubSubPeer,
msg: RPCMsg, msg: RPCMsg) =
timeout: Duration): Future[bool] {.async.} =
## send to remote peer ## send to remote peer
## ##
trace "sending pubsub message to peer", peer = $peer, msg = shortLog(msg) trace "sending pubsub message to peer", peer = $peer, msg = shortLog(msg)
try: peer.send(msg)
await peer.send(msg, timeout)
return true
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception sending pubsub message to peer",
peer = $peer, msg = shortLog(msg)
p.unsubscribePeer(peer.peerId)
proc broadcast*( proc broadcast*(
p: PubSub, p: PubSub,
sendPeers: seq[PubSubPeer], sendPeers: openArray[PubSubPeer],
msg: RPCMsg, msg: RPCMsg) = # raises: [Defect]
timeout: Duration): Future[int] {.async.} = ## send messages - returns number of send attempts made.
## send messages and cleanup failed peers
## ##
trace "broadcasting messages to peers", trace "broadcasting messages to peers",
peers = sendPeers.len, message = shortLog(msg) peers = sendPeers.len, message = shortLog(msg)
let sent = await allFinished( for peer in sendPeers:
sendPeers.mapIt( p.send(it, msg, timeout) )) p.send(peer, msg)
return sent.filterIt( it.finished and it.read ).len
proc sendSubs*(p: PubSub, proc sendSubs*(p: PubSub,
peer: PubSubPeer, peer: PubSubPeer,
topics: seq[string], topics: seq[string],
subscribe: bool): Future[bool] = subscribe: bool) =
## send subscriptions to remote peer ## send subscriptions to remote peer
p.send( p.send(peer, RPCMsg.withSubs(topics, subscribe))
peer,
RPCMsg(
subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))),
DefaultSendTimeout)
method subscribeTopic*(p: PubSub, method subscribeTopic*(p: PubSub,
topic: string, topic: string,
@ -124,16 +108,14 @@ method subscribeTopic*(p: PubSub,
method rpcHandler*(p: PubSub, method rpcHandler*(p: PubSub,
peer: PubSubPeer, peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async, base.} = rpcMsg: RPCMsg) {.async, base.} =
## handle rpc messages ## handle rpc messages
trace "processing RPC message", peer = peer.id, msgs = rpcMsgs.len logScope: peer = peer.id
for m in rpcMsgs: # for all RPC messages trace "processing RPC message", msg = rpcMsg.shortLog
trace "processing messages", msg = m.shortLog for s in rpcMsg.subscriptions: # subscribe/unsubscribe the peer for each topic
if m.subscriptions.len > 0: # if there are any subscriptions trace "about to subscribe to topic", topicId = s.topic
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic p.subscribeTopic(s.topic, s.subscribe, peer)
trace "about to subscribe to topic", topicId = s.topic
p.subscribeTopic(s.topic, s.subscribe, peer)
proc getOrCreatePeer*( proc getOrCreatePeer*(
p: PubSub, p: PubSub,
@ -142,16 +124,36 @@ proc getOrCreatePeer*(
if peer in p.peers: if peer in p.peers:
return p.peers[peer] return p.peers[peer]
proc getConn(): Future[(Connection, RPCMsg)] {.async.} =
let conn = await p.switch.dial(peer, proto)
return (conn, RPCMsg.withSubs(toSeq(p.topics.keys), true))
# create new pubsub peer # create new pubsub peer
let pubSubPeer = newPubSubPeer(peer, p.switch, proto) let pubSubPeer = newPubSubPeer(peer, getConn, proto)
trace "created new pubsub peer", peerId = $peer trace "created new pubsub peer", peerId = $peer
p.peers[peer] = pubSubPeer p.peers[peer] = pubSubPeer
pubSubPeer.observers = p.observers pubSubPeer.observers = p.observers
libp2p_pubsub_peers.set(p.peers.len.int64) libp2p_pubsub_peers.set(p.peers.len.int64)
pubsubPeer.connect()
return pubSubPeer return pubSubPeer
proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] {.async.} =
if topic notin p.topics: return # Not subscribed
for h in p.topics[topic].handler:
trace "triggering handler", topicID = topic
try:
await h(topic, data)
except CancelledError as exc:
raise exc
except CatchableError as exc:
# Handlers should never raise exceptions
warn "Error in topic handler", msg = exc.msg
method handleConn*(p: PubSub, method handleConn*(p: PubSub,
conn: Connection, conn: Connection,
proto: string) {.base, async.} = proto: string) {.base, async.} =
@ -171,15 +173,13 @@ method handleConn*(p: PubSub,
await conn.close() await conn.close()
return return
proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: RPCMsg): Future[void] =
# call pubsub rpc handler # call pubsub rpc handler
await p.rpcHandler(peer, msgs) p.rpcHandler(peer, msg)
let peer = p.getOrCreatePeer(conn.peerInfo.peerId, proto)
try: try:
let peer = p.getOrCreatePeer(conn.peerInfo.peerId, proto)
if p.topics.len > 0:
discard await p.sendSubs(peer, toSeq(p.topics.keys), true)
peer.handler = handler peer.handler = handler
await peer.handle(conn) # spawn peer read loop await peer.handle(conn) # spawn peer read loop
trace "pubsub peer handler ended", peer = peer.id trace "pubsub peer handler ended", peer = peer.id
@ -195,15 +195,7 @@ method subscribePeer*(p: PubSub, peer: PeerID) {.base.} =
## messages ## messages
## ##
let pubsubPeer = p.getOrCreatePeer(peer, p.codec) discard p.getOrCreatePeer(peer, p.codec)
if p.topics.len > 0:
# TODO sendSubs may raise, but doing asyncCheck here causes the exception
# to escape to the poll loop.
# With a bit of luck, it may be harmless to ignore exceptions here -
# some cleanup is eventually done in PubSubPeer.send
asyncCheck p.sendSubs(pubsubPeer, toSeq(p.topics.keys), true)
pubsubPeer.subscribed = true
method unsubscribe*(p: PubSub, method unsubscribe*(p: PubSub,
topics: seq[TopicPair]) {.base, async.} = topics: seq[TopicPair]) {.base, async.} =
@ -249,32 +241,18 @@ method subscribe*(p: PubSub,
p.topics[topic].handler.add(handler) p.topics[topic].handler.add(handler)
var sent: seq[Future[bool]] for _, peer in p.peers:
for peer in toSeq(p.peers.values): p.sendSubs(peer, @[topic], true)
sent.add(p.sendSubs(peer, @[topic], true))
checkFutures(await allFinished(sent))
# metrics # metrics
libp2p_pubsub_topics.set(p.topics.len.int64) libp2p_pubsub_topics.set(p.topics.len.int64)
method publish*(p: PubSub, method publish*(p: PubSub,
topic: string, topic: string,
data: seq[byte], data: seq[byte]): Future[int] {.base, async.} =
timeout: Duration = InfiniteDuration): Future[int] {.base, async.} =
## publish to a ``topic`` ## publish to a ``topic``
if p.triggerSelf and topic in p.topics: if p.triggerSelf:
for h in p.topics[topic].handler: await handleData(p, topic, data)
trace "triggering handler", topicID = topic
try:
await h(topic, data)
except CancelledError as exc:
raise exc
except CatchableError as exc:
# TODO these exceptions are ignored since it's likely that if writes are
# are failing, the underlying connection is already closed - this needs
# more cleanup though
debug "Could not write to pubsub connection", msg = exc.msg
return 0 return 0

View File

@ -28,27 +28,26 @@ when defined(libp2p_expensive_metrics):
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"]) declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"]) declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
const
DefaultSendTimeout* = 10.seconds
type type
PubSubObserver* = ref object PubSubObserver* = ref object
onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
GetConn* = proc(): Future[(Connection, RPCMsg)] {.gcsafe.}
PubSubPeer* = ref object of RootObj PubSubPeer* = ref object of RootObj
switch*: Switch # switch instance to dial peers getConn*: GetConn # callback to establish a new send connection
codec*: string # the protocol that this peer joined from codec*: string # the protocol that this peer joined from
sendConn: Connection # cached send connection sendConn: Connection # cached send connection
connections*: seq[Connection] # connections to this peer
peerId*: PeerID peerId*: PeerID
handler*: RPCHandler handler*: RPCHandler
sentRpcCache: TimedCache[string] # cache for already sent messages sentRpcCache: TimedCache[string] # cache for already sent messages
recvdRpcCache: TimedCache[string] # cache for already received messages recvdRpcCache: TimedCache[string] # cache for already received messages
observers*: ref seq[PubSubObserver] # ref as in smart_ptr observers*: ref seq[PubSubObserver] # ref as in smart_ptr
subscribed*: bool # are we subscribed to this peer
dialLock: AsyncLock dialLock: AsyncLock
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe.}
func hash*(p: PubSubPeer): Hash = func hash*(p: PubSubPeer): Hash =
# int is either 32/64, so intptr basically, pubsubpeer is a ref # int is either 32/64, so intptr basically, pubsubpeer is a ref
@ -78,13 +77,15 @@ proc sendObservers(p: PubSubPeer, msg: var RPCMsg) =
proc handle*(p: PubSubPeer, conn: Connection) {.async.} = proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
logScope: logScope:
oid = $conn.oid
peer = p.id peer = p.id
closed = conn.closed
debug "starting pubsub read loop for peer", closed = conn.closed debug "starting pubsub read loop"
try: try:
try: try:
while not conn.atEof: while not conn.atEof:
trace "waiting for data", closed = conn.closed trace "waiting for data"
let data = await conn.readLp(64 * 1024) let data = await conn.readLp(64 * 1024)
let digest = $(sha256.digest(data)) let digest = $(sha256.digest(data))
trace "read data from peer", data = data.shortLog trace "read data from peer", data = data.shortLog
@ -111,10 +112,9 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
# metrics # metrics
libp2p_pubsub_received_messages.inc(labelValues = [p.id, t]) libp2p_pubsub_received_messages.inc(labelValues = [p.id, t])
await p.handler(p, @[msg]) await p.handler(p, msg)
p.recvdRpcCache.put(digest) p.recvdRpcCache.put(digest)
finally: finally:
debug "exiting pubsub peer read loop"
await conn.close() await conn.close()
if p.sendConn == conn: if p.sendConn == conn:
@ -124,9 +124,14 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
trace "Exception occurred in PubSubPeer.handle", exc = exc.msg trace "Exception occurred in PubSubPeer.handle", exc = exc.msg
finally:
debug "exiting pubsub read loop"
proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} =
# get a cached send connection or create a new one ## get a cached send connection or create a new one - will return nil if
## getting a new connection fails
##
block: # check if there's an existing connection that can be reused block: # check if there's an existing connection that can be reused
let current = p.sendConn let current = p.sendConn
@ -147,7 +152,8 @@ proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} =
# and later close one of them, other implementations such as rust-libp2p # and later close one of them, other implementations such as rust-libp2p
# become deaf to our messages (potentially due to the clean-up associated # become deaf to our messages (potentially due to the clean-up associated
# with closing connections). To prevent this, we use a lock that ensures # with closing connections). To prevent this, we use a lock that ensures
# that only a single dial will be performed for each peer. # that only a single dial will be performed for each peer and send the
# subscription table every time we reconnect.
# #
# Nevertheless, this approach is still quite problematic because the gossip # Nevertheless, this approach is still quite problematic because the gossip
# sends and their respective dials may be started from the mplex read loop. # sends and their respective dials may be started from the mplex read loop.
@ -172,24 +178,35 @@ proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} =
return current return current
# Grab a new send connection # Grab a new send connection
let newConn = await p.switch.dial(p.peerId, p.codec) # ...and here let (newConn, handshake) = await p.getConn() # ...and here
if newConn.isNil: if newConn.isNil:
return nil return nil
trace "Sending handshake", oid = $newConn.oid, handshake = shortLog(handshake)
await newConn.writeLp(encodeRpcMsg(handshake))
trace "Caching new send connection", oid = $newConn.oid trace "Caching new send connection", oid = $newConn.oid
p.sendConn = newConn p.sendConn = newConn
asyncCheck p.handle(newConn) # start a read loop on the new connection asyncCheck p.handle(newConn) # start a read loop on the new connection
return newConn return newConn
except CancelledError as exc:
raise exc
except CatchableError as exc:
return nil
finally: finally:
if p.dialLock.locked: if p.dialLock.locked:
p.dialLock.release() p.dialLock.release()
proc send*( proc connectImpl*(p: PubSubPeer) {.async.} =
p: PubSubPeer, try:
msg: RPCMsg, discard await getSendConn(p)
timeout: Duration = DefaultSendTimeout) {.async.} = except CatchableError as exc:
debug "Could not connect to pubsub peer", err = exc.msg
proc connect*(p: PubSubPeer) =
asyncCheck(connectImpl(p))
proc sendImpl(p: PubSubPeer, msg: RPCMsg) {.async.} =
doAssert(not isNil(p), "pubsubpeer nil!") doAssert(not isNil(p), "pubsubpeer nil!")
logScope: logScope:
@ -217,16 +234,15 @@ proc send*(
libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id]) libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id])
return return
var conn: Connection var conn = await p.getSendConn()
try: try:
trace "about to send message" trace "about to send message"
conn = await p.getSendConn()
if conn == nil: if conn == nil:
debug "Couldn't get send connection, dropping message" debug "Couldn't get send connection, dropping message"
return return
trace "sending encoded msgs to peer", connId = $conn.oid trace "sending encoded msgs to peer", connId = $conn.oid
await conn.writeLp(encoded).wait(timeout) await conn.writeLp(encoded)
p.sentRpcCache.put(digest) p.sentRpcCache.put(digest)
trace "sent pubsub message to remote", connId = $conn.oid trace "sent pubsub message to remote", connId = $conn.oid
@ -238,22 +254,32 @@ proc send*(
libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t]) libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t])
except CatchableError as exc: except CatchableError as exc:
# Because we detach the send call from the currently executing task using
# asyncCheck, no exceptions may leak out of it
trace "unable to send to remote", exc = exc.msg trace "unable to send to remote", exc = exc.msg
# Next time sendConn is used, it will be have its close flag set and thus # Next time sendConn is used, it will be have its close flag set and thus
# will be recycled # will be recycled
if not isNil(conn): if not isNil(conn):
await conn.close() await conn.close() # This will clean up the send connection
raise exc if exc is CancelledError: # TODO not handled
debug "Send cancelled"
# We'll ask for a new send connection whenever possible
if p.sendConn == conn:
p.sendConn = nil
proc send*(p: PubSubPeer, msg: RPCMsg) =
asyncCheck sendImpl(p, msg)
proc `$`*(p: PubSubPeer): string = proc `$`*(p: PubSubPeer): string =
p.id $p.peerId
proc newPubSubPeer*(peerId: PeerID, proc newPubSubPeer*(peerId: PeerID,
switch: Switch, getConn: GetConn,
codec: string): PubSubPeer = codec: string): PubSubPeer =
new result new result
result.switch = switch result.getConn = getConn
result.codec = codec result.codec = codec
result.peerId = peerId result.peerId = peerId
result.sentRpcCache = newTimedCache[string](2.minutes) result.sentRpcCache = newTimedCache[string](2.minutes)

View File

@ -50,6 +50,11 @@ type
messages*: seq[Message] messages*: seq[Message]
control*: Option[ControlMessage] control*: Option[ControlMessage]
func withSubs*(
T: type RPCMsg, topics: openArray[string], subscribe: bool): T =
T(
subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it)))
func shortLog*(s: ControlIHave): auto = func shortLog*(s: ControlIHave): auto =
( (
topicID: s.topicID.shortLog, topicID: s.topicID.shortLog,

View File

@ -43,7 +43,7 @@ logScope:
topics = "bufferstream" topics = "bufferstream"
const const
DefaultBufferSize* = 102400 DefaultBufferSize* = 128
const const
BufferStreamTrackerName* = "libp2p.bufferstream" BufferStreamTrackerName* = "libp2p.bufferstream"

View File

@ -16,6 +16,13 @@ type
proc noop(data: seq[byte]) {.async, gcsafe.} = discard proc noop(data: seq[byte]) {.async, gcsafe.} = discard
proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): auto =
proc getConn(): Future[(Connection, RPCMsg)] {.async.} =
let conn = await p.switch.dial(peerId, GossipSubCodec)
return (conn, RPCMsg.withSubs(toSeq(p.topics.keys), true))
newPubSubPeer(peerId, getConn, GossipSubCodec)
proc randomPeerInfo(): PeerInfo = proc randomPeerInfo(): PeerInfo =
PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
@ -39,7 +46,7 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipSub.switch, GossipSubCodec) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.peers[peerInfo.peerId] = peer gossipSub.peers[peerInfo.peerId] = peer
gossipSub.mesh[topic].incl(peer) gossipSub.mesh[topic].incl(peer)
@ -69,7 +76,7 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
gossipSub.peers[peerInfo.peerId] = peer gossipSub.peers[peerInfo.peerId] = peer
gossipSub.mesh[topic].incl(peer) gossipSub.mesh[topic].incl(peer)
@ -89,7 +96,7 @@ suite "GossipSub internal":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = TestGossipSub.init(newStandardSwitch()) let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
discard discard
let topic = "foobar" let topic = "foobar"
@ -101,7 +108,7 @@ suite "GossipSub internal":
conns &= conn conns &= conn
var peerInfo = randomPeerInfo() var peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
peer.handler = handler peer.handler = handler
gossipSub.gossipsub[topic].incl(peer) gossipSub.gossipsub[topic].incl(peer)
@ -121,7 +128,7 @@ suite "GossipSub internal":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = TestGossipSub.init(newStandardSwitch()) let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
discard discard
let topic = "foobar" let topic = "foobar"
@ -135,7 +142,7 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
peer.handler = handler peer.handler = handler
gossipSub.fanout[topic].incl(peer) gossipSub.fanout[topic].incl(peer)
@ -156,7 +163,7 @@ suite "GossipSub internal":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = TestGossipSub.init(newStandardSwitch()) let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
discard discard
let topic1 = "foobar1" let topic1 = "foobar1"
@ -173,7 +180,7 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
peer.handler = handler peer.handler = handler
gossipSub.fanout[topic1].incl(peer) gossipSub.fanout[topic1].incl(peer)
gossipSub.fanout[topic2].incl(peer) gossipSub.fanout[topic2].incl(peer)
@ -197,7 +204,7 @@ suite "GossipSub internal":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = TestGossipSub.init(newStandardSwitch()) let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
discard discard
let topic = "foobar" let topic = "foobar"
@ -212,7 +219,7 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
peer.handler = handler peer.handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.fanout[topic].incl(peer) gossipSub.fanout[topic].incl(peer)
@ -225,7 +232,7 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
peer.handler = handler peer.handler = handler
gossipSub.gossipsub[topic].incl(peer) gossipSub.gossipsub[topic].incl(peer)
@ -262,7 +269,7 @@ suite "GossipSub internal":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = TestGossipSub.init(newStandardSwitch()) let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
discard discard
let topic = "foobar" let topic = "foobar"
@ -274,7 +281,7 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipsub.switch, GossipSubCodec) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
peer.handler = handler peer.handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.fanout[topic].incl(peer) gossipSub.fanout[topic].incl(peer)
@ -307,7 +314,7 @@ suite "GossipSub internal":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = TestGossipSub.init(newStandardSwitch()) let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
discard discard
let topic = "foobar" let topic = "foobar"
@ -319,7 +326,7 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipSub.switch, GossipSubCodec) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
peer.handler = handler peer.handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.mesh[topic].incl(peer) gossipSub.mesh[topic].incl(peer)
@ -352,7 +359,7 @@ suite "GossipSub internal":
proc testRun(): Future[bool] {.async.} = proc testRun(): Future[bool] {.async.} =
let gossipSub = TestGossipSub.init(newStandardSwitch()) let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} = proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
discard discard
let topic = "foobar" let topic = "foobar"
@ -364,7 +371,7 @@ suite "GossipSub internal":
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let peer = newPubSubPeer(peerInfo.peerId, gossipSub.switch, GossipSubCodec) let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
peer.handler = handler peer.handler = handler
if i mod 2 == 0: if i mod 2 == 0:
gossipSub.mesh[topic].incl(peer) gossipSub.mesh[topic].incl(peer)