Pass connection status chan to go-waku

Fix conn status chan gorouting never quitting

Fix import cycle

Add keepAlive interval to wakuv2 node

Add CustomNodes

Do not resume wakuv2 store
This commit is contained in:
Vitaliy Vlasov 2021-07-21 13:08:08 +03:00
parent e74e4e1baf
commit 049afe5765
12 changed files with 351 additions and 170 deletions

2
go.mod
View File

@ -47,7 +47,7 @@ require (
github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect
github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a
github.com/status-im/doubleratchet v3.0.0+incompatible
github.com/status-im/go-waku v0.0.0-20210729163508-c9d3334f2d0e
github.com/status-im/go-waku v0.0.0-20210918141919-49f216d40c4a
github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a
github.com/status-im/markdown v0.0.0-20201022101546-c0cbdd5763bf
github.com/status-im/migrate/v4 v4.6.2-status.2

4
go.sum
View File

@ -1102,8 +1102,8 @@ github.com/status-im/go-ethereum v1.10.4-status.2 h1:uvcD2U7skYqPQviARFb4w3wZyFS
github.com/status-im/go-ethereum v1.10.4-status.2/go.mod h1:GvIhpdCOgMHI6i5xVPEZOrv/qSMeOFHbZh77AoyZUoE=
github.com/status-im/go-multiaddr-ethv4 v1.2.0 h1:OT84UsUzTCwguqCpJqkrCMiL4VZ1SvUtH9a5MsZupBk=
github.com/status-im/go-multiaddr-ethv4 v1.2.0/go.mod h1:2VQ3C+9zEurcceasz12gPAtmEzCeyLUGPeKLSXYQKHo=
github.com/status-im/go-waku v0.0.0-20210729163508-c9d3334f2d0e h1:WlCBk7mwXa8tVIOQedx0whIXk9xT4lIKv3UWWABGWpI=
github.com/status-im/go-waku v0.0.0-20210729163508-c9d3334f2d0e/go.mod h1:plJBn9iDLcklnq2KKf/bBeuRNsKck8QMdr3yoJnh3hM=
github.com/status-im/go-waku v0.0.0-20210918141919-49f216d40c4a h1:PQ/S9OaV3I+peTU0YC7q8/AImudIKfuLNDfMzf+w8DY=
github.com/status-im/go-waku v0.0.0-20210918141919-49f216d40c4a/go.mod h1:XK6wGIMnxhpx9SQLDV9Qw0zfXTjd8jjw6DXGC0mKvA8=
github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a h1:eCna/q/PuZVqtmOMBqytw9yzZwMNKpao4au0OJDvesI=
github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=

View File

@ -222,6 +222,9 @@ type WakuV2Config struct {
// EnableConfirmations when true, instructs that confirmation should be sent for received messages
EnableConfirmations bool
// A name->libp2p_addr map for Wakuv2 custom nodes
CustomNodes map[string]string
}
// ----------

View File

@ -1,7 +1,5 @@
package signal
import "github.com/status-im/status-go/protocol/communities"
const (
// EventMesssageDelivered triggered when we got acknowledge from datasync level, that means peer got message
@ -32,6 +30,6 @@ func SendMessageDelivered(chatID string, messageID string) {
}
// SendMessageDelivered notifies about delivered message
func SendCommunityInfoFound(community *communities.Community) {
func SendCommunityInfoFound(community interface{}) {
send(EventCommunityInfoFound, community)
}

12
signal/events_wakuv2.go Normal file
View File

@ -0,0 +1,12 @@
package signal
const (
// EventPeerStats is sent when peer is added or removed.
// it will be a map with capability=peer count k/v's.
EventPeerStats = "wakuv2.peerstats"
)
// SendPeerStats sends discovery.summary signal.
func SendPeerStats(peerStats interface{}) {
send(EventPeerStats, peerStats)
}

View File

@ -4,17 +4,23 @@ import (
"context"
"errors"
"fmt"
//"log/syslog"
//"strconv"
"sync"
"time"
proto "github.com/golang/protobuf/proto"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
"github.com/libp2p/go-libp2p-core/peerstore"
p2pproto "github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
ma "github.com/multiformats/go-multiaddr"
"go.opencensus.io/stats"
@ -32,6 +38,8 @@ import (
var log = logging.Logger("wakunode")
//var logwriter, _ = syslog.New(syslog.LOG_ERR|syslog.LOG_LOCAL0, "WAKU")
type Message []byte
// A map of peer IDs to supported protocols
@ -40,11 +48,13 @@ type PeerStats map[peer.ID][]string
type ConnStatus struct {
IsOnline bool
HasHistory bool
Peers PeerStats
}
type WakuNode struct {
host host.Host
opts *WakuNodeParameters
host host.Host
idService *identify.IDService
opts *WakuNodeParameters
relay *relay.WakuRelay
filter *filter.WakuFilter
@ -68,85 +78,118 @@ type WakuNode struct {
quit chan struct{}
// Map of peers and their supported protocols
peers PeerStats
peers PeerStats
peersMutex sync.Mutex
// Internal protocol implementations that wish
// to listen to peer added/removed events (e.g. Filter)
peerListeners []chan *event.EvtPeerConnectednessChanged
// Channel passed to WakuNode constructor
// receiving connection status notifications
connStatusChan chan ConnStatus
pingEventsChan chan interface{}
}
func (w *WakuNode) handleConnectednessChanged(ev event.EvtPeerConnectednessChanged) {
log.Debug("### EvtPeerConnectednessChanged ", w.Host().ID(), " to ", ev.Peer, " : ", ev.Connectedness)
w.peersMutex.Lock()
defer w.peersMutex.Unlock()
if ev.Connectedness == network.Connected {
_, ok := w.peers[ev.Peer]
if !ok {
peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer)
log.Debug("protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols)
w.peers[ev.Peer] = peerProtocols
} else {
log.Debug("### Peer already exists")
}
} else if ev.Connectedness == network.NotConnected {
log.Debug("Peer down: ", ev.Peer)
delete(w.peers, ev.Peer)
// for _, pl := range w.peerListeners {
// pl <- &ev
// }
// TODO
// There seems to be no proper way to
// remove a dropped peer from Host's Peerstore
// https://github.com/libp2p/go-libp2p-host/issues/13
//w.Host().Network().ClosePeer(ev.Peer)
}
}
func (w *WakuNode) handleProtocolsUpdated(ev event.EvtPeerProtocolsUpdated) {
log.Debug("### EvtPeerProtocolsUpdated ", w.Host().ID(), " to ", ev.Peer, " added: ", ev.Added, ", removed: ", ev.Removed)
w.peersMutex.Lock()
defer w.peersMutex.Unlock()
_, ok := w.peers[ev.Peer]
if ok {
peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer)
log.Debug("updated protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols)
w.peers[ev.Peer] = peerProtocols
}
}
func (w *WakuNode) handlePeerIdentificationCompleted(ev event.EvtPeerIdentificationCompleted) {
log.Debug("### EvtPeerIdentificationCompleted ", w.Host().ID(), " to ", ev.Peer)
w.peersMutex.Lock()
defer w.peersMutex.Unlock()
peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer)
log.Debug("identified protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols)
w.peers[ev.Peer] = peerProtocols
}
func (w *WakuNode) processHostEvent(e interface{}) {
if e == nil {
log.Debug("processHostEvent nil event")
return
}
isOnline := w.IsOnline()
hasHistory := w.HasHistory()
switch e := e.(type) {
case event.EvtPeerConnectednessChanged:
w.handleConnectednessChanged(e)
case event.EvtPeerProtocolsUpdated:
w.handleProtocolsUpdated(e)
case event.EvtPeerIdentificationCompleted:
w.handlePeerIdentificationCompleted(e)
}
log.Debug("###processHostEvent before isOnline()")
newIsOnline := w.IsOnline()
log.Debug("###processHostEvent before hasHistory()")
newHasHistory := w.HasHistory()
log.Debug("###ConnStatus isOnline: ", isOnline, "/", newIsOnline, " hasHistory: ",
hasHistory, "/", newHasHistory)
if w.connStatusChan != nil {
connStatus := ConnStatus{IsOnline: newIsOnline, HasHistory: newHasHistory, Peers: w.Peers()}
log.Debug("New ConnStatus: ", connStatus)
w.connStatusChan <- connStatus
}
}
func (w *WakuNode) connectednessListener() {
for {
isOnline := w.IsOnline()
hasHistory := w.HasHistory()
var e interface{}
log.Debug("connectednessListener before select")
select {
case e := <-w.connectednessEventSub.Out():
if e == nil {
break
}
ev := e.(event.EvtPeerConnectednessChanged)
log.Info("### EvtPeerConnectednessChanged ", w.Host().ID(), " to ", ev.Peer, " : ", ev.Connectedness)
if ev.Connectedness == network.Connected {
_, ok := w.peers[ev.Peer]
if !ok {
peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer)
log.Info("protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols)
w.peers[ev.Peer] = peerProtocols
} else {
log.Info("### Peer already exists")
}
} else if ev.Connectedness == network.NotConnected {
log.Info("Peer down: ", ev.Peer)
delete(w.peers, ev.Peer)
for _, pl := range w.peerListeners {
pl <- &ev
}
// TODO
// There seems to be no proper way to
// remove a dropped peer from Host's Peerstore
// https://github.com/libp2p/go-libp2p-host/issues/13
//w.Host().Network().ClosePeer(ev.Peer)
}
case e := <-w.protocolEventSub.Out():
if e == nil {
break
}
ev := e.(event.EvtPeerProtocolsUpdated)
log.Info("### EvtPeerProtocolsUpdated ", w.Host().ID(), " to ", ev.Peer, " added: ", ev.Added, ", removed: ", ev.Removed)
_, ok := w.peers[ev.Peer]
if ok {
peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer)
log.Info("updated protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols)
w.peers[ev.Peer] = peerProtocols
}
case e := <-w.identificationEventSub.Out():
if e == nil {
break
}
ev := e.(event.EvtPeerIdentificationCompleted)
log.Info("### EvtPeerIdentificationCompleted ", w.Host().ID(), " to ", ev.Peer)
peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer)
log.Info("identified protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols)
_, ok := w.peers[ev.Peer]
if ok {
peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer)
w.peers[ev.Peer] = peerProtocols
}
}
newIsOnline := w.IsOnline()
newHasHistory := w.HasHistory()
if w.connStatusChan != nil &&
(isOnline != newIsOnline || hasHistory != newHasHistory) {
w.connStatusChan <- ConnStatus{IsOnline: newIsOnline, HasHistory: newHasHistory}
case e = <-w.connectednessEventSub.Out():
log.Debug("connectednessListener connectednessEvent")
case e = <-w.protocolEventSub.Out():
log.Debug("connectednessListener protocolEvent")
case e = <-w.identificationEventSub.Out():
log.Debug("connectednessListener identificationEvent")
case e = <-w.pingEventsChan:
log.Debug("connectednessListener pingEvent")
}
log.Debug("connectednessListener after select")
w.processHostEvent(e)
log.Debug("connectednessListener after processHostEvent")
}
}
@ -181,6 +224,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
w := new(WakuNode)
w.bcaster = NewBroadcaster(1024)
w.host = host
w.idService = identify.NewIDService(host)
w.cancel = cancel
w.ctx = ctx
w.subscriptions = make(map[relay.Topic][]*Subscription)
@ -189,8 +233,6 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
w.peers = make(PeerStats)
// Subscribe to Connectedness events
log.Info("### host.ID(): ", host.ID())
connectednessEventSub, _ := host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
w.connectednessEventSub = connectednessEventSub
@ -203,6 +245,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
if params.connStatusChan != nil {
w.connStatusChan = params.connStatusChan
}
w.pingEventsChan = make(chan interface{})
go w.connectednessListener()
if params.enableStore {
@ -357,85 +400,49 @@ func (w *WakuNode) mountLightPush() {
w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay)
}
func (w *WakuNode) AddPeer(p peer.ID, addrs []ma.Multiaddr, protocolId string) error {
log.Info("AddPeer: ", protocolId)
func (w *WakuNode) AddPeer(info *peer.AddrInfo, protocolId string) error {
log.Info(fmt.Sprintf("adding peer %s with protocol %s", info.ID.Pretty(), protocolId))
for _, addr := range addrs {
w.host.Peerstore().AddAddr(p, addr, peerstore.PermanentAddrTTL)
}
err := w.host.Peerstore().AddProtocols(p, protocolId)
w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
if err != nil {
return err
}
return nil
return w.host.Peerstore().AddProtocols(info.ID, protocolId)
}
func (w *WakuNode) startStore() {
peerChan := make(chan *event.EvtPeerConnectednessChanged)
w.opts.store.Start(w.ctx, w.host, peerChan)
w.peerListeners = append(w.peerListeners, peerChan)
w.opts.store.Resume(string(relay.GetTopic(nil)), nil)
if w.opts.shouldResume {
if _, err := w.opts.store.Resume(string(relay.GetTopic(nil)), nil); err != nil {
log.Error("failed to resume", err)
}
}
}
func (w *WakuNode) addPeerWithProtocol(address string, proto p2pproto.ID) (*peer.ID, error) {
info, err := addrInfoFromMultiaddrString(address)
if err != nil {
return nil, err
}
return &info.ID, w.AddPeer(info, string(proto))
}
func (w *WakuNode) AddStorePeer(address string) (*peer.ID, error) {
if w.opts.store == nil {
return nil, errors.New("WakuStore is not set")
}
storePeer, err := ma.NewMultiaddr(address)
if err != nil {
return nil, err
}
// Extract the peer ID from the multiaddr.
info, err := peer.AddrInfoFromP2pAddr(storePeer)
if err != nil {
return nil, err
}
return &info.ID, w.AddPeer(info.ID, info.Addrs, string(store.WakuStoreProtocolId))
return w.addPeerWithProtocol(address, store.WakuStoreProtocolId)
}
func (w *WakuNode) AddRelayPeer(address string) (*peer.ID, error) {
return w.addPeerWithProtocol(address, wakurelay.WakuRelayID_v200)
}
// TODO Remove code duplication
func (w *WakuNode) AddFilterPeer(address string) (*peer.ID, error) {
if w.filter == nil {
return nil, errors.New("WakuFilter is not set")
}
filterPeer, err := ma.NewMultiaddr(address)
if err != nil {
return nil, err
}
// Extract the peer ID from the multiaddr.
info, err := peer.AddrInfoFromP2pAddr(filterPeer)
if err != nil {
return nil, err
}
return &info.ID, w.AddPeer(info.ID, info.Addrs, string(filter.WakuFilterProtocolId))
return w.addPeerWithProtocol(address, filter.WakuFilterProtocolId)
}
// TODO Remove code duplication
func (w *WakuNode) AddLightPushPeer(address string) (*peer.ID, error) {
if w.filter == nil {
return nil, errors.New("WakuFilter is not set")
}
lightPushPeer, err := ma.NewMultiaddr(address)
if err != nil {
return nil, err
}
// Extract the peer ID from the multiaddr.
info, err := peer.AddrInfoFromP2pAddr(lightPushPeer)
if err != nil {
return nil, err
}
return &info.ID, w.AddPeer(info.ID, info.Addrs, string(lightpush.WakuLightPushProtocolId))
return w.addPeerWithProtocol(address, lightpush.WakuLightPushProtocolId)
}
func (w *WakuNode) Query(ctx context.Context, contentTopics []string, startTime float64, endTime float64, opts ...store.HistoryRequestOption) (*pb.HistoryResponse, error) {
@ -687,13 +694,31 @@ func (w *WakuNode) DialPeer(address string) error {
return err
}
// Extract the peer ID from the multiaddr.
info, err := peer.AddrInfoFromP2pAddr(p)
if err != nil {
return err
}
return w.host.Connect(w.ctx, *info)
return w.connect(*info)
}
func (w *WakuNode) connect(info peer.AddrInfo) error {
err := w.host.Connect(w.ctx, info)
if err != nil {
return err
}
w.processHostEvent(event.EvtPeerConnectednessChanged{
Peer: info.ID,
Connectedness: network.Connected,
})
return nil
}
func (w *WakuNode) DialPeerByID(peerID peer.ID) error {
info := w.host.Peerstore().PeerInfo(peerID)
return w.connect(info)
}
func (w *WakuNode) ClosePeerByAddress(address string) error {
@ -712,25 +737,118 @@ func (w *WakuNode) ClosePeerByAddress(address string) error {
}
func (w *WakuNode) ClosePeerById(id peer.ID) error {
return w.host.Network().ClosePeer(id)
err := w.host.Network().ClosePeer(id)
if err != nil {
return err
}
w.processHostEvent(event.EvtPeerConnectednessChanged{
Peer: id,
Connectedness: network.NotConnected,
})
return nil
}
func (w *WakuNode) PeerCount() int {
return len(w.host.Network().Peers())
return len(w.peers)
}
func (w *WakuNode) Peers() PeerStats {
w.peersMutex.Lock()
defer w.peersMutex.Unlock()
p := make(PeerStats)
for k, v := range w.peers {
p[k] = v
}
return p
}
func (w *WakuNode) startKeepAlive(t time.Duration) {
log.Info("Setting up ping protocol with duration of", t)
log.Info("Setting up ping protocol with duration of ", t)
w.ping = ping.NewPingService(w.host)
ticker := time.NewTicker(t)
go func() {
// This map contains peers that we're
// waiting for the ping response from
peerMap := make(map[peer.ID]<-chan ping.Result)
var mu sync.Mutex
for {
select {
case <-ticker.C:
for _, peer := range w.host.Network().Peers() {
log.Info("Pinging", peer)
w.ping.Ping(w.ctx, peer)
for _, p := range w.host.Peerstore().Peers() {
if p == w.host.ID() {
log.Info("###PING skip ", p)
continue
}
mu.Lock()
_, ok := peerMap[p]
mu.Unlock()
var s = p.Pretty()
s = s[len(s)-4:]
if !ok {
log.Info("###PING ", s)
result := w.ping.Ping(w.ctx, p)
mu.Lock()
peerMap[p] = result
mu.Unlock()
go func(peer peer.ID) {
peerFound := false
for p := range w.peers {
if p == peer {
peerFound = true
break
}
}
//log.Info("###PING " + s + " before fetching result")
//logwriter.Write([]byte("###PING " + s + " before fetching result"))
pingTicker := time.NewTicker(time.Duration(1) * time.Second)
isError := false
select {
case resVal := <-result:
isError = resVal.Error != nil
case <-pingTicker.C:
isError = true
}
pingTicker.Stop()
if !peerFound && !isError {
//EventBus Emitter doesn't seem to work when there's no connection
w.pingEventsChan <- event.EvtPeerConnectednessChanged{
Peer: peer,
Connectedness: network.Connected,
}
peerConns := w.host.Network().ConnsToPeer(peer)
if len(peerConns) > 0 {
// log.Info("###PING " + s + " IdentifyWait")
// logwriter.Write([]byte("###PING " + s + " IdentifyWait"))
//w.idService.IdentifyWait(peerConns[0])
} else {
w.DialPeerByID(peer)
}
} else if peerFound && isError {
// log.Info("###PING " + s + " peer removed")
// logwriter.Write([]byte("###PING " + s + " peer removed"))
w.pingEventsChan <- event.EvtPeerConnectednessChanged{
Peer: peer,
Connectedness: network.NotConnected,
}
}
mu.Lock()
delete(peerMap, peer)
mu.Unlock()
}(p)
} else {
log.Info("###PING " + s + " already pinged")
// logwriter.Write([]byte("###PING " + s + " already pinged"))
}
}
case <-w.quit:
ticker.Stop()
@ -738,5 +856,13 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
}
}
}()
}
func addrInfoFromMultiaddrString(address string) (*peer.AddrInfo, error) {
ma, err := ma.NewMultiaddr(address)
if err != nil {
return nil, err
}
return peer.AddrInfoFromP2pAddr(ma)
}

View File

@ -26,9 +26,10 @@ type WakuNodeParameters struct {
enableFilter bool
wOpts []wakurelay.Option
enableStore bool
storeMsgs bool
store *store.WakuStore
enableStore bool
shouldResume bool
storeMsgs bool
store *store.WakuStore
// filter *filter.WakuFilter
keepAliveInterval time.Duration
@ -107,11 +108,12 @@ func WithWakuFilter(opts ...wakurelay.Option) WakuNodeOption {
// WithWakuStore enables the Waku V2 Store protocol and if the messages should
// be stored or not in a message provider
func WithWakuStore(shouldStoreMessages bool) WakuNodeOption {
func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableStore = true
params.storeMsgs = shouldStoreMessages
params.store = store.NewWakuStore(shouldStoreMessages, nil)
params.shouldResume = shouldResume
return nil
}
}

View File

@ -59,12 +59,6 @@ const WakuFilterCodec = "/vac/waku/filter/2.0.0-beta1"
const WakuFilterProtocolId = libp2pProtocol.ID(WakuFilterCodec)
// Error types (metric label values)
const (
dialFailure = "dial_failure"
decodeRpcFailure = "decode_rpc_failure"
)
func (filters *Filters) Notify(msg *pb.WakuMessage, requestId string) {
for key, filter := range *filters {
envelope := protocol.NewEnvelope(msg, filter.Topic)
@ -252,7 +246,9 @@ func (wf *WakuFilter) FilterListener() {
}
for m := range wf.MsgC {
handle(m)
if err := handle(m); err != nil {
log.Error("failed to handle message", err)
}
}
}
@ -273,6 +269,10 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) (
filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request}
log.Info("Sending filterRPC: ", filterRPC)
err = writer.WriteMsg(filterRPC)
if err != nil {
log.Error("failed to write message", err)
return "", err
}
return string(id), nil
} else {
// @TODO more sophisticated error handling here
@ -300,6 +300,9 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, request pb.FilterRequest)
writer := protoio.NewDelimitedWriter(conn)
filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request}
err = writer.WriteMsg(filterRPC)
if err != nil {
log.Error("failed to write message", err)
}
//return some(id)
} else {
// @TODO more sophisticated error handling here

View File

@ -177,7 +177,12 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o
}
defer connOpt.Close()
defer connOpt.Reset()
defer func() {
err := connOpt.Reset()
if err != nil {
log.Error("failed to reset connection", err)
}
}()
pushRequestRPC := &pb.PushRPC{RequestId: hex.EncodeToString(params.requestId), Query: req}

View File

@ -260,7 +260,10 @@ func (store *WakuStore) Start(ctx context.Context, h host.Host, peerChan chan *e
storedMessages, err := store.msgProvider.GetAll()
if err != nil {
log.Error("could not load DBProvider messages", err)
stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "store_load_failure")}, metrics.Errors.M(1))
err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "store_load_failure")}, metrics.Errors.M(1))
if err != nil {
log.Error("failed to record with tags")
}
return
}
@ -272,7 +275,9 @@ func (store *WakuStore) Start(ctx context.Context, h host.Host, peerChan chan *e
store.storeMessageWithIndex(storedMessage.PubsubTopic, idx, storedMessage.Message)
stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "stored")}, metrics.StoreMessages.M(int64(len(store.messages))))
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "stored")}, metrics.StoreMessages.M(int64(len(store.messages)))); err != nil {
log.Error("failed to record with tags")
}
}
go store.peerListener()
@ -312,11 +317,16 @@ func (store *WakuStore) storeMessage(pubSubTopic string, msg *pb.WakuMessage) {
if err != nil {
log.Error("could not store message", err)
stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "store_failure")}, metrics.Errors.M(1))
if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "store_failure")}, metrics.Errors.M(1)); err != nil {
log.Error("failed to record with tags", err)
}
return
}
stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "stored")}, metrics.StoreMessages.M(int64(len(store.messages))))
if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "stored")}, metrics.StoreMessages.M(int64(len(store.messages)))); err != nil {
log.Error("failed to record with tags", err)
}
}
func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
@ -336,7 +346,9 @@ func (store *WakuStore) onRequest(s network.Stream) {
err := reader.ReadMsg(historyRPCRequest)
if err != nil {
log.Error("error reading request", err)
stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "decodeRPCFailure")}, metrics.Errors.M(1))
if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "decodeRPCFailure")}, metrics.Errors.M(1)); err != nil {
log.Error("failed to record with tags", err)
}
return
}
@ -497,11 +509,15 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec
err = reader.ReadMsg(historyResponseRPC)
if err != nil {
log.Error("could not read response", err)
stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "decodeRPCFailure")}, metrics.Errors.M(1))
if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "decodeRPCFailure")}, metrics.Errors.M(1)); err != nil {
log.Error("failed to record with tags")
}
return nil, err
}
stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "retrieved")}, metrics.StoreMessages.M(int64(len(store.messages))))
if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "retrieved")}, metrics.StoreMessages.M(int64(len(store.messages)))); err != nil {
log.Error("failed to record with tags", err)
}
return historyResponseRPC.Response, nil
}

