chore: bump go-waku

This commit is contained in:
Richard Ramos 2022-02-24 09:45:14 -04:00
parent b559c561d8
commit 381c5f6116
20 changed files with 278 additions and 238 deletions

View File

@ -35,9 +35,9 @@ type MessagesRequest struct {
}
type StoreRequestCursor struct {
Digest []byte `json:"digest"`
ReceiverTime float64 `json:"receiverTime"`
SenderTime float64 `json:"senderTime"`
Digest []byte `json:"digest"`
ReceiverTime int64 `json:"receiverTime"`
SenderTime int64 `json:"senderTime"`
}
// SetDefaults sets the From and To defaults

2
go.mod
View File

@ -51,7 +51,7 @@ require (
github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect
github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a
github.com/status-im/doubleratchet v3.0.0+incompatible
github.com/status-im/go-waku v0.0.0-20220218174911-0db40c7de58b
github.com/status-im/go-waku v0.0.0-20220224134018-cdc0c9c69d18
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432
github.com/status-im/markdown v0.0.0-20210405121740-32e5a5055fb6
github.com/status-im/migrate/v4 v4.6.2-status.2

6
go.sum
View File

@ -203,6 +203,7 @@ github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d h1:bE1UyBQ5aE6FjhNY4lbPtMqh7VDldoVkvZMtFEbd+CE=
@ -1142,6 +1143,7 @@ github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a h1:yVNJFSzkEG8sm
github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a/go.mod h1:TPq+fcJOdGrkpZpXF4UVmFjYxH0gGqnxdgZ+OzAmvJk=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
@ -1212,8 +1214,8 @@ github.com/status-im/go-ethereum v1.10.4-status.3 h1:RF618iSCvqJtXu3ZSg7XNg6MJaS
github.com/status-im/go-ethereum v1.10.4-status.3/go.mod h1:GvIhpdCOgMHI6i5xVPEZOrv/qSMeOFHbZh77AoyZUoE=
github.com/status-im/go-multiaddr-ethv4 v1.2.1 h1:09v9n6426NAojNOvdgegqrAotgffWW/UPDwrpJ85DNE=
github.com/status-im/go-multiaddr-ethv4 v1.2.1/go.mod h1:SlBebvQcSUM5+/R/YfpfMuu5WyraW47XFmIqLYBmlKU=
github.com/status-im/go-waku v0.0.0-20220218174911-0db40c7de58b h1:g8DyQPMeWhHyxNVTyJmdP3Q4vf+WTZ5j7lMe9bPkrpE=
github.com/status-im/go-waku v0.0.0-20220218174911-0db40c7de58b/go.mod h1:dxqyVXn079tFLM50Q4olJGCiHcw52CJeLPesr+ruzQg=
github.com/status-im/go-waku v0.0.0-20220224134018-cdc0c9c69d18 h1:pykYGtdQZemOQaeqmOTYoERuG2CGRUjKrZhP6ThN37I=
github.com/status-im/go-waku v0.0.0-20220224134018-cdc0c9c69d18/go.mod h1:JVJzXmxDWPcSg2CAuVBw0WBMLSpyqtyvv/HnTkYky8U=
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 h1:cbNFU38iimo9fY4B7CdF/fvIF6tNPJIZjBbpfmW2EY4=
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432/go.mod h1:A8t3i0CUGtXCA0aiLsP7iyikmk/KaD/2XVvNJqGCU20=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=

View File

@ -28,7 +28,7 @@ type DBStore struct {
type StoredMessage struct {
ID []byte
PubsubTopic string
ReceiverTime float64
ReceiverTime int64
Message *pb.WakuMessage
}
@ -93,8 +93,8 @@ func NewDBStore(log *zap.SugaredLogger, options ...DBOption) (*DBStore, error) {
func (d *DBStore) createTable() error {
sqlStmt := `CREATE TABLE IF NOT EXISTS message (
id BLOB PRIMARY KEY,
receiverTimestamp REAL NOT NULL,
senderTimestamp REAL NOT NULL,
receiverTimestamp INTEGER NOT NULL,
senderTimestamp INTEGER NOT NULL,
contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL,
payload BLOB,
@ -161,8 +161,8 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) {
for rows.Next() {
var id []byte
var receiverTimestamp float64
var senderTimestamp float64
var receiverTimestamp int64
var senderTimestamp int64
var contentTopic string
var payload []byte
var version uint32

View File

@ -7,10 +7,17 @@ import (
// Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120
// by Dustin Sallings (c) 2013, which was released under MIT license
type doneCh chan struct{}
type chOperation struct {
ch chan<- *protocol.Envelope
done doneCh
}
type broadcaster struct {
input chan *protocol.Envelope
reg chan chan<- *protocol.Envelope
unreg chan chan<- *protocol.Envelope
reg chan chOperation
unreg chan chOperation
outputs map[chan<- *protocol.Envelope]bool
}
@ -20,8 +27,12 @@ type broadcaster struct {
type Broadcaster interface {
// Register a new channel to receive broadcasts
Register(chan<- *protocol.Envelope)
// Register a new channel to receive broadcasts and return a channel to wait until this operation is complete
WaitRegister(newch chan<- *protocol.Envelope) doneCh
// Unregister a channel so that it no longer receives broadcasts.
Unregister(chan<- *protocol.Envelope)
// Unregister a subscriptor channel and return a channel to wait until this operation is done
WaitUnregister(newch chan<- *protocol.Envelope) doneCh
// Shut this broadcaster down.
Close()
// Submit a new object to all subscribers
@ -39,14 +50,23 @@ func (b *broadcaster) run() {
select {
case m := <-b.input:
b.broadcast(m)
case ch, ok := <-b.reg:
case broadcastee, ok := <-b.reg:
if ok {
b.outputs[ch] = true
b.outputs[broadcastee.ch] = true
if broadcastee.done != nil {
broadcastee.done <- struct{}{}
}
} else {
if broadcastee.done != nil {
broadcastee.done <- struct{}{}
}
return
}
case ch := <-b.unreg:
delete(b.outputs, ch)
case broadcastee := <-b.unreg:
delete(b.outputs, broadcastee.ch)
if broadcastee.done != nil {
broadcastee.done <- struct{}{}
}
}
}
}
@ -57,8 +77,8 @@ func (b *broadcaster) run() {
func NewBroadcaster(buflen int) Broadcaster {
b := &broadcaster{
input: make(chan *protocol.Envelope, buflen),
reg: make(chan chan<- *protocol.Envelope),
unreg: make(chan chan<- *protocol.Envelope),
reg: make(chan chOperation),
unreg: make(chan chOperation),
outputs: make(map[chan<- *protocol.Envelope]bool),
}
@ -67,14 +87,40 @@ func NewBroadcaster(buflen int) Broadcaster {
return b
}
// Register a subscriptor channel and return a channel to wait until this operation is done
func (b *broadcaster) WaitRegister(newch chan<- *protocol.Envelope) doneCh {
d := make(doneCh)
b.reg <- chOperation{
ch: newch,
done: d,
}
return d
}
// Register a subscriptor channel
func (b *broadcaster) Register(newch chan<- *protocol.Envelope) {
b.reg <- newch
b.reg <- chOperation{
ch: newch,
done: nil,
}
}
// Unregister a subscriptor channel and return a channel to wait until this operation is done
func (b *broadcaster) WaitUnregister(newch chan<- *protocol.Envelope) doneCh {
d := make(doneCh)
b.unreg <- chOperation{
ch: newch,
done: d,
}
return d
}
// Unregister a subscriptor channel
func (b *broadcaster) Unregister(newch chan<- *protocol.Envelope) {
b.unreg <- newch
b.unreg <- chOperation{
ch: newch,
done: nil,
}
}
// Closes the broadcaster. Used to stop receiving new subscribers

View File

@ -120,7 +120,7 @@ func (w *WakuNode) Status() (isOnline bool, hasHistory bool) {
if !hasLightPush && protocol == string(lightpush.LightPushID_v20beta1) {
hasLightPush = true
}
if !hasStore && protocol == string(store.StoreID_v20beta3) {
if !hasStore && protocol == string(store.StoreID_v20beta4) {
hasStore = true
}
if !hasFilter && protocol == string(filter.FilterID_v20beta1) {

View File

@ -486,7 +486,7 @@ func (w *WakuNode) startStore() {
case <-w.quit:
return
case <-ticker.C:
_, err := utils.SelectPeer(w.host, string(store.StoreID_v20beta3), w.log)
_, err := utils.SelectPeer(w.host, string(store.StoreID_v20beta4), w.log)
if err == nil {
break peerVerif
}

View File

@ -4,7 +4,6 @@
package pb
import (
encoding_binary "encoding/binary"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
io "io"
@ -27,7 +26,7 @@ type WakuMessage struct {
Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"`
ContentTopic string `protobuf:"bytes,2,opt,name=contentTopic,proto3" json:"contentTopic,omitempty"`
Version uint32 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"`
Timestamp float64 `protobuf:"fixed64,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
Timestamp int64 `protobuf:"zigzag64,10,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
Proof []byte `protobuf:"bytes,21,opt,name=proof,proto3" json:"proof,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
@ -88,7 +87,7 @@ func (m *WakuMessage) GetVersion() uint32 {
return 0
}
func (m *WakuMessage) GetTimestamp() float64 {
func (m *WakuMessage) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
@ -117,11 +116,11 @@ var fileDescriptor_6f0a20862b3bf714 = []byte{
0x82, 0x71, 0x85, 0x94, 0xb8, 0x78, 0x92, 0xf3, 0xf3, 0x4a, 0x52, 0xf3, 0x4a, 0x42, 0xf2, 0x0b,
0x32, 0x93, 0x25, 0x98, 0x14, 0x18, 0x35, 0x38, 0x83, 0x50, 0xc4, 0x40, 0xba, 0xcb, 0x52, 0x8b,
0x8a, 0x33, 0xf3, 0xf3, 0x24, 0x98, 0x15, 0x18, 0x35, 0x78, 0x83, 0x60, 0x5c, 0x21, 0x19, 0x2e,
0xce, 0x92, 0xcc, 0xdc, 0xd4, 0xe2, 0x92, 0xc4, 0xdc, 0x02, 0x09, 0x16, 0x05, 0x46, 0x0d, 0xc6,
0xce, 0x92, 0xcc, 0xdc, 0xd4, 0xe2, 0x92, 0xc4, 0xdc, 0x02, 0x09, 0x2e, 0x05, 0x46, 0x0d, 0xa1,
0x20, 0x84, 0x80, 0x90, 0x08, 0x17, 0x6b, 0x41, 0x51, 0x7e, 0x7e, 0x9a, 0x84, 0x28, 0xd8, 0x4e,
0x08, 0xc7, 0x49, 0xe0, 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63,
0x9c, 0xf1, 0x58, 0x8e, 0x21, 0x89, 0x0d, 0xec, 0x70, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff,
0x0a, 0xc6, 0x91, 0x28, 0xce, 0x00, 0x00, 0x00,
0x33, 0x1b, 0x64, 0x81, 0xce, 0x00, 0x00, 0x00,
}
func (m *WakuMessage) Marshal() (dAtA []byte, err error) {
@ -158,10 +157,9 @@ func (m *WakuMessage) MarshalToSizedBuffer(dAtA []byte) (int, error) {
dAtA[i] = 0xaa
}
if m.Timestamp != 0 {
i -= 8
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Timestamp))))
i = encodeVarintWakuMessage(dAtA, i, uint64((uint64(m.Timestamp)<<1)^uint64((m.Timestamp>>63))))
i--
dAtA[i] = 0x21
dAtA[i] = 0x50
}
if m.Version != 0 {
i = encodeVarintWakuMessage(dAtA, i, uint64(m.Version))
@ -214,7 +212,7 @@ func (m *WakuMessage) Size() (n int) {
n += 1 + sovWakuMessage(uint64(m.Version))
}
if m.Timestamp != 0 {
n += 9
n += 1 + sozWakuMessage(uint64(m.Timestamp))
}
l = len(m.Proof)
if l > 0 {
@ -346,17 +344,27 @@ func (m *WakuMessage) Unmarshal(dAtA []byte) error {
break
}
}
case 4:
if wireType != 1 {
case 10:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType)
}
var v uint64
if (iNdEx + 8) > l {
return io.ErrUnexpectedEOF
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:]))
iNdEx += 8
m.Timestamp = float64(math.Float64frombits(v))
v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63)
m.Timestamp = int64(v)
case 21:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Proof", wireType)

View File

@ -6,6 +6,6 @@ message WakuMessage {
bytes payload = 1;
string contentTopic = 2;
uint32 version = 3;
double timestamp = 4;
sint64 timestamp = 10;
bytes proof = 21;
}

View File

@ -4,7 +4,6 @@
package pb
import (
encoding_binary "encoding/binary"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
io "io"
@ -75,8 +74,8 @@ func (HistoryResponse_Error) EnumDescriptor() ([]byte, []int) {
type Index struct {
Digest []byte `protobuf:"bytes,1,opt,name=digest,proto3" json:"digest,omitempty"`
ReceiverTime float64 `protobuf:"fixed64,2,opt,name=receiverTime,proto3" json:"receiverTime,omitempty"`
SenderTime float64 `protobuf:"fixed64,3,opt,name=senderTime,proto3" json:"senderTime,omitempty"`
ReceiverTime int64 `protobuf:"zigzag64,2,opt,name=receiverTime,proto3" json:"receiverTime,omitempty"`
SenderTime int64 `protobuf:"zigzag64,3,opt,name=senderTime,proto3" json:"senderTime,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -122,14 +121,14 @@ func (m *Index) GetDigest() []byte {
return nil
}
func (m *Index) GetReceiverTime() float64 {
func (m *Index) GetReceiverTime() int64 {
if m != nil {
return m.ReceiverTime
}
return 0
}
func (m *Index) GetSenderTime() float64 {
func (m *Index) GetSenderTime() int64 {
if m != nil {
return m.SenderTime
}
@ -250,8 +249,8 @@ type HistoryQuery struct {
PubsubTopic string `protobuf:"bytes,2,opt,name=pubsubTopic,proto3" json:"pubsubTopic,omitempty"`
ContentFilters []*ContentFilter `protobuf:"bytes,3,rep,name=contentFilters,proto3" json:"contentFilters,omitempty"`
PagingInfo *PagingInfo `protobuf:"bytes,4,opt,name=pagingInfo,proto3" json:"pagingInfo,omitempty"`
StartTime float64 `protobuf:"fixed64,5,opt,name=startTime,proto3" json:"startTime,omitempty"`
EndTime float64 `protobuf:"fixed64,6,opt,name=endTime,proto3" json:"endTime,omitempty"`
StartTime int64 `protobuf:"zigzag64,5,opt,name=startTime,proto3" json:"startTime,omitempty"`
EndTime int64 `protobuf:"zigzag64,6,opt,name=endTime,proto3" json:"endTime,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -311,14 +310,14 @@ func (m *HistoryQuery) GetPagingInfo() *PagingInfo {
return nil
}
func (m *HistoryQuery) GetStartTime() float64 {
func (m *HistoryQuery) GetStartTime() int64 {
if m != nil {
return m.StartTime
}
return 0
}
func (m *HistoryQuery) GetEndTime() float64 {
func (m *HistoryQuery) GetEndTime() int64 {
if m != nil {
return m.EndTime
}
@ -476,31 +475,31 @@ var fileDescriptor_ca6891f77a46e680 = []byte{
0x25, 0x32, 0x4e, 0x63, 0x2c, 0x24, 0x93, 0x7d, 0xd4, 0xea, 0x35, 0x29, 0xe5, 0x4f, 0x8d, 0xee,
0x06, 0x50, 0x1e, 0x44, 0x21, 0x7d, 0xc4, 0xbb, 0x50, 0x09, 0xc5, 0x94, 0x54, 0xea, 0xb0, 0x16,
0x6b, 0xd7, 0xb9, 0xa9, 0xd0, 0x85, 0xba, 0xa4, 0x80, 0xc4, 0x7b, 0x92, 0x17, 0xe2, 0x9a, 0x9c,
0x42, 0x8b, 0xb5, 0x19, 0xdf, 0xd0, 0xb0, 0x09, 0xa0, 0x28, 0x0a, 0x0d, 0x51, 0xd4, 0xc4, 0x9a,
0xe2, 0x7e, 0x65, 0x00, 0x67, 0xfe, 0x54, 0x44, 0xd3, 0x41, 0xf4, 0x36, 0xc6, 0x7d, 0xa8, 0x26,
0xfe, 0x94, 0xce, 0xc5, 0x27, 0xd2, 0x66, 0x25, 0xbe, 0xac, 0xf1, 0x21, 0x54, 0x82, 0x4c, 0xaa,
0x58, 0x6a, 0xa3, 0x5a, 0xc7, 0xf2, 0x92, 0x89, 0xa7, 0x13, 0x72, 0xd3, 0xc0, 0x27, 0x60, 0x85,
0x42, 0x52, 0x90, 0x8a, 0x38, 0xd2, 0x66, 0x8d, 0x8e, 0x93, 0x53, 0x2b, 0x07, 0xaf, 0xbf, 0xe8,
0xf3, 0x15, 0xea, 0x1e, 0x80, 0xb5, 0xd4, 0xb1, 0x0e, 0xd5, 0xe3, 0x6e, 0xef, 0xf9, 0x65, 0x97,
0xf7, 0xed, 0x1d, 0xac, 0xc1, 0xee, 0xe9, 0x88, 0xeb, 0x82, 0xb9, 0x47, 0xf0, 0x5f, 0x2f, 0x8e,
0x52, 0x8a, 0xd2, 0x53, 0xf1, 0x2e, 0x25, 0x99, 0xaf, 0x20, 0x98, 0x0b, 0x17, 0x71, 0x22, 0x02,
0x9d, 0xd9, 0xe2, 0x1b, 0x9a, 0xfb, 0x93, 0x41, 0xfd, 0x99, 0xc8, 0x37, 0x7e, 0xfb, 0x2a, 0x23,
0x79, 0x8b, 0x2d, 0xa8, 0x25, 0xd9, 0x44, 0x65, 0x93, 0xf9, 0x37, 0x05, 0xfd, 0xcd, 0xba, 0x84,
0x4f, 0xa1, 0x11, 0xac, 0xfb, 0x28, 0xa7, 0xd8, 0x2a, 0xb6, 0x6b, 0x9d, 0xff, 0xf3, 0x61, 0x36,
0x12, 0xf0, 0x2d, 0x10, 0x3d, 0x80, 0x64, 0x39, 0xad, 0x53, 0xd2, 0x9b, 0x6a, 0x6c, 0xee, 0x80,
0xaf, 0x11, 0x78, 0x1f, 0x2c, 0x95, 0xfa, 0x32, 0xd5, 0xf7, 0x29, 0xeb, 0xfb, 0xac, 0x04, 0x74,
0x60, 0x97, 0xa2, 0x50, 0xf7, 0x2a, 0xba, 0xb7, 0x28, 0xdd, 0x6f, 0x0c, 0xf6, 0xcc, 0x54, 0x9c,
0x54, 0x12, 0x47, 0x8a, 0xf0, 0x31, 0x54, 0xcd, 0x13, 0x52, 0x4e, 0x41, 0x07, 0xde, 0xcb, 0x9d,
0x2f, 0xfd, 0xab, 0xec, 0xe5, 0x5c, 0xe7, 0x4b, 0x60, 0x2b, 0x68, 0xf1, 0x9f, 0x41, 0x0f, 0xa1,
0x4c, 0x52, 0xc6, 0x52, 0xcf, 0xd4, 0xe8, 0xdc, 0xcb, 0xd1, 0xad, 0x00, 0xde, 0x49, 0x0e, 0xf0,
0x39, 0xe7, 0x3e, 0x82, 0xb2, 0xae, 0xb1, 0x0a, 0xa5, 0xe1, 0x68, 0x78, 0x62, 0xef, 0x20, 0x42,
0x63, 0x30, 0x7c, 0xd3, 0x7d, 0x31, 0xe8, 0x8f, 0x7b, 0xaf, 0xf9, 0xf9, 0x88, 0xdb, 0xcc, 0xfd,
0xcc, 0x00, 0x16, 0xbf, 0x73, 0xd6, 0xc3, 0x07, 0x00, 0x92, 0x6e, 0x32, 0x52, 0xe9, 0x58, 0x84,
0xe6, 0x9e, 0x96, 0x51, 0x06, 0x21, 0x1e, 0x40, 0xf9, 0x26, 0x3f, 0xa2, 0x79, 0x83, 0xf6, 0x5a,
0x0a, 0x7d, 0x5c, 0x3e, 0x6f, 0xe3, 0x21, 0x54, 0xa5, 0x49, 0x65, 0x66, 0xbb, 0xf3, 0x97, 0xc0,
0x7c, 0x09, 0x1d, 0xdb, 0xdf, 0x67, 0x4d, 0xf6, 0x63, 0xd6, 0x64, 0xbf, 0x66, 0x4d, 0xf6, 0xe5,
0x77, 0x73, 0x67, 0x52, 0xd1, 0x7f, 0xc3, 0xa3, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xb7, 0x5c,
0xa1, 0x0e, 0xb2, 0x03, 0x00, 0x00,
0x42, 0x8b, 0xb5, 0x91, 0x6f, 0x68, 0xd8, 0x04, 0x50, 0x14, 0x85, 0x86, 0x28, 0x6a, 0x62, 0x4d,
0x71, 0xbf, 0x32, 0x80, 0x33, 0x7f, 0x2a, 0xa2, 0xe9, 0x20, 0x7a, 0x1b, 0xe3, 0x3e, 0x54, 0x13,
0x7f, 0x4a, 0xe7, 0xe2, 0x13, 0x69, 0xb3, 0x12, 0x5f, 0xd6, 0xf8, 0x10, 0x2a, 0x41, 0x26, 0x55,
0x2c, 0xb5, 0x51, 0xad, 0x63, 0x79, 0xc9, 0xc4, 0xd3, 0x09, 0xb9, 0x69, 0xe0, 0x13, 0xb0, 0x42,
0x21, 0x29, 0x48, 0x45, 0x1c, 0x69, 0xb3, 0x46, 0xc7, 0xc9, 0xa9, 0x95, 0x83, 0xd7, 0x5f, 0xf4,
0xf9, 0x0a, 0x75, 0x0f, 0xc0, 0x5a, 0xea, 0x58, 0x87, 0xea, 0x71, 0xb7, 0xf7, 0xfc, 0xb2, 0xcb,
0xfb, 0xf6, 0x0e, 0xd6, 0x60, 0xf7, 0x74, 0xc4, 0x75, 0xc1, 0xdc, 0x23, 0xf8, 0xaf, 0x17, 0x47,
0x29, 0x45, 0xe9, 0xa9, 0x78, 0x97, 0x92, 0xcc, 0x57, 0x10, 0xcc, 0x85, 0x8b, 0x38, 0x11, 0x81,
0xce, 0x6c, 0xf1, 0x0d, 0xcd, 0xfd, 0xc9, 0xa0, 0xfe, 0x4c, 0xe4, 0x1b, 0xbf, 0x7d, 0x95, 0x91,
0xbc, 0xc5, 0x16, 0xd4, 0x92, 0x6c, 0xa2, 0xb2, 0xc9, 0xfc, 0x9b, 0x82, 0xfe, 0x66, 0x5d, 0xc2,
0xa7, 0xd0, 0x08, 0xd6, 0x7d, 0x94, 0x53, 0x6c, 0x15, 0xdb, 0xb5, 0xce, 0xff, 0xf9, 0x30, 0x1b,
0x09, 0xf8, 0x16, 0x88, 0x1e, 0x40, 0xb2, 0x9c, 0xd6, 0x29, 0xe9, 0x4d, 0x35, 0x36, 0x77, 0xc0,
0xd7, 0x08, 0xbc, 0x0f, 0x96, 0x4a, 0x7d, 0x99, 0xea, 0xfb, 0x94, 0xf5, 0x7d, 0x56, 0x02, 0x3a,
0xb0, 0x4b, 0x51, 0xa8, 0x7b, 0x15, 0xdd, 0x5b, 0x94, 0xee, 0x37, 0x06, 0x7b, 0x66, 0x2a, 0x4e,
0x2a, 0x89, 0x23, 0x45, 0xf8, 0x18, 0xaa, 0xe6, 0x09, 0x29, 0xa7, 0xa0, 0x03, 0xef, 0xe5, 0xce,
0x97, 0xfe, 0x55, 0xf6, 0x72, 0xae, 0xf3, 0x25, 0xb0, 0x15, 0xb4, 0xf8, 0xcf, 0xa0, 0x87, 0x50,
0x26, 0x29, 0x63, 0xa9, 0x67, 0x6a, 0x74, 0xee, 0xe5, 0xe8, 0x56, 0x00, 0xef, 0x24, 0x07, 0xf8,
0x9c, 0x73, 0x1f, 0x41, 0x59, 0xd7, 0x58, 0x85, 0xd2, 0x70, 0x34, 0x3c, 0xb1, 0x77, 0x10, 0xa1,
0x31, 0x18, 0xbe, 0xe9, 0xbe, 0x18, 0xf4, 0xc7, 0xbd, 0xd7, 0xfc, 0x7c, 0xc4, 0x6d, 0xe6, 0x7e,
0x66, 0x00, 0x8b, 0xdf, 0x39, 0xeb, 0xe1, 0x03, 0x00, 0x49, 0x37, 0x19, 0xa9, 0x74, 0x2c, 0x42,
0x73, 0x4f, 0xcb, 0x28, 0x83, 0x10, 0x0f, 0xa0, 0x7c, 0x93, 0x1f, 0xd1, 0xbc, 0x41, 0x7b, 0x2d,
0x85, 0x3e, 0x2e, 0x9f, 0xb7, 0xf1, 0x10, 0xaa, 0xd2, 0xa4, 0x32, 0xb3, 0xdd, 0xf9, 0x4b, 0x60,
0xbe, 0x84, 0x8e, 0xed, 0xef, 0xb3, 0x26, 0xfb, 0x31, 0x6b, 0xb2, 0x5f, 0xb3, 0x26, 0xfb, 0xf2,
0xbb, 0xb9, 0x33, 0xa9, 0xe8, 0xbf, 0xe1, 0xd1, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x14, 0x8f,
0x84, 0xdf, 0xb2, 0x03, 0x00, 0x00,
}
func (m *Index) Marshal() (dAtA []byte, err error) {
@ -528,16 +527,14 @@ func (m *Index) MarshalToSizedBuffer(dAtA []byte) (int, error) {
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.SenderTime != 0 {
i -= 8
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.SenderTime))))
i = encodeVarintWakuStore(dAtA, i, uint64((uint64(m.SenderTime)<<1)^uint64((m.SenderTime>>63))))
i--
dAtA[i] = 0x19
dAtA[i] = 0x18
}
if m.ReceiverTime != 0 {
i -= 8
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.ReceiverTime))))
i = encodeVarintWakuStore(dAtA, i, uint64((uint64(m.ReceiverTime)<<1)^uint64((m.ReceiverTime>>63))))
i--
dAtA[i] = 0x11
dAtA[i] = 0x10
}
if len(m.Digest) > 0 {
i -= len(m.Digest)
@ -657,16 +654,14 @@ func (m *HistoryQuery) MarshalToSizedBuffer(dAtA []byte) (int, error) {
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.EndTime != 0 {
i -= 8
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.EndTime))))
i = encodeVarintWakuStore(dAtA, i, uint64((uint64(m.EndTime)<<1)^uint64((m.EndTime>>63))))
i--
dAtA[i] = 0x31
dAtA[i] = 0x30
}
if m.StartTime != 0 {
i -= 8
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.StartTime))))
i = encodeVarintWakuStore(dAtA, i, uint64((uint64(m.StartTime)<<1)^uint64((m.StartTime>>63))))
i--
dAtA[i] = 0x29
dAtA[i] = 0x28
}
if m.PagingInfo != nil {
{
@ -842,10 +837,10 @@ func (m *Index) Size() (n int) {
n += 1 + l + sovWakuStore(uint64(l))
}
if m.ReceiverTime != 0 {
n += 9
n += 1 + sozWakuStore(uint64(m.ReceiverTime))
}
if m.SenderTime != 0 {
n += 9
n += 1 + sozWakuStore(uint64(m.SenderTime))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
@ -912,10 +907,10 @@ func (m *HistoryQuery) Size() (n int) {
n += 1 + l + sovWakuStore(uint64(l))
}
if m.StartTime != 0 {
n += 9
n += 1 + sozWakuStore(uint64(m.StartTime))
}
if m.EndTime != 0 {
n += 9
n += 1 + sozWakuStore(uint64(m.EndTime))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
@ -1042,27 +1037,47 @@ func (m *Index) Unmarshal(dAtA []byte) error {
}
iNdEx = postIndex
case 2:
if wireType != 1 {
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field ReceiverTime", wireType)
}
var v uint64
if (iNdEx + 8) > l {
return io.ErrUnexpectedEOF
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuStore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:]))
iNdEx += 8
m.ReceiverTime = float64(math.Float64frombits(v))
v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63)
m.ReceiverTime = int64(v)
case 3:
if wireType != 1 {
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field SenderTime", wireType)
}
var v uint64
if (iNdEx + 8) > l {
return io.ErrUnexpectedEOF
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuStore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:]))
iNdEx += 8
m.SenderTime = float64(math.Float64frombits(v))
v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63)
m.SenderTime = int64(v)
default:
iNdEx = preIndex
skippy, err := skipWakuStore(dAtA[iNdEx:])
@ -1425,27 +1440,47 @@ func (m *HistoryQuery) Unmarshal(dAtA []byte) error {
}
iNdEx = postIndex
case 5:
if wireType != 1 {
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field StartTime", wireType)
}
var v uint64
if (iNdEx + 8) > l {
return io.ErrUnexpectedEOF
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuStore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:]))
iNdEx += 8
m.StartTime = float64(math.Float64frombits(v))
v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63)
m.StartTime = int64(v)
case 6:
if wireType != 1 {
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field EndTime", wireType)
}
var v uint64
if (iNdEx + 8) > l {
return io.ErrUnexpectedEOF
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuStore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:]))
iNdEx += 8
m.EndTime = float64(math.Float64frombits(v))
v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63)
m.EndTime = int64(v)
default:
iNdEx = preIndex
skippy, err := skipWakuStore(dAtA[iNdEx:])

View File

@ -6,8 +6,8 @@ import "waku_message.proto";
message Index {
bytes digest = 1;
double receiverTime = 2;
double senderTime = 3;
sint64 receiverTime = 2;
sint64 senderTime = 3;
}
message PagingInfo {
@ -28,8 +28,8 @@ message HistoryQuery {
string pubsubTopic = 2;
repeated ContentFilter contentFilters = 3;
PagingInfo pagingInfo = 4; // used for pagination
double startTime = 5;
double endTime = 6;
sint64 startTime = 5;
sint64 endTime = 6;
}
message HistoryResponse {

View File

@ -8,6 +8,8 @@ import (
// Subscription handles the subscrition to a particular pubsub topic
type Subscription struct {
sync.RWMutex
// C is channel used for receiving envelopes
C chan *protocol.Envelope
@ -19,14 +21,14 @@ type Subscription struct {
// Unsubscribe will close a subscription from a pubsub topic. Will close the message channel
func (subs *Subscription) Unsubscribe() {
subs.once.Do(func() {
subs.closed = true
close(subs.quit)
close(subs.C)
})
}
// IsClosed determine whether a Subscription is still open for receiving messages
func (subs *Subscription) IsClosed() bool {
subs.RLock()
defer subs.RUnlock()
return subs.closed
}

View File

@ -300,9 +300,20 @@ func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub *
for {
select {
case <-subscription.quit:
if w.bcaster != nil {
w.bcaster.Unregister(subscription.C) // Remove from broadcast list
}
func() {
subscription.Lock()
defer subscription.Unlock()
if subscription.closed {
return
}
subscription.closed = true
if w.bcaster != nil {
<-w.bcaster.WaitUnregister(subscription.C) // Remove from broadcast list
}
close(subscription.C)
}()
// TODO: if there are no more relay subscriptions, close the pubsub subscription
case msg := <-subChannel:
if msg == nil {

View File

@ -28,6 +28,7 @@ func (self *MessageQueue) Push(msg IndexedWakuMessage) error {
var k [32]byte
copy(k[:], msg.index.Digest)
if _, ok := self.seen[k]; ok {
return ErrDuplicatedMessage
}

View File

@ -26,8 +26,8 @@ import (
"github.com/status-im/go-waku/waku/v2/utils"
)
// StoreID_v20beta3 is the current Waku Store protocol identifier
const StoreID_v20beta3 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta3")
// StoreID_v20beta4 is the current Waku Store protocol identifier
const StoreID_v20beta4 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta4")
// MaxPageSize is the maximum number of waku messages to return per page
const MaxPageSize = 100
@ -193,8 +193,8 @@ type MessageProvider interface {
type Query struct {
Topic string
ContentTopics []string
StartTime float64
EndTime float64
StartTime int64
EndTime int64
}
// Result represents a valid response from a store node
@ -266,7 +266,7 @@ func (store *WakuStore) Start(ctx context.Context) {
store.ctx = ctx
store.MsgC = make(chan *protocol.Envelope, 1024)
store.h.SetStreamHandlerMatch(StoreID_v20beta3, protocol.PrefixTextMatch(string(StoreID_v20beta3)), store.onRequest)
store.h.SetStreamHandlerMatch(StoreID_v20beta4, protocol.PrefixTextMatch(string(StoreID_v20beta4)), store.onRequest)
store.wg.Add(1)
go store.storeIncomingMessages(ctx)
@ -296,7 +296,7 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) {
for _, storedMessage := range storedMessages {
idx := &pb.Index{
Digest: storedMessage.ID,
ReceiverTime: float64(storedMessage.ReceiverTime),
ReceiverTime: storedMessage.ReceiverTime,
}
_ = store.addToMessageQueue(storedMessage.PubsubTopic, idx, storedMessage.Message)
@ -309,21 +309,21 @@ func (store *WakuStore) addToMessageQueue(pubsubTopic string, idx *pb.Index, msg
return store.messageQueue.Push(IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic})
}
func (store *WakuStore) storeMessage(env *protocol.Envelope) {
func (store *WakuStore) storeMessage(env *protocol.Envelope) error {
index, err := computeIndex(env)
if err != nil {
store.log.Error("could not calculate message index", err)
return
return err
}
err = store.addToMessageQueue(env.PubsubTopic(), index, env.Message())
if err == ErrDuplicatedMessage {
return
return err
}
if store.msgProvider == nil {
metrics.RecordMessage(store.ctx, "stored", store.messageQueue.Length())
return
return err
}
// TODO: Move this to a separate go routine if DB writes becomes a bottleneck
@ -331,16 +331,17 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) {
if err != nil {
store.log.Error("could not store message", err)
metrics.RecordStoreError(store.ctx, "store_failure")
return
return err
}
metrics.RecordMessage(store.ctx, "stored", store.messageQueue.Length())
return nil
}
func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
defer store.wg.Done()
for envelope := range store.MsgC {
store.storeMessage(envelope)
_ = store.storeMessage(envelope)
}
}
@ -445,7 +446,7 @@ func WithPeer(p peer.ID) HistoryRequestOption {
// to request the message history
func WithAutomaticPeerSelection() HistoryRequestOption {
return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta3), params.s.log)
p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta4), params.s.log)
if err == nil {
params.selectedPeer = *p
} else {
@ -456,7 +457,7 @@ func WithAutomaticPeerSelection() HistoryRequestOption {
func WithFastestPeerSelection(ctx context.Context) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta3), params.s.log)
p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta4), params.s.log)
if err == nil {
params.selectedPeer = *p
} else {
@ -503,7 +504,7 @@ func DefaultOptions() []HistoryRequestOption {
func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) {
store.log.Info(fmt.Sprintf("Querying message history with peer %s", selectedPeer))
connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta3)
connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4)
if err != nil {
store.log.Error("Failed to connect to remote peer", err)
return nil, err
@ -650,6 +651,7 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c
result, err := store.queryFrom(ctx, query, peer, protocol.GenerateRequestId())
if err == nil {
resultChan <- result
return
}
store.log.Error(fmt.Errorf("resume history with peer %s failed: %w", peer, err))
}()
@ -672,8 +674,8 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c
return nil, ErrFailedQuery
}
func (store *WakuStore) findLastSeen() float64 {
var lastSeenTime float64 = 0
func (store *WakuStore) findLastSeen() int64 {
var lastSeenTime int64 = 0
for imsg := range store.messageQueue.Messages() {
if imsg.msg.Timestamp > lastSeenTime {
lastSeenTime = imsg.msg.Timestamp
@ -682,6 +684,13 @@ func (store *WakuStore) findLastSeen() float64 {
return lastSeenTime
}
func max(x, y int64) int64 {
if x > y {
return x
}
return y
}
// Resume retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
// messages are stored in the store node's messages field and in the message db
// the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
@ -698,9 +707,9 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
currentTime := utils.GetUnixEpoch()
lastSeenTime := store.findLastSeen()
var offset float64 = 200000
var offset int64 = int64(20 * time.Nanosecond)
currentTime = currentTime + offset
lastSeenTime = math.Max(lastSeenTime-offset, 0)
lastSeenTime = max(lastSeenTime-offset, 0)
rpc := &pb.HistoryQuery{
PubsubTopic: pubsubTopic,
@ -713,7 +722,7 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
}
if len(peerList) == 0 {
p, err := utils.SelectPeer(store.h, string(StoreID_v20beta3), store.log)
p, err := utils.SelectPeer(store.h, string(StoreID_v20beta4), store.log)
if err != nil {
store.log.Info("Error selecting peer: ", err)
return -1, ErrNoPeersAvailable
@ -728,13 +737,16 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
return -1, ErrFailedToResumeHistory
}
msgCount := 0
for _, msg := range messages {
store.storeMessage(protocol.NewEnvelope(msg, pubsubTopic))
if err = store.storeMessage(protocol.NewEnvelope(msg, pubsubTopic)); err == nil {
msgCount++
}
}
store.log.Info("Retrieved messages since the last online time: ", len(messages))
return len(messages), nil
return msgCount, nil
}
// TODO: queryWithAccounting
@ -748,7 +760,7 @@ func (store *WakuStore) Stop() {
}
if store.h != nil {
store.h.RemoveStreamHandler(StoreID_v20beta3)
store.h.RemoveStreamHandler(StoreID_v20beta4)
}
store.wg.Wait()

View File

@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
@ -125,7 +126,8 @@ func GetENRandIP(addr ma.Multiaddr, wakuFlags WakuEnrBitfield, privK *ecdsa.Priv
}
func EnodeToMultiAddr(node *enode.Node) (ma.Multiaddr, error) {
peerID, err := peer.IDFromPublicKey(&ECDSAPublicKey{node.Pubkey()})
pubKey := (*crypto.Secp256k1PublicKey)(node.Pubkey())
peerID, err := peer.IDFromPublicKey(pubKey)
if err != nil {
return nil, err
}
@ -134,7 +136,8 @@ func EnodeToMultiAddr(node *enode.Node) (ma.Multiaddr, error) {
}
func Multiaddress(node *enode.Node) ([]ma.Multiaddr, error) {
peerID, err := peer.IDFromPublicKey(&ECDSAPublicKey{node.Pubkey()})
pubKey := (*crypto.Secp256k1PublicKey)(node.Pubkey())
peerID, err := peer.IDFromPublicKey(pubKey)
if err != nil {
return nil, err
}

View File

@ -1,79 +0,0 @@
package utils
import (
"crypto/ecdsa"
"crypto/subtle"
"encoding/asn1"
"errors"
"math/big"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/libp2p/go-libp2p-core/crypto"
pb "github.com/libp2p/go-libp2p-core/crypto/pb"
"github.com/minio/sha256-simd"
)
// Taken from: https://github.com/libp2p/go-libp2p-core/blob/094b0d3f8ba2934339cb35e1a875b11ab6d08839/crypto/ecdsa.go as
// they don't provide a way to set the key
var ErrNilSig = errors.New("sig is nil")
// ECDSASig holds the r and s values of an ECDSA signature
type ECDSASig struct {
R, S *big.Int
}
// ECDSAPublicKey is an implementation of an ECDSA public key
type ECDSAPublicKey struct {
pub *ecdsa.PublicKey
}
// Type returns the key type
func (ePub *ECDSAPublicKey) Type() pb.KeyType {
return pb.KeyType_Secp256k1
}
// Raw returns x509 bytes from a public key
func (ePub *ECDSAPublicKey) Raw() ([]byte, error) {
return ethcrypto.CompressPubkey(ePub.pub), nil
}
// Bytes returns the public key as protobuf bytes
func (ePub *ECDSAPublicKey) Bytes() ([]byte, error) {
return crypto.MarshalPublicKey(ePub)
}
// Equals compares to public keys
func (ePub *ECDSAPublicKey) Equals(o crypto.Key) bool {
return basicEquals(ePub, o)
}
// Verify compares data to a signature
func (ePub *ECDSAPublicKey) Verify(data, sigBytes []byte) (bool, error) {
sig := new(ECDSASig)
if _, err := asn1.Unmarshal(sigBytes, sig); err != nil {
return false, err
}
if sig == nil {
return false, ErrNilSig
}
hash := sha256.Sum256(data)
return ecdsa.Verify(ePub.pub, hash[:], sig.R, sig.S), nil
}
func basicEquals(k1, k2 crypto.Key) bool {
if k1.Type() != k2.Type() {
return false
}
a, err := k1.Raw()
if err != nil {
return false
}
b, err := k2.Raw()
if err != nil {
return false
}
return subtle.ConstantTimeCompare(a, b) == 1
}

View File

@ -2,14 +2,13 @@ package utils
import "time"
// GetUnixEpoch converts a time into a unix timestamp with the integer part
// representing seconds and the decimal part representing subseconds
func GetUnixEpochFrom(now time.Time) float64 {
return float64(now.UnixNano()) / float64(time.Second)
// GetUnixEpoch converts a time into a unix timestamp with nanoseconds
func GetUnixEpochFrom(now time.Time) int64 {
return now.UnixNano()
}
// GetUnixEpoch returns the current time in unix timestamp with the integer part
// representing seconds and the decimal part representing subseconds
func GetUnixEpoch() float64 {
func GetUnixEpoch() int64 {
return GetUnixEpochFrom(time.Now())
}

2
vendor/modules.txt vendored
View File

@ -453,7 +453,7 @@ github.com/status-im/go-discover/discover/v4wire
github.com/status-im/go-discover/discover/v5wire
# github.com/status-im/go-multiaddr-ethv4 v1.2.1
github.com/status-im/go-multiaddr-ethv4
# github.com/status-im/go-waku v0.0.0-20220218174911-0db40c7de58b
# github.com/status-im/go-waku v0.0.0-20220224134018-cdc0c9c69d18
github.com/status-im/go-waku/waku/persistence
github.com/status-im/go-waku/waku/try
github.com/status-im/go-waku/waku/v2

View File

@ -377,7 +377,7 @@ func (w *Waku) addWakuV2Peers(cfg *Config) {
log.Info("peer added successfully", peerID)
}
w.addPeers(cfg.StoreNodes, store.StoreID_v20beta3, addToStore)
w.addPeers(cfg.StoreNodes, store.StoreID_v20beta4, addToStore)
w.addPeers(cfg.FilterNodes, filter.FilterID_v20beta1, addToStore)
w.addPeers(cfg.LightpushNodes, lightpush.LightPushID_v20beta1, addToStore)
w.addPeers(cfg.WakuRendezvousNodes, rendezvous.RendezvousID_v001, addToStore)
@ -869,8 +869,8 @@ func (w *Waku) Query(topics []common.TopicType, from uint64, to uint64, opts []s
}
query := store.Query{
StartTime: float64(from),
EndTime: float64(to),
StartTime: int64(from),
EndTime: int64(to),
ContentTopics: strTopics,
Topic: relay.DefaultWakuTopic,
}
@ -1074,7 +1074,7 @@ func (w *Waku) AddStorePeer(address string) (string, error) {
return "", err
}
peerID, err := w.node.AddPeer(addr, store.StoreID_v20beta3)
peerID, err := w.node.AddPeer(addr, store.StoreID_v20beta4)
if err != nil {
return "", err
}