diff --git a/examples/chat2/options.go b/examples/chat2/options.go index eeabbac7..2ea62e81 100644 --- a/examples/chat2/options.go +++ b/examples/chat2/options.go @@ -40,6 +40,19 @@ type RLNRelayOptions struct { MembershipContractAddress common.Address } +func nodePeerID(node *multiaddr.Multiaddr) (peer.ID, error) { + if node == nil { + return peer.ID(""), errors.New("node is nil") + } + + peerID, err := (*node).ValueForProtocol(multiaddr.P_P2P) + if err != nil { + return peer.ID(""), err + } + + return peer.Decode(peerID) +} + // FilterOptions are settings used to enable filter protocol. This is a protocol // that enables subscribing to messages that a peer receives. This is a more // lightweight version of WakuRelay specifically designed for bandwidth @@ -50,16 +63,7 @@ type FilterOptions struct { } func (f FilterOptions) NodePeerID() (peer.ID, error) { - if f.Node == nil { - return peer.ID(""), errors.New("node is nil") - } - - peerID, err := (*f.Node).ValueForProtocol(multiaddr.P_P2P) - if err != nil { - return peer.ID(""), err - } - - return peer.Decode(peerID) + return nodePeerID(f.Node) } // LightpushOptions are settings used to enable the lightpush protocol. This is @@ -74,16 +78,7 @@ type LightpushOptions struct { } func (f LightpushOptions) NodePeerID() (peer.ID, error) { - if f.Node == nil { - return peer.ID(""), errors.New("node is nil") - } - - peerID, err := (*f.Node).ValueForProtocol(multiaddr.P_P2P) - if err != nil { - return peer.ID(""), err - } - - return peer.Decode(peerID) + return nodePeerID(f.Node) } // StoreOptions are settings used for enabling the store protocol, used to @@ -94,16 +89,7 @@ type StoreOptions struct { } func (f StoreOptions) NodePeerID() (peer.ID, error) { - if f.Node == nil { - return peer.ID(""), errors.New("node is nil") - } - - peerID, err := (*f.Node).ValueForProtocol(multiaddr.P_P2P) - if err != nil { - return peer.ID(""), err - } - - return peer.Decode(peerID) + return nodePeerID(f.Node) } // DNSDiscoveryOptions are settings used for enabling DNS-based discovery diff --git a/waku/persistence/postgres/migrations/migrate.go b/waku/persistence/postgres/migrations/migrate.go index dd5c733b..427e73bf 100644 --- a/waku/persistence/postgres/migrations/migrate.go +++ b/waku/persistence/postgres/migrations/migrate.go @@ -1,6 +1,3 @@ -//go:build !gowaku_skip_migrations -// +build !gowaku_skip_migrations - package migrations import ( diff --git a/waku/persistence/postgres/migrations/no_migrations.go b/waku/persistence/postgres/migrations/no_migrations.go deleted file mode 100644 index ae2297f3..00000000 --- a/waku/persistence/postgres/migrations/no_migrations.go +++ /dev/null @@ -1,13 +0,0 @@ -//go:build gowaku_skip_migrations -// +build gowaku_skip_migrations - -package migrations - -import ( - "database/sql" -) - -// Skip migration code -func Migrate(db *sql.DB) error { - return nil -} diff --git a/waku/persistence/sqlite/migrations/migrate.go b/waku/persistence/sqlite/migrations/migrate.go index dd5c733b..427e73bf 100644 --- a/waku/persistence/sqlite/migrations/migrate.go +++ b/waku/persistence/sqlite/migrations/migrate.go @@ -1,6 +1,3 @@ -//go:build !gowaku_skip_migrations -// +build !gowaku_skip_migrations - package migrations import ( diff --git a/waku/persistence/sqlite/migrations/no_migrations.go b/waku/persistence/sqlite/migrations/no_migrations.go deleted file mode 100644 index ae2297f3..00000000 --- a/waku/persistence/sqlite/migrations/no_migrations.go +++ /dev/null @@ -1,13 +0,0 @@ -//go:build gowaku_skip_migrations -// +build gowaku_skip_migrations - -package migrations - -import ( - "database/sql" -) - -// Skip migration code -func Migrate(db *sql.DB) error { - return nil -} diff --git a/waku/v2/node/keepalive.go b/waku/v2/node/keepalive.go index cc8165e2..0fccf964 100644 --- a/waku/v2/node/keepalive.go +++ b/waku/v2/node/keepalive.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/ping" @@ -14,52 +15,54 @@ import ( const maxAllowedPingFailures = 2 const maxPublishAttempt = 5 +func disconnectPeers(host host.Host, logger *zap.Logger) { + logger.Warn("keep alive hasnt been executed recently. Killing all connections to peers") + for _, p := range host.Network().Peers() { + err := host.Network().ClosePeer(p) + if err != nil { + logger.Warn("while disconnecting peer", zap.Error(err)) + } + } +} + // startKeepAlive creates a go routine that periodically pings connected peers. // This is necessary because TCP connections are automatically closed due to inactivity, // and doing a ping will avoid this (with a small bandwidth cost) func (w *WakuNode) startKeepAlive(ctx context.Context, t time.Duration) { - go func() { - defer w.wg.Done() - w.log.Info("setting up ping protocol", zap.Duration("duration", t)) - ticker := time.NewTicker(t) - defer ticker.Stop() + defer w.wg.Done() + w.log.Info("setting up ping protocol", zap.Duration("duration", t)) + ticker := time.NewTicker(t) + defer ticker.Stop() - lastTimeExecuted := w.timesource.Now() + lastTimeExecuted := w.timesource.Now() - sleepDetectionInterval := int64(t) * 3 - - for { - select { - case <-ticker.C: - difference := w.timesource.Now().UnixNano() - lastTimeExecuted.UnixNano() - if difference > sleepDetectionInterval { - w.log.Warn("keep alive hasnt been executed recently. Killing all connections to peers") - for _, p := range w.host.Network().Peers() { - err := w.host.Network().ClosePeer(p) - if err != nil { - w.log.Warn("while disconnecting peer", zap.Error(err)) - } - } - lastTimeExecuted = w.timesource.Now() - continue - } - - // Network's peers collection, - // contains only currently active peers - for _, p := range w.host.Network().Peers() { - if p != w.host.ID() { - w.wg.Add(1) - go w.pingPeer(ctx, p) - } - } + sleepDetectionInterval := int64(t) * 3 + for { + select { + case <-ticker.C: + difference := w.timesource.Now().UnixNano() - lastTimeExecuted.UnixNano() + if difference > sleepDetectionInterval { + disconnectPeers(w.host, w.log) lastTimeExecuted = w.timesource.Now() - case <-ctx.Done(): - w.log.Info("stopping ping protocol") - return + continue } + + // Network's peers collection, + // contains only currently active peers + for _, p := range w.host.Network().Peers() { + if p != w.host.ID() { + w.wg.Add(1) + go w.pingPeer(ctx, p) + } + } + + lastTimeExecuted = w.timesource.Now() + case <-ctx.Done(): + w.log.Info("stopping ping protocol") + return } - }() + } } func (w *WakuNode) pingPeer(ctx context.Context, peer peer.ID) { diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index ecef83f0..40336b14 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -286,7 +286,7 @@ func (w *WakuNode) Start(ctx context.Context) error { if w.opts.keepAliveInterval > time.Duration(0) { w.wg.Add(1) - w.startKeepAlive(ctx, w.opts.keepAliveInterval) + go w.startKeepAlive(ctx, w.opts.keepAliveInterval) } if w.opts.enableNTP { diff --git a/waku/v2/protocol/filter/waku_filter_option.go b/waku/v2/protocol/filter/waku_filter_option.go index b0b7c37f..653d28f8 100644 --- a/waku/v2/protocol/filter/waku_filter_option.go +++ b/waku/v2/protocol/filter/waku_filter_option.go @@ -45,7 +45,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption return func(params *FilterSubscribeParameters) { p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1), fromThesePeers, params.log) if err == nil { - params.selectedPeer = *p + params.selectedPeer = p } else { params.log.Info("selecting peer", zap.Error(err)) } @@ -60,7 +60,7 @@ func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Fi return func(params *FilterSubscribeParameters) { p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1), fromThesePeers, params.log) if err == nil { - params.selectedPeer = *p + params.selectedPeer = p } else { params.log.Info("selecting peer", zap.Error(err)) } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index fed84791..1ec42904 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -81,31 +81,24 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(s network.Strea } logger.Info("request received") - if requestPushRPC.Query != nil { logger.Info("push request") response := new(pb.PushResponse) - if !wakuLP.relayIsNotAvailable() { - pubSubTopic := requestPushRPC.Query.PubsubTopic - message := requestPushRPC.Query.Message - // TODO: Assumes success, should probably be extended to check for network, peers, etc - // It might make sense to use WithReadiness option here? + pubSubTopic := requestPushRPC.Query.PubsubTopic + message := requestPushRPC.Query.Message - _, err := wakuLP.relay.PublishToTopic(ctx, message, pubSubTopic) + // TODO: Assumes success, should probably be extended to check for network, peers, etc + // It might make sense to use WithReadiness option here? - if err != nil { - logger.Error("publishing message", zap.Error(err)) - response.IsSuccess = false - response.Info = "Could not publish message" - } else { - response.IsSuccess = true - response.Info = "Totally" // TODO: ask about this - } + _, err := wakuLP.relay.PublishToTopic(ctx, message, pubSubTopic) + + if err != nil { + logger.Error("publishing message", zap.Error(err)) + response.Info = "Could not publish message" } else { - logger.Debug("no relay protocol present, unsuccessful push") - response.IsSuccess = false - response.Info = "No relay protocol" + response.IsSuccess = true + response.Info = "Totally" // TODO: ask about this } responsePushRPC := &pb.PushRPC{} @@ -136,8 +129,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o params.host = wakuLP.h params.log = wakuLP.log - optList := DefaultOptions(wakuLP.h) - optList = append(optList, opts...) + optList := append(DefaultOptions(wakuLP.h), opts...) for _, opt := range optList { opt(params) } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index 093dac04..b962ee58 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -34,7 +34,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) LightPushOption { return func(params *LightPushParameters) { p, err := utils.SelectPeer(params.host, string(LightPushID_v20beta1), fromThesePeers, params.log) if err == nil { - params.selectedPeer = *p + params.selectedPeer = p } else { params.log.Info("selecting peer", zap.Error(err)) } @@ -49,7 +49,7 @@ func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Li return func(params *LightPushParameters) { p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1), fromThesePeers, params.log) if err == nil { - params.selectedPeer = *p + params.selectedPeer = p } else { params.log.Info("selecting peer", zap.Error(err)) } diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go index 0b1f738b..68a12fbb 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go @@ -32,7 +32,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { return func(params *PeerExchangeParameters) { p, err := utils.SelectPeer(params.host, string(PeerExchangeID_v20alpha1), fromThesePeers, params.log) if err == nil { - params.selectedPeer = *p + params.selectedPeer = p } else { params.log.Info("selecting peer", zap.Error(err)) } @@ -47,7 +47,7 @@ func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Pe return func(params *PeerExchangeParameters) { p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(PeerExchangeID_v20alpha1), fromThesePeers, params.log) if err == nil { - params.selectedPeer = *p + params.selectedPeer = p } else { params.log.Info("selecting peer", zap.Error(err)) } diff --git a/waku/v2/protocol/peer_options.go b/waku/v2/protocol/peer_options.go new file mode 100644 index 00000000..2d0eaff9 --- /dev/null +++ b/waku/v2/protocol/peer_options.go @@ -0,0 +1 @@ +package protocol diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index 00f70849..ab450c9b 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -107,7 +107,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption return func(params *HistoryRequestParameters) { p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta4), fromThesePeers, params.s.log) if err == nil { - params.selectedPeer = *p + params.selectedPeer = p } else { params.s.log.Info("selecting peer", zap.Error(err)) } @@ -122,7 +122,7 @@ func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Hi return func(params *HistoryRequestParameters) { p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta4), fromThesePeers, params.s.log) if err == nil { - params.selectedPeer = *p + params.selectedPeer = p } else { params.s.log.Info("selecting peer", zap.Error(err)) } diff --git a/waku/v2/utils/peer.go b/waku/v2/utils/peer.go index 20f87fb3..284bc7c4 100644 --- a/waku/v2/utils/peer.go +++ b/waku/v2/utils/peer.go @@ -10,7 +10,6 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/ping" - "github.com/waku-org/go-waku/logging" "go.uber.org/zap" ) @@ -21,7 +20,7 @@ var ErrNoPeersAvailable = errors.New("no suitable peers found") // SelectPeer is used to return a random peer that supports a given protocol. // If a list of specific peers is passed, the peer will be chosen from that list assuming // it supports the chosen protocol, otherwise it will chose a peer from the node peerstore -func SelectPeer(host host.Host, protocolId string, specificPeers []peer.ID, log *zap.Logger) (*peer.ID, error) { +func SelectPeer(host host.Host, protocolId string, specificPeers []peer.ID, log *zap.Logger) (peer.ID, error) { // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. // Ideally depending on the query and our set of peers we take a subset of ideal peers. // This will require us to check for various factors such as: @@ -38,8 +37,7 @@ func SelectPeer(host host.Host, protocolId string, specificPeers []peer.ID, log for _, peer := range peerSet { protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId) if err != nil { - log.Error("obtaining protocols supported by peers", zap.Error(err), logging.HostID("peer", peer)) - return nil, err + return "", err } if len(protocols) > 0 { @@ -49,10 +47,10 @@ func SelectPeer(host host.Host, protocolId string, specificPeers []peer.ID, log if len(peers) >= 1 { // TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned - return &peers[rand.Intn(len(peers))], nil // nolint: gosec + return peers[rand.Intn(len(peers))], nil // nolint: gosec } - return nil, ErrNoPeersAvailable + return "", ErrNoPeersAvailable } type pingResult struct { @@ -63,7 +61,7 @@ type pingResult struct { // SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time // If a list of specific peers is passed, the peer will be chosen from that list assuming // it supports the chosen protocol, otherwise it will chose a peer from the node peerstore -func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string, specificPeers []peer.ID, log *zap.Logger) (*peer.ID, error) { +func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string, specificPeers []peer.ID, log *zap.Logger) (peer.ID, error) { var peers peer.IDSlice peerSet := specificPeers @@ -74,8 +72,7 @@ func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId str for _, peer := range peerSet { protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId) if err != nil { - log.Error("error obtaining the protocols supported by peers", zap.Error(err)) - return nil, err + return "", err } if len(protocols) > 0 { @@ -122,11 +119,11 @@ func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId str } } if min == nil { - return nil, ErrNoPeersAvailable + return "", ErrNoPeersAvailable } - return &min.p, nil + return min.p, nil case <-ctx.Done(): - return nil, ErrNoPeersAvailable + return "", ErrNoPeersAvailable } }