feat: wakuV2 light client using lightpush / filter (#2351)

This commit is contained in:
RichΛrd 2021-09-23 12:17:57 -04:00
parent 71555991db
commit da63894a5a
11 changed files with 206 additions and 87 deletions

View File

@ -1 +1 @@
0.88.4 0.88.5

2
go.mod
View File

@ -47,7 +47,7 @@ require (
github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect
github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a
github.com/status-im/doubleratchet v3.0.0+incompatible github.com/status-im/doubleratchet v3.0.0+incompatible
github.com/status-im/go-waku v0.0.0-20210918141919-49f216d40c4a github.com/status-im/go-waku v0.0.0-20210927124718-6c4a74fb9cbf
github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a
github.com/status-im/markdown v0.0.0-20201022101546-c0cbdd5763bf github.com/status-im/markdown v0.0.0-20201022101546-c0cbdd5763bf
github.com/status-im/migrate/v4 v4.6.2-status.2 github.com/status-im/migrate/v4 v4.6.2-status.2

4
go.sum
View File

@ -1102,8 +1102,8 @@ github.com/status-im/go-ethereum v1.10.4-status.2 h1:uvcD2U7skYqPQviARFb4w3wZyFS
github.com/status-im/go-ethereum v1.10.4-status.2/go.mod h1:GvIhpdCOgMHI6i5xVPEZOrv/qSMeOFHbZh77AoyZUoE= github.com/status-im/go-ethereum v1.10.4-status.2/go.mod h1:GvIhpdCOgMHI6i5xVPEZOrv/qSMeOFHbZh77AoyZUoE=
github.com/status-im/go-multiaddr-ethv4 v1.2.0 h1:OT84UsUzTCwguqCpJqkrCMiL4VZ1SvUtH9a5MsZupBk= github.com/status-im/go-multiaddr-ethv4 v1.2.0 h1:OT84UsUzTCwguqCpJqkrCMiL4VZ1SvUtH9a5MsZupBk=
github.com/status-im/go-multiaddr-ethv4 v1.2.0/go.mod h1:2VQ3C+9zEurcceasz12gPAtmEzCeyLUGPeKLSXYQKHo= github.com/status-im/go-multiaddr-ethv4 v1.2.0/go.mod h1:2VQ3C+9zEurcceasz12gPAtmEzCeyLUGPeKLSXYQKHo=
github.com/status-im/go-waku v0.0.0-20210918141919-49f216d40c4a h1:PQ/S9OaV3I+peTU0YC7q8/AImudIKfuLNDfMzf+w8DY= github.com/status-im/go-waku v0.0.0-20210927124718-6c4a74fb9cbf h1:r4TenmNYnine3l1qGFOQW74s+27M0HlLipQxxZ3PJbI=
github.com/status-im/go-waku v0.0.0-20210918141919-49f216d40c4a/go.mod h1:XK6wGIMnxhpx9SQLDV9Qw0zfXTjd8jjw6DXGC0mKvA8= github.com/status-im/go-waku v0.0.0-20210927124718-6c4a74fb9cbf/go.mod h1:XK6wGIMnxhpx9SQLDV9Qw0zfXTjd8jjw6DXGC0mKvA8=
github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a h1:eCna/q/PuZVqtmOMBqytw9yzZwMNKpao4au0OJDvesI= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a h1:eCna/q/PuZVqtmOMBqytw9yzZwMNKpao4au0OJDvesI=
github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=

View File

@ -248,8 +248,12 @@ func (b *StatusNode) wakuV2Service(nodeKey string, wakuCfg *params.WakuV2Config,
SoftBlacklistedPeerIDs: wakuCfg.SoftBlacklistedPeerIDs, SoftBlacklistedPeerIDs: wakuCfg.SoftBlacklistedPeerIDs,
Host: wakuCfg.Host, Host: wakuCfg.Host,
Port: wakuCfg.Port, Port: wakuCfg.Port,
BootNodes: clusterCfg.WakuNodes, LightClient: wakuCfg.LightClient,
StoreNodes: clusterCfg.WakuStoreNodes, KeepAliveInterval: wakuCfg.KeepAliveInterval,
RelayNodes: clusterCfg.RelayNodes,
StoreNodes: clusterCfg.StoreNodes,
FilterNodes: clusterCfg.FilterNodes,
LightpushNodes: clusterCfg.LightpushNodes,
} }
if cfg.Host == "" { if cfg.Host == "" {

View File

@ -163,7 +163,10 @@ type WakuV2Config struct {
// Port number in which to start libp2p protocol (0 for random) // Port number in which to start libp2p protocol (0 for random)
Port int Port int
// LightClient should be true if the node should start with an empty bloom filter and not forward messages from other nodes // Interval of time in seconds to send a ping to peers to keep the connection to them alive
KeepAliveInterval int
// LightClient should be true if the node will not relay messages and only rely on lightpush/filter nodes
LightClient bool LightClient bool
// FullNode should be true if waku should always acta as a full node // FullNode should be true if waku should always acta as a full node
@ -274,11 +277,17 @@ type ClusterConfig struct {
// RendezvousNodes is a list rendezvous discovery nodes. // RendezvousNodes is a list rendezvous discovery nodes.
RendezvousNodes []string RendezvousNodes []string
// WakuNodes is a list of wakuv2 libp2p nodes // RelayNodes is a list of wakuv2 relay nodes (libp2p)
WakuNodes []string RelayNodes []string
// WakuStoreNodes is a list of wakuv2 store nodes // StoreNodes is a list of wakuv2 store nodes (libp2p)
WakuStoreNodes []string StoreNodes []string
// FilterNodes is a list of wakuv2 filter nodes (libp2p)
FilterNodes []string
// LightpushNodes is a list of wakuv2 lightpush nodes (libp2p)
LightpushNodes []string
} }
// String dumps config object as nicely indented JSON // String dumps config object as nicely indented JSON

View File

@ -307,11 +307,10 @@ func (w *WakuNode) ID() string {
return w.host.ID().Pretty() return w.host.ID().Pretty()
} }
func (w *WakuNode) GetPeerStats() PeerStats {
return w.peers
}
func (w *WakuNode) IsOnline() bool { func (w *WakuNode) IsOnline() bool {
w.peersMutex.Lock()
defer w.peersMutex.Unlock()
hasRelay := false hasRelay := false
hasLightPush := false hasLightPush := false
hasStore := false hasStore := false
@ -340,6 +339,9 @@ func (w *WakuNode) IsOnline() bool {
} }
func (w *WakuNode) HasHistory() bool { func (w *WakuNode) HasHistory() bool {
w.peersMutex.Lock()
defer w.peersMutex.Unlock()
for _, v := range w.peers { for _, v := range w.peers {
for _, protocol := range v { for _, protocol := range v {
if protocol == string(store.WakuStoreProtocolId) { if protocol == string(store.WakuStoreProtocolId) {
@ -721,11 +723,6 @@ func (w *WakuNode) DialPeerByID(peerID peer.ID) error {
return w.connect(info) return w.connect(info)
} }
func (w *WakuNode) DialPeerByID(peerID peer.ID) error {
info := w.host.Peerstore().PeerInfo(peerID)
return w.connect(info)
}
func (w *WakuNode) ClosePeerByAddress(address string) error { func (w *WakuNode) ClosePeerByAddress(address string) error {
p, err := ma.NewMultiaddr(address) p, err := ma.NewMultiaddr(address)
if err != nil { if err != nil {
@ -756,6 +753,8 @@ func (w *WakuNode) ClosePeerById(id peer.ID) error {
} }
func (w *WakuNode) PeerCount() int { func (w *WakuNode) PeerCount() int {
w.peersMutex.Lock()
defer w.peersMutex.Unlock()
return len(w.peers) return len(w.peers)
} }
@ -771,6 +770,7 @@ func (w *WakuNode) Peers() PeerStats {
} }
func (w *WakuNode) startKeepAlive(t time.Duration) { func (w *WakuNode) startKeepAlive(t time.Duration) {
log.Info("Setting up ping protocol with duration of ", t) log.Info("Setting up ping protocol with duration of ", t)
w.ping = ping.NewPingService(w.host) w.ping = ping.NewPingService(w.host)
@ -804,15 +804,16 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
go func(peer peer.ID) { go func(peer peer.ID) {
peerFound := false peerFound := false
w.peersMutex.Lock()
for p := range w.peers { for p := range w.peers {
if p == peer { if p == peer {
peerFound = true peerFound = true
break break
} }
} }
defer w.peersMutex.Unlock()
log.Debug("###PING before fetching result")
//log.Info("###PING " + s + " before fetching result")
//logwriter.Write([]byte("###PING " + s + " before fetching result"))
pingTicker := time.NewTicker(time.Duration(1) * time.Second) pingTicker := time.NewTicker(time.Duration(1) * time.Second)
isError := false isError := false
select { select {

View File

@ -96,12 +96,10 @@ func WithWakuRelay(opts ...wakurelay.Option) WakuNodeOption {
} }
} }
// WithWakuFilter enables the Waku V2 Filter protocol. This WakuNodeOption // WithWakuFilter enables the Waku V2 Filter protocol.
// accepts a list of WakuFilter gossipsub options to setup the protocol func WithWakuFilter() WakuNodeOption {
func WithWakuFilter(opts ...wakurelay.Option) WakuNodeOption {
return func(params *WakuNodeParameters) error { return func(params *WakuNodeParameters) error {
params.enableFilter = true params.enableFilter = true
params.wOpts = opts
return nil return nil
} }
} }

View File

@ -95,22 +95,28 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
return return
} }
log.Info(fmt.Sprintf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) log.Info(fmt.Sprintf("%s: received request from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer()))
stats.Record(wf.ctx, metrics.Messages.M(1)) stats.Record(wf.ctx, metrics.Messages.M(1))
if filterRPCRequest.Request != nil { if filterRPCRequest.Push != nil && len(filterRPCRequest.Push.Messages) > 0 {
// We're on a light node.
// This is a message push coming from a full node.
log.Info("filter light node, received a message push. ", len(filterRPCRequest.Push.Messages), " messages")
wf.pushHandler(filterRPCRequest.RequestId, *filterRPCRequest.Push)
} else if filterRPCRequest.Request != nil {
// We're on a full node. // We're on a full node.
// This is a filter request coming from a light node. // This is a filter request coming from a light node.
if filterRPCRequest.Request.Subscribe { if filterRPCRequest.Request.Subscribe {
subscriber := Subscriber{peer: string(s.Conn().RemotePeer()), requestId: filterRPCRequest.RequestId, filter: *filterRPCRequest.Request} subscriber := Subscriber{peer: string(s.Conn().RemotePeer()), requestId: filterRPCRequest.RequestId, filter: *filterRPCRequest.Request}
wf.subscribers = append(wf.subscribers, subscriber) wf.subscribers = append(wf.subscribers, subscriber)
log.Info("Full node, add a filter subscriber ", subscriber) log.Info("filter full node, add a filter subscriber: ", subscriber.peer)
stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers)))) stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers))))
} else { } else {
peerId := string(s.Conn().RemotePeer()) peerId := string(s.Conn().RemotePeer())
log.Info("Full node, remove a filter subscriber ", peerId) log.Info("filter full node, remove a filter subscriber: ", peerId)
contentFilters := filterRPCRequest.Request.ContentFilters contentFilters := filterRPCRequest.Request.ContentFilters
var peerIdsToRemove []string var peerIdsToRemove []string
for _, subscriber := range wf.subscribers { for _, subscriber := range wf.subscribers {
@ -151,20 +157,13 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers)))) stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers))))
} }
} else if filterRPCRequest.Push != nil {
// We're on a light node.
// This is a message push coming from a full node.
log.Info("Light node, received a message push ", *filterRPCRequest.Push)
wf.pushHandler(filterRPCRequest.RequestId, *filterRPCRequest.Push)
} }
} }
func (wf *WakuFilter) peerListener() { func (wf *WakuFilter) peerListener() {
for e := range wf.peerChan { for e := range wf.peerChan {
if e.Connectedness == network.NotConnected { if e.Connectedness == network.NotConnected {
log.Info("Filter Notification received ", e.Peer) log.Info("filter Notification received ", e.Peer)
i := 0 i := 0
// Delete subscribers matching deleted peer // Delete subscribers matching deleted peer
for _, s := range wf.subscribers { for _, s := range wf.subscribers {
@ -174,7 +173,7 @@ func (wf *WakuFilter) peerListener() {
} }
} }
log.Info("Filter, deleted subscribers: ", len(wf.subscribers)-i) log.Info("filter, deleted subscribers: ", len(wf.subscribers)-i)
wf.subscribers = wf.subscribers[:i] wf.subscribers = wf.subscribers[:i]
} }
} }
@ -217,20 +216,22 @@ func (wf *WakuFilter) FilterListener() {
for _, filter := range subscriber.filter.ContentFilters { for _, filter := range subscriber.filter.ContentFilters {
if msg.ContentTopic == filter.ContentTopic { if msg.ContentTopic == filter.ContentTopic {
log.Info("Found matching contentTopic ", filter, msg) log.Info("found matching contentTopic ", filter, msg)
msgArr := []*pb.WakuMessage{msg} msgArr := []*pb.WakuMessage{msg}
// Do a message push to light node // Do a message push to light node
pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: msgArr}} pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: msgArr}}
log.Info("Pushing a message to light node: ", pushRPC) log.Info("pushing a message to light node: ", pushRPC)
conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), WakuFilterProtocolId) conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), WakuFilterProtocolId)
if err != nil { if err != nil {
// @TODO more sophisticated error handling here // @TODO more sophisticated error handling here
log.Error("Failed to open peer stream") log.Error("failed to open peer stream")
//waku_filter_errors.inc(labelValues = [dialFailure]) //waku_filter_errors.inc(labelValues = [dialFailure])
return err return err
} }
defer conn.Close()
writer := protoio.NewDelimitedWriter(conn) writer := protoio.NewDelimitedWriter(conn)
err = writer.WriteMsg(pushRPC) err = writer.WriteMsg(pushRPC)
if err != nil { if err != nil {
@ -262,12 +263,14 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) (
conn, err := wf.h.NewStream(ctx, *peer, WakuFilterProtocolId) conn, err := wf.h.NewStream(ctx, *peer, WakuFilterProtocolId)
if conn != nil { if conn != nil {
defer conn.Close()
// This is the only successful path to subscription // This is the only successful path to subscription
id := protocol.GenerateRequestId() id := protocol.GenerateRequestId()
writer := protoio.NewDelimitedWriter(conn) writer := protoio.NewDelimitedWriter(conn)
filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request} filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request}
log.Info("Sending filterRPC: ", filterRPC) log.Info("sending filterRPC: ", filterRPC)
err = writer.WriteMsg(filterRPC) err = writer.WriteMsg(filterRPC)
if err != nil { if err != nil {
log.Error("failed to write message", err) log.Error("failed to write message", err)
@ -281,7 +284,7 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) (
return "", err return "", err
} }
} else { } else {
log.Info("Error selecting peer: ", err) log.Info("error selecting peer: ", err)
} }
return "", nil return "", nil
@ -294,6 +297,8 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, request pb.FilterRequest)
conn, err := wf.h.NewStream(ctx, *peer, WakuFilterProtocolId) conn, err := wf.h.NewStream(ctx, *peer, WakuFilterProtocolId)
if conn != nil { if conn != nil {
defer conn.Close()
// This is the only successful path to subscription // This is the only successful path to subscription
id := protocol.GenerateRequestId() id := protocol.GenerateRequestId()
@ -303,7 +308,6 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, request pb.FilterRequest)
if err != nil { if err != nil {
log.Error("failed to write message", err) log.Error("failed to write message", err)
} }
//return some(id)
} else { } else {
// @TODO more sophisticated error handling here // @TODO more sophisticated error handling here
log.Error("failed to connect to remote peer", err) log.Error("failed to connect to remote peer", err)

2
vendor/modules.txt vendored
View File

@ -426,7 +426,7 @@ github.com/spacemonkeygo/spacelog
github.com/status-im/doubleratchet github.com/status-im/doubleratchet
# github.com/status-im/go-multiaddr-ethv4 v1.2.0 # github.com/status-im/go-multiaddr-ethv4 v1.2.0
github.com/status-im/go-multiaddr-ethv4 github.com/status-im/go-multiaddr-ethv4
# github.com/status-im/go-waku v0.0.0-20210918141919-49f216d40c4a # github.com/status-im/go-waku v0.0.0-20210927124718-6c4a74fb9cbf
github.com/status-im/go-waku/waku/v2/metrics github.com/status-im/go-waku/waku/v2/metrics
github.com/status-im/go-waku/waku/v2/node github.com/status-im/go-waku/waku/v2/node
github.com/status-im/go-waku/waku/v2/protocol github.com/status-im/go-waku/waku/v2/protocol

View File

@ -28,12 +28,17 @@ type Config struct {
SoftBlacklistedPeerIDs []string `toml:",omitempty"` SoftBlacklistedPeerIDs []string `toml:",omitempty"`
Host string `toml:",omitempty"` Host string `toml:",omitempty"`
Port int `toml:",omitempty"` Port int `toml:",omitempty"`
BootNodes []string `toml:",omitempty"` KeepAliveInterval int `toml:",omitempty"`
LightClient bool `toml:",omitempty"`
RelayNodes []string `toml:",omitempty"`
StoreNodes []string `toml:",omitempty"` StoreNodes []string `toml:",omitempty"`
FilterNodes []string `toml:",omitempty"`
LightpushNodes []string `toml:",omitempty"`
} }
var DefaultConfig = Config{ var DefaultConfig = Config{
MaxMessageSize: common.DefaultMaxMessageSize, MaxMessageSize: common.DefaultMaxMessageSize,
Host: "0.0.0.0", Host: "0.0.0.0",
Port: 60000, Port: 60000,
KeepAliveInterval: 1, // second
} }

