feat: subscribe to shards with --topic and refactors

- Shards in same cluster are advertised on ENR
- Store().Resume() was moved to app layer
- NoDefaultWakuTopic() was removed since it's the app that must determine whether it subscribes to the default waku topic
- Removed `Publish` from WakuNode (not really used and easy to implement in app layer if needed)
This commit is contained in:
Richard Ramos 2023-07-05 15:17:43 -04:00 committed by richΛrd
parent 7dd02067f8
commit 6ece3c483b
15 changed files with 184 additions and 122 deletions

View File

@ -28,7 +28,7 @@
];
doCheck = false;
# FIXME: This needs to be manually changed when updating modules.
vendorSha256 = "sha256-6VBZ0ilGcXhTXCoUmGdrRQqgnRwVJOtAe4JIMUCVw8Y=";
vendorSha256 = "sha256-TU/jog0MZNC4g13gaGm88gsKTRvmlcKkMeXZbaVf3fc=";
# Fix for 'nix run' trying to execute 'go-waku'.
meta = { mainProgram = "waku"; };
};

View File

@ -92,7 +92,6 @@ func NewNode(configJSON string) string {
node.WithPrivateKey(prvKey),
node.WithHostAddress(hostAddr),
node.WithKeepAlive(time.Duration(*config.KeepAliveInterval) * time.Second),
node.NoDefaultWakuTopic(),
}
if *config.EnableRelay {

View File

@ -227,7 +227,6 @@ func Execute(options Options) {
}
if options.Store.Enable {
nodeOpts = append(nodeOpts, node.WithWakuStore(options.Store.ResumeNodes...))
nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore))
}
@ -314,27 +313,6 @@ func Execute(options Options) {
addStaticPeers(wakuNode, options.Rendezvous.Nodes, rendezvous.RendezvousID)
addStaticPeers(wakuNode, options.Filter.Nodes, filter.FilterSubscribeID_v20beta1)
if options.DiscV5.Enable {
if err = wakuNode.DiscV5().Start(ctx); err != nil {
logger.Fatal("starting discovery v5", zap.Error(err))
}
}
// retrieve and connect to peer exchange peers
if options.PeerExchange.Enable && options.PeerExchange.Node != nil {
logger.Info("retrieving peer info via peer exchange protocol")
peerId, err := wakuNode.AddPeer(*options.PeerExchange.Node, peers.Static, peer_exchange.PeerExchangeID_v20alpha1)
if err != nil {
logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err))
} else {
desiredOutDegree := wakuNode.Relay().Params().D
if err = wakuNode.PeerExchange().Request(ctx, desiredOutDegree, peer_exchange.WithPeer(peerId)); err != nil {
logger.Error("requesting peers via peer exchange", zap.Error(err))
}
}
}
if len(options.Relay.Topics.Value()) == 0 {
options.Relay.Topics = *cli.NewStringSlice(relay.DefaultWakuTopic)
}
@ -398,6 +376,27 @@ func Execute(options Options) {
}(ctx, n)
}
if options.DiscV5.Enable {
if err = wakuNode.DiscV5().Start(ctx); err != nil {
logger.Fatal("starting discovery v5", zap.Error(err))
}
}
// retrieve and connect to peer exchange peers
if options.PeerExchange.Enable && options.PeerExchange.Node != nil {
logger.Info("retrieving peer info via peer exchange protocol")
peerId, err := wakuNode.AddPeer(*options.PeerExchange.Node, peers.Static, peer_exchange.PeerExchangeID_v20alpha1)
if err != nil {
logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err))
} else {
desiredOutDegree := wakuNode.Relay().Params().D
if err = wakuNode.PeerExchange().Request(ctx, desiredOutDegree, peer_exchange.WithPeer(peerId)); err != nil {
logger.Error("requesting peers via peer exchange", zap.Error(err))
}
}
}
if len(discoveredNodes) != 0 {
for _, n := range discoveredNodes {
go func(ctx context.Context, info peer.AddrInfo) {
@ -412,14 +411,40 @@ func Execute(options Options) {
}
}
var wg sync.WaitGroup
if options.Store.Enable && len(options.Store.ResumeNodes) != 0 {
// TODO: extract this to a function and run it when you go offline
// TODO: determine if a store is listening to a topic
var peerIDs []peer.ID
for _, n := range options.Store.ResumeNodes {
pID, err := wakuNode.AddPeer(n, peers.Static, store.StoreID_v20beta4)
if err != nil {
logger.Warn("adding peer to peerstore", logging.MultiAddrs("peer", n), zap.Error(err))
}
peerIDs = append(peerIDs, pID)
}
for _, t := range options.Relay.Topics.Value() {
wg.Add(1)
go func(topic string) {
defer wg.Done()
ctxWithTimeout, ctxCancel := context.WithTimeout(ctx, 20*time.Second)
defer ctxCancel()
if _, err := wakuNode.Store().Resume(ctxWithTimeout, topic, peerIDs); err != nil {
logger.Error("Could not resume history", zap.Error(err))
}
}(t)
}
}
var rpcServer *rpc.WakuRpc
if options.RPCServer.Enable {
rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.RPCServer.Private, options.PProf, options.RPCServer.RelayCacheCapacity, logger)
rpcServer.Start()
}
var wg sync.WaitGroup
var restServer *rest.WakuRest
if options.RESTServer.Enable {
wg.Add(1)

View File

@ -373,7 +373,7 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error {
return true
}
nodeRS, err := enr.RelaySharding(d.localnode.Node().Record())
nodeRS, err := enr.RelaySharding(n.Record())
if err != nil || nodeRS == nil {
return false
}
@ -383,7 +383,7 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error {
}
// Contains any
for _, idx := range nodeRS.Indices {
for _, idx := range localRS.Indices {
if nodeRS.Contains(localRS.Cluster, idx) {
return true
}

View File

@ -14,7 +14,6 @@ import (
)
const maxAllowedPingFailures = 2
const maxPublishAttempt = 5
func disconnectPeers(host host.Host, logger *zap.Logger) {
logger.Warn("keep alive hasnt been executed recently. Killing all connections to peers")

View File

@ -9,7 +9,9 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
ma "github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/protocol"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
)
@ -56,6 +58,8 @@ func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.M
}
}
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
return wenr.Update(localnode, options...)
}
@ -269,5 +273,62 @@ func (w *WakuNode) setupENR(ctx context.Context, addrs []ma.Multiaddr) error {
return err
}
if w.Relay() != nil {
err = w.watchTopicShards(ctx)
if err != nil {
return err
}
}
return nil
}
func (w *WakuNode) watchTopicShards(ctx context.Context) error {
evtRelaySubscribed, err := w.Relay().Events().Subscribe(new(relay.EvtRelaySubscribed))
if err != nil {
return err
}
evtRelayUnsubscribed, err := w.Relay().Events().Subscribe(new(relay.EvtRelayUnsubscribed))
if err != nil {
return err
}
go func() {
defer evtRelaySubscribed.Close()
defer evtRelayUnsubscribed.Close()
for {
select {
case <-ctx.Done():
return
case <-evtRelayUnsubscribed.Out():
case <-evtRelaySubscribed.Out():
rs, err := protocol.TopicsToRelayShards(w.Relay().Topics()...)
if err != nil {
w.log.Warn("could not set ENR shard info", zap.Error(err))
continue
}
if len(rs) > 1 {
w.log.Warn("use sharded topics within the same cluster")
continue
}
if len(rs) == 1 {
w.log.Info("updating advertised relay shards in ENR")
err = wenr.Update(w.localNode, wenr.WithWakuRelaySharding(rs[0]))
if err != nil {
w.log.Warn("could not set ENR shard info", zap.Error(err))
continue
}
w.enrChangeCh <- struct{}{}
}
}
}
}()
return nil
}

