From 9161c4f7fee3ff5971489547f0d618ae6c158520 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 20 Oct 2023 20:09:20 -0400 Subject: [PATCH] fix: stream closing/reset --- waku/v2/protocol/metadata/waku_metadata.go | 40 ++++++++++++++-------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/waku/v2/protocol/metadata/waku_metadata.go b/waku/v2/protocol/metadata/waku_metadata.go index 0471ecb6..3f0295ea 100644 --- a/waku/v2/protocol/metadata/waku_metadata.go +++ b/waku/v2/protocol/metadata/waku_metadata.go @@ -98,16 +98,11 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*protoc return nil, err } - defer stream.Close() - defer func() { - err := stream.Reset() - if err != nil { - logger.Error("resetting connection", zap.Error(err)) - } - }() - clusterID, shards, err := wakuM.getClusterAndShards() if err != nil { + if err := stream.Reset(); err != nil { + wakuM.log.Error("resetting connection", zap.Error(err)) + } return nil, err } @@ -121,6 +116,9 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*protoc err = writer.WriteMsg(request) if err != nil { logger.Error("writing request", zap.Error(err)) + if err := stream.Reset(); err != nil { + wakuM.log.Error("resetting connection", zap.Error(err)) + } return nil, err } @@ -128,9 +126,14 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*protoc err = reader.ReadMsg(response) if err != nil { logger.Error("reading response", zap.Error(err)) + if err := stream.Reset(); err != nil { + wakuM.log.Error("resetting connection", zap.Error(err)) + } return nil, err } + stream.Close() + if response.ClusterId == nil { return nil, nil // Node is not using sharding } @@ -144,18 +147,20 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*protoc return result, nil } -func (wakuM *WakuMetadata) onRequest(ctx context.Context) func(s network.Stream) { - return func(s network.Stream) { - defer s.Close() - logger := wakuM.log.With(logging.HostID("peer", s.Conn().RemotePeer())) +func (wakuM *WakuMetadata) onRequest(ctx context.Context) func(network.Stream) { + return func(stream network.Stream) { + logger := wakuM.log.With(logging.HostID("peer", stream.Conn().RemotePeer())) request := &pb.WakuMetadataRequest{} - writer := pbio.NewDelimitedWriter(s) - reader := pbio.NewDelimitedReader(s, math.MaxInt32) + writer := pbio.NewDelimitedWriter(stream) + reader := pbio.NewDelimitedReader(stream, math.MaxInt32) err := reader.ReadMsg(request) if err != nil { logger.Error("reading request", zap.Error(err)) + if err := stream.Reset(); err != nil { + wakuM.log.Error("resetting connection", zap.Error(err)) + } return } @@ -172,8 +177,13 @@ func (wakuM *WakuMetadata) onRequest(ctx context.Context) func(s network.Stream) err = writer.WriteMsg(response) if err != nil { logger.Error("writing response", zap.Error(err)) - _ = s.Reset() + if err := stream.Reset(); err != nil { + wakuM.log.Error("resetting connection", zap.Error(err)) + } + return } + + stream.Close() } }