mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-02 12:53:09 +00:00
perf: use msgio pooled buffers for received msgs (#500)
This commit is contained in:
parent
1e161006c4
commit
9c56b2deca
23
comm.go
23
comm.go
@ -6,14 +6,14 @@ import (
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
|
||||
pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
||||
|
||||
"github.com/libp2p/go-msgio"
|
||||
"github.com/libp2p/go-msgio/protoio"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
||||
)
|
||||
|
||||
// get the initial RPC containing all of our subscriptions to send to new peers
|
||||
@ -60,11 +60,11 @@ func (p *PubSub) handleNewStream(s network.Stream) {
|
||||
p.inboundStreamsMx.Unlock()
|
||||
}()
|
||||
|
||||
r := protoio.NewDelimitedReader(s, p.maxMessageSize)
|
||||
r := msgio.NewVarintReaderSize(s, p.maxMessageSize)
|
||||
for {
|
||||
rpc := new(RPC)
|
||||
err := r.ReadMsg(&rpc.RPC)
|
||||
msgbytes, err := r.ReadMsg()
|
||||
if err != nil {
|
||||
r.ReleaseMsg(msgbytes)
|
||||
if err != io.EOF {
|
||||
s.Reset()
|
||||
log.Debugf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err)
|
||||
@ -77,6 +77,15 @@ func (p *PubSub) handleNewStream(s network.Stream) {
|
||||
return
|
||||
}
|
||||
|
||||
rpc := new(RPC)
|
||||
err = rpc.Unmarshal(msgbytes)
|
||||
r.ReleaseMsg(msgbytes)
|
||||
if err != nil {
|
||||
s.Reset()
|
||||
log.Warnf("bogus rpc from %s: %s", s.Conn().RemotePeer(), err)
|
||||
return
|
||||
}
|
||||
|
||||
rpc.from = peer
|
||||
select {
|
||||
case p.incoming <- rpc:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user