diff --git a/flake.nix b/flake.nix index 1109421f..1421c7d4 100644 --- a/flake.nix +++ b/flake.nix @@ -28,7 +28,7 @@ ]; doCheck = false; # FIXME: This needs to be manually changed when updating modules. - vendorSha256 = "sha256-6VBZ0ilGcXhTXCoUmGdrRQqgnRwVJOtAe4JIMUCVw8Y="; + vendorSha256 = "sha256-TU/jog0MZNC4g13gaGm88gsKTRvmlcKkMeXZbaVf3fc="; # Fix for 'nix run' trying to execute 'go-waku'. meta = { mainProgram = "waku"; }; }; diff --git a/mobile/api.go b/mobile/api.go index 815725be..d588cd19 100644 --- a/mobile/api.go +++ b/mobile/api.go @@ -92,7 +92,6 @@ func NewNode(configJSON string) string { node.WithPrivateKey(prvKey), node.WithHostAddress(hostAddr), node.WithKeepAlive(time.Duration(*config.KeepAliveInterval) * time.Second), - node.NoDefaultWakuTopic(), } if *config.EnableRelay { diff --git a/waku/node.go b/waku/node.go index 70d9fc50..f779fb45 100644 --- a/waku/node.go +++ b/waku/node.go @@ -227,7 +227,6 @@ func Execute(options Options) { } if options.Store.Enable { - nodeOpts = append(nodeOpts, node.WithWakuStore(options.Store.ResumeNodes...)) nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore)) } @@ -314,27 +313,6 @@ func Execute(options Options) { addStaticPeers(wakuNode, options.Rendezvous.Nodes, rendezvous.RendezvousID) addStaticPeers(wakuNode, options.Filter.Nodes, filter.FilterSubscribeID_v20beta1) - if options.DiscV5.Enable { - if err = wakuNode.DiscV5().Start(ctx); err != nil { - logger.Fatal("starting discovery v5", zap.Error(err)) - } - } - - // retrieve and connect to peer exchange peers - if options.PeerExchange.Enable && options.PeerExchange.Node != nil { - logger.Info("retrieving peer info via peer exchange protocol") - - peerId, err := wakuNode.AddPeer(*options.PeerExchange.Node, peers.Static, peer_exchange.PeerExchangeID_v20alpha1) - if err != nil { - logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err)) - } else { - desiredOutDegree := wakuNode.Relay().Params().D - if err = wakuNode.PeerExchange().Request(ctx, desiredOutDegree, peer_exchange.WithPeer(peerId)); err != nil { - logger.Error("requesting peers via peer exchange", zap.Error(err)) - } - } - } - if len(options.Relay.Topics.Value()) == 0 { options.Relay.Topics = *cli.NewStringSlice(relay.DefaultWakuTopic) } @@ -398,6 +376,27 @@ func Execute(options Options) { }(ctx, n) } + if options.DiscV5.Enable { + if err = wakuNode.DiscV5().Start(ctx); err != nil { + logger.Fatal("starting discovery v5", zap.Error(err)) + } + } + + // retrieve and connect to peer exchange peers + if options.PeerExchange.Enable && options.PeerExchange.Node != nil { + logger.Info("retrieving peer info via peer exchange protocol") + + peerId, err := wakuNode.AddPeer(*options.PeerExchange.Node, peers.Static, peer_exchange.PeerExchangeID_v20alpha1) + if err != nil { + logger.Error("adding peer exchange peer", logging.MultiAddrs("node", *options.PeerExchange.Node), zap.Error(err)) + } else { + desiredOutDegree := wakuNode.Relay().Params().D + if err = wakuNode.PeerExchange().Request(ctx, desiredOutDegree, peer_exchange.WithPeer(peerId)); err != nil { + logger.Error("requesting peers via peer exchange", zap.Error(err)) + } + } + } + if len(discoveredNodes) != 0 { for _, n := range discoveredNodes { go func(ctx context.Context, info peer.AddrInfo) { @@ -412,14 +411,40 @@ func Execute(options Options) { } } + var wg sync.WaitGroup + + if options.Store.Enable && len(options.Store.ResumeNodes) != 0 { + // TODO: extract this to a function and run it when you go offline + // TODO: determine if a store is listening to a topic + + var peerIDs []peer.ID + for _, n := range options.Store.ResumeNodes { + pID, err := wakuNode.AddPeer(n, peers.Static, store.StoreID_v20beta4) + if err != nil { + logger.Warn("adding peer to peerstore", logging.MultiAddrs("peer", n), zap.Error(err)) + } + peerIDs = append(peerIDs, pID) + } + + for _, t := range options.Relay.Topics.Value() { + wg.Add(1) + go func(topic string) { + defer wg.Done() + ctxWithTimeout, ctxCancel := context.WithTimeout(ctx, 20*time.Second) + defer ctxCancel() + if _, err := wakuNode.Store().Resume(ctxWithTimeout, topic, peerIDs); err != nil { + logger.Error("Could not resume history", zap.Error(err)) + } + }(t) + } + } + var rpcServer *rpc.WakuRpc if options.RPCServer.Enable { rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.RPCServer.Private, options.PProf, options.RPCServer.RelayCacheCapacity, logger) rpcServer.Start() } - var wg sync.WaitGroup - var restServer *rest.WakuRest if options.RESTServer.Enable { wg.Add(1) diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index ae358caf..38254570 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -373,7 +373,7 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error { return true } - nodeRS, err := enr.RelaySharding(d.localnode.Node().Record()) + nodeRS, err := enr.RelaySharding(n.Record()) if err != nil || nodeRS == nil { return false } @@ -383,7 +383,7 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error { } // Contains any - for _, idx := range nodeRS.Indices { + for _, idx := range localRS.Indices { if nodeRS.Contains(localRS.Cluster, idx) { return true } diff --git a/waku/v2/node/keepalive.go b/waku/v2/node/keepalive.go index 536c7b75..847bc4fa 100644 --- a/waku/v2/node/keepalive.go +++ b/waku/v2/node/keepalive.go @@ -14,7 +14,6 @@ import ( ) const maxAllowedPingFailures = 2 -const maxPublishAttempt = 5 func disconnectPeers(host host.Host, logger *zap.Logger) { logger.Warn("keep alive hasnt been executed recently. Killing all connections to peers") diff --git a/waku/v2/node/localnode.go b/waku/v2/node/localnode.go index dd9fb7f4..93c1303e 100644 --- a/waku/v2/node/localnode.go +++ b/waku/v2/node/localnode.go @@ -9,7 +9,9 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" ma "github.com/multiformats/go-multiaddr" + "github.com/waku-org/go-waku/waku/v2/protocol" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "go.uber.org/zap" ) @@ -56,6 +58,8 @@ func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.M } } + localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) + return wenr.Update(localnode, options...) } @@ -269,5 +273,62 @@ func (w *WakuNode) setupENR(ctx context.Context, addrs []ma.Multiaddr) error { return err } + if w.Relay() != nil { + err = w.watchTopicShards(ctx) + if err != nil { + return err + } + } + + return nil + +} + +func (w *WakuNode) watchTopicShards(ctx context.Context) error { + evtRelaySubscribed, err := w.Relay().Events().Subscribe(new(relay.EvtRelaySubscribed)) + if err != nil { + return err + } + + evtRelayUnsubscribed, err := w.Relay().Events().Subscribe(new(relay.EvtRelayUnsubscribed)) + if err != nil { + return err + } + + go func() { + defer evtRelaySubscribed.Close() + defer evtRelayUnsubscribed.Close() + + for { + select { + case <-ctx.Done(): + return + case <-evtRelayUnsubscribed.Out(): + case <-evtRelaySubscribed.Out(): + rs, err := protocol.TopicsToRelayShards(w.Relay().Topics()...) + if err != nil { + w.log.Warn("could not set ENR shard info", zap.Error(err)) + continue + } + + if len(rs) > 1 { + w.log.Warn("use sharded topics within the same cluster") + continue + } + + if len(rs) == 1 { + w.log.Info("updating advertised relay shards in ENR") + err = wenr.Update(w.localNode, wenr.WithWakuRelaySharding(rs[0])) + if err != nil { + w.log.Warn("could not set ENR shard info", zap.Error(err)) + continue + } + + w.enrChangeCh <- struct{}{} + } + } + } + }() + return nil } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 4729d431..08d2a1c2 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -2,7 +2,6 @@ package node import ( "context" - "errors" "fmt" "math/rand" "net" @@ -32,7 +31,6 @@ import ( "go.opencensus.io/stats" "github.com/waku-org/go-waku/logging" - "github.com/waku-org/go-waku/waku/try" v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/discv5" "github.com/waku-org/go-waku/waku/v2/metrics" @@ -403,14 +401,6 @@ func (w *WakuNode) Start(ctx context.Context) error { if err != nil { return err } - - if !w.opts.noDefaultWakuTopic { - sub, err := w.Relay().Subscribe(ctx) - if err != nil { - return err - } - sub.Unsubscribe() - } } w.store = w.storeFactory(w) @@ -666,34 +656,6 @@ func (w *WakuNode) Broadcaster() relay.Broadcaster { return w.bcaster } -// Publish will attempt to publish a message via WakuRelay if there are enough -// peers available, otherwise it will attempt to publish via Lightpush protocol -func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error { - if !w.opts.enableLightPush && !w.opts.enableRelay { - return errors.New("cannot publish message, relay and lightpush are disabled") - } - - hash := msg.Hash(relay.DefaultWakuTopic) - err := try.Do(func(attempt int) (bool, error) { - var err error - - relay := w.Relay() - lightpush := w.Lightpush() - - if relay == nil || !relay.EnoughPeersToPublish() { - w.log.Debug("publishing message via lightpush", logging.HexBytes("hash", hash)) - _, err = lightpush.Publish(ctx, msg) - } else { - w.log.Debug("publishing message via relay", logging.HexBytes("hash", hash)) - _, err = relay.Publish(ctx, msg) - } - - return attempt < maxPublishAttempt, err - }) - - return err -} - func (w *WakuNode) mountDiscV5() error { discV5Options := []discv5.DiscoveryV5Option{ discv5.WithBootnodes(w.opts.discV5bootnodes), @@ -718,33 +680,6 @@ func (w *WakuNode) startStore(ctx context.Context, sub relay.Subscription) error return err } - if len(w.opts.resumeNodes) != 0 { - // TODO: extract this to a function and run it when you go offline - // TODO: determine if a store is listening to a topic - - var peerIDs []peer.ID - for _, n := range w.opts.resumeNodes { - pID, err := w.AddPeer(n, peers.Static, store.StoreID_v20beta4) - if err != nil { - w.log.Warn("adding peer to peerstore", logging.MultiAddrs("peer", n), zap.Error(err)) - } - peerIDs = append(peerIDs, pID) - } - - if !w.opts.noDefaultWakuTopic { - w.wg.Add(1) - go func() { - defer w.wg.Done() - - ctxWithTimeout, ctxCancel := context.WithTimeout(ctx, 20*time.Second) - defer ctxCancel() - if _, err := w.store.(store.Store).Resume(ctxWithTimeout, string(relay.DefaultWakuTopic), peerIDs); err != nil { - w.log.Error("Could not resume history", zap.Error(err)) - time.Sleep(10 * time.Second) - } - }() - } - } return nil } diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index e2204c65..3c8278c8 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -154,7 +154,7 @@ func Test500(t *testing.T) { msg := createTestMsg(0) msg.Payload = int2Bytes(i) msg.Timestamp = int64(i) - if err := wakuNode2.Publish(ctx, msg); err != nil { + if _, err := wakuNode2.Relay().Publish(ctx, msg); err != nil { require.Fail(t, "Could not publish all messages") } time.Sleep(5 * time.Millisecond) @@ -182,6 +182,10 @@ func TestDecoupledStoreFromRelay(t *testing.T) { require.NoError(t, err) defer wakuNode1.Stop() + subs, err := wakuNode1.Relay().Subscribe(ctx) + require.NoError(t, err) + subs.Unsubscribe() + // NODE2: Filter Client/Store db, migration, err := sqlite.NewDB(":memory:") require.NoError(t, err) @@ -230,7 +234,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) { time.Sleep(500 * time.Millisecond) - if err := wakuNode1.Publish(ctx, msg); err != nil { + if _, err := wakuNode1.Relay().Publish(ctx, msg); err != nil { require.Fail(t, "Could not publish all messages") } diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 47652ba3..5b39af51 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -66,7 +66,6 @@ type WakuNodeParameters struct { logger *zap.Logger logLevel logging.LogLevel - noDefaultWakuTopic bool enableRelay bool enableLegacyFilter bool isLegacyFilterFullnode bool @@ -79,7 +78,6 @@ type WakuNodeParameters struct { minRelayPeersToPublish int enableStore bool - resumeNodes []multiaddr.Multiaddr messageProvider store.MessageProvider rendezvousNodes []multiaddr.Multiaddr @@ -318,15 +316,6 @@ func WithPeerStore(ps peerstore.Peerstore) WakuNodeOption { } } -// NoDefaultWakuTopic will stop the node from subscribing to the default -// pubsub topic automatically -func NoDefaultWakuTopic() WakuNodeOption { - return func(params *WakuNodeParameters) error { - params.noDefaultWakuTopic = true - return nil - } -} - // WithWakuRelay enables the Waku V2 Relay protocol. This WakuNodeOption // accepts a list of WakuRelay gossipsub option to setup the protocol func WithWakuRelay(opts ...pubsub.Option) WakuNodeOption { @@ -400,12 +389,10 @@ func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption { } // WithWakuStore enables the Waku V2 Store protocol and if the messages should -// be stored or not in a message provider. If resumeNodes are specified, the -// store will attempt to resume message history using those nodes -func WithWakuStore(resumeNodes ...multiaddr.Multiaddr) WakuNodeOption { +// be stored or not in a message provider. +func WithWakuStore() WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableStore = true - params.resumeNodes = resumeNodes return nil } } diff --git a/waku/v2/protocol/enr/localnode.go b/waku/v2/protocol/enr/localnode.go index e7a92dfe..27ebe4b8 100644 --- a/waku/v2/protocol/enr/localnode.go +++ b/waku/v2/protocol/enr/localnode.go @@ -91,7 +91,6 @@ func WithUDPPort(udpPort uint) ENROption { } func Update(localnode *enode.LocalNode, enrOptions ...ENROption) error { - localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) for _, opt := range enrOptions { err := opt(localnode) if err != nil { diff --git a/waku/v2/protocol/enr/shards.go b/waku/v2/protocol/enr/shards.go index ede1fb97..fe0c6680 100644 --- a/waku/v2/protocol/enr/shards.go +++ b/waku/v2/protocol/enr/shards.go @@ -8,12 +8,18 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" ) +func deleteShardingENREntries(localnode *enode.LocalNode) { + localnode.Delete(enr.WithEntry(ShardingBitVectorEnrField, struct{}{})) + localnode.Delete(enr.WithEntry(ShardingIndicesListEnrField, struct{}{})) +} + func WithWakuRelayShardingIndicesList(rs protocol.RelayShards) ENROption { return func(localnode *enode.LocalNode) error { value, err := rs.IndicesList() if err != nil { return err } + deleteShardingENREntries(localnode) localnode.Set(enr.WithEntry(ShardingIndicesListEnrField, value)) return nil } @@ -21,6 +27,7 @@ func WithWakuRelayShardingIndicesList(rs protocol.RelayShards) ENROption { func WithWakuRelayShardingBitVector(rs protocol.RelayShards) ENROption { return func(localnode *enode.LocalNode) error { + deleteShardingENREntries(localnode) localnode.Set(enr.WithEntry(ShardingBitVectorEnrField, rs.BitVector())) return nil } @@ -55,8 +62,11 @@ func WithWakuRelayShardingTopics(topics ...string) ENROption { func RelayShardingIndicesList(record *enr.Record) (*protocol.RelayShards, error) { var field []byte - if err := record.Load(enr.WithEntry(ShardingIndicesListEnrField, field)); err != nil { - return nil, nil + if err := record.Load(enr.WithEntry(ShardingIndicesListEnrField, &field)); err != nil { + if enr.IsNotFound(err) { + return nil, nil + } + return nil, err } res, err := protocol.FromIndicesList(field) @@ -69,7 +79,7 @@ func RelayShardingIndicesList(record *enr.Record) (*protocol.RelayShards, error) func RelayShardingBitVector(record *enr.Record) (*protocol.RelayShards, error) { var field []byte - if err := record.Load(enr.WithEntry(ShardingBitVectorEnrField, field)); err != nil { + if err := record.Load(enr.WithEntry(ShardingBitVectorEnrField, &field)); err != nil { if enr.IsNotFound(err) { return nil, nil } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 083791e5..c185f622 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -424,9 +424,7 @@ func (wf *WakuFilterLightnode) UnsubscribeAll(ctx context.Context, opts ...Filte wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) } - wf.subscriptions.Lock() delete(wf.subscriptions.items, peerID) - defer wf.subscriptions.Unlock() resultChan <- WakuFilterPushResult{ Err: err, diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 898e0302..83eaa5ac 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -7,9 +7,11 @@ import ( "sync" "time" + "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" "go.opencensus.io/stats" "go.opencensus.io/tag" "go.uber.org/zap" @@ -50,11 +52,27 @@ type WakuRelay struct { wakuRelayTopics map[string]*pubsub.Topic relaySubs map[string]*pubsub.Subscription + events event.Bus + emitters struct { + EvtRelaySubscribed event.Emitter + EvtRelayUnsubscribed event.Emitter + } + ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } +// EvtRelaySubscribed is an event emitted when a new subscription to a pubsub topic is created +type EvtRelaySubscribed struct { + Topic string +} + +// EvtRelayUnsubscribed is an event emitted when a subscription to a pubsub topic is closed +type EvtRelayUnsubscribed struct { + Topic string +} + func msgIdFn(pmsg *pubsub_pb.Message) string { return string(hash.SHA256(pmsg.Data)) } @@ -69,6 +87,7 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou w.minPeersToPublish = minPeersToPublish w.wg = sync.WaitGroup{} w.log = log.Named("relay") + w.events = eventbus.NewBus() cfg := pubsub.DefaultGossipSubParams() cfg.PruneBackoff = time.Minute @@ -200,6 +219,15 @@ func (w *WakuRelay) Start(ctx context.Context) error { } w.pubsub = ps + w.emitters.EvtRelaySubscribed, err = w.events.Emitter(new(EvtRelaySubscribed)) + if err != nil { + return err + } + w.emitters.EvtRelayUnsubscribed, err = w.events.Emitter(new(EvtRelayUnsubscribed)) + if err != nil { + return err + } + w.log.Info("Relay protocol started") return nil } @@ -234,8 +262,8 @@ func (w *WakuRelay) SetPubSub(pubSub *pubsub.PubSub) { } func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) { - defer w.topicsMutex.Unlock() w.topicsMutex.Lock() + defer w.topicsMutex.Unlock() pubSubTopic, ok := w.wakuRelayTopics[topic] if !ok { // Joins topic if node hasn't joined yet @@ -267,7 +295,14 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro if err != nil { return nil, err } + w.relaySubs[topic] = sub + + err = w.emitters.EvtRelaySubscribed.Emit(EvtRelaySubscribed{topic}) + if err != nil { + return nil, err + } + if w.bcaster != nil { w.wg.Add(1) go w.subscribeToTopic(topic, sub) @@ -328,7 +363,8 @@ func (w *WakuRelay) Stop() { } w.host.RemoveStreamHandler(WakuRelayID_v200) - + w.emitters.EvtRelaySubscribed.Close() + w.emitters.EvtRelayUnsubscribed.Close() w.cancel() w.wg.Wait() } @@ -384,6 +420,11 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error { } delete(w.wakuRelayTopics, topic) + err = w.emitters.EvtRelayUnsubscribed.Emit(EvtRelayUnsubscribed{topic}) + if err != nil { + return err + } + return nil } @@ -449,3 +490,8 @@ func (w *WakuRelay) subscribeToTopic(pubsubTopic string, sub *pubsub.Subscriptio func (w *WakuRelay) Params() pubsub.GossipSubParams { return w.params } + +// Events returns the event bus on which WakuRelay events will be emitted +func (w *WakuRelay) Events() event.Bus { + return w.events +} diff --git a/waku/v2/protocol/shard.go b/waku/v2/protocol/shard.go index 184817b8..5e0bc64c 100644 --- a/waku/v2/protocol/shard.go +++ b/waku/v2/protocol/shard.go @@ -53,8 +53,8 @@ func (rs RelayShards) Contains(cluster uint16, index uint16) bool { } found := false - for _, i := range rs.Indices { - if i == index { + for _, idx := range rs.Indices { + if idx == index { found = true } } diff --git a/waku/v2/protocol/topic.go b/waku/v2/protocol/topic.go index abbdc7b1..dd16c136 100644 --- a/waku/v2/protocol/topic.go +++ b/waku/v2/protocol/topic.go @@ -159,7 +159,6 @@ func (n StaticShardingPubsubTopic) String() string { func (s *StaticShardingPubsubTopic) Parse(topic string) error { if !strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) { - fmt.Println(topic, StaticShardingPubsubTopicPrefix) return ErrInvalidShardedTopicPrefix }