fix(filterV2): requestID and log request type

This commit is contained in:
Richard Ramos 2023-08-25 11:40:24 -04:00 committed by richΛrd
parent 8b73eb8ae3
commit 041dc4070a
2 changed files with 5 additions and 4 deletions

View File

@ -254,6 +254,7 @@ func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter
func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) { func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) {
params := new(FilterUnsubscribeParameters) params := new(FilterUnsubscribeParameters)
params.log = wf.log params.log = wf.log
opts = append(DefaultUnsubscribeOptions(), opts...)
for _, opt := range opts { for _, opt := range opts {
opt(params) opt(params)
} }
@ -353,7 +354,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co
defer localWg.Done() defer localWg.Done()
err := wf.request( err := wf.request(
ctx, ctx,
&FilterSubscribeParameters{selectedPeer: peerID}, &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID},
pb.FilterSubscribeRequest_UNSUBSCRIBE, pb.FilterSubscribeRequest_UNSUBSCRIBE,
contentFilter) contentFilter)
if err != nil { if err != nil {
@ -422,7 +423,7 @@ func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...Filte
defer localWg.Done() defer localWg.Done()
err := wf.request( err := wf.request(
ctx, ctx,
&FilterSubscribeParameters{selectedPeer: peerID}, &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID},
pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL, pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL,
ContentFilter{}) ContentFilter{})
if err != nil { if err != nil {

View File

@ -118,7 +118,7 @@ func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(s network.Stre
wf.metrics.RecordRequest(subscribeRequest.FilterSubscribeType.String(), time.Since(start)) wf.metrics.RecordRequest(subscribeRequest.FilterSubscribeType.String(), time.Since(start))
logger.Info("received request") logger.Info("received request", zap.String("requestType", subscribeRequest.FilterSubscribeType.String()))
} }
} }
@ -298,7 +298,7 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, e
} else { } else {
wf.metrics.RecordError(writeResponseFailure) wf.metrics.RecordError(writeResponseFailure)
} }
logger.Error("pushing messages to peer", zap.Error(err)) logger.Error("pushing messages to peer", logging.HexBytes("envelopeHash", env.Hash()), zap.String("pubsubTopic", env.PubsubTopic()), zap.String("contentTopic", env.Message().ContentTopic), zap.Error(err))
wf.subscriptions.FlagAsFailure(peerID) wf.subscriptions.FlagAsFailure(peerID)
return nil return nil
} }