diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 2de94bc2..0ac60dee 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -156,16 +156,16 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p logger := wakuLP.log.With(logging.HostID("peer", params.selectedPeer)) - connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1) + stream, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) wakuLP.metrics.RecordError(dialFailure) return nil, err } - defer connOpt.Close() + defer stream.Close() defer func() { - err := connOpt.Reset() + err := stream.Reset() if err != nil { wakuLP.metrics.RecordError(dialFailure) logger.Error("resetting connection", zap.Error(err)) @@ -174,8 +174,8 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p pushRequestRPC := &pb.PushRPC{RequestId: hex.EncodeToString(params.requestID), Query: req} - writer := pbio.NewDelimitedWriter(connOpt) - reader := pbio.NewDelimitedReader(connOpt, math.MaxInt32) + writer := pbio.NewDelimitedWriter(stream) + reader := pbio.NewDelimitedReader(stream, math.MaxInt32) err = writer.WriteMsg(pushRequestRPC) if err != nil { diff --git a/waku/v2/protocol/metadata/waku_metadata.go b/waku/v2/protocol/metadata/waku_metadata.go index d5c2d832..0471ecb6 100644 --- a/waku/v2/protocol/metadata/waku_metadata.go +++ b/waku/v2/protocol/metadata/waku_metadata.go @@ -92,15 +92,15 @@ func (wakuM *WakuMetadata) getClusterAndShards() (*uint32, []uint32, error) { func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*protocol.RelayShards, error) { logger := wakuM.log.With(logging.HostID("peer", peerID)) - connOpt, err := wakuM.h.NewStream(ctx, peerID, MetadataID_v1) + stream, err := wakuM.h.NewStream(ctx, peerID, MetadataID_v1) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) return nil, err } - defer connOpt.Close() + defer stream.Close() defer func() { - err := connOpt.Reset() + err := stream.Reset() if err != nil { logger.Error("resetting connection", zap.Error(err)) } @@ -115,8 +115,8 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*protoc request.ClusterId = clusterID request.Shards = shards - writer := pbio.NewDelimitedWriter(connOpt) - reader := pbio.NewDelimitedReader(connOpt, math.MaxInt32) + writer := pbio.NewDelimitedWriter(stream) + reader := pbio.NewDelimitedReader(stream, math.MaxInt32) err = writer.WriteMsg(request) if err != nil { diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index 65a7305f..69a607dc 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -53,19 +53,19 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts }, } - connOpt, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1) + stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1) if err != nil { return err } - defer connOpt.Close() + defer stream.Close() - writer := pbio.NewDelimitedWriter(connOpt) + writer := pbio.NewDelimitedWriter(stream) err = writer.WriteMsg(requestRPC) if err != nil { return err } - reader := pbio.NewDelimitedReader(connOpt, math.MaxInt32) + reader := pbio.NewDelimitedReader(stream, math.MaxInt32) responseRPC := &pb.PeerExchangeRPC{} err = reader.ReadMsg(responseRPC) if err != nil { diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index 68a0b390..31041743 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -174,22 +174,22 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec logger := store.log.With(logging.HostID("peer", selectedPeer)) logger.Info("querying message history") - connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4) + stream, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) store.metrics.RecordError(dialFailure) return nil, err } - defer connOpt.Close() + defer stream.Close() defer func() { - _ = connOpt.Reset() + _ = stream.Reset() }() historyRequest := &pb.HistoryRPC{Query: q, RequestId: hex.EncodeToString(requestID)} - writer := pbio.NewDelimitedWriter(connOpt) - reader := pbio.NewDelimitedReader(connOpt, math.MaxInt32) + writer := pbio.NewDelimitedWriter(stream) + reader := pbio.NewDelimitedReader(stream, math.MaxInt32) err = writer.WriteMsg(historyRequest) if err != nil {