fix: connect before opening a stream

This commit is contained in:
Richard Ramos 2022-03-03 12:04:03 -04:00
parent b820d797c8
commit f4f307db87
No known key found for this signature in database
GPG Key ID: BD36D48BC9FFC88C
3 changed files with 30 additions and 1 deletions

View File

@ -149,6 +149,12 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) error { func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) error {
pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*pb.WakuMessage{msg}}} pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*pb.WakuMessage{msg}}}
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err := wf.h.Connect(wf.ctx, wf.h.Peerstore().PeerInfo(subscriber.peer))
if err != nil {
return err
}
conn, err := wf.h.NewStream(wf.ctx, subscriber.peer, FilterID_v20beta1) conn, err := wf.h.NewStream(wf.ctx, subscriber.peer, FilterID_v20beta1)
if err != nil { if err != nil {
wf.subscribers.FlagAsFailure(subscriber.peer) wf.subscribers.FlagAsFailure(subscriber.peer)
@ -233,6 +239,12 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct}) contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct})
} }
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err = wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(params.selectedPeer))
if err != nil {
return
}
request := pb.FilterRequest{ request := pb.FilterRequest{
Subscribe: true, Subscribe: true,
Topic: filter.Topic, Topic: filter.Topic,
@ -267,8 +279,13 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
} }
func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilter, peer peer.ID) error { func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilter, peer peer.ID) error {
conn, err := wf.h.NewStream(ctx, peer, FilterID_v20beta1) // We connect first so dns4 addresses are resolved (NewStream does not do it)
err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(peer))
if err != nil {
return err
}
conn, err := wf.h.NewStream(ctx, peer, FilterID_v20beta1)
if err != nil { if err != nil {
return err return err
} }

View File

@ -145,6 +145,12 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
return nil, ErrInvalidId return nil, ErrInvalidId
} }
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err := wakuLP.h.Connect(ctx, wakuLP.h.Peerstore().PeerInfo(params.selectedPeer))
if err != nil {
return nil, err
}
connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1) connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1)
if err != nil { if err != nil {
wakuLP.log.Info("failed to connect to remote peer", err) wakuLP.log.Info("failed to connect to remote peer", err)

View File

@ -523,6 +523,12 @@ func DefaultOptions() []HistoryRequestOption {
func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) { func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) {
store.log.Info(fmt.Sprintf("Querying message history with peer %s", selectedPeer)) store.log.Info(fmt.Sprintf("Querying message history with peer %s", selectedPeer))
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err := store.h.Connect(ctx, store.h.Peerstore().PeerInfo(selectedPeer))
if err != nil {
return nil, err
}
connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4) connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4)
if err != nil { if err != nil {
store.log.Error("Failed to connect to remote peer", err) store.log.Error("Failed to connect to remote peer", err)