bump go-waku (#2442)

This commit is contained in:
Richard Ramos 2021-11-25 10:37:06 -04:00 committed by GitHub
parent dad02df69b
commit 87a033c249
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 851 additions and 49 deletions

View File

@ -1 +1 @@
0.91.6
0.91.7

2
go.mod
View File

@ -49,7 +49,7 @@ require (
github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect
github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a
github.com/status-im/doubleratchet v3.0.0+incompatible
github.com/status-im/go-waku v0.0.0-20211121140431-79bb101787c5
github.com/status-im/go-waku v0.0.0-20211125141833-23cbb24a94d6
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432
github.com/status-im/markdown v0.0.0-20210405121740-32e5a5055fb6
github.com/status-im/migrate/v4 v4.6.2-status.2

4
go.sum
View File

@ -1207,8 +1207,8 @@ github.com/status-im/go-ethereum v1.10.4-status.3 h1:RF618iSCvqJtXu3ZSg7XNg6MJaS
github.com/status-im/go-ethereum v1.10.4-status.3/go.mod h1:GvIhpdCOgMHI6i5xVPEZOrv/qSMeOFHbZh77AoyZUoE=
github.com/status-im/go-multiaddr-ethv4 v1.2.1 h1:09v9n6426NAojNOvdgegqrAotgffWW/UPDwrpJ85DNE=
github.com/status-im/go-multiaddr-ethv4 v1.2.1/go.mod h1:SlBebvQcSUM5+/R/YfpfMuu5WyraW47XFmIqLYBmlKU=
github.com/status-im/go-waku v0.0.0-20211121140431-79bb101787c5 h1:ur43GiEbW0iI+n+Iql3i1+wvgKRun/J10YcEsx985X0=
github.com/status-im/go-waku v0.0.0-20211121140431-79bb101787c5/go.mod h1:egfHY9n6ATRVAJ8atFPUuBqlECqDcHemfzD7VOgwZv8=
github.com/status-im/go-waku v0.0.0-20211125141833-23cbb24a94d6 h1:U2wIiROz4VvHzY8SwT/7AHiNWH4/RGXw9PpEF940yjQ=
github.com/status-im/go-waku v0.0.0-20211125141833-23cbb24a94d6/go.mod h1:egfHY9n6ATRVAJ8atFPUuBqlECqDcHemfzD7VOgwZv8=
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 h1:cbNFU38iimo9fY4B7CdF/fvIF6tNPJIZjBbpfmW2EY4=
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432/go.mod h1:A8t3i0CUGtXCA0aiLsP7iyikmk/KaD/2XVvNJqGCU20=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=

View File

@ -13,6 +13,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/nat"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
@ -33,6 +34,10 @@ type DiscoveryV5 struct {
udpAddr *net.UDPAddr
listener *discover.UDPv5
localnode *enode.LocalNode
NAT nat.Interface
quit chan struct{}
wg *sync.WaitGroup
peerCache peerCache
}
@ -130,9 +135,16 @@ func NewDiscoveryV5(host host.Host, ipAddr net.IP, tcpPort int, priv *ecdsa.Priv
return nil, err
}
var NAT nat.Interface = nil
if params.advertiseAddr == nil {
NAT = nat.Any()
}
return &DiscoveryV5{
host: host,
params: params,
NAT: NAT,
wg: &sync.WaitGroup{},
peerCache: peerCache{
rng: rand.New(rand.NewSource(rand.Int63())),
recs: make(map[peer.ID]peerRecord),
@ -175,6 +187,7 @@ func newLocalnode(priv *ecdsa.PrivateKey, ipAddr net.IP, udpPort int, tcpPort in
if advertiseAddr != nil {
localnode.SetStaticIP(*advertiseAddr)
}
return localnode, nil
}
@ -184,6 +197,19 @@ func (d *DiscoveryV5) listen() error {
return err
}
d.udpAddr = conn.LocalAddr().(*net.UDPAddr)
if d.NAT != nil && !d.udpAddr.IP.IsLoopback() {
d.wg.Add(1)
go func() {
defer d.wg.Done()
nat.Map(d.NAT, d.quit, "udp", d.udpAddr.Port, d.udpAddr.Port, "go-waku discv5 discovery")
}()
}
d.localnode.SetFallbackUDP(d.udpAddr.Port)
listener, err := discover.ListenV5(conn, d.localnode, d.config)
if err != nil {
return err
@ -191,6 +217,9 @@ func (d *DiscoveryV5) listen() error {
d.listener = listener
log.Info(fmt.Sprintf("Started Discovery V5 at %s:%d, advertising IP: %s:%d", d.udpAddr.IP, d.udpAddr.Port, d.localnode.Node().IP(), d.params.tcpPort))
log.Info("Discovery V5 ", d.localnode.Node())
return nil
}
@ -198,14 +227,15 @@ func (d *DiscoveryV5) Start() error {
d.Lock()
defer d.Unlock()
d.wg.Wait() // Waiting for other go routines to stop
d.quit = make(chan struct{}, 1)
err := d.listen()
if err != nil {
return err
}
log.Info(fmt.Sprintf("Started Discovery V5 at %s:%d, advertising IP: %s:%d", d.udpAddr.IP, d.udpAddr.Port, d.localnode.Node().IP(), d.params.tcpPort))
log.Info("Discovery V5 ", d.localnode.Node())
return nil
}
@ -213,9 +243,14 @@ func (d *DiscoveryV5) Stop() {
d.Lock()
defer d.Unlock()
close(d.quit)
d.listener.Close()
d.listener = nil
log.Info("Stopped Discovery V5")
d.wg.Wait()
}
// IsPrivate reports whether ip is a private address, according to
@ -328,6 +363,8 @@ func (c *DiscoveryV5) Advertise(ctx context.Context, ns string, opts ...discover
}
func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limit int, doneCh chan struct{}) {
defer d.wg.Done()
for {
if len(d.peerCache.recs) >= limit {
break
@ -409,6 +446,8 @@ func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...disco
defer iterator.Close()
doneCh := make(chan struct{})
d.wg.Add(1)
go d.iterate(ctx, iterator, limit, doneCh)
select {

View File

@ -84,6 +84,8 @@ func (w *WakuNode) sendConnStatus() {
}
func (w *WakuNode) connectednessListener() {
defer w.wg.Done()
for {
select {
case <-w.quit:

View File

@ -5,6 +5,7 @@ import (
"fmt"
"net"
"strconv"
"sync"
"time"
logging "github.com/ipfs/go-log"
@ -34,6 +35,8 @@ import (
var log = logging.Logger("wakunode")
const maxAllowedPingFailures = 2
type Message []byte
type Peer struct {
@ -64,9 +67,13 @@ type WakuNode struct {
identificationEventSub event.Subscription
addressChangesSub event.Subscription
keepAliveMutex sync.Mutex
keepAliveFails map[peer.ID]int
ctx context.Context
cancel context.CancelFunc
quit chan struct{}
wg *sync.WaitGroup
// Channel passed to WakuNode constructor
// receiving connection status notifications
@ -122,7 +129,9 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
w.ctx = ctx
w.opts = params
w.quit = make(chan struct{})
w.wg = &sync.WaitGroup{}
w.addrChan = make(chan ma.Multiaddr, 1024)
w.keepAliveFails = make(map[peer.ID]int)
if w.protocolEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerProtocolsUpdated)); err != nil {
return nil, err
@ -143,15 +152,16 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
w.connectionNotif = NewConnectionNotifier(ctx, host)
w.host.Network().Notify(w.connectionNotif)
w.wg.Add(2)
go w.connectednessListener()
if w.opts.keepAliveInterval > time.Duration(0) {
w.startKeepAlive(w.opts.keepAliveInterval)
}
go w.checkForAddressChanges()
go w.onAddrChange()
if w.opts.keepAliveInterval > time.Duration(0) {
w.wg.Add(1)
w.startKeepAlive(w.opts.keepAliveInterval)
}
return w, nil
}
@ -190,6 +200,8 @@ func (w *WakuNode) logAddress(addr ma.Multiaddr) {
}
func (w *WakuNode) checkForAddressChanges() {
defer w.wg.Done()
addrs := w.ListenAddresses()
first := make(chan struct{}, 1)
first <- struct{}{}
@ -311,6 +323,8 @@ func (w *WakuNode) Stop() {
w.store.Stop()
w.host.Close()
w.wg.Wait()
}
func (w *WakuNode) Host() host.Host {
@ -425,7 +439,10 @@ func (w *WakuNode) startStore() {
if w.opts.shouldResume {
// TODO: extract this to a function and run it when you go offline
// TODO: determine if a store is listening to a topic
w.wg.Add(1)
go func() {
defer w.wg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
@ -577,12 +594,11 @@ func (w *WakuNode) Peers() ([]*Peer, error) {
// This is necessary because TCP connections are automatically closed due to inactivity,
// and doing a ping will avoid this (with a small bandwidth cost)
func (w *WakuNode) startKeepAlive(t time.Duration) {
log.Info("Setting up ping protocol with duration of ", t)
ticker := time.NewTicker(t)
defer ticker.Stop()
go func() {
defer w.wg.Done()
log.Info("Setting up ping protocol with duration of ", t)
ticker := time.NewTicker(t)
defer ticker.Stop()
for {
select {
case <-ticker.C:
@ -594,7 +610,8 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
// through Network's peer collection, as it will be empty
for _, p := range w.host.Peerstore().Peers() {
if p != w.host.ID() {
go pingPeer(w.ctx, w.host, p)
w.wg.Add(1)
go w.pingPeer(p)
}
}
case <-w.quit:
@ -604,18 +621,34 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
}()
}
func pingPeer(ctx context.Context, host host.Host, peer peer.ID) {
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
func (w *WakuNode) pingPeer(peer peer.ID) {
w.keepAliveMutex.Lock()
defer w.keepAliveMutex.Unlock()
defer w.wg.Done()
ctx, cancel := context.WithTimeout(w.ctx, 3*time.Second)
defer cancel()
log.Debug("Pinging ", peer)
pr := ping.Ping(ctx, host, peer)
pr := ping.Ping(ctx, w.host, peer)
select {
case res := <-pr:
if res.Error != nil {
w.keepAliveFails[peer]++
log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, res.Error.Error()))
} else {
w.keepAliveFails[peer] = 0
}
case <-ctx.Done():
w.keepAliveFails[peer]++
log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, ctx.Err()))
}
if w.keepAliveFails[peer] > maxAllowedPingFailures && w.host.Network().Connectedness(peer) == network.Connected {
log.Info("Disconnecting peer ", peer)
if err := w.host.Network().ClosePeer(peer); err != nil {
log.Debug(fmt.Sprintf("Could not close conn to peer %s: %s", peer, err))
}
w.keepAliveFails[peer] = 0
}
}

View File

@ -22,21 +22,21 @@ func NewSubscribers() *Subscribers {
return &Subscribers{}
}
func (self *Subscribers) Append(s Subscriber) int {
self.Lock()
defer self.Unlock()
func (sub *Subscribers) Append(s Subscriber) int {
sub.Lock()
defer sub.Unlock()
self.subscribers = append(self.subscribers, s)
return len(self.subscribers)
sub.subscribers = append(sub.subscribers, s)
return len(sub.subscribers)
}
func (self *Subscribers) Items() <-chan Subscriber {
func (sub *Subscribers) Items() <-chan Subscriber {
c := make(chan Subscriber)
f := func() {
self.RLock()
defer self.RUnlock()
for _, value := range self.subscribers {
sub.RLock()
defer sub.RUnlock()
for _, value := range sub.subscribers {
c <- value
}
close(c)
@ -46,17 +46,17 @@ func (self *Subscribers) Items() <-chan Subscriber {
return c
}
func (self *Subscribers) Length() int {
self.RLock()
defer self.RUnlock()
func (sub *Subscribers) Length() int {
sub.RLock()
defer sub.RUnlock()
return len(self.subscribers)
return len(sub.subscribers)
}
func (self *Subscribers) RemoveContentFilters(peerID peer.ID, contentFilters []*pb.FilterRequest_ContentFilter) {
func (sub *Subscribers) RemoveContentFilters(peerID peer.ID, contentFilters []*pb.FilterRequest_ContentFilter) {
var peerIdsToRemove []peer.ID
for _, subscriber := range self.subscribers {
for _, subscriber := range sub.subscribers {
if subscriber.peer != peerID {
continue
}
@ -82,11 +82,11 @@ func (self *Subscribers) RemoveContentFilters(peerID peer.ID, contentFilters []*
// make sure we delete the subscriber
// if no more content filters left
for _, peerId := range peerIdsToRemove {
for i, s := range self.subscribers {
for i, s := range sub.subscribers {
if s.peer == peerId {
l := len(self.subscribers) - 1
self.subscribers[l], self.subscribers[i] = self.subscribers[i], self.subscribers[l]
self.subscribers = self.subscribers[:l]
l := len(sub.subscribers) - 1
sub.subscribers[l], sub.subscribers[i] = sub.subscribers[i], sub.subscribers[l]
sub.subscribers = sub.subscribers[:l]
break
}
}

View File

@ -5,6 +5,8 @@ import (
"encoding/hex"
"errors"
"fmt"
"math"
"sync"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
@ -48,6 +50,7 @@ type (
h host.Host
isFullNode bool
MsgC chan *protocol.Envelope
wg *sync.WaitGroup
filters *FilterMap
subscribers *Subscribers
@ -67,13 +70,16 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFi
wf := new(WakuFilter)
wf.ctx = ctx
wf.MsgC = make(chan *protocol.Envelope)
wf.wg = &sync.WaitGroup{}
wf.MsgC = make(chan *protocol.Envelope, 1024)
wf.h = host
wf.isFullNode = isFullNode
wf.filters = NewFilterMap()
wf.subscribers = NewSubscribers()
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest)
wf.wg.Add(1)
go wf.FilterListener()
if wf.isFullNode {
@ -90,7 +96,7 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
filterRPCRequest := &pb.FilterRPC{}
reader := protoio.NewDelimitedReader(s, 64*1024)
reader := protoio.NewDelimitedReader(s, math.MaxInt32)
err := reader.ReadMsg(filterRPCRequest)
if err != nil {
@ -155,6 +161,8 @@ func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) er
}
func (wf *WakuFilter) FilterListener() {
defer wf.wg.Done()
// This function is invoked for each message received
// on the full node in context of Waku2-Filter
handle := func(envelope *protocol.Envelope) error { // async
@ -189,7 +197,6 @@ func (wf *WakuFilter) FilterListener() {
log.Error("failed to handle message", err)
}
}
}
// Having a FilterRequest struct,
@ -281,8 +288,11 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilt
}
func (wf *WakuFilter) Stop() {
close(wf.MsgC)
wf.h.RemoveStreamHandler(FilterID_v20beta1)
wf.filters.RemoveAll()
wf.wg.Wait()
}
func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...FilterSubscribeOption) (filterID string, theFilter Filter, err error) {

View File

@ -5,6 +5,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"math"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
@ -62,7 +63,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
requestPushRPC := &pb.PushRPC{}
writer := protoio.NewDelimitedWriter(s)
reader := protoio.NewDelimitedReader(s, 64*1024)
reader := protoio.NewDelimitedReader(s, math.MaxInt32)
err := reader.ReadMsg(requestPushRPC)
if err != nil {
@ -158,7 +159,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
pushRequestRPC := &pb.PushRPC{RequestId: hex.EncodeToString(params.requestId), Query: req}
writer := protoio.NewDelimitedWriter(connOpt)
reader := protoio.NewDelimitedReader(connOpt, 64*1024)
reader := protoio.NewDelimitedReader(connOpt, math.MaxInt32)
err = writer.WriteMsg(pushRequestRPC)
if err != nil {

View File

@ -4,3 +4,4 @@ package pb
//go:generate protoc -I. --gofast_out=. ./waku_lightpush.proto
//go:generate protoc -I. --gofast_out=. ./waku_message.proto
//go:generate protoc -I. --gofast_out=. ./waku_store.proto
//go:generate protoc -I. --gofast_out=. ./waku_swap.proto

View File

@ -0,0 +1,687 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: waku_swap.proto
package pb
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type Cheque struct {
IssuerAddress string `protobuf:"bytes,1,opt,name=issuerAddress,proto3" json:"issuerAddress,omitempty"`
Beneficiary []byte `protobuf:"bytes,2,opt,name=beneficiary,proto3" json:"beneficiary,omitempty"`
Date uint32 `protobuf:"varint,3,opt,name=date,proto3" json:"date,omitempty"`
Amount uint32 `protobuf:"varint,4,opt,name=amount,proto3" json:"amount,omitempty"`
Signature []byte `protobuf:"bytes,5,opt,name=signature,proto3" json:"signature,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Cheque) Reset() { *m = Cheque{} }
func (m *Cheque) String() string { return proto.CompactTextString(m) }
func (*Cheque) ProtoMessage() {}
func (*Cheque) Descriptor() ([]byte, []int) {
return fileDescriptor_8ec987fcc28cf932, []int{0}
}
func (m *Cheque) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Cheque) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Cheque.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Cheque) XXX_Merge(src proto.Message) {
xxx_messageInfo_Cheque.Merge(m, src)
}
func (m *Cheque) XXX_Size() int {
return m.Size()
}
func (m *Cheque) XXX_DiscardUnknown() {
xxx_messageInfo_Cheque.DiscardUnknown(m)
}
var xxx_messageInfo_Cheque proto.InternalMessageInfo
func (m *Cheque) GetIssuerAddress() string {
if m != nil {
return m.IssuerAddress
}
return ""
}
func (m *Cheque) GetBeneficiary() []byte {
if m != nil {
return m.Beneficiary
}
return nil
}
func (m *Cheque) GetDate() uint32 {
if m != nil {
return m.Date
}
return 0
}
func (m *Cheque) GetAmount() uint32 {
if m != nil {
return m.Amount
}
return 0
}
func (m *Cheque) GetSignature() []byte {
if m != nil {
return m.Signature
}
return nil
}
type Handshake struct {
Beneficiary []byte `protobuf:"bytes,1,opt,name=beneficiary,proto3" json:"beneficiary,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Handshake) Reset() { *m = Handshake{} }
func (m *Handshake) String() string { return proto.CompactTextString(m) }
func (*Handshake) ProtoMessage() {}
func (*Handshake) Descriptor() ([]byte, []int) {
return fileDescriptor_8ec987fcc28cf932, []int{1}
}
func (m *Handshake) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Handshake) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Handshake.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Handshake) XXX_Merge(src proto.Message) {
xxx_messageInfo_Handshake.Merge(m, src)
}
func (m *Handshake) XXX_Size() int {
return m.Size()
}
func (m *Handshake) XXX_DiscardUnknown() {
xxx_messageInfo_Handshake.DiscardUnknown(m)
}
var xxx_messageInfo_Handshake proto.InternalMessageInfo
func (m *Handshake) GetBeneficiary() []byte {
if m != nil {
return m.Beneficiary
}
return nil
}
func init() {
proto.RegisterType((*Cheque)(nil), "pb.Cheque")
proto.RegisterType((*Handshake)(nil), "pb.Handshake")
}
func init() { proto.RegisterFile("waku_swap.proto", fileDescriptor_8ec987fcc28cf932) }
var fileDescriptor_8ec987fcc28cf932 = []byte{
// 200 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2f, 0x4f, 0xcc, 0x2e,
0x8d, 0x2f, 0x2e, 0x4f, 0x2c, 0xd0, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2a, 0x48, 0x52,
0x9a, 0xc5, 0xc8, 0xc5, 0xe6, 0x9c, 0x91, 0x5a, 0x58, 0x9a, 0x2a, 0xa4, 0xc2, 0xc5, 0x9b, 0x59,
0x5c, 0x5c, 0x9a, 0x5a, 0xe4, 0x98, 0x92, 0x52, 0x94, 0x5a, 0x5c, 0x2c, 0xc1, 0xa8, 0xc0, 0xa8,
0xc1, 0x19, 0x84, 0x2a, 0x28, 0xa4, 0xc0, 0xc5, 0x9d, 0x94, 0x9a, 0x97, 0x9a, 0x96, 0x99, 0x9c,
0x99, 0x58, 0x54, 0x29, 0xc1, 0xa4, 0xc0, 0xa8, 0xc1, 0x13, 0x84, 0x2c, 0x24, 0x24, 0xc4, 0xc5,
0x92, 0x92, 0x58, 0x92, 0x2a, 0xc1, 0xac, 0xc0, 0xa8, 0xc1, 0x1b, 0x04, 0x66, 0x0b, 0x89, 0x71,
0xb1, 0x25, 0xe6, 0xe6, 0x97, 0xe6, 0x95, 0x48, 0xb0, 0x80, 0x45, 0xa1, 0x3c, 0x21, 0x19, 0x2e,
0xce, 0xe2, 0xcc, 0xf4, 0xbc, 0xc4, 0x92, 0xd2, 0xa2, 0x54, 0x09, 0x56, 0xb0, 0x59, 0x08, 0x01,
0x25, 0x5d, 0x2e, 0x4e, 0x8f, 0xc4, 0xbc, 0x94, 0xe2, 0x8c, 0xc4, 0xec, 0x54, 0x74, 0x8b, 0x19,
0x31, 0x2c, 0x76, 0x12, 0x38, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4,
0x18, 0x67, 0x3c, 0x96, 0x63, 0x48, 0x62, 0x03, 0x7b, 0xd4, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff,
0xce, 0x34, 0x72, 0xd2, 0xfb, 0x00, 0x00, 0x00,
}
func (m *Cheque) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Cheque) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Cheque) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Signature) > 0 {
i -= len(m.Signature)
copy(dAtA[i:], m.Signature)
i = encodeVarintWakuSwap(dAtA, i, uint64(len(m.Signature)))
i--
dAtA[i] = 0x2a
}
if m.Amount != 0 {
i = encodeVarintWakuSwap(dAtA, i, uint64(m.Amount))
i--
dAtA[i] = 0x20
}
if m.Date != 0 {
i = encodeVarintWakuSwap(dAtA, i, uint64(m.Date))
i--
dAtA[i] = 0x18
}
if len(m.Beneficiary) > 0 {
i -= len(m.Beneficiary)
copy(dAtA[i:], m.Beneficiary)
i = encodeVarintWakuSwap(dAtA, i, uint64(len(m.Beneficiary)))
i--
dAtA[i] = 0x12
}
if len(m.IssuerAddress) > 0 {
i -= len(m.IssuerAddress)
copy(dAtA[i:], m.IssuerAddress)
i = encodeVarintWakuSwap(dAtA, i, uint64(len(m.IssuerAddress)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *Handshake) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Handshake) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Handshake) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Beneficiary) > 0 {
i -= len(m.Beneficiary)
copy(dAtA[i:], m.Beneficiary)
i = encodeVarintWakuSwap(dAtA, i, uint64(len(m.Beneficiary)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintWakuSwap(dAtA []byte, offset int, v uint64) int {
offset -= sovWakuSwap(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *Cheque) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.IssuerAddress)
if l > 0 {
n += 1 + l + sovWakuSwap(uint64(l))
}
l = len(m.Beneficiary)
if l > 0 {
n += 1 + l + sovWakuSwap(uint64(l))
}
if m.Date != 0 {
n += 1 + sovWakuSwap(uint64(m.Date))
}
if m.Amount != 0 {
n += 1 + sovWakuSwap(uint64(m.Amount))
}
l = len(m.Signature)
if l > 0 {
n += 1 + l + sovWakuSwap(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *Handshake) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Beneficiary)
if l > 0 {
n += 1 + l + sovWakuSwap(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovWakuSwap(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozWakuSwap(x uint64) (n int) {
return sovWakuSwap(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *Cheque) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuSwap
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Cheque: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Cheque: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field IssuerAddress", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuSwap
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthWakuSwap
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthWakuSwap
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.IssuerAddress = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Beneficiary", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuSwap
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthWakuSwap
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthWakuSwap
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Beneficiary = append(m.Beneficiary[:0], dAtA[iNdEx:postIndex]...)
if m.Beneficiary == nil {
m.Beneficiary = []byte{}
}
iNdEx = postIndex
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Date", wireType)
}
m.Date = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuSwap
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Date |= uint32(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 4:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Amount", wireType)
}
m.Amount = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuSwap
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Amount |= uint32(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Signature", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuSwap
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthWakuSwap
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthWakuSwap
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Signature = append(m.Signature[:0], dAtA[iNdEx:postIndex]...)
if m.Signature == nil {
m.Signature = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipWakuSwap(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthWakuSwap
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *Handshake) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuSwap
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Handshake: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Handshake: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Beneficiary", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuSwap
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthWakuSwap
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthWakuSwap
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Beneficiary = append(m.Beneficiary[:0], dAtA[iNdEx:postIndex]...)
if m.Beneficiary == nil {
m.Beneficiary = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipWakuSwap(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthWakuSwap
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipWakuSwap(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowWakuSwap
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowWakuSwap
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowWakuSwap
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthWakuSwap
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupWakuSwap
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthWakuSwap
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthWakuSwap = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowWakuSwap = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupWakuSwap = fmt.Errorf("proto: unexpected end of group")
)

View File

@ -0,0 +1,15 @@
syntax = "proto3";
package pb;
message Cheque {
string issuerAddress = 1;
bytes beneficiary = 2;
uint32 date = 3;
uint32 amount = 4;
bytes signature = 5;
}
message Handshake {
bytes beneficiary = 1;
}

View File

@ -16,6 +16,7 @@ type MessageQueue struct {
maxDuration time.Duration
quit chan struct{}
wg *sync.WaitGroup
}
func (self *MessageQueue) Push(msg IndexedWakuMessage) {
@ -73,6 +74,8 @@ func (self *MessageQueue) cleanOlderRecords() {
}
func (self *MessageQueue) checkForOlderRecords(d time.Duration) {
defer self.wg.Done()
ticker := time.NewTicker(d)
defer ticker.Stop()
@ -98,9 +101,11 @@ func NewMessageQueue(maxMessages int, maxDuration time.Duration) *MessageQueue {
maxDuration: maxDuration,
seen: make(map[[32]byte]struct{}),
quit: make(chan struct{}),
wg: &sync.WaitGroup{},
}
if maxDuration != 0 {
result.wg.Add(1)
go result.checkForOlderRecords(10 * time.Second) // is 10s okay?
}
@ -109,4 +114,5 @@ func NewMessageQueue(maxMessages int, maxDuration time.Duration) *MessageQueue {
func (self *MessageQueue) Stop() {
close(self.quit)
self.wg.Wait()
}

View File

@ -8,6 +8,7 @@ import (
"fmt"
"math"
"sort"
"sync"
"time"
logging "github.com/ipfs/go-log"
@ -227,6 +228,7 @@ type IndexedWakuMessage struct {
type WakuStore struct {
ctx context.Context
MsgC chan *protocol.Envelope
wg *sync.WaitGroup
started bool
@ -240,6 +242,7 @@ func NewWakuStore(host host.Host, p MessageProvider, maxNumberOfMessages int, ma
wakuStore := new(WakuStore)
wakuStore.msgProvider = p
wakuStore.h = host
wakuStore.wg = &sync.WaitGroup{}
wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages, maxRetentionDuration)
return wakuStore
}
@ -261,6 +264,7 @@ func (store *WakuStore) Start(ctx context.Context) {
store.h.SetStreamHandlerMatch(StoreID_v20beta3, protocol.PrefixTextMatch(string(StoreID_v20beta3)), store.onRequest)
store.wg.Add(1)
go store.storeIncomingMessages(ctx)
if store.msgProvider == nil {
@ -327,6 +331,7 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) {
}
func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
defer store.wg.Done()
for envelope := range store.MsgC {
store.storeMessage(envelope)
}
@ -338,7 +343,7 @@ func (store *WakuStore) onRequest(s network.Stream) {
historyRPCRequest := &pb.HistoryRPC{}
writer := protoio.NewDelimitedWriter(s)
reader := protoio.NewDelimitedReader(s, 64*1024)
reader := protoio.NewDelimitedReader(s, math.MaxInt32)
err := reader.ReadMsg(historyRPCRequest)
if err != nil {
@ -505,7 +510,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec
historyRequest := &pb.HistoryRPC{Query: q, RequestId: hex.EncodeToString(requestId)}
writer := protoio.NewDelimitedWriter(connOpt)
reader := protoio.NewDelimitedReader(connOpt, 64*1024)
reader := protoio.NewDelimitedReader(connOpt, math.MaxInt32)
err = writer.WriteMsg(historyRequest)
if err != nil {
@ -721,4 +726,6 @@ func (store *WakuStore) Stop() {
if store.h != nil {
store.h.RemoveStreamHandler(StoreID_v20beta3)
}
store.wg.Wait()
}

View File

@ -79,9 +79,10 @@ func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId str
waitCh := make(chan struct{})
pingCh := make(chan pingResult, 1000)
wg.Add(len(peers))
go func() {
for _, p := range peers {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)

2
vendor/modules.txt vendored
View File

@ -447,7 +447,7 @@ github.com/spacemonkeygo/spacelog
github.com/status-im/doubleratchet
# github.com/status-im/go-multiaddr-ethv4 v1.2.1
github.com/status-im/go-multiaddr-ethv4
# github.com/status-im/go-waku v0.0.0-20211121140431-79bb101787c5
# github.com/status-im/go-waku v0.0.0-20211125141833-23cbb24a94d6
github.com/status-im/go-waku/waku/persistence
github.com/status-im/go-waku/waku/v2
github.com/status-im/go-waku/waku/v2/discv5