refactor: remove some duplication and unneeded code

This commit is contained in:
Richard Ramos 2023-01-08 14:33:30 -04:00 committed by RichΛrd
parent 30e3884000
commit 607bf07198
14 changed files with 86 additions and 139 deletions

View File

@ -40,6 +40,19 @@ type RLNRelayOptions struct {
MembershipContractAddress common.Address
}
func nodePeerID(node *multiaddr.Multiaddr) (peer.ID, error) {
if node == nil {
return peer.ID(""), errors.New("node is nil")
}
peerID, err := (*node).ValueForProtocol(multiaddr.P_P2P)
if err != nil {
return peer.ID(""), err
}
return peer.Decode(peerID)
}
// FilterOptions are settings used to enable filter protocol. This is a protocol
// that enables subscribing to messages that a peer receives. This is a more
// lightweight version of WakuRelay specifically designed for bandwidth
@ -50,16 +63,7 @@ type FilterOptions struct {
}
func (f FilterOptions) NodePeerID() (peer.ID, error) {
if f.Node == nil {
return peer.ID(""), errors.New("node is nil")
}
peerID, err := (*f.Node).ValueForProtocol(multiaddr.P_P2P)
if err != nil {
return peer.ID(""), err
}
return peer.Decode(peerID)
return nodePeerID(f.Node)
}
// LightpushOptions are settings used to enable the lightpush protocol. This is
@ -74,16 +78,7 @@ type LightpushOptions struct {
}
func (f LightpushOptions) NodePeerID() (peer.ID, error) {
if f.Node == nil {
return peer.ID(""), errors.New("node is nil")
}
peerID, err := (*f.Node).ValueForProtocol(multiaddr.P_P2P)
if err != nil {
return peer.ID(""), err
}
return peer.Decode(peerID)
return nodePeerID(f.Node)
}
// StoreOptions are settings used for enabling the store protocol, used to
@ -94,16 +89,7 @@ type StoreOptions struct {
}
func (f StoreOptions) NodePeerID() (peer.ID, error) {
if f.Node == nil {
return peer.ID(""), errors.New("node is nil")
}
peerID, err := (*f.Node).ValueForProtocol(multiaddr.P_P2P)
if err != nil {
return peer.ID(""), err
}
return peer.Decode(peerID)
return nodePeerID(f.Node)
}
// DNSDiscoveryOptions are settings used for enabling DNS-based discovery

View File