2
vendor/modules.txt vendored
View File

@ -426,7 +426,7 @@ github.com/spacemonkeygo/spacelog
github.com/status-im/doubleratchet
# github.com/status-im/go-multiaddr-ethv4 v1.2.0
github.com/status-im/go-multiaddr-ethv4
# github.com/status-im/go-waku v0.0.0-20210729163508-c9d3334f2d0e
# github.com/status-im/go-waku v0.0.0-20210918141919-49f216d40c4a
github.com/status-im/go-waku/waku/v2/metrics
github.com/status-im/go-waku/waku/v2/node
github.com/status-im/go-waku/waku/v2/protocol

View File

@ -50,6 +50,7 @@ import (
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/signal"
"github.com/status-im/status-go/wakuv2/common"
node "github.com/status-im/go-waku/waku/v2/node"
@ -145,6 +146,9 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger) (*Waku, error) {
return nil, fmt.Errorf("failed to setup the network interface: %v", err)
}
connStatusChan := make(chan node.ConnStatus)
keepAliveInt := 1
waku.node, err = node.New(context.Background(),
node.WithLibP2POptions(
libp2p.BandwidthReporter(waku.bandwidthCounter),
@ -152,9 +156,21 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger) (*Waku, error) {
node.WithPrivateKey(privateKey),
node.WithHostAddress([]net.Addr{hostAddr}),
node.WithWakuRelay(wakurelay.WithMaxMessageSize(int(waku.settings.MaxMsgSize))),
node.WithWakuStore(false), // Mounts the store protocol (without storing the messages)
node.WithWakuStore(false, false), // Mounts the store protocol (without storing the messages)
node.WithConnStatusChan(connStatusChan),
node.WithKeepAlive(time.Duration(keepAliveInt)*time.Second),
)
go func() {
for {
select {
case <-waku.quit:
return
case c := <-connStatusChan:
signal.SendPeerStats(c)
}
}
}()
if err != nil {
fmt.Println(err)
return nil, fmt.Errorf("failed to start the go-waku node: %v", err)