refactor: inject host on node start

This commit is contained in:
Richard Ramos 2023-04-16 20:04:12 -04:00 committed by RichΛrd
parent fbd58df2e8
commit 2c3566377a
30 changed files with 212 additions and 112 deletions

View File

@ -46,7 +46,7 @@ type PeerConnectionStrategy struct {
// dialTimeout is how long we attempt to connect to a peer before giving up
// minPeers is the minimum number of peers that the node should have
// backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer
func NewPeerConnectionStrategy(h host.Host, cacheSize int, minPeers int, dialTimeout time.Duration, backoff backoff.BackoffFactory, logger *zap.Logger) (*PeerConnectionStrategy, error) {
func NewPeerConnectionStrategy(cacheSize int, minPeers int, dialTimeout time.Duration, backoff backoff.BackoffFactory, logger *zap.Logger) (*PeerConnectionStrategy, error) {
cache, err := lru.New2Q(cacheSize)
if err != nil {
return nil, err
@ -54,7 +54,6 @@ func NewPeerConnectionStrategy(h host.Host, cacheSize int, minPeers int, dialTim
return &PeerConnectionStrategy{
cache: cache,
host: h,
wg: sync.WaitGroup{},
minPeers: minPeers,
dialTimeout: dialTimeout,
@ -73,6 +72,11 @@ func (c *PeerConnectionStrategy) PeerChannel() chan<- peer.AddrInfo {
return c.peerCh
}
// Sets the host to be able to mount or consume a protocol
func (c *PeerConnectionStrategy) SetHost(h host.Host) {
c.host = h
}
// Start attempts to connect to the peers passed in by peerCh. Will not connect to peers if they are within the backoff period.
func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
if c.cancel != nil {

View File

@ -87,7 +87,7 @@ type PeerConnector interface {
PeerChannel() chan<- peer.AddrInfo
}
func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConnector PeerConnector, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) {
func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConnector PeerConnector, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) {
params := new(discV5Parameters)
optList := DefaultOptions()
optList = append(optList, opts...)
@ -103,7 +103,6 @@ func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.Loc
}
return &DiscoveryV5{
host: host,
peerConnector: peerConnector,
params: params,
NAT: NAT,
@ -161,6 +160,11 @@ func (d *DiscoveryV5) listen(ctx context.Context) error {
return nil
}
// Sets the host to be able to mount or consume a protocol
func (d *DiscoveryV5) SetHost(h host.Host) {
d.host = h
}
func (d *DiscoveryV5) Start(ctx context.Context) error {
d.Lock()
defer d.Unlock()

View File

@ -106,8 +106,9 @@ func TestDiscV5(t *testing.T) {
l1, err := newLocalnode(prvKey1, ip1, udpPort1, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger())
require.NoError(t, err)
peerconn1 := tests.NewTestPeerDiscoverer()
d1, err := NewDiscoveryV5(host1, prvKey1, l1, peerconn1, utils.Logger(), WithUDPPort(uint(udpPort1)))
d1, err := NewDiscoveryV5(prvKey1, l1, peerconn1, utils.Logger(), WithUDPPort(uint(udpPort1)))
require.NoError(t, err)
d1.SetHost(host1)
// H2
host2, _, prvKey2 := createHost(t)
@ -117,8 +118,9 @@ func TestDiscV5(t *testing.T) {
l2, err := newLocalnode(prvKey2, ip2, udpPort2, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger())
require.NoError(t, err)
peerconn2 := tests.NewTestPeerDiscoverer()
d2, err := NewDiscoveryV5(host2, prvKey2, l2, peerconn2, utils.Logger(), WithUDPPort(uint(udpPort2)), WithBootnodes([]*enode.Node{d1.localnode.Node()}))
d2, err := NewDiscoveryV5(prvKey2, l2, peerconn2, utils.Logger(), WithUDPPort(uint(udpPort2)), WithBootnodes([]*enode.Node{d1.localnode.Node()}))
require.NoError(t, err)
d2.SetHost(host2)
// H3
host3, _, prvKey3 := createHost(t)
@ -128,8 +130,9 @@ func TestDiscV5(t *testing.T) {
l3, err := newLocalnode(prvKey3, ip3, udpPort3, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger())
require.NoError(t, err)
peerconn3 := tests.NewTestPeerDiscoverer()
d3, err := NewDiscoveryV5(host3, prvKey3, l3, peerconn3, utils.Logger(), WithUDPPort(uint(udpPort3)), WithBootnodes([]*enode.Node{d2.localnode.Node()}))
d3, err := NewDiscoveryV5(prvKey3, l3, peerconn3, utils.Logger(), WithUDPPort(uint(udpPort3)), WithBootnodes([]*enode.Node{d2.localnode.Node()}))
require.NoError(t, err)
d3.SetHost(host3)
defer d1.Stop()
defer d2.Stop()

View File

@ -3,11 +3,13 @@ package node
import (
"context"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
type Service interface {
SetHost(h host.Host)
Start(ctx context.Context) error
Stop()
}

View File

@ -114,7 +114,7 @@ type WakuNode struct {
}
func defaultStoreFactory(w *WakuNode) store.Store {
return store.NewWakuStore(w.host, w.opts.messageProvider, w.timesource, w.log)
return store.NewWakuStore(w.opts.messageProvider, w.timesource, w.log)
}
// New is used to instantiate a WakuNode using a set of WakuNodeOptions
@ -167,19 +167,15 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
params.libP2POpts = append(params.libP2POpts, libp2p.AddrsFactory(params.addressFactory))
}
host, err := libp2p.New(params.libP2POpts...)
if err != nil {
return nil, err
}
var err error
w := new(WakuNode)
w.bcaster = v2.NewBroadcaster(1024)
w.host = host
w.opts = params
w.log = params.logger.Named("node2")
w.wg = &sync.WaitGroup{}
w.keepAliveFails = make(map[peer.ID]int)
w.wakuFlag = utils.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableLefacyFilter, w.opts.enableStore, w.opts.enableRelay)
w.wakuFlag = utils.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableLegacyFilter, w.opts.enableStore, w.opts.enableRelay)
if params.enableNTP {
w.timesource = timesource.NewNTPTimesource(w.opts.ntpURLs, w.log)
@ -197,7 +193,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
rngSrc := rand.NewSource(rand.Int63())
minBackoff, maxBackoff := time.Second*30, time.Hour
bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc))
w.peerConnector, err = v2.NewPeerConnectionStrategy(host, cacheSize, w.opts.discoveryMinPeers, network.DialPeerTimeout, bkf, w.log)
w.peerConnector, err = v2.NewPeerConnectionStrategy(cacheSize, w.opts.discoveryMinPeers, network.DialPeerTimeout, bkf, w.log)
if err != nil {
w.log.Error("creating peer connection strategy", zap.Error(err))
}
@ -209,7 +205,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
}
}
w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.host, w.DiscV5(), w.peerConnector, w.log)
w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.DiscV5(), w.peerConnector, w.log)
if err != nil {
return nil, err
}
@ -223,12 +219,12 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
rendezvousPoints = append(rendezvousPoints, peerID)
}
w.rendezvous = rendezvous.NewRendezvous(w.host, w.opts.enableRendezvousServer, w.opts.rendezvousDB, w.opts.enableRendezvous, rendezvousPoints, w.peerConnector, w.log)
w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
w.legacyFilter = legacy_filter.NewWakuFilter(w.host, w.bcaster, w.opts.isLegacyFilterFullnode, w.timesource, w.log, w.opts.legacyFilterOpts...)
w.filterFullnode = filter.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterOpts...)
w.filterLightnode = filter.NewWakuFilterLightnode(w.host, w.bcaster, w.timesource, w.log)
w.lightPush = lightpush.NewWakuLightPush(w.host, w.Relay(), w.log)
w.rendezvous = rendezvous.NewRendezvous(w.opts.enableRendezvousServer, w.opts.rendezvousDB, w.opts.enableRendezvous, rendezvousPoints, w.peerConnector, w.log)
w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullnode, w.timesource, w.log, w.opts.legacyFilterOpts...)
w.filterFullnode = filter.NewWakuFilterFullnode(w.bcaster, w.timesource, w.log, w.opts.filterOpts...)
w.filterLightnode = filter.NewWakuFilterLightnode(w.bcaster, w.timesource, w.log)
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.log)
if params.storeFactory != nil {
w.storeFactory = params.storeFactory
@ -236,18 +232,6 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.storeFactory = defaultStoreFactory
}
if w.protocolEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerProtocolsUpdated)); err != nil {
return nil, err
}
if w.identificationEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted)); err != nil {
return nil, err
}
if w.addressChangesSub, err = host.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated)); err != nil {
return nil, err
}
if params.connStatusC != nil {
w.connStatusChan = params.connStatusC
}
@ -296,6 +280,25 @@ func (w *WakuNode) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
w.cancel = cancel
host, err := libp2p.New(w.opts.libP2POpts...)
if err != nil {
return err
}
w.host = host
if w.protocolEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerProtocolsUpdated)); err != nil {
return err
}
if w.identificationEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted)); err != nil {
return err
}
if w.addressChangesSub, err = host.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated)); err != nil {
return err
}
w.connectionNotif = NewConnectionNotifier(ctx, w.host, w.log)
w.host.Network().Notify(w.connectionNotif)
@ -306,7 +309,7 @@ func (w *WakuNode) Start(ctx context.Context) error {
go w.watchMultiaddressChanges(ctx)
go w.watchENRChanges(ctx)
err := w.bcaster.Start(ctx)
err = w.bcaster.Start(ctx)
if err != nil {
return err
}
@ -316,6 +319,7 @@ func (w *WakuNode) Start(ctx context.Context) error {
go w.startKeepAlive(ctx, w.opts.keepAliveInterval)
}
w.peerConnector.SetHost(host)
err = w.peerConnector.Start(ctx)
if err != nil {
return err
@ -328,6 +332,7 @@ func (w *WakuNode) Start(ctx context.Context) error {
}
}
w.relay.SetHost(host)
if w.opts.enableRelay {
err := w.relay.Start(ctx)
if err != nil {
@ -345,6 +350,7 @@ func (w *WakuNode) Start(ctx context.Context) error {
}
w.store = w.storeFactory(w)
w.store.SetHost(host)
if w.opts.enableStore {
err := w.startStore(ctx)
if err != nil {
@ -355,13 +361,15 @@ func (w *WakuNode) Start(ctx context.Context) error {
w.bcaster.Register(nil, w.store.MessageChannel())
}
w.lightPush.SetHost(host)
if w.opts.enableLightPush {
if err := w.lightPush.Start(ctx); err != nil {
return err
}
}
if w.opts.enableLefacyFilter {
w.legacyFilter.SetHost(host)
if w.opts.enableLegacyFilter {
err := w.legacyFilter.Start(ctx)
if err != nil {
return err
@ -371,7 +379,8 @@ func (w *WakuNode) Start(ctx context.Context) error {
w.bcaster.Register(nil, w.legacyFilter.MessageChannel())
}
if w.opts.enableFilterFullnode {
w.filterFullnode.SetHost(host)
if w.opts.enableFilterFullNode {
err := w.filterFullnode.Start(ctx)
if err != nil {
return err
@ -381,6 +390,7 @@ func (w *WakuNode) Start(ctx context.Context) error {
w.bcaster.Register(nil, w.filterFullnode.MessageChannel())
}
w.filterLightnode.SetHost(host)
if w.opts.enableFilterLightNode {
err := w.filterLightnode.Start(ctx)
if err != nil {
@ -393,6 +403,7 @@ func (w *WakuNode) Start(ctx context.Context) error {
return err
}
w.peerExchange.SetHost(host)
if w.opts.enablePeerExchange {
err := w.peerExchange.Start(ctx)
if err != nil {
@ -400,6 +411,7 @@ func (w *WakuNode) Start(ctx context.Context) error {
}
}
w.rendezvous.SetHost(host)
if w.opts.enableRendezvousServer || w.opts.enableRendezvous {
err := w.rendezvous.Start(ctx)
if err != nil {
@ -624,7 +636,7 @@ func (w *WakuNode) mountDiscV5() error {
}
var err error
w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.opts.privKey, w.localNode, w.peerConnector, w.log, discV5Options...)
w.discoveryV5, err = discv5.NewDiscoveryV5(w.opts.privKey, w.localNode, w.peerConnector, w.log, discV5Options...)
return err
}

View File

@ -66,10 +66,10 @@ type WakuNodeParameters struct {
noDefaultWakuTopic bool
enableRelay bool
enableLefacyFilter bool
enableLegacyFilter bool
isLegacyFilterFullnode bool
enableFilterLightNode bool
enableFilterFullnode bool
enableFilterFullNode bool
legacyFilterOpts []legacy_filter.Option
filterOpts []filter.Option
wOpts []pubsub.Option
@ -323,7 +323,7 @@ func WithPeerExchange() WakuNodeOption {
// accepts a list of WakuFilter gossipsub options to setup the protocol
func WithLegacyWakuFilter(fullnode bool, filterOpts ...legacy_filter.Option) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableLefacyFilter = true
params.enableLegacyFilter = true
params.isLegacyFilterFullnode = fullnode
params.legacyFilterOpts = filterOpts
return nil
@ -342,7 +342,7 @@ func WithWakuFilterLightNode() WakuNodeOption {
// This WakuNodeOption accepts a list of WakuFilter options to setup the protocol
func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableFilterFullnode = true
params.enableFilterFullNode = true
params.filterOpts = filterOpts
return nil
}

View File

@ -28,7 +28,7 @@ func TestWakuOptions(t *testing.T) {
require.NoError(t, err)
storeFactory := func(w *WakuNode) store.Store {
return store.NewWakuStore(w.host, w.opts.messageProvider, w.timesource, w.log)
return store.NewWakuStore(w.opts.messageProvider, w.timesource, w.log)
}
options := []WakuNodeOption{

View File

@ -55,17 +55,21 @@ type WakuFilterPushResult struct {
}
// NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilterLightnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightnode {
func NewWakuFilterLightnode(broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightnode {
wf := new(WakuFilterLightnode)
wf.log = log.Named("filterv2-lightnode")
wf.broadcaster = broadcaster
wf.timesource = timesource
wf.wg = &sync.WaitGroup{}
wf.h = host
return wf
}
// Sets the host to be able to mount or consume a protocol
func (wf *WakuFilterLightnode) SetHost(h host.Host) {
wf.h = h
}
func (wf *WakuFilterLightnode) Start(ctx context.Context) error {
wf.wg.Wait() // Wait for any goroutines to stop

View File

@ -25,7 +25,8 @@ func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*rel
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay := relay.NewWakuRelay(host, broadcaster, 0, timesource.NewDefaultClock(), utils.Logger())
relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
require.NoError(t, err)
@ -44,7 +45,8 @@ func makeWakuFilterLightNode(t *testing.T) (*WakuFilterLightnode, host.Host) {
b := v2.NewBroadcaster(10)
require.NoError(t, b.Start(context.Background()))
filterPush := NewWakuFilterLightnode(host, b, timesource.NewDefaultClock(), utils.Logger())
filterPush := NewWakuFilterLightnode(b, timesource.NewDefaultClock(), utils.Logger())
filterPush.SetHost(host)
err = filterPush.Start(context.Background())
require.NoError(t, err)
@ -77,7 +79,8 @@ func TestWakuFilter(t *testing.T) {
defer node2.Stop()
defer sub2.Unsubscribe()
node2Filter := NewWakuFilterFullnode(host2, broadcaster, timesource.NewDefaultClock(), utils.Logger())
node2Filter := NewWakuFilterFullnode(broadcaster, timesource.NewDefaultClock(), utils.Logger())
node2Filter.SetHost(host2)
err := node2Filter.Start(ctx)
require.NoError(t, err)
@ -166,7 +169,8 @@ func TestSubscriptionPing(t *testing.T) {
defer node2.Stop()
defer sub2.Unsubscribe()
node2Filter := NewWakuFilterFullnode(host2, broadcaster, timesource.NewDefaultClock(), utils.Logger())
node2Filter := NewWakuFilterFullnode(broadcaster, timesource.NewDefaultClock(), utils.Logger())
node2Filter.SetHost(host2)
err := node2Filter.Start(ctx)
require.NoError(t, err)
@ -208,7 +212,8 @@ func TestWakuFilterPeerFailure(t *testing.T) {
broadcaster2 := v2.NewBroadcaster(10)
require.NoError(t, broadcaster2.Start(context.Background()))
node2Filter := NewWakuFilterFullnode(host2, broadcaster2, timesource.NewDefaultClock(), utils.Logger(), WithTimeout(5*time.Second))
node2Filter := NewWakuFilterFullnode(broadcaster2, timesource.NewDefaultClock(), utils.Logger(), WithTimeout(5*time.Second))
node2Filter.SetHost(host2)
err := node2Filter.Start(ctx)
require.NoError(t, err)

View File

@ -44,7 +44,7 @@ type (
)
// NewWakuFilterFullnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFullNode {
func NewWakuFilterFullnode(broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFullNode {
wf := new(WakuFilterFullNode)
wf.log = log.Named("filterv2-fullnode")
@ -56,13 +56,17 @@ func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesourc
}
wf.wg = &sync.WaitGroup{}
wf.h = host
wf.subscriptions = NewSubscribersMap(params.Timeout)
wf.maxSubscriptions = params.MaxSubscribers
return wf
}
// Sets the host to be able to mount or consume a protocol
func (wf *WakuFilterFullNode) SetHost(h host.Host) {
wf.h = h
}
func (wf *WakuFilterFullNode) Start(ctx context.Context) error {
wf.wg.Wait() // Wait for any goroutines to stop

View File

@ -66,7 +66,7 @@ type (
const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1")
// NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilter(host host.Host, broadcaster v2.Broadcaster, isFullNode bool, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilter {
func NewWakuFilter(broadcaster v2.Broadcaster, isFullNode bool, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilter {
wf := new(WakuFilter)
wf.log = log.Named("filter").With(zap.Bool("fullNode", isFullNode))
@ -78,7 +78,6 @@ func NewWakuFilter(host host.Host, broadcaster v2.Broadcaster, isFullNode bool,
}
wf.wg = &sync.WaitGroup{}
wf.h = host
wf.isFullNode = isFullNode
wf.filters = NewFilterMap(broadcaster, timesource)
wf.subscribers = NewSubscribers(params.Timeout)
@ -86,6 +85,11 @@ func NewWakuFilter(host host.Host, broadcaster v2.Broadcaster, isFullNode bool,
return wf
}
// Sets the host to be able to mount or consume a protocol
func (wf *WakuFilter) SetHost(h host.Host) {
wf.h = h
}
func (wf *WakuFilter) Start(ctx context.Context) error {
wf.wg.Wait() // Wait for any goroutines to stop

View File

@ -24,7 +24,8 @@ func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*rel
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay := relay.NewWakuRelay(host, broadcaster, 0, timesource.NewDefaultClock(), utils.Logger())
relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
require.NoError(t, err)
@ -43,7 +44,8 @@ func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) {
b := v2.NewBroadcaster(10)
require.NoError(t, b.Start(context.Background()))
filter := NewWakuFilter(host, b, false, timesource.NewDefaultClock(), utils.Logger())
filter := NewWakuFilter(b, false, timesource.NewDefaultClock(), utils.Logger())
filter.SetHost(host)
err = filter.Start(context.Background())
require.NoError(t, err)
@ -76,7 +78,8 @@ func TestWakuFilter(t *testing.T) {
defer node2.Stop()
defer sub2.Unsubscribe()
node2Filter := NewWakuFilter(host2, broadcaster, true, timesource.NewDefaultClock(), utils.Logger())
node2Filter := NewWakuFilter(broadcaster, true, timesource.NewDefaultClock(), utils.Logger())
node2Filter.SetHost(host2)
err := node2Filter.Start(ctx)
require.NoError(t, err)
@ -167,7 +170,8 @@ func TestWakuFilterPeerFailure(t *testing.T) {
broadcaster2 := v2.NewBroadcaster(10)
require.NoError(t, broadcaster2.Start(context.Background()))
node2Filter := NewWakuFilter(host2, broadcaster2, true, timesource.NewDefaultClock(), utils.Logger(), WithTimeout(3*time.Second))
node2Filter := NewWakuFilter(broadcaster2, true, timesource.NewDefaultClock(), utils.Logger(), WithTimeout(3*time.Second))
node2Filter.SetHost(host2)
err := node2Filter.Start(ctx)
require.NoError(t, err)

View File

@ -36,15 +36,19 @@ type WakuLightPush struct {
}
// NewWakuRelay returns a new instance of Waku Lightpush struct
func NewWakuLightPush(h host.Host, relay *relay.WakuRelay, log *zap.Logger) *WakuLightPush {
func NewWakuLightPush(relay *relay.WakuRelay, log *zap.Logger) *WakuLightPush {
wakuLP := new(WakuLightPush)
wakuLP.relay = relay
wakuLP.h = h
wakuLP.log = log.Named("lightpush")
return wakuLP
}
// Sets the host to be able to mount or consume a protocol
func (wakuLP *WakuLightPush) SetHost(h host.Host) {
wakuLP.h = h
}
// Start inits the lighpush protocol
func (wakuLP *WakuLightPush) Start(ctx context.Context) error {
if wakuLP.relayIsNotAvailable() {

View File

@ -28,7 +28,8 @@ func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscri
b := v2.NewBroadcaster(10)
require.NoError(t, b.Start(context.Background()))
relay := relay.NewWakuRelay(host, b, 0, timesource.NewDefaultClock(), utils.Logger())
relay := relay.NewWakuRelay(b, 0, timesource.NewDefaultClock(), utils.Logger())
relay.SetHost(host)
require.NoError(t, err)
err = relay.Start(context.Background())
require.NoError(t, err)
@ -61,7 +62,8 @@ func TestWakuLightPush(t *testing.T) {
defer sub2.Unsubscribe()
ctx := context.Background()
lightPushNode2 := NewWakuLightPush(host2, node2, utils.Logger())
lightPushNode2 := NewWakuLightPush(node2, utils.Logger())
lightPushNode2.SetHost(host2)
err := lightPushNode2.Start(ctx)
require.NoError(t, err)
defer lightPushNode2.Stop()
@ -71,7 +73,8 @@ func TestWakuLightPush(t *testing.T) {
clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
client := NewWakuLightPush(clientHost, nil, utils.Logger())
client := NewWakuLightPush(nil, utils.Logger())
client.SetHost(clientHost)
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
err = host2.Peerstore().AddProtocols(host1.ID(), relay.WakuRelayID_v200)
@ -127,7 +130,8 @@ func TestWakuLightPushStartWithoutRelay(t *testing.T) {
clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader)
require.NoError(t, err)
client := NewWakuLightPush(clientHost, nil, utils.Logger())
client := NewWakuLightPush(nil, utils.Logger())
client.SetHost(clientHost)
err = client.Start(ctx)
require.Errorf(t, err, "relay is required")
@ -141,8 +145,8 @@ func TestWakuLightPushNoPeers(t *testing.T) {
clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader)
require.NoError(t, err)
client := NewWakuLightPush(clientHost, nil, utils.Logger())
client := NewWakuLightPush(nil, utils.Logger())
client.SetHost(clientHost)
_, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", 0), testTopic)
require.Errorf(t, err, "no suitable remote peers")
}

View File

@ -28,7 +28,8 @@ func createRelayNode(t *testing.T) (host.Host, *relay.WakuRelay) {
b := v2.NewBroadcaster(1024)
require.NoError(t, b.Start(context.Background()))
relay := relay.NewWakuRelay(host, b, 0, timesource.NewDefaultClock(), utils.Logger())
relay := relay.NewWakuRelay(b, 0, timesource.NewDefaultClock(), utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
require.NoError(t, err)

View File

@ -62,9 +62,8 @@ type PeerConnector interface {
}
// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
func NewWakuPeerExchange(h host.Host, disc *discv5.DiscoveryV5, peerConnector PeerConnector, log *zap.Logger) (*WakuPeerExchange, error) {
func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, log *zap.Logger) (*WakuPeerExchange, error) {
wakuPX := new(WakuPeerExchange)
wakuPX.h = h
wakuPX.disc = disc
wakuPX.log = log.Named("wakupx")
wakuPX.enrCache = make(map[enode.ID]peerRecord)
@ -73,6 +72,11 @@ func NewWakuPeerExchange(h host.Host, disc *discv5.DiscoveryV5, peerConnector Pe
return wakuPX, nil
}
// Sets the host to be able to mount or consume a protocol
func (wakuPX *WakuPeerExchange) SetHost(h host.Host) {
wakuPX.h = h
}
// Start inits the peer exchange protocol
func (wakuPX *WakuPeerExchange) Start(ctx context.Context) error {
if wakuPX.cancel != nil {

View File

@ -106,8 +106,9 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
l1, err := newLocalnode(prvKey1, ip1, udpPort1, utils.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger())
require.NoError(t, err)
discv5PeerConn1 := tests.NewTestPeerDiscoverer()
d1, err := discv5.NewDiscoveryV5(host1, prvKey1, l1, discv5PeerConn1, utils.Logger(), discv5.WithUDPPort(uint(udpPort1)))
d1, err := discv5.NewDiscoveryV5(prvKey1, l1, discv5PeerConn1, utils.Logger(), discv5.WithUDPPort(uint(udpPort1)))
require.NoError(t, err)
d1.SetHost(host1)
// H2
host2, _, prvKey2 := createHost(t)
@ -117,8 +118,9 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
l2, err := newLocalnode(prvKey2, ip2, udpPort2, utils.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger())
require.NoError(t, err)
discv5PeerConn2 := tests.NewTestPeerDiscoverer()
d2, err := discv5.NewDiscoveryV5(host2, prvKey2, l2, discv5PeerConn2, utils.Logger(), discv5.WithUDPPort(uint(udpPort2)), discv5.WithBootnodes([]*enode.Node{d1.Node()}))
d2, err := discv5.NewDiscoveryV5(prvKey2, l2, discv5PeerConn2, utils.Logger(), discv5.WithUDPPort(uint(udpPort2)), discv5.WithBootnodes([]*enode.Node{d1.Node()}))
require.NoError(t, err)
d2.SetHost(host2)
// H3
host3, _, _ := createHost(t)
@ -139,12 +141,14 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
// mount peer exchange
pxPeerConn1 := tests.NewTestPeerDiscoverer()
px1, err := NewWakuPeerExchange(host1, d1, pxPeerConn1, utils.Logger())
px1, err := NewWakuPeerExchange(d1, pxPeerConn1, utils.Logger())
require.NoError(t, err)
px1.SetHost(host1)
pxPeerConn3 := tests.NewTestPeerDiscoverer()
px3, err := NewWakuPeerExchange(host3, nil, pxPeerConn3, utils.Logger())
px3, err := NewWakuPeerExchange(nil, pxPeerConn3, utils.Logger())
require.NoError(t, err)
px3.SetHost(host3)
err = px1.Start(context.Background())
require.NoError(t, err)

View File

@ -61,9 +61,8 @@ func msgIdFn(pmsg *pubsub_pb.Message) string {
}
// NewWakuRelay returns a new instance of a WakuRelay struct
func NewWakuRelay(h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, timesource timesource.Timesource, log *zap.Logger, opts ...pubsub.Option) *WakuRelay {
func NewWakuRelay(bcaster v2.Broadcaster, minPeersToPublish int, timesource timesource.Timesource, log *zap.Logger, opts ...pubsub.Option) *WakuRelay {
w := new(WakuRelay)
w.host = h
w.timesource = timesource
w.wakuRelayTopics = make(map[string]*pubsub.Topic)
w.relaySubs = make(map[string]*pubsub.Subscription)
@ -96,6 +95,11 @@ func NewWakuRelay(h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, ti
return w
}
// Sets the host to be able to mount or consume a protocol
func (w *WakuRelay) SetHost(h host.Host) {
w.host = h
}
func (w *WakuRelay) Start(ctx context.Context) error {
w.wg.Wait()
ctx, cancel := context.WithCancel(ctx)

View File

@ -22,7 +22,8 @@ func TestWakuRelay(t *testing.T) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay := NewWakuRelay(host, nil, 0, timesource.NewDefaultClock(), utils.Logger())
relay := NewWakuRelay(nil, 0, timesource.NewDefaultClock(), utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
require.NoError(t, err)
defer relay.Stop()

View File

@ -35,7 +35,8 @@ func (s *WakuRLNRelaySuite) TestOffchainMode() {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
s.Require().NoError(err)
relay := relay.NewWakuRelay(host, nil, 0, timesource.NewDefaultClock(), utils.Logger())
relay := relay.NewWakuRelay(nil, 0, timesource.NewDefaultClock(), utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
s.Require().NoError(err)
defer relay.Stop()

View File

@ -22,7 +22,7 @@ func TestFindLastSeenMessage(t *testing.T) {
msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), utils.GetUnixEpoch(), "test")
msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), utils.GetUnixEpoch(), "test")
s := NewWakuStore(nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
_ = s.storeMessage(msg1)
_ = s.storeMessage(msg3)
_ = s.storeMessage(msg5)
@ -42,7 +42,8 @@ func TestResume(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx)
require.NoError(t, err)
@ -62,7 +63,8 @@ func TestResume(t *testing.T) {
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(host2, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx)
require.NoError(t, err)
defer s2.Stop()
@ -99,7 +101,8 @@ func TestResumeWithListOfPeers(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx)
require.NoError(t, err)
@ -112,7 +115,8 @@ func TestResumeWithListOfPeers(t *testing.T) {
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(host2, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx)
require.NoError(t, err)
defer s2.Stop()
@ -138,7 +142,8 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx)
require.NoError(t, err)
@ -151,7 +156,8 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(host2, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx)
require.NoError(t, err)

View File

@ -63,10 +63,9 @@ type WakuStore struct {
}
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
func NewWakuStore(host host.Host, p MessageProvider, timesource timesource.Timesource, log *zap.Logger) *WakuStore {
func NewWakuStore(p MessageProvider, timesource timesource.Timesource, log *zap.Logger) *WakuStore {
wakuStore := new(WakuStore)
wakuStore.msgProvider = p
wakuStore.h = host
wakuStore.wg = &sync.WaitGroup{}
wakuStore.log = log.Named("store")
wakuStore.timesource = timesource

View File

@ -14,7 +14,7 @@ import (
func TestStorePersistence(t *testing.T) {
db := MemoryDB(t)
s1 := NewWakuStore(nil, db, timesource.NewDefaultClock(), utils.Logger())
s1 := NewWakuStore(db, timesource.NewDefaultClock(), utils.Logger())
defaultPubSubTopic := "test"
defaultContentTopic := "1"

View File

@ -7,6 +7,7 @@ import (
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-msgio/pbio"
@ -85,6 +86,7 @@ type MessageProvider interface {
}
type Store interface {
SetHost(h host.Host)
Start(ctx context.Context) error
Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error)
Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*wpb.WakuMessage, error)
@ -99,6 +101,11 @@ func (store *WakuStore) SetMessageProvider(p MessageProvider) {
store.msgProvider = p
}
// Sets the host to be able to mount or consume a protocol
func (store *WakuStore) SetHost(h host.Host) {
store.h = h
}
// Start initializes the WakuStore by enabling the protocol and fetching records from a message provider
func (store *WakuStore) Start(ctx context.Context) error {
if store.started {

View File

@ -22,7 +22,8 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx)
require.NoError(t, err)
@ -43,7 +44,8 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
// Simulate a message has been received via relay protocol
s1.MsgC <- protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1)
s2 := NewWakuStore(host2, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx)
require.NoError(t, err)
defer s2.Stop()
@ -71,7 +73,8 @@ func TestWakuStoreProtocolLocalQuery(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx)
require.NoError(t, err)
@ -111,7 +114,8 @@ func TestWakuStoreProtocolNext(t *testing.T) {
require.NoError(t, err)
db := MemoryDB(t)
s1 := NewWakuStore(host1, db, timesource.NewDefaultClock(), utils.Logger())
s1 := NewWakuStore(db, timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx)
require.NoError(t, err)
@ -137,7 +141,8 @@ func TestWakuStoreProtocolNext(t *testing.T) {
err = host2.Peerstore().AddProtocols(host1.ID(), StoreID_v20beta4)
require.NoError(t, err)
s2 := NewWakuStore(host2, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx)
require.NoError(t, err)
defer s2.Stop()
@ -181,7 +186,8 @@ func TestWakuStoreResult(t *testing.T) {
require.NoError(t, err)
db := MemoryDB(t)
s1 := NewWakuStore(host1, db, timesource.NewDefaultClock(), utils.Logger())
s1 := NewWakuStore(db, timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx)
require.NoError(t, err)
@ -207,7 +213,8 @@ func TestWakuStoreResult(t *testing.T) {
err = host2.Peerstore().AddProtocols(host1.ID(), StoreID_v20beta4)
require.NoError(t, err)
s2 := NewWakuStore(host2, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx)
require.NoError(t, err)
defer s2.Stop()
@ -266,7 +273,8 @@ func TestWakuStoreProtocolFind(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx)
require.NoError(t, err)
defer s1.Stop()
@ -301,7 +309,8 @@ func TestWakuStoreProtocolFind(t *testing.T) {
err = host2.Peerstore().AddProtocols(host1.ID(), StoreID_v20beta4)
require.NoError(t, err)
s2 := NewWakuStore(host2, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx)
require.NoError(t, err)
defer s2.Stop()

View File

@ -21,7 +21,7 @@ func TestStoreQuery(t *testing.T) {
msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch())
msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch())
s := NewWakuStore(nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), defaultPubSubTopic))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), defaultPubSubTopic))
@ -47,7 +47,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), defaultPubSubTopic))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), defaultPubSubTopic))
@ -80,7 +80,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic2))
@ -112,7 +112,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic2))
@ -134,7 +134,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1))
@ -153,7 +153,7 @@ func TestStoreQueryForwardPagination(t *testing.T) {
topic1 := "1"
pubsubTopic1 := "topic1"
s := NewWakuStore(nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
for i := 0; i < 10; i++ {
msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
msg.Payload = []byte{byte(i)}
@ -177,7 +177,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
topic1 := "1"
pubsubTopic1 := "topic1"
s := NewWakuStore(nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
for i := 0; i < 10; i++ {
msg := &wpb.WakuMessage{
Payload: []byte{byte(i)},
@ -203,7 +203,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
}
func TestTemporalHistoryQueries(t *testing.T) {
s := NewWakuStore(nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
var messages []*wpb.WakuMessage
for i := 0; i < 10; i++ {

View File

@ -43,7 +43,7 @@ type PeerConnector interface {
PeerChannel() chan<- peer.AddrInfo
}
func NewRendezvous(host host.Host, enableServer bool, db *DB, discoverPeers bool, rendezvousPoints []peer.ID, peerConnector PeerConnector, log *zap.Logger) *Rendezvous {
func NewRendezvous(enableServer bool, db *DB, discoverPeers bool, rendezvousPoints []peer.ID, peerConnector PeerConnector, log *zap.Logger) *Rendezvous {
logger := log.Named("rendezvous")
var rendevousPoints []*rendezvousPoint
@ -54,7 +54,6 @@ func NewRendezvous(host host.Host, enableServer bool, db *DB, discoverPeers bool
}
return &Rendezvous{
host: host,
enableServer: enableServer,
db: db,
discoverPeers: discoverPeers,
@ -64,6 +63,11 @@ func NewRendezvous(host host.Host, enableServer bool, db *DB, discoverPeers bool
}
}
// Sets the host to be able to mount or consume a protocol
func (r *Rendezvous) SetHost(h host.Host) {
r.host = h
}
func (r *Rendezvous) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel

View File

@ -45,7 +45,8 @@ func TestRendezvous(t *testing.T) {
require.NoError(t, err)
rdb := NewDB(context.Background(), db, utils.Logger())
rendezvousPoint := NewRendezvous(host1, true, rdb, false, nil, nil, utils.Logger())
rendezvousPoint := NewRendezvous(true, rdb, false, nil, nil, utils.Logger())
rendezvousPoint.SetHost(host1)
err = rendezvousPoint.Start(context.Background())
require.NoError(t, err)
defer rendezvousPoint.Stop()
@ -65,7 +66,8 @@ func TestRendezvous(t *testing.T) {
err = host2.Peerstore().AddProtocols(info.ID, RendezvousID)
require.NoError(t, err)
rendezvousClient1 := NewRendezvous(host2, false, nil, false, []peer.ID{host1.ID()}, nil, utils.Logger())
rendezvousClient1 := NewRendezvous(false, nil, false, []peer.ID{host1.ID()}, nil, utils.Logger())
rendezvousClient1.SetHost(host2)
err = rendezvousClient1.Start(context.Background())
require.NoError(t, err)
defer rendezvousClient1.Stop()
@ -81,7 +83,8 @@ func TestRendezvous(t *testing.T) {
myPeerConnector := NewPeerConn()
rendezvousClient2 := NewRendezvous(host3, false, nil, true, []peer.ID{host1.ID()}, myPeerConnector, utils.Logger())
rendezvousClient2 := NewRendezvous(false, nil, true, []peer.ID{host1.ID()}, myPeerConnector, utils.Logger())
rendezvousClient2.SetHost(host3)
err = rendezvousClient2.Start(context.Background())
require.NoError(t, err)
defer rendezvousClient2.Stop()

View File

@ -34,7 +34,8 @@ func TestV1Peers(t *testing.T) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay := relay.NewWakuRelay(host, nil, 0, timesource.NewDefaultClock(), utils.Logger())
relay := relay.NewWakuRelay(nil, 0, timesource.NewDefaultClock(), utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
require.NoError(t, err)
defer relay.Stop()

View File

@ -52,7 +52,8 @@ func TestFilterSubscription(t *testing.T) {
b := v2.NewBroadcaster(10)
require.NoError(t, b.Start(context.Background()))
node := relay.NewWakuRelay(host, b, 0, timesource.NewDefaultClock(), utils.Logger())
node := relay.NewWakuRelay(b, 0, timesource.NewDefaultClock(), utils.Logger())
node.SetHost(host)
err = node.Start(context.Background())
require.NoError(t, err)
@ -61,7 +62,8 @@ func TestFilterSubscription(t *testing.T) {
b2 := v2.NewBroadcaster(10)
require.NoError(t, b2.Start(context.Background()))
f := legacy_filter.NewWakuFilter(host, b2, false, timesource.NewDefaultClock(), utils.Logger())
f := legacy_filter.NewWakuFilter(b2, false, timesource.NewDefaultClock(), utils.Logger())
f.SetHost(host)
err = f.Start(context.Background())
require.NoError(t, err)