View File

@ -47,6 +47,7 @@ import (
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/metrics" "github.com/libp2p/go-libp2p-core/metrics"
"github.com/status-im/go-waku/waku/v2/protocol"
wakuprotocol "github.com/status-im/go-waku/waku/v2/protocol" wakuprotocol "github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/protocol/relay"
@ -63,6 +64,7 @@ import (
const messageQueueLimit = 1024 const messageQueueLimit = 1024
type settings struct { type settings struct {
LightClient bool // Indicates if the node is a light client
MaxMsgSize uint32 // Maximal message length allowed by the waku node MaxMsgSize uint32 // Maximal message length allowed by the waku node
EnableConfirmations bool // Enable sending message confirmations EnableConfirmations bool // Enable sending message confirmations
SoftBlacklistedPeerIDs map[string]bool // SoftBlacklistedPeerIDs is a list of peer ids that we want to keep connected but silently drop any envelope from SoftBlacklistedPeerIDs map[string]bool // SoftBlacklistedPeerIDs is a list of peer ids that we want to keep connected but silently drop any envelope from
@ -80,6 +82,7 @@ type Waku struct {
node *node.WakuNode // reference to a libp2p waku node node *node.WakuNode // reference to a libp2p waku node
filters *common.Filters // Message filters installed with Subscribe function filters *common.Filters // Message filters installed with Subscribe function
filterMsgChannel chan *protocol.Envelope // Channel for wakuv2 filter messages
privateKeys map[string]*ecdsa.PrivateKey // Private key storage privateKeys map[string]*ecdsa.PrivateKey // Private key storage
symKeys map[string][]byte // Symmetric key storage symKeys map[string][]byte // Symmetric key storage
@ -130,10 +133,12 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger) (*Waku, error) {
waku.settings = settings{ waku.settings = settings{
MaxMsgSize: cfg.MaxMessageSize, MaxMsgSize: cfg.MaxMessageSize,
SoftBlacklistedPeerIDs: make(map[string]bool), SoftBlacklistedPeerIDs: make(map[string]bool),
LightClient: cfg.LightClient,
} }
waku.filters = common.NewFilters() waku.filters = common.NewFilters()
waku.bandwidthCounter = metrics.NewBandwidthCounter() waku.bandwidthCounter = metrics.NewBandwidthCounter()
waku.filterMsgChannel = make(chan *protocol.Envelope, 1024)
var privateKey *ecdsa.PrivateKey var privateKey *ecdsa.PrivateKey
var err error var err error
@ -153,20 +158,35 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger) (*Waku, error) {
return nil, fmt.Errorf("failed to setup the network interface: %v", err) return nil, fmt.Errorf("failed to setup the network interface: %v", err)
} }
connStatusChan := make(chan node.ConnStatus) connStatusChan := make(chan node.ConnStatus, 100)
keepAliveInt := 1 if cfg.KeepAliveInterval == 0 {
waku.node, err = node.New(context.Background(), cfg.KeepAliveInterval = DefaultConfig.KeepAliveInterval
}
opts := []node.WakuNodeOption{
node.WithLibP2POptions( node.WithLibP2POptions(
libp2p.BandwidthReporter(waku.bandwidthCounter), libp2p.BandwidthReporter(waku.bandwidthCounter),
), ),
node.WithPrivateKey(privateKey), node.WithPrivateKey(privateKey),
node.WithHostAddress([]net.Addr{hostAddr}), node.WithHostAddress([]net.Addr{hostAddr}),
node.WithWakuRelay(wakurelay.WithMaxMessageSize(int(waku.settings.MaxMsgSize))),
node.WithWakuStore(false, false), // Mounts the store protocol (without storing the messages) node.WithWakuStore(false, false), // Mounts the store protocol (without storing the messages)
node.WithConnStatusChan(connStatusChan), node.WithConnStatusChan(connStatusChan),
node.WithKeepAlive(time.Duration(keepAliveInt)*time.Second), node.WithKeepAlive(time.Duration(cfg.KeepAliveInterval) * time.Second),
) }
if cfg.LightClient {
opts = append(opts, node.WithLightPush())
opts = append(opts, node.WithWakuFilter())
} else {
opts = append(opts, node.WithWakuRelay(wakurelay.WithMaxMessageSize(int(waku.settings.MaxMsgSize))))
}
if waku.node, err = node.New(context.Background(), opts...); err != nil {
return nil, fmt.Errorf("failed to start the go-waku node: %v", err)
}
waku.addPeers(cfg)
go func() { go func() {
for { for {
@ -178,36 +198,55 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger) (*Waku, error) {
} }
} }
}() }()
if err != nil {
fmt.Println(err)
return nil, fmt.Errorf("failed to start the go-waku node: %v", err)
}
for _, bootnode := range cfg.BootNodes { go waku.runFilterMsgLoop()
err := waku.node.DialPeer(bootnode) go waku.runRelayMsgLoop()
if err != nil {
log.Warn("Could not dial peer", err)
} else {
log.Info("Bootnode dialed successfully", bootnode)
}
}
for _, storenode := range cfg.StoreNodes {
peerID, err := waku.node.AddStorePeer(storenode)
if err != nil {
log.Warn("Could not add store peer", err)
} else {
log.Info("Storepeeer dialed successfully", "peerId", peerID.Pretty())
}
}
go waku.runMsgLoop()
log.Info("setup the go-waku node successfully") log.Info("setup the go-waku node successfully")
return waku, nil return waku, nil
} }
func (w *Waku) addPeers(cfg *Config) {
if !cfg.LightClient {
for _, relaynode := range cfg.RelayNodes {
err := w.node.DialPeer(relaynode)
if err != nil {
log.Warn("could not dial peer", err)
} else {
log.Info("relay peer dialed successfully", relaynode)
}
}
}
for _, storenode := range cfg.StoreNodes {
peerID, err := w.node.AddStorePeer(storenode)
if err != nil {
log.Warn("could not add store peer", err)
} else {
log.Info("store peer added successfully", "peerId", peerID.Pretty())
}
}
for _, filternode := range cfg.FilterNodes {
peerID, err := w.node.AddFilterPeer(filternode)
if err != nil {
log.Warn("could not add filter peer", err)
} else {
log.Info("filter peer added successfully", "peerId", peerID.Pretty())
}
}
for _, lightpushnode := range cfg.LightpushNodes {
peerID, err := w.node.AddLightPushPeer(lightpushnode)
if err != nil {
log.Warn("could not add lightpush peer", err)
} else {
log.Info("lightpush peer added successfully", "peerId", peerID.Pretty())
}
}
}
func (w *Waku) GetStats() types.StatsSummary { func (w *Waku) GetStats() types.StatsSummary {
stats := w.bandwidthCounter.GetBandwidthTotals() stats := w.bandwidthCounter.GetBandwidthTotals()
return types.StatsSummary{ return types.StatsSummary{
@ -216,7 +255,11 @@ func (w *Waku) GetStats() types.StatsSummary {
} }
} }
func (w *Waku) runMsgLoop() { func (w *Waku) runRelayMsgLoop() {
if w.settings.LightClient {
return
}
sub, err := w.node.Subscribe(nil) sub, err := w.node.Subscribe(nil)
if err != nil { if err != nil {
fmt.Println("Could not subscribe:", err) fmt.Println("Could not subscribe:", err)
@ -225,13 +268,49 @@ func (w *Waku) runMsgLoop() {
for env := range sub.C { for env := range sub.C {
envelopeErrors, err := w.OnNewEnvelopes(env) envelopeErrors, err := w.OnNewEnvelopes(env)
// TODO: should these be handled? // TODO: should these be handled?
_ = envelopeErrors _ = envelopeErrors
_ = err _ = err
} }
} }
func (w *Waku) runFilterMsgLoop() {
if !w.settings.LightClient {
return
}
for {
select {
case <-w.quit:
return
case env := <-w.filterMsgChannel:
envelopeErrors, err := w.OnNewEnvelopes(env)
// TODO: should these be handled?
_ = envelopeErrors
_ = err
}
}
}
func (w *Waku) subscribeWakuFilterTopic(topics [][]byte) {
pubsubTopic := relay.GetTopic(nil)
filterRequest := pb.FilterRequest{
Topic: string(pubsubTopic),
Subscribe: true,
}
var contentFilters []*pb.FilterRequest_ContentFilter
for _, topic := range topics {
t := &pb.FilterRequest_ContentFilter{ContentTopic: common.BytesToTopic(topic).ContentTopic()}
contentFilters = append(contentFilters, t)
}
filterRequest.ContentFilters = contentFilters
err := w.node.SubscribeFilter(context.Background(), filterRequest, w.filterMsgChannel)
if err != nil {
w.logger.Warn("could not add wakuv2 filter for topics", zap.Any("topics", topics))
return
}
}
// MaxMessageSize returns the maximum accepted message size. // MaxMessageSize returns the maximum accepted message size.
func (w *Waku) MaxMessageSize() uint32 { func (w *Waku) MaxMessageSize() uint32 {
w.settingsMu.RLock() w.settingsMu.RLock()
@ -527,6 +606,10 @@ func (w *Waku) Subscribe(f *common.Filter) (string, error) {
return s, err return s, err
} }
if w.settings.LightClient {
w.subscribeWakuFilterTopic(f.Topics)
}
return s, nil return s, nil
} }
@ -536,9 +619,25 @@ func (w *Waku) GetFilter(id string) *common.Filter {
} }
// Unsubscribe removes an installed message handler. // Unsubscribe removes an installed message handler.
// TODO: This does not seem to update the bloom filter, but does update
// the topic interest map
func (w *Waku) Unsubscribe(id string) error { func (w *Waku) Unsubscribe(id string) error {
filter := w.filters.Get(id)
if filter != nil && w.settings.LightClient {
pubsubTopic := relay.GetTopic(nil)
filterRequest := pb.FilterRequest{
Topic: string(pubsubTopic),
Subscribe: true,
}
var contentFilters []*pb.FilterRequest_ContentFilter
for _, topic := range filter.Topics {
t := &pb.FilterRequest_ContentFilter{ContentTopic: common.BytesToTopic(topic).ContentTopic()}
contentFilters = append(contentFilters, t)
}
filterRequest.ContentFilters = contentFilters
w.node.UnsubscribeFilter(context.Background(), filterRequest)
}
ok := w.filters.Uninstall(id) ok := w.filters.Uninstall(id)
if !ok { if !ok {
return fmt.Errorf("failed to unsubscribe: invalid ID '%s'", id) return fmt.Errorf("failed to unsubscribe: invalid ID '%s'", id)
@ -547,8 +646,6 @@ func (w *Waku) Unsubscribe(id string) error {
} }
// Unsubscribe removes an installed message handler. // Unsubscribe removes an installed message handler.
// TODO: This does not seem to update the bloom filter, but does update
// the topic interest map
func (w *Waku) UnsubscribeMany(ids []string) error { func (w *Waku) UnsubscribeMany(ids []string) error {
for _, id := range ids { for _, id := range ids {
w.logger.Debug("cleaning up filter", zap.String("id", id)) w.logger.Debug("cleaning up filter", zap.String("id", id))
@ -605,6 +702,7 @@ func (w *Waku) Start() error {
func (w *Waku) Stop() error { func (w *Waku) Stop() error {
w.node.Stop() w.node.Stop()
close(w.quit) close(w.quit)
close(w.filterMsgChannel)
return nil return nil
} }