mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-08 15:53:07 +00:00
only call fragmentRPC if we're over the size limit
This commit is contained in:
parent
568fa5a244
commit
21a4f8c0c8
35
gossipsub.go
35
gossipsub.go
@ -899,16 +899,31 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// fragment into multiple RPCs if necessary
|
dropRPC := func(rpc *RPC, template string, args ...interface{}) {
|
||||||
outRPCs, err := fragmentRPC(out, gs.p.maxMessageSize)
|
log.Warnf(template, args...)
|
||||||
if err != nil {
|
gs.tracer.DropRPC(rpc, p)
|
||||||
log.Warnf("unable to fragment message to peer %s: %s", p, err)
|
|
||||||
gs.tracer.DropRPC(out, p)
|
|
||||||
// push control messages that need to be retried
|
// push control messages that need to be retried
|
||||||
ctl := out.GetControl()
|
ctl := rpc.GetControl()
|
||||||
if ctl != nil {
|
if ctl != nil {
|
||||||
gs.pushControl(p, ctl)
|
gs.pushControl(p, ctl)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we're below the max message size, go ahead and send
|
||||||
|
if out.Size() < gs.p.maxMessageSize {
|
||||||
|
select {
|
||||||
|
case mch <- out:
|
||||||
|
gs.tracer.SendRPC(out, p)
|
||||||
|
default:
|
||||||
|
dropRPC(out, "dropping message to peer %s: queue full", p)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we're too big, fragment into multiple RPCs and send each sequentially
|
||||||
|
outRPCs, err := fragmentRPC(out, gs.p.maxMessageSize)
|
||||||
|
if err != nil {
|
||||||
|
dropRPC(out, "unable to fragment message to peer %s: %s", p, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -917,13 +932,7 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {
|
|||||||
case mch <- rpc:
|
case mch <- rpc:
|
||||||
gs.tracer.SendRPC(rpc, p)
|
gs.tracer.SendRPC(rpc, p)
|
||||||
default:
|
default:
|
||||||
log.Infof("dropping message to peer %s: queue full", p)
|
dropRPC(rpc, "dropping message to peer %s: queue full", p)
|
||||||
gs.tracer.DropRPC(rpc, p)
|
|
||||||
// push control messages that need to be retried
|
|
||||||
ctl := rpc.GetControl()
|
|
||||||
if ctl != nil {
|
|
||||||
gs.pushControl(p, ctl)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user