View File

@ -2,7 +2,6 @@ package node
import (
"context"
"errors"
"fmt"
"math/rand"
"net"
@ -32,7 +31,6 @@ import (
"go.opencensus.io/stats"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/try"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/discv5"
"github.com/waku-org/go-waku/waku/v2/metrics"
@ -403,14 +401,6 @@ func (w *WakuNode) Start(ctx context.Context) error {
if err != nil {
return err
}
if !w.opts.noDefaultWakuTopic {
sub, err := w.Relay().Subscribe(ctx)
if err != nil {
return err
}
sub.Unsubscribe()
}
}
w.store = w.storeFactory(w)
@ -666,34 +656,6 @@ func (w *WakuNode) Broadcaster() relay.Broadcaster {
return w.bcaster
}
// Publish will attempt to publish a message via WakuRelay if there are enough
// peers available, otherwise it will attempt to publish via Lightpush protocol
func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error {
if !w.opts.enableLightPush && !w.opts.enableRelay {
return errors.New("cannot publish message, relay and lightpush are disabled")
}
hash := msg.Hash(relay.DefaultWakuTopic)
err := try.Do(func(attempt int) (bool, error) {
var err error
relay := w.Relay()
lightpush := w.Lightpush()
if relay == nil || !relay.EnoughPeersToPublish() {
w.log.Debug("publishing message via lightpush", logging.HexBytes("hash", hash))
_, err = lightpush.Publish(ctx, msg)
} else {
w.log.Debug("publishing message via relay", logging.HexBytes("hash", hash))
_, err = relay.Publish(ctx, msg)
}
return attempt < maxPublishAttempt, err
})
return err
}
func (w *WakuNode) mountDiscV5() error {
discV5Options := []discv5.DiscoveryV5Option{
discv5.WithBootnodes(w.opts.discV5bootnodes),
@ -718,33 +680,6 @@ func (w *WakuNode) startStore(ctx context.Context, sub relay.Subscription) error
return err
}
if len(w.opts.resumeNodes) != 0 {
// TODO: extract this to a function and run it when you go offline
// TODO: determine if a store is listening to a topic
var peerIDs []peer.ID
for _, n := range w.opts.resumeNodes {
pID, err := w.AddPeer(n, peers.Static, store.StoreID_v20beta4)
if err != nil {
w.log.Warn("adding peer to peerstore", logging.MultiAddrs("peer", n), zap.Error(err))
}
peerIDs = append(peerIDs, pID)
}
if !w.opts.noDefaultWakuTopic {
w.wg.Add(1)
go func() {
defer w.wg.Done()
ctxWithTimeout, ctxCancel := context.WithTimeout(ctx, 20*time.Second)
defer ctxCancel()
if _, err := w.store.(store.Store).Resume(ctxWithTimeout, string(relay.DefaultWakuTopic), peerIDs); err != nil {
w.log.Error("Could not resume history", zap.Error(err))
time.Sleep(10 * time.Second)
}
}()
}
}
return nil
}

View File

@ -154,7 +154,7 @@ func Test500(t *testing.T) {
msg := createTestMsg(0)
msg.Payload = int2Bytes(i)
msg.Timestamp = int64(i)
if err := wakuNode2.Publish(ctx, msg); err != nil {
if _, err := wakuNode2.Relay().Publish(ctx, msg); err != nil {
require.Fail(t, "Could not publish all messages")
}
time.Sleep(5 * time.Millisecond)
@ -182,6 +182,10 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
require.NoError(t, err)
defer wakuNode1.Stop()
subs, err := wakuNode1.Relay().Subscribe(ctx)
require.NoError(t, err)
subs.Unsubscribe()
// NODE2: Filter Client/Store
db, migration, err := sqlite.NewDB(":memory:")
require.NoError(t, err)
@ -230,7 +234,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
time.Sleep(500 * time.Millisecond)
if err := wakuNode1.Publish(ctx, msg); err != nil {
if _, err := wakuNode1.Relay().Publish(ctx, msg); err != nil {
require.Fail(t, "Could not publish all messages")
}

View File

@ -66,7 +66,6 @@ type WakuNodeParameters struct {
logger *zap.Logger
logLevel logging.LogLevel
noDefaultWakuTopic bool
enableRelay bool
enableLegacyFilter bool
isLegacyFilterFullnode bool
@ -79,7 +78,6 @@ type WakuNodeParameters struct {
minRelayPeersToPublish int
enableStore bool
resumeNodes []multiaddr.Multiaddr
messageProvider store.MessageProvider
rendezvousNodes []multiaddr.Multiaddr
@ -318,15 +316,6 @@ func WithPeerStore(ps peerstore.Peerstore) WakuNodeOption {
}
}
// NoDefaultWakuTopic will stop the node from subscribing to the default
// pubsub topic automatically
func NoDefaultWakuTopic() WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.noDefaultWakuTopic = true
return nil
}
}
// WithWakuRelay enables the Waku V2 Relay protocol. This WakuNodeOption
// accepts a list of WakuRelay gossipsub option to setup the protocol
func WithWakuRelay(opts ...pubsub.Option) WakuNodeOption {
@ -400,12 +389,10 @@ func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption {
}
// WithWakuStore enables the Waku V2 Store protocol and if the messages should
// be stored or not in a message provider. If resumeNodes are specified, the
// store will attempt to resume message history using those nodes
func WithWakuStore(resumeNodes ...multiaddr.Multiaddr) WakuNodeOption {
// be stored or not in a message provider.
func WithWakuStore() WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableStore = true
params.resumeNodes = resumeNodes
return nil
}
}

View File

@ -91,7 +91,6 @@ func WithUDPPort(udpPort uint) ENROption {
}
func Update(localnode *enode.LocalNode, enrOptions ...ENROption) error {
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
for _, opt := range enrOptions {
err := opt(localnode)
if err != nil {

View File

@ -8,12 +8,18 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"
)
func deleteShardingENREntries(localnode *enode.LocalNode) {
localnode.Delete(enr.WithEntry(ShardingBitVectorEnrField, struct{}{}))
localnode.Delete(enr.WithEntry(ShardingIndicesListEnrField, struct{}{}))
}
func WithWakuRelayShardingIndicesList(rs protocol.RelayShards) ENROption {
return func(localnode *enode.LocalNode) error {
value, err := rs.IndicesList()
if err != nil {
return err
}
deleteShardingENREntries(localnode)
localnode.Set(enr.WithEntry(ShardingIndicesListEnrField, value))
return nil
}
@ -21,6 +27,7 @@ func WithWakuRelayShardingIndicesList(rs protocol.RelayShards) ENROption {
func WithWakuRelayShardingBitVector(rs protocol.RelayShards) ENROption {
return func(localnode *enode.LocalNode) error {
deleteShardingENREntries(localnode)
localnode.Set(enr.WithEntry(ShardingBitVectorEnrField, rs.BitVector()))
return nil
}
@ -55,8 +62,11 @@ func WithWakuRelayShardingTopics(topics ...string) ENROption {
func RelayShardingIndicesList(record *enr.Record) (*protocol.RelayShards, error) {
var field []byte
if err := record.Load(enr.WithEntry(ShardingIndicesListEnrField, field)); err != nil {
return nil, nil
if err := record.Load(enr.WithEntry(ShardingIndicesListEnrField, &field)); err != nil {
if enr.IsNotFound(err) {
return nil, nil
}
return nil, err
}
res, err := protocol.FromIndicesList(field)
@ -69,7 +79,7 @@ func RelayShardingIndicesList(record *enr.Record) (*protocol.RelayShards, error)
func RelayShardingBitVector(record *enr.Record) (*protocol.RelayShards, error) {
var field []byte
if err := record.Load(enr.WithEntry(ShardingBitVectorEnrField, field)); err != nil {
if err := record.Load(enr.WithEntry(ShardingBitVectorEnrField, &field)); err != nil {
if enr.IsNotFound(err) {
return nil, nil
}

View File

@ -424,9 +424,7 @@ func (wf *WakuFilterLightnode) UnsubscribeAll(ctx context.Context, opts ...Filte
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
}
wf.subscriptions.Lock()
delete(wf.subscriptions.items, peerID)
defer wf.subscriptions.Unlock()
resultChan <- WakuFilterPushResult{
Err: err,

View File

@ -7,9 +7,11 @@ import (
"sync"
"time"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/zap"
@ -50,11 +52,27 @@ type WakuRelay struct {
wakuRelayTopics map[string]*pubsub.Topic
relaySubs map[string]*pubsub.Subscription
events event.Bus
emitters struct {
EvtRelaySubscribed event.Emitter
EvtRelayUnsubscribed event.Emitter
}
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// EvtRelaySubscribed is an event emitted when a new subscription to a pubsub topic is created
type EvtRelaySubscribed struct {
Topic string
}
// EvtRelayUnsubscribed is an event emitted when a subscription to a pubsub topic is closed
type EvtRelayUnsubscribed struct {
Topic string
}
func msgIdFn(pmsg *pubsub_pb.Message) string {
return string(hash.SHA256(pmsg.Data))
}
@ -69,6 +87,7 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou
w.minPeersToPublish = minPeersToPublish
w.wg = sync.WaitGroup{}
w.log = log.Named("relay")
w.events = eventbus.NewBus()
cfg := pubsub.DefaultGossipSubParams()
cfg.PruneBackoff = time.Minute
@ -200,6 +219,15 @@ func (w *WakuRelay) Start(ctx context.Context) error {
}
w.pubsub = ps
w.emitters.EvtRelaySubscribed, err = w.events.Emitter(new(EvtRelaySubscribed))
if err != nil {
return err
}
w.emitters.EvtRelayUnsubscribed, err = w.events.Emitter(new(EvtRelayUnsubscribed))
if err != nil {
return err
}
w.log.Info("Relay protocol started")
return nil
}
@ -234,8 +262,8 @@ func (w *WakuRelay) SetPubSub(pubSub *pubsub.PubSub) {
}
func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) {
defer w.topicsMutex.Unlock()
w.topicsMutex.Lock()
defer w.topicsMutex.Unlock()
pubSubTopic, ok := w.wakuRelayTopics[topic]
if !ok { // Joins topic if node hasn't joined yet
@ -267,7 +295,14 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro
if err != nil {
return nil, err
}
w.relaySubs[topic] = sub
err = w.emitters.EvtRelaySubscribed.Emit(EvtRelaySubscribed{topic})
if err != nil {
return nil, err
}
if w.bcaster != nil {
w.wg.Add(1)
go w.subscribeToTopic(topic, sub)
@ -328,7 +363,8 @@ func (w *WakuRelay) Stop() {
}
w.host.RemoveStreamHandler(WakuRelayID_v200)
w.emitters.EvtRelaySubscribed.Close()
w.emitters.EvtRelayUnsubscribed.Close()
w.cancel()
w.wg.Wait()
}
@ -384,6 +420,11 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error {
}
delete(w.wakuRelayTopics, topic)
err = w.emitters.EvtRelayUnsubscribed.Emit(EvtRelayUnsubscribed{topic})
if err != nil {
return err
}
return nil
}
@ -449,3 +490,8 @@ func (w *WakuRelay) subscribeToTopic(pubsubTopic string, sub *pubsub.Subscriptio
func (w *WakuRelay) Params() pubsub.GossipSubParams {
return w.params
}
// Events returns the event bus on which WakuRelay events will be emitted
func (w *WakuRelay) Events() event.Bus {
return w.events
}

View File

@ -53,8 +53,8 @@ func (rs RelayShards) Contains(cluster uint16, index uint16) bool {
}
found := false
for _, i := range rs.Indices {
if i == index {
for _, idx := range rs.Indices {
if idx == index {
found = true
}
}

View File

@ -159,7 +159,6 @@ func (n StaticShardingPubsubTopic) String() string {
func (s *StaticShardingPubsubTopic) Parse(topic string) error {
if !strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) {
fmt.Println(topic, StaticShardingPubsubTopicPrefix)
return ErrInvalidShardedTopicPrefix
}