perf: use pooled buffers for message writes (#507)

This commit is contained in:
Hlib Kanunnikov 2022-11-19 15:59:45 +01:00 committed by GitHub
parent 9c56b2deca
commit aed7fc42c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

20
comm.go
View File

@ -1,12 +1,14 @@
package pubsub
import (
"bufio"
"context"
"encoding/binary"
"io"
"time"
"github.com/gogo/protobuf/proto"
pool "github.com/libp2p/go-buffer-pool"
"github.com/multiformats/go-varint"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
@ -156,16 +158,20 @@ func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) {
}
func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing <-chan *RPC) {
bufw := bufio.NewWriter(s)
wc := protoio.NewDelimitedWriter(bufw)
writeRpc := func(rpc *RPC) error {
size := uint64(rpc.Size())
writeMsg := func(msg proto.Message) error {
err := wc.WriteMsg(msg)
buf := pool.Get(varint.UvarintSize(size) + int(size))
defer pool.Put(buf)
n := binary.PutUvarint(buf, size)
_, err := rpc.MarshalTo(buf[n:])
if err != nil {
return err
}
return bufw.Flush()
_, err = s.Write(buf)
return err
}
defer s.Close()
@ -176,7 +182,7 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, ou
return
}
err := writeMsg(&rpc.RPC)
err := writeRpc(rpc)
if err != nil {
s.Reset()
log.Debugf("writing message to %s: %s", s.Conn().RemotePeer(), err)