refactor: rename connOpt to stream

This commit is contained in:
Richard Ramos 2023-10-20 17:15:51 -04:00 committed by richΛrd
parent 19ba25ffcb
commit 519fa2977a
4 changed files with 19 additions and 19 deletions

View File

@ -156,16 +156,16 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p
logger := wakuLP.log.With(logging.HostID("peer", params.selectedPeer)) logger := wakuLP.log.With(logging.HostID("peer", params.selectedPeer))
connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1) stream, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1)
if err != nil { if err != nil {
logger.Error("creating stream to peer", zap.Error(err)) logger.Error("creating stream to peer", zap.Error(err))
wakuLP.metrics.RecordError(dialFailure) wakuLP.metrics.RecordError(dialFailure)
return nil, err return nil, err
} }
defer connOpt.Close() defer stream.Close()
defer func() { defer func() {
err := connOpt.Reset() err := stream.Reset()
if err != nil { if err != nil {
wakuLP.metrics.RecordError(dialFailure) wakuLP.metrics.RecordError(dialFailure)
logger.Error("resetting connection", zap.Error(err)) logger.Error("resetting connection", zap.Error(err))
@ -174,8 +174,8 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p
pushRequestRPC := &pb.PushRPC{RequestId: hex.EncodeToString(params.requestID), Query: req} pushRequestRPC := &pb.PushRPC{RequestId: hex.EncodeToString(params.requestID), Query: req}
writer := pbio.NewDelimitedWriter(connOpt) writer := pbio.NewDelimitedWriter(stream)
reader := pbio.NewDelimitedReader(connOpt, math.MaxInt32) reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
err = writer.WriteMsg(pushRequestRPC) err = writer.WriteMsg(pushRequestRPC)
if err != nil { if err != nil {

View File

@ -92,15 +92,15 @@ func (wakuM *WakuMetadata) getClusterAndShards() (*uint32, []uint32, error) {
func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*protocol.RelayShards, error) { func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*protocol.RelayShards, error) {
logger := wakuM.log.With(logging.HostID("peer", peerID)) logger := wakuM.log.With(logging.HostID("peer", peerID))
connOpt, err := wakuM.h.NewStream(ctx, peerID, MetadataID_v1) stream, err := wakuM.h.NewStream(ctx, peerID, MetadataID_v1)
if err != nil { if err != nil {
logger.Error("creating stream to peer", zap.Error(err)) logger.Error("creating stream to peer", zap.Error(err))
return nil, err return nil, err
} }
defer connOpt.Close() defer stream.Close()
defer func() { defer func() {
err := connOpt.Reset() err := stream.Reset()
if err != nil { if err != nil {
logger.Error("resetting connection", zap.Error(err)) logger.Error("resetting connection", zap.Error(err))
} }
@ -115,8 +115,8 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*protoc
request.ClusterId = clusterID request.ClusterId = clusterID
request.Shards = shards request.Shards = shards
writer := pbio.NewDelimitedWriter(connOpt) writer := pbio.NewDelimitedWriter(stream)
reader := pbio.NewDelimitedReader(connOpt, math.MaxInt32) reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
err = writer.WriteMsg(request) err = writer.WriteMsg(request)
if err != nil { if err != nil {

View File

@ -53,19 +53,19 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
}, },
} }
connOpt, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1) stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1)
if err != nil { if err != nil {
return err return err
} }
defer connOpt.Close() defer stream.Close()
writer := pbio.NewDelimitedWriter(connOpt) writer := pbio.NewDelimitedWriter(stream)
err = writer.WriteMsg(requestRPC) err = writer.WriteMsg(requestRPC)
if err != nil { if err != nil {
return err return err
} }
reader := pbio.NewDelimitedReader(connOpt, math.MaxInt32) reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
responseRPC := &pb.PeerExchangeRPC{} responseRPC := &pb.PeerExchangeRPC{}
err = reader.ReadMsg(responseRPC) err = reader.ReadMsg(responseRPC)
if err != nil { if err != nil {

View File

@ -174,22 +174,22 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec
logger := store.log.With(logging.HostID("peer", selectedPeer)) logger := store.log.With(logging.HostID("peer", selectedPeer))
logger.Info("querying message history") logger.Info("querying message history")
connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4) stream, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4)
if err != nil { if err != nil {
logger.Error("creating stream to peer", zap.Error(err)) logger.Error("creating stream to peer", zap.Error(err))
store.metrics.RecordError(dialFailure) store.metrics.RecordError(dialFailure)
return nil, err return nil, err
} }
defer connOpt.Close() defer stream.Close()
defer func() { defer func() {
_ = connOpt.Reset() _ = stream.Reset()
}() }()
historyRequest := &pb.HistoryRPC{Query: q, RequestId: hex.EncodeToString(requestID)} historyRequest := &pb.HistoryRPC{Query: q, RequestId: hex.EncodeToString(requestID)}
writer := pbio.NewDelimitedWriter(connOpt) writer := pbio.NewDelimitedWriter(stream)
reader := pbio.NewDelimitedReader(connOpt, math.MaxInt32) reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
err = writer.WriteMsg(historyRequest) err = writer.WriteMsg(historyRequest)
if err != nil { if err != nil {