mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2025-03-03 09:31:08 +00:00
rework sending, remove helpers from pubsubpeer, unify in broadcast
This commit is contained in:
parent
2eb5347ad2
commit
c73447cd38
@ -101,7 +101,7 @@ method rpcHandler*(f: FloodSub,
|
||||
trace "exception in message handler", exc = exc.msg
|
||||
|
||||
# forward the message to all peers interested in it
|
||||
let published = await f.publishHelper(toSendPeers, m.messages, DefaultSendTimeout)
|
||||
let published = await f.broadcast(toSendPeers, RPCMsg(messages: m.messages), DefaultSendTimeout)
|
||||
|
||||
trace "forwared message to peers", peers = published
|
||||
|
||||
@ -137,7 +137,7 @@ method publish*(f: FloodSub,
|
||||
let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign)
|
||||
|
||||
# start the future but do not wait yet
|
||||
let published = await f.publishHelper(f.floodsub.getOrDefault(topic), @[msg], timeout)
|
||||
let published = await f.broadcast(f.floodsub.getOrDefault(topic), RPCMsg(messages: @[msg]), timeout)
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
libp2p_pubsub_messages_published.inc(labelValues = [topic])
|
||||
|
@ -155,10 +155,10 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||
.set(g.mesh.peers(topic).int64, labelValues = [topic])
|
||||
|
||||
# Send changes to peers after table updates to avoid stale state
|
||||
for p in grafts:
|
||||
await p.sendGraft(@[topic])
|
||||
for p in prunes:
|
||||
await p.sendPrune(@[topic])
|
||||
let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))
|
||||
let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
|
||||
discard g.broadcast(grafts, graft, DefaultSendTimeout)
|
||||
discard g.broadcast(prunes, prune, DefaultSendTimeout)
|
||||
|
||||
trace "mesh balanced, got peers", peers = g.mesh.peers(topic)
|
||||
|
||||
@ -229,10 +229,10 @@ proc heartbeat(g: GossipSub) {.async.} =
|
||||
g.replenishFanout(t)
|
||||
|
||||
let peers = g.getGossipPeers()
|
||||
var sent: seq[Future[void]]
|
||||
var sent: seq[Future[int]]
|
||||
for peer, control in peers:
|
||||
g.peers.withValue(peer.peerId, pubsubPeer) do:
|
||||
sent &= pubsubPeer[].send(RPCMsg(control: some(control)))
|
||||
sent &= g.broadcast([pubsubPeer[]], RPCMsg(control: some(control)), DefaultSendTimeout)
|
||||
checkFutures(await allFinished(sent))
|
||||
|
||||
g.mcache.shift() # shift the cache
|
||||
@ -436,7 +436,7 @@ method rpcHandler*(g: GossipSub,
|
||||
trace "exception in message handler", exc = exc.msg
|
||||
|
||||
# forward the message to all peers interested in it
|
||||
let published = await g.publishHelper(toSendPeers, m.messages, DefaultSendTimeout)
|
||||
let published = await g.broadcast(toSendPeers, RPCMsg(messages: m.messages), DefaultSendTimeout)
|
||||
|
||||
trace "forwared message to peers", peers = published
|
||||
|
||||
@ -453,8 +453,7 @@ method rpcHandler*(g: GossipSub,
|
||||
respControl.ihave.len > 0:
|
||||
try:
|
||||
info "sending control message", msg = respControl
|
||||
await peer.send(
|
||||
RPCMsg(control: some(respControl), messages: messages))
|
||||
let _ = await g.broadcast([peer], RPCMsg(control: some(respControl), messages: messages), DefaultSendTimeout)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
@ -477,10 +476,8 @@ method unsubscribe*(g: GossipSub,
|
||||
let peers = g.mesh.getOrDefault(topic)
|
||||
g.mesh.del(topic)
|
||||
|
||||
var pending = newSeq[Future[void]]()
|
||||
for peer in peers:
|
||||
pending.add(peer.sendPrune(@[topic]))
|
||||
checkFutures(await allFinished(pending))
|
||||
let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
|
||||
discard g.broadcast(peers, prune, DefaultSendTimeout)
|
||||
|
||||
method unsubscribeAll*(g: GossipSub, topic: string) {.async.} =
|
||||
await procCall PubSub(g).unsubscribeAll(topic)
|
||||
@ -489,10 +486,8 @@ method unsubscribeAll*(g: GossipSub, topic: string) {.async.} =
|
||||
let peers = g.mesh.getOrDefault(topic)
|
||||
g.mesh.del(topic)
|
||||
|
||||
var pending = newSeq[Future[void]]()
|
||||
for peer in peers:
|
||||
pending.add(peer.sendPrune(@[topic]))
|
||||
checkFutures(await allFinished(pending))
|
||||
let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
|
||||
discard g.broadcast(peers, prune, DefaultSendTimeout)
|
||||
|
||||
method publish*(g: GossipSub,
|
||||
topic: string,
|
||||
@ -533,7 +528,7 @@ method publish*(g: GossipSub,
|
||||
if msgId notin g.mcache:
|
||||
g.mcache.put(msgId, msg)
|
||||
|
||||
let published = await g.publishHelper(peers, @[msg], timeout)
|
||||
let published = await g.broadcast(peers, RPCMsg(messages: @[msg]), timeout)
|
||||
when defined(libp2p_expensive_metrics):
|
||||
if published > 0:
|
||||
libp2p_pubsub_messages_published.inc(labelValues = [topic])
|
||||
|
@ -72,12 +72,45 @@ method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} =
|
||||
|
||||
libp2p_pubsub_peers.set(p.peers.len.int64)
|
||||
|
||||
proc broadcast*(p: PubSub,
|
||||
sendPeers: HashSet[PubSubPeer] | seq[PubSubPeer] | array[1, PubSubPeer],
|
||||
msg: RPCMsg,
|
||||
timeout: Duration): Future[int] {.async.} =
|
||||
# send messages and cleanup failed peers
|
||||
var sent: seq[tuple[id: PeerID, fut: Future[void]]]
|
||||
for sendPeer in sendPeers:
|
||||
if sendPeer.isNil:
|
||||
continue
|
||||
|
||||
# avoid sending to self
|
||||
if sendPeer.peerId == p.peerInfo.peerId:
|
||||
continue
|
||||
|
||||
trace "broadcast to peer", peer = sendPeer.id, message = msg
|
||||
sent.add((id: sendPeer.peerId, fut: sendPeer.send(msg, timeout)))
|
||||
|
||||
var broadcasted: seq[PeerID]
|
||||
var failed: seq[PeerID]
|
||||
let futs = await allFinished(sent.mapIt(it.fut))
|
||||
for s in futs:
|
||||
let f = sent.filterIt(it.fut == s)
|
||||
if f.len > 0:
|
||||
if s.failed:
|
||||
trace "broadcast to peer failed", peer = f[0].id
|
||||
p.unsubscribePeer(f[0].id)
|
||||
failed.add(f[0].id)
|
||||
else:
|
||||
trace "broadcast to peer succeeded", peer = f[0].id
|
||||
broadcasted.add(f[0].id)
|
||||
|
||||
return broadcasted.len
|
||||
|
||||
proc sendSubs*(p: PubSub,
|
||||
peer: PubSubPeer,
|
||||
topics: seq[string],
|
||||
subscribe: bool) {.async.} =
|
||||
## send subscriptions to remote peer
|
||||
await peer.sendSubOpts(topics, subscribe)
|
||||
discard await p.broadcast([peer], RPCMsg(subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))), DefaultSendTimeout)
|
||||
|
||||
method subscribeTopic*(p: PubSub,
|
||||
topic: string,
|
||||
@ -218,39 +251,6 @@ method subscribe*(p: PubSub,
|
||||
# metrics
|
||||
libp2p_pubsub_topics.set(p.topics.len.int64)
|
||||
|
||||
proc publishHelper*(p: PubSub,
|
||||
sendPeers: HashSet[PubSubPeer],
|
||||
msgs: seq[Message],
|
||||
timeout: Duration): Future[int] {.async.} =
|
||||
# send messages and cleanup failed peers
|
||||
var sent: seq[tuple[id: PeerID, fut: Future[void]]]
|
||||
for sendPeer in sendPeers:
|
||||
if sendPeer.isNil:
|
||||
continue
|
||||
|
||||
# avoid sending to self
|
||||
if sendPeer.peerId == p.peerInfo.peerId:
|
||||
continue
|
||||
|
||||
trace "sending messages to peer", peer = sendPeer.id, msgs
|
||||
sent.add((id: sendPeer.peerId, fut: sendPeer.send(RPCMsg(messages: msgs), timeout)))
|
||||
|
||||
var published: seq[PeerID]
|
||||
var failed: seq[PeerID]
|
||||
let futs = await allFinished(sent.mapIt(it.fut))
|
||||
for s in futs:
|
||||
let f = sent.filterIt(it.fut == s)
|
||||
if f.len > 0:
|
||||
if s.failed:
|
||||
trace "sending messages to peer failed", peer = f[0].id
|
||||
p.unsubscribePeer(f[0].id)
|
||||
failed.add(f[0].id)
|
||||
else:
|
||||
trace "sending messages to peer succeeded", peer = f[0].id
|
||||
published.add(f[0].id)
|
||||
|
||||
return published.len
|
||||
|
||||
method publish*(p: PubSub,
|
||||
topic: string,
|
||||
data: seq[byte],
|
||||
|
@ -190,39 +190,6 @@ proc send*(
|
||||
|
||||
raise exc
|
||||
|
||||
proc sendSubOpts*(p: PubSubPeer, topics: seq[string], subscribe: bool) {.async.} =
|
||||
trace "sending subscriptions", peer = p.id, subscribe, topicIDs = topics
|
||||
|
||||
try:
|
||||
await p.send(RPCMsg(
|
||||
subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))))
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "exception sending subscriptions", exc = exc.msg
|
||||
|
||||
proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} =
|
||||
trace "sending graft to peer", peer = p.id, topicIDs = topics
|
||||
|
||||
try:
|
||||
await p.send(RPCMsg(control: some(
|
||||
ControlMessage(graft: topics.mapIt(ControlGraft(topicID: it))))))
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "exception sending grafts", exc = exc.msg
|
||||
|
||||
proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async.} =
|
||||
trace "sending prune to peer", peer = p.id, topicIDs = topics
|
||||
|
||||
try:
|
||||
await p.send(RPCMsg(control: some(
|
||||
ControlMessage(prune: topics.mapIt(ControlPrune(topicID: it))))))
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "exception sending prunes", exc = exc.msg
|
||||
|
||||
proc `$`*(p: PubSubPeer): string =
|
||||
p.id
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user