@ -1,6 +1,3 @@
//go:build !gowaku_skip_migrations
// +build !gowaku_skip_migrations
package migrations
import (

View File

@ -1,13 +0,0 @@
//go:build gowaku_skip_migrations
// +build gowaku_skip_migrations
package migrations
import (
"database/sql"
)
// Skip migration code
func Migrate(db *sql.DB) error {
return nil
}

View File

@ -1,6 +1,3 @@
//go:build !gowaku_skip_migrations
// +build !gowaku_skip_migrations
package migrations
import (

View File

@ -1,13 +0,0 @@
//go:build gowaku_skip_migrations
// +build gowaku_skip_migrations
package migrations
import (
"database/sql"
)
// Skip migration code
func Migrate(db *sql.DB) error {
return nil
}

View File

@ -4,6 +4,7 @@ import (
"context"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
@ -14,52 +15,54 @@ import (
const maxAllowedPingFailures = 2
const maxPublishAttempt = 5
func disconnectPeers(host host.Host, logger *zap.Logger) {
logger.Warn("keep alive hasnt been executed recently. Killing all connections to peers")
for _, p := range host.Network().Peers() {
err := host.Network().ClosePeer(p)
if err != nil {
logger.Warn("while disconnecting peer", zap.Error(err))
}
}
}
// startKeepAlive creates a go routine that periodically pings connected peers.
// This is necessary because TCP connections are automatically closed due to inactivity,
// and doing a ping will avoid this (with a small bandwidth cost)
func (w *WakuNode) startKeepAlive(ctx context.Context, t time.Duration) {
go func() {
defer w.wg.Done()
w.log.Info("setting up ping protocol", zap.Duration("duration", t))
ticker := time.NewTicker(t)
defer ticker.Stop()
defer w.wg.Done()
w.log.Info("setting up ping protocol", zap.Duration("duration", t))
ticker := time.NewTicker(t)
defer ticker.Stop()
lastTimeExecuted := w.timesource.Now()
lastTimeExecuted := w.timesource.Now()
sleepDetectionInterval := int64(t) * 3
for {
select {
case <-ticker.C:
difference := w.timesource.Now().UnixNano() - lastTimeExecuted.UnixNano()
if difference > sleepDetectionInterval {
w.log.Warn("keep alive hasnt been executed recently. Killing all connections to peers")
for _, p := range w.host.Network().Peers() {
err := w.host.Network().ClosePeer(p)
if err != nil {
w.log.Warn("while disconnecting peer", zap.Error(err))
}
}
lastTimeExecuted = w.timesource.Now()
continue
}
// Network's peers collection,
// contains only currently active peers
for _, p := range w.host.Network().Peers() {
if p != w.host.ID() {
w.wg.Add(1)
go w.pingPeer(ctx, p)
}
}
sleepDetectionInterval := int64(t) * 3
for {
select {
case <-ticker.C:
difference := w.timesource.Now().UnixNano() - lastTimeExecuted.UnixNano()
if difference > sleepDetectionInterval {
disconnectPeers(w.host, w.log)
lastTimeExecuted = w.timesource.Now()
case <-ctx.Done():
w.log.Info("stopping ping protocol")
return
continue
}
// Network's peers collection,
// contains only currently active peers
for _, p := range w.host.Network().Peers() {
if p != w.host.ID() {
w.wg.Add(1)
go w.pingPeer(ctx, p)
}
}
lastTimeExecuted = w.timesource.Now()
case <-ctx.Done():
w.log.Info("stopping ping protocol")
return
}
}()
}
}
func (w *WakuNode) pingPeer(ctx context.Context, peer peer.ID) {

View File

@ -286,7 +286,7 @@ func (w *WakuNode) Start(ctx context.Context) error {
if w.opts.keepAliveInterval > time.Duration(0) {
w.wg.Add(1)
w.startKeepAlive(ctx, w.opts.keepAliveInterval)
go w.startKeepAlive(ctx, w.opts.keepAliveInterval)
}
if w.opts.enableNTP {

View File

@ -45,7 +45,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption
return func(params *FilterSubscribeParameters) {
p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1), fromThesePeers, params.log)
if err == nil {
params.selectedPeer = *p
params.selectedPeer = p
} else {
params.log.Info("selecting peer", zap.Error(err))
}
@ -60,7 +60,7 @@ func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Fi
return func(params *FilterSubscribeParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1), fromThesePeers, params.log)
if err == nil {
params.selectedPeer = *p
params.selectedPeer = p
} else {
params.log.Info("selecting peer", zap.Error(err))
}

View File

@ -81,31 +81,24 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(s network.Strea
}
logger.Info("request received")
if requestPushRPC.Query != nil {
logger.Info("push request")
response := new(pb.PushResponse)
if !wakuLP.relayIsNotAvailable() {
pubSubTopic := requestPushRPC.Query.PubsubTopic
message := requestPushRPC.Query.Message
// TODO: Assumes success, should probably be extended to check for network, peers, etc
// It might make sense to use WithReadiness option here?
pubSubTopic := requestPushRPC.Query.PubsubTopic
message := requestPushRPC.Query.Message
_, err := wakuLP.relay.PublishToTopic(ctx, message, pubSubTopic)
// TODO: Assumes success, should probably be extended to check for network, peers, etc
// It might make sense to use WithReadiness option here?
if err != nil {
logger.Error("publishing message", zap.Error(err))
response.IsSuccess = false
response.Info = "Could not publish message"
} else {
response.IsSuccess = true
response.Info = "Totally" // TODO: ask about this
}
_, err := wakuLP.relay.PublishToTopic(ctx, message, pubSubTopic)
if err != nil {
logger.Error("publishing message", zap.Error(err))
response.Info = "Could not publish message"
} else {
logger.Debug("no relay protocol present, unsuccessful push")
response.IsSuccess = false
response.Info = "No relay protocol"
response.IsSuccess = true
response.Info = "Totally" // TODO: ask about this
}
responsePushRPC := &pb.PushRPC{}
@ -136,8 +129,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
params.host = wakuLP.h
params.log = wakuLP.log
optList := DefaultOptions(wakuLP.h)
optList = append(optList, opts...)
optList := append(DefaultOptions(wakuLP.h), opts...)
for _, opt := range optList {
opt(params)
}

View File

@ -34,7 +34,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) LightPushOption {
return func(params *LightPushParameters) {
p, err := utils.SelectPeer(params.host, string(LightPushID_v20beta1), fromThesePeers, params.log)
if err == nil {
params.selectedPeer = *p
params.selectedPeer = p
} else {
params.log.Info("selecting peer", zap.Error(err))
}
@ -49,7 +49,7 @@ func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Li
return func(params *LightPushParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1), fromThesePeers, params.log)
if err == nil {
params.selectedPeer = *p
params.selectedPeer = p
} else {
params.log.Info("selecting peer", zap.Error(err))
}

View File

@ -32,7 +32,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) {
p, err := utils.SelectPeer(params.host, string(PeerExchangeID_v20alpha1), fromThesePeers, params.log)
if err == nil {
params.selectedPeer = *p
params.selectedPeer = p
} else {
params.log.Info("selecting peer", zap.Error(err))
}
@ -47,7 +47,7 @@ func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Pe
return func(params *PeerExchangeParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(PeerExchangeID_v20alpha1), fromThesePeers, params.log)
if err == nil {
params.selectedPeer = *p
params.selectedPeer = p
} else {
params.log.Info("selecting peer", zap.Error(err))
}

View File

@ -0,0 +1 @@
package protocol

View File

@ -107,7 +107,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption
return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta4), fromThesePeers, params.s.log)
if err == nil {
params.selectedPeer = *p
params.selectedPeer = p
} else {
params.s.log.Info("selecting peer", zap.Error(err))
}
@ -122,7 +122,7 @@ func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Hi
return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta4), fromThesePeers, params.s.log)
if err == nil {
params.selectedPeer = *p
params.selectedPeer = p
} else {
params.s.log.Info("selecting peer", zap.Error(err))
}

View File

@ -10,7 +10,6 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/waku-org/go-waku/logging"
"go.uber.org/zap"
)
@ -21,7 +20,7 @@ var ErrNoPeersAvailable = errors.New("no suitable peers found")
// SelectPeer is used to return a random peer that supports a given protocol.
// If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore
func SelectPeer(host host.Host, protocolId string, specificPeers []peer.ID, log *zap.Logger) (*peer.ID, error) {
func SelectPeer(host host.Host, protocolId string, specificPeers []peer.ID, log *zap.Logger) (peer.ID, error) {
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
// Ideally depending on the query and our set of peers we take a subset of ideal peers.
// This will require us to check for various factors such as:
@ -38,8 +37,7 @@ func SelectPeer(host host.Host, protocolId string, specificPeers []peer.ID, log
for _, peer := range peerSet {
protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId)
if err != nil {
log.Error("obtaining protocols supported by peers", zap.Error(err), logging.HostID("peer", peer))
return nil, err
return "", err
}
if len(protocols) > 0 {
@ -49,10 +47,10 @@ func SelectPeer(host host.Host, protocolId string, specificPeers []peer.ID, log
if len(peers) >= 1 {
// TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned
return &peers[rand.Intn(len(peers))], nil // nolint: gosec
return peers[rand.Intn(len(peers))], nil // nolint: gosec
}
return nil, ErrNoPeersAvailable
return "", ErrNoPeersAvailable
}
type pingResult struct {
@ -63,7 +61,7 @@ type pingResult struct {
// SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time
// If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore
func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string, specificPeers []peer.ID, log *zap.Logger) (*peer.ID, error) {
func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string, specificPeers []peer.ID, log *zap.Logger) (peer.ID, error) {
var peers peer.IDSlice
peerSet := specificPeers
@ -74,8 +72,7 @@ func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId str
for _, peer := range peerSet {
protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId)
if err != nil {
log.Error("error obtaining the protocols supported by peers", zap.Error(err))
return nil, err
return "", err
}
if len(protocols) > 0 {
@ -122,11 +119,11 @@ func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId str
}
}
if min == nil {
return nil, ErrNoPeersAvailable
return "", ErrNoPeersAvailable
}
return &min.p, nil
return min.p, nil
case <-ctx.Done():
return nil, ErrNoPeersAvailable
return "", ErrNoPeersAvailable
}
}