diff --git a/go.mod b/go.mod index 4b0faf20c..2becb31ba 100644 --- a/go.mod +++ b/go.mod @@ -93,7 +93,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240507175626-19d27befd98b + github.com/waku-org/go-waku v0.8.1-0.20240529181619-d1cb6b0eaa7f github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 @@ -189,8 +189,6 @@ require ( github.com/libp2p/go-cidranger v1.1.0 // indirect github.com/libp2p/go-flow-metrics v0.1.0 // indirect github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect - github.com/libp2p/go-libp2p-mplex v0.9.0 // indirect - github.com/libp2p/go-mplex v0.7.0 // indirect github.com/libp2p/go-msgio v0.3.0 // indirect github.com/libp2p/go-nat v0.2.0 // indirect github.com/libp2p/go-netroute v0.2.1 // indirect diff --git a/go.sum b/go.sum index 12c17bbec..548ea5e9e 100644 --- a/go.sum +++ b/go.sum @@ -1362,15 +1362,11 @@ github.com/libp2p/go-libp2p v0.32.2 h1:s8GYN4YJzgUoyeYNPdW7JZeZ5Ee31iNaIBfGYMAY4 github.com/libp2p/go-libp2p v0.32.2/go.mod h1:E0LKe+diV/ZVJVnOJby8VC5xzHF0660osg71skcxJvk= github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLEQHwOCZ7s8s= github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNzVVaMNmq2NACuqyB9w= -github.com/libp2p/go-libp2p-mplex v0.9.0 h1:R58pDRAmuBXkYugbSSXR9wrTX3+1pFM1xP2bLuodIq8= -github.com/libp2p/go-libp2p-mplex v0.9.0/go.mod h1:ro1i4kuwiFT+uMPbIDIFkcLs1KRbNp0QwnUXM+P64Og= github.com/libp2p/go-libp2p-pubsub v0.10.1 h1:/RqOZpEtAolsr8/9CC8KqROJSOZeu7lK7fPftn4MwNg= github.com/libp2p/go-libp2p-pubsub v0.10.1/go.mod h1:1OxbaT/pFRO5h+Dpze8hdHQ63R0ke55XTs6b6NwLLkw= github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA= github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg= github.com/libp2p/go-maddr-filter v0.1.0/go.mod h1:VzZhTXkMucEGGEOSKddrwGiOv0tUhgnKqNEmIAz/bPU= -github.com/libp2p/go-mplex v0.7.0 h1:BDhFZdlk5tbr0oyFq/xv/NPGfjbnrsDam1EvutpBDbY= -github.com/libp2p/go-mplex v0.7.0/go.mod h1:rW8ThnRcYWft/Jb2jeORBmPd6xuG3dGxWN/W168L9EU= github.com/libp2p/go-msgio v0.3.0 h1:mf3Z8B1xcFN314sWX+2vOTShIE0Mmn2TXn3YCUQGNj0= github.com/libp2p/go-msgio v0.3.0/go.mod h1:nyRM819GmVaF9LX3l03RMh10QdOroF++NBbxAb0mmDM= github.com/libp2p/go-nat v0.2.0 h1:Tyz+bUFAYqGyJ/ppPPymMGbIgNRH+WqC5QrT5fKrrGk= @@ -2142,8 +2138,8 @@ github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 h1:4K3IS97Jry github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240507175626-19d27befd98b h1:2NR0UCjuuAFmnkhsvlCKn7PTs4JxUjSq4s7lSWaG0ek= -github.com/waku-org/go-waku v0.8.1-0.20240507175626-19d27befd98b/go.mod h1:yXnWChXRKTb+NhALbFysluxgSwuxeTF2rhanDJkIx+k= +github.com/waku-org/go-waku v0.8.1-0.20240529181619-d1cb6b0eaa7f h1:KiDqcxmCi74BGDZzkGT5T83QhEL/rPrUbEiJWOuiuU4= +github.com/waku-org/go-waku v0.8.1-0.20240529181619-d1cb6b0eaa7f/go.mod h1:yXnWChXRKTb+NhALbFysluxgSwuxeTF2rhanDJkIx+k= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/protocol/chat.go b/protocol/chat.go index edae20dfc..88a22128e 100644 --- a/protocol/chat.go +++ b/protocol/chat.go @@ -288,6 +288,19 @@ func (c *Chat) IsActivePersonalChat() bool { return c.Active && (c.OneToOne() || c.PrivateGroupChat() || c.Public()) && c.CommunityID == "" } +// DefaultResendType returns the resend type for a chat. +// This function currently infers the ResendType from the chat type. +// Note that specific message might have different resent types. At times +// some messages dictate their ResendType based on their own properties and +// context, rather than the chat type it is associated with. +func (c *Chat) DefaultResendType() common.ResendType { + if c.OneToOne() || c.PrivateGroupChat() { + return common.ResendTypeDataSync + } + + return common.ResendTypeRawMessage +} + func (c *Chat) shouldBeSynced() bool { isPublicChat := !c.Timeline() && !c.ProfileUpdates() && c.Public() return isPublicChat || c.OneToOne() || c.PrivateGroupChat() diff --git a/protocol/chat_test.go b/protocol/chat_test.go index a79b251fe..6e4fa2c3e 100644 --- a/protocol/chat_test.go +++ b/protocol/chat_test.go @@ -163,6 +163,47 @@ func (s *ChatTestSuite) TestUpdateFirstMessageTimestamp() { setAndCheck(100, true, 100) } +func (s *ChatTestSuite) TestDefaultResendType() { + testID := "some-id" + testCases := []struct { + Name string + ExpectedResendType common.ResendType + Chat Chat + }{ + { + Name: "one to one chat", + ExpectedResendType: common.ResendTypeDataSync, + Chat: Chat{ + ID: testID, + ChatType: ChatTypeOneToOne, + }, + }, + { + Name: "private group chat", + ExpectedResendType: common.ResendTypeDataSync, + Chat: Chat{ + ID: testID, + ChatType: ChatTypePrivateGroupChat, + }, + }, + { + Name: "community chat", + ExpectedResendType: common.ResendTypeRawMessage, + Chat: Chat{ + ID: testID, + ChatType: ChatTypeCommunityChat, + }, + }, + } + + for _, tc := range testCases { + s.Run(tc.Name, func() { + s.Require().Equal(tc.ExpectedResendType, tc.Chat.DefaultResendType()) + }) + } + +} + func (s *ChatTestSuite) TestDeepLink() { chat := &Chat{ CommunityID: "0x02b1188c997e666cd5505ffd5c4b5fdbe3084b78a486d8e709da3b32ad3708a89e", diff --git a/protocol/messenger.go b/protocol/messenger.go index 481f02232..edcb397ec 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -2382,10 +2382,7 @@ func (m *Messenger) sendChatMessage(ctx context.Context, message *common.Message SendPushNotification: m.featureFlags.PushNotifications, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_CHAT_MESSAGE, - ResendType: common.ResendTypeRawMessage, - } - if chat.ChatType == ChatTypeOneToOne { - rawMessage.ResendType = common.ResendTypeDataSync + ResendType: chat.DefaultResendType(), } // We want to save the raw message before dispatching it, to avoid race conditions diff --git a/protocol/messenger_messages.go b/protocol/messenger_messages.go index 637d68d11..e497b4478 100644 --- a/protocol/messenger_messages.go +++ b/protocol/messenger_messages.go @@ -235,7 +235,7 @@ func (m *Messenger) DeleteMessageAndSend(ctx context.Context, messageID string) Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_DELETE_MESSAGE, SkipGroupMessageWrap: true, - ResendType: GetResendTypeForChat(chat), + ResendType: chat.DefaultResendType(), } _, err = m.dispatchMessage(ctx, rawMessage) diff --git a/protocol/messenger_pin_messages.go b/protocol/messenger_pin_messages.go index 45df91888..3271073a7 100644 --- a/protocol/messenger_pin_messages.go +++ b/protocol/messenger_pin_messages.go @@ -56,7 +56,7 @@ func (m *Messenger) sendPinMessage(ctx context.Context, message *common.PinMessa Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_PIN_MESSAGE, SkipGroupMessageWrap: true, - ResendType: GetResendTypeForChat(chat), + ResendType: chat.DefaultResendType(), } _, err = m.dispatchMessage(ctx, rawMessage) if err != nil { diff --git a/protocol/messenger_util.go b/protocol/messenger_util.go deleted file mode 100644 index d47d3243a..000000000 --- a/protocol/messenger_util.go +++ /dev/null @@ -1,16 +0,0 @@ -package protocol - -import "github.com/status-im/status-go/protocol/common" - -// GetResendTypeForChat returns the resend type for a chat. -// This function currently infers the ResendType from the chat type. -// However, it is recommended to explicitly determine the ResendType based on -// specific message characteristics to avoid implicit assumptions. This ensures -// that each message dictates its ResendType based on its own properties and -// context, rather than the chat type it is associated with. -func GetResendTypeForChat(chat *Chat) common.ResendType { - if chat.ChatType == ChatTypeOneToOne { - return common.ResendTypeDataSync - } - return common.ResendTypeRawMessage -} diff --git a/vendor/github.com/libp2p/go-libp2p-mplex/.gitignore b/vendor/github.com/libp2p/go-libp2p-mplex/.gitignore deleted file mode 100644 index 1377554eb..000000000 --- a/vendor/github.com/libp2p/go-libp2p-mplex/.gitignore +++ /dev/null @@ -1 +0,0 @@ -*.swp diff --git a/vendor/github.com/libp2p/go-libp2p-mplex/LICENSE b/vendor/github.com/libp2p/go-libp2p-mplex/LICENSE deleted file mode 100644 index c7386b3c9..000000000 --- a/vendor/github.com/libp2p/go-libp2p-mplex/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2014 Juan Batiz-Benet - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. diff --git a/vendor/github.com/libp2p/go-libp2p-mplex/README.md b/vendor/github.com/libp2p/go-libp2p-mplex/README.md deleted file mode 100644 index 00782d7b9..000000000 --- a/vendor/github.com/libp2p/go-libp2p-mplex/README.md +++ /dev/null @@ -1,11 +0,0 @@ -# DEPRECATION NOTICE - -mplex has been deprecated. - -see https://github.com/libp2p/specs/issues/553 for details - -# go-libp2p-mplex - a go-stream-muxer shim for multiplex - -[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) [![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) ![](https://raw.githubusercontent.com/libp2p/go-stream-muxer/master/img/badge.png) - -This is an implementation of the [go-libp2p muxer](https://pkg.go.dev/github.com/libp2p/go-libp2p@v0.30.0/core/network#Multiplexer) interface for [multiplex](https://github.com/libp2p/go-mplex). For more information, see that repo. diff --git a/vendor/github.com/libp2p/go-libp2p-mplex/conn.go b/vendor/github.com/libp2p/go-libp2p-mplex/conn.go deleted file mode 100644 index ed4e847e5..000000000 --- a/vendor/github.com/libp2p/go-libp2p-mplex/conn.go +++ /dev/null @@ -1,48 +0,0 @@ -package mplex - -import ( - "context" - - "github.com/libp2p/go-libp2p/core/network" - - mp "github.com/libp2p/go-mplex" -) - -type conn mp.Multiplex - -var _ network.MuxedConn = &conn{} - -// NewMuxedConn constructs a new Conn from a *mp.Multiplex. -func NewMuxedConn(m *mp.Multiplex) network.MuxedConn { - return (*conn)(m) -} - -func (c *conn) Close() error { - return c.mplex().Close() -} - -func (c *conn) IsClosed() bool { - return c.mplex().IsClosed() -} - -// OpenStream creates a new stream. -func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) { - s, err := c.mplex().NewStream(ctx) - if err != nil { - return nil, err - } - return (*stream)(s), nil -} - -// AcceptStream accepts a stream opened by the other side. -func (c *conn) AcceptStream() (network.MuxedStream, error) { - s, err := c.mplex().Accept() - if err != nil { - return nil, err - } - return (*stream)(s), nil -} - -func (c *conn) mplex() *mp.Multiplex { - return (*mp.Multiplex)(c) -} diff --git a/vendor/github.com/libp2p/go-libp2p-mplex/stream.go b/vendor/github.com/libp2p/go-libp2p-mplex/stream.go deleted file mode 100644 index 53e9e6daf..000000000 --- a/vendor/github.com/libp2p/go-libp2p-mplex/stream.go +++ /dev/null @@ -1,64 +0,0 @@ -package mplex - -import ( - "time" - - "github.com/libp2p/go-libp2p/core/network" - - mp "github.com/libp2p/go-mplex" -) - -// stream implements network.MuxedStream over mplex.Stream. -type stream mp.Stream - -var _ network.MuxedStream = &stream{} - -func (s *stream) Read(b []byte) (n int, err error) { - n, err = s.mplex().Read(b) - if err == mp.ErrStreamReset { - err = network.ErrReset - } - - return n, err -} - -func (s *stream) Write(b []byte) (n int, err error) { - n, err = s.mplex().Write(b) - if err == mp.ErrStreamReset { - err = network.ErrReset - } - - return n, err -} - -func (s *stream) Close() error { - return s.mplex().Close() -} - -func (s *stream) CloseWrite() error { - return s.mplex().CloseWrite() -} - -func (s *stream) CloseRead() error { - return s.mplex().CloseRead() -} - -func (s *stream) Reset() error { - return s.mplex().Reset() -} - -func (s *stream) SetDeadline(t time.Time) error { - return s.mplex().SetDeadline(t) -} - -func (s *stream) SetReadDeadline(t time.Time) error { - return s.mplex().SetReadDeadline(t) -} - -func (s *stream) SetWriteDeadline(t time.Time) error { - return s.mplex().SetWriteDeadline(t) -} - -func (s *stream) mplex() *mp.Stream { - return (*mp.Stream)(s) -} diff --git a/vendor/github.com/libp2p/go-libp2p-mplex/transport.go b/vendor/github.com/libp2p/go-libp2p-mplex/transport.go deleted file mode 100644 index bcfadfd7e..000000000 --- a/vendor/github.com/libp2p/go-libp2p-mplex/transport.go +++ /dev/null @@ -1,30 +0,0 @@ -// DEPRECATED: mplex has been deprecated. Users should prefer Yamux over mplex. see https://github.com/libp2p/specs/issues/553 -// for details -package mplex - -import ( - "net" - - "github.com/libp2p/go-libp2p/core/network" - - mp "github.com/libp2p/go-mplex" -) - -// DefaultTransport has default settings for Transport -var DefaultTransport = &Transport{} - -const ID = "/mplex/6.7.0" - -var _ network.Multiplexer = &Transport{} - -// Transport implements mux.Multiplexer that constructs -// mplex-backed muxed connections. -type Transport struct{} - -func (t *Transport) NewConn(nc net.Conn, isServer bool, scope network.PeerScope) (network.MuxedConn, error) { - m, err := mp.NewMultiplex(nc, isServer, scope) - if err != nil { - return nil, err - } - return NewMuxedConn(m), nil -} diff --git a/vendor/github.com/libp2p/go-libp2p-mplex/version.json b/vendor/github.com/libp2p/go-libp2p-mplex/version.json deleted file mode 100644 index 960b84e55..000000000 --- a/vendor/github.com/libp2p/go-libp2p-mplex/version.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "version": "v0.9.0" -} diff --git a/vendor/github.com/libp2p/go-mplex/LICENSE b/vendor/github.com/libp2p/go-mplex/LICENSE deleted file mode 100644 index 26100332b..000000000 --- a/vendor/github.com/libp2p/go-mplex/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2016 Jeromy Johnson - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. diff --git a/vendor/github.com/libp2p/go-mplex/README.md b/vendor/github.com/libp2p/go-mplex/README.md deleted file mode 100644 index e8e016970..000000000 --- a/vendor/github.com/libp2p/go-mplex/README.md +++ /dev/null @@ -1,27 +0,0 @@ -# go-mplex - -[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai) -[![Go Reference](https://pkg.go.dev/badge/github.com/libp2p/go-mplex.svg)](https://pkg.go.dev/github.com/libp2p/go-mplex) -[![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](https://libp2p.io/) -[![](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p) -[![Discourse posts](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg)](https://discuss.libp2p.io) - -A super simple [stream muxing](https://docs.libp2p.io/concepts/stream-multiplexing/) library implementing [mplex](https://github.com/libp2p/specs/tree/master/mplex). - -## Usage - -```go -mplex := multiplex.NewMultiplex(mysocket) - -s, _ := mplex.NewStream() -s.Write([]byte("Hello World!")) -s.Close() - -os, _ := mplex.Accept() -// echo back everything received -io.Copy(os, os) -``` - ---- - -The last gx published version of this module was: 0.2.35: QmWGQQ6Tz8AdUpxktLf3zgnVN9Vy8fcWVezZJSU3ZmiANj diff --git a/vendor/github.com/libp2p/go-mplex/deadline.go b/vendor/github.com/libp2p/go-mplex/deadline.go deleted file mode 100644 index b251c1a49..000000000 --- a/vendor/github.com/libp2p/go-mplex/deadline.go +++ /dev/null @@ -1,97 +0,0 @@ -// Copied from the go standard library. -// -// Copyright 2010 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE-BSD file. - -package multiplex - -import ( - "sync" - "time" -) - -// pipeDeadline is an abstraction for handling timeouts. -type pipeDeadline struct { - mu sync.Mutex // Guards timer and cancel - timer *time.Timer - cancel chan struct{} // Must be non-nil -} - -func makePipeDeadline() pipeDeadline { - return pipeDeadline{cancel: make(chan struct{})} -} - -// set sets the point in time when the deadline will time out. -// A timeout event is signaled by closing the channel returned by waiter. -// Once a timeout has occurred, the deadline can be refreshed by specifying a -// t value in the future. -// -// A zero value for t prevents timeout. -func (d *pipeDeadline) set(t time.Time) { - d.mu.Lock() - defer d.mu.Unlock() - - // deadline closed - if d.cancel == nil { - return - } - - if d.timer != nil && !d.timer.Stop() { - <-d.cancel // Wait for the timer callback to finish and close cancel - } - d.timer = nil - - // Time is zero, then there is no deadline. - closed := isClosedChan(d.cancel) - if t.IsZero() { - if closed { - d.cancel = make(chan struct{}) - } - return - } - - // Time in the future, setup a timer to cancel in the future. - if dur := time.Until(t); dur > 0 { - if closed { - d.cancel = make(chan struct{}) - } - d.timer = time.AfterFunc(dur, func() { - close(d.cancel) - }) - return - } - - // Time in the past, so close immediately. - if !closed { - close(d.cancel) - } -} - -// wait returns a channel that is closed when the deadline is exceeded. -func (d *pipeDeadline) wait() chan struct{} { - d.mu.Lock() - defer d.mu.Unlock() - return d.cancel -} - -// close closes, the deadline. Any future calls to `set` will do nothing. -func (d *pipeDeadline) close() { - d.mu.Lock() - defer d.mu.Unlock() - - if d.timer != nil && !d.timer.Stop() { - <-d.cancel // Wait for the timer callback to finish and close cancel - } - d.timer = nil - d.cancel = nil -} - -func isClosedChan(c <-chan struct{}) bool { - select { - case <-c: - return true - default: - return false - } -} diff --git a/vendor/github.com/libp2p/go-mplex/multiplex.go b/vendor/github.com/libp2p/go-mplex/multiplex.go deleted file mode 100644 index 90e364beb..000000000 --- a/vendor/github.com/libp2p/go-mplex/multiplex.go +++ /dev/null @@ -1,670 +0,0 @@ -package multiplex - -import ( - "bufio" - "context" - "encoding/binary" - "errors" - "fmt" - "io" - "net" - "os" - "runtime/debug" - "sync" - "time" - - pool "github.com/libp2p/go-buffer-pool" - - logging "github.com/ipfs/go-log/v2" - "github.com/multiformats/go-varint" -) - -var log = logging.Logger("mplex") - -const ( - MaxMessageSize = 1 << 20 - BufferSize = 4096 - MaxBuffers = 4 - - MinMemoryReservation = 3 * BufferSize -) - -var ( - ChunkSize = BufferSize - 20 -) - -// Max time to block waiting for a slow reader to read from a stream before -// resetting it. Preferably, we'd have some form of back-pressure mechanism but -// we don't have that in this protocol. -var ReceiveTimeout = 5 * time.Second - -// ErrShutdown is returned when operating on a shutdown session -var ErrShutdown = errors.New("session shut down") - -// ErrTwoInitiators is returned when both sides think they're the initiator -var ErrTwoInitiators = errors.New("two initiators") - -// ErrInvalidState is returned when the other side does something it shouldn't. -// In this case, we close the connection to be safe. -var ErrInvalidState = errors.New("received an unexpected message from the peer") - -var errTimeout = timeout{} - -var ResetStreamTimeout = 2 * time.Minute - -var getInputBufferTimeout = time.Minute - -type timeout struct{} - -func (timeout) Error() string { return "i/o deadline exceeded" } -func (timeout) Temporary() bool { return true } -func (timeout) Timeout() bool { return true } - -// The MemoryManager allows management of memory allocations. -type MemoryManager interface { - // ReserveMemory reserves memory / buffer. - ReserveMemory(size int, prio uint8) error - // ReleaseMemory explicitly releases memory previously reserved with ReserveMemory - ReleaseMemory(size int) -} - -type nullMemoryManager struct{} - -func (m *nullMemoryManager) ReserveMemory(size int, prio uint8) error { return nil } -func (m *nullMemoryManager) ReleaseMemory(size int) {} - -// +1 for initiator -const ( - newStreamTag = 0 - messageTag = 2 - closeTag = 4 - resetTag = 6 -) - -// Multiplex is a mplex session. -type Multiplex struct { - con net.Conn - buf *bufio.Reader - nextID uint64 - initiator bool - - memoryManager MemoryManager - - closed chan struct{} - shutdown chan struct{} - shutdownErr error - shutdownLock sync.Mutex - - writeCh chan []byte - nstreams chan *Stream - - channels map[streamID]*Stream - chLock sync.Mutex - - bufIn, bufOut chan struct{} - bufInTimer *time.Timer - reservedMemory int -} - -// NewMultiplex creates a new multiplexer session. -func NewMultiplex(con net.Conn, initiator bool, memoryManager MemoryManager) (*Multiplex, error) { - if memoryManager == nil { - memoryManager = &nullMemoryManager{} - } - mp := &Multiplex{ - con: con, - initiator: initiator, - channels: make(map[streamID]*Stream), - closed: make(chan struct{}), - shutdown: make(chan struct{}), - nstreams: make(chan *Stream, 16), - memoryManager: memoryManager, - } - - // up-front reserve memory for the essential buffers (1 input, 1 output + the reader buffer) - if err := mp.memoryManager.ReserveMemory(MinMemoryReservation, 255); err != nil { - return nil, err - } - - mp.reservedMemory += MinMemoryReservation - bufs := 1 - - // reserve some more memory for buffers if possible - for i := 1; i < MaxBuffers; i++ { - var prio uint8 - if bufs < 2 { - prio = 192 - } else { - prio = 128 - } - - // 2xBufferSize -- one for input and one for output - if err := mp.memoryManager.ReserveMemory(2*BufferSize, prio); err != nil { - break - } - mp.reservedMemory += 2 * BufferSize - bufs++ - } - - mp.buf = bufio.NewReaderSize(con, BufferSize) - mp.writeCh = make(chan []byte, bufs) - mp.bufIn = make(chan struct{}, bufs) - mp.bufOut = make(chan struct{}, bufs) - mp.bufInTimer = time.NewTimer(0) - if !mp.bufInTimer.Stop() { - <-mp.bufInTimer.C - } - - go mp.handleIncoming() - go mp.handleOutgoing() - - return mp, nil -} - -func (mp *Multiplex) newStream(id streamID, name string) (s *Stream) { - s = &Stream{ - id: id, - name: name, - dataIn: make(chan []byte, 1), - rDeadline: makePipeDeadline(), - wDeadline: makePipeDeadline(), - mp: mp, - writeCancel: make(chan struct{}), - readCancel: make(chan struct{}), - } - return -} - -// Accept accepts the next stream from the connection. -func (m *Multiplex) Accept() (*Stream, error) { - select { - case s, ok := <-m.nstreams: - if !ok { - return nil, errors.New("multiplex closed") - } - return s, nil - case <-m.closed: - return nil, m.shutdownErr - } -} - -// Close closes the session. -func (mp *Multiplex) Close() error { - mp.closeNoWait() - - // Wait for the receive loop to finish. - <-mp.closed - - return nil -} - -func (mp *Multiplex) closeNoWait() { - mp.shutdownLock.Lock() - select { - case <-mp.shutdown: - default: - mp.memoryManager.ReleaseMemory(mp.reservedMemory) - mp.con.Close() - close(mp.shutdown) - } - mp.shutdownLock.Unlock() -} - -// IsClosed returns true if the session is closed. -func (mp *Multiplex) IsClosed() bool { - select { - case <-mp.closed: - return true - default: - return false - } -} - -// CloseChan returns a read-only channel which will be closed when the session is closed -func (mp *Multiplex) CloseChan() <-chan struct{} { - return mp.closed -} - -func (mp *Multiplex) sendMsg(timeout, cancel <-chan struct{}, header uint64, data []byte) error { - buf, err := mp.getBufferOutbound(len(data)+20, timeout, cancel) - if err != nil { - return err - } - - n := 0 - n += binary.PutUvarint(buf[n:], header) - n += binary.PutUvarint(buf[n:], uint64(len(data))) - n += copy(buf[n:], data) - - select { - case mp.writeCh <- buf[:n]: - return nil - case <-mp.shutdown: - mp.putBufferOutbound(buf) - return ErrShutdown - case <-timeout: - mp.putBufferOutbound(buf) - return errTimeout - case <-cancel: - mp.putBufferOutbound(buf) - return ErrStreamClosed - } -} - -func (mp *Multiplex) handleOutgoing() { - defer func() { - if rerr := recover(); rerr != nil { - fmt.Fprintf(os.Stderr, "caught panic in handleOutgoing: %s\n%s\n", rerr, debug.Stack()) - } - }() - - for { - select { - case <-mp.shutdown: - return - - case data := <-mp.writeCh: - err := mp.doWriteMsg(data) - mp.putBufferOutbound(data) - if err != nil { - // the connection is closed by this time - log.Warnf("error writing data: %s", err.Error()) - return - } - } - } -} - -func (mp *Multiplex) doWriteMsg(data []byte) error { - if mp.isShutdown() { - return ErrShutdown - } - - _, err := mp.con.Write(data) - if err != nil { - mp.closeNoWait() - } - - return err -} - -func (mp *Multiplex) nextChanID() uint64 { - out := mp.nextID - mp.nextID++ - return out -} - -// NewStream creates a new stream. -func (mp *Multiplex) NewStream(ctx context.Context) (*Stream, error) { - return mp.NewNamedStream(ctx, "") -} - -// NewNamedStream creates a new named stream. -func (mp *Multiplex) NewNamedStream(ctx context.Context, name string) (*Stream, error) { - mp.chLock.Lock() - - // We could call IsClosed but this is faster (given that we already have - // the lock). - if mp.channels == nil { - mp.chLock.Unlock() - return nil, ErrShutdown - } - - sid := mp.nextChanID() - header := (sid << 3) | newStreamTag - - if name == "" { - name = fmt.Sprint(sid) - } - s := mp.newStream(streamID{ - id: sid, - initiator: true, - }, name) - mp.channels[s.id] = s - mp.chLock.Unlock() - - err := mp.sendMsg(ctx.Done(), nil, header, []byte(name)) - if err != nil { - if err == errTimeout { - return nil, ctx.Err() - } - return nil, err - } - - return s, nil -} - -func (mp *Multiplex) cleanup() { - mp.closeNoWait() - - // Take the channels. - mp.chLock.Lock() - channels := mp.channels - mp.channels = nil - mp.chLock.Unlock() - - // Cancel any reads/writes - for _, msch := range channels { - msch.cancelRead(ErrStreamReset) - msch.cancelWrite(ErrStreamReset) - } - - // And... shutdown! - if mp.shutdownErr == nil { - mp.shutdownErr = ErrShutdown - } - close(mp.closed) -} - -func (mp *Multiplex) handleIncoming() { - defer func() { - if rerr := recover(); rerr != nil { - fmt.Fprintf(os.Stderr, "caught panic in handleIncoming: %s\n%s\n", rerr, debug.Stack()) - } - }() - - defer mp.cleanup() - - recvTimeout := time.NewTimer(0) - defer recvTimeout.Stop() - recvTimeoutFired := false - -loop: - for { - chID, tag, err := mp.readNextHeader() - if err != nil { - mp.shutdownErr = err - return - } - - remoteIsInitiator := tag&1 == 0 - ch := streamID{ - // true if *I'm* the initiator. - initiator: !remoteIsInitiator, - id: chID, - } - // Rounds up the tag: - // 0 -> 0 - // 1 -> 2 - // 2 -> 2 - // 3 -> 4 - // etc... - tag += (tag & 1) - - mlen, err := mp.readNextMsgLen() - if err != nil { - mp.shutdownErr = err - return - } - - mp.chLock.Lock() - msch, ok := mp.channels[ch] - mp.chLock.Unlock() - - switch tag { - case newStreamTag: - if ok { - log.Debugf("received NewStream message for existing stream: %d", ch) - mp.shutdownErr = ErrInvalidState - return - } - - // skip stream name, this is not at all useful in the context of libp2p streams - if err := mp.skipNextMsg(mlen); err != nil { - mp.shutdownErr = err - return - } - - msch = mp.newStream(ch, "") - mp.chLock.Lock() - mp.channels[ch] = msch - mp.chLock.Unlock() - select { - case mp.nstreams <- msch: - case <-mp.shutdown: - return - } - - case resetTag: - if err := mp.skipNextMsg(mlen); err != nil { - mp.shutdownErr = err - return - } - - if !ok { - // This is *ok*. We forget the stream on reset. - continue - } - - // Cancel any ongoing reads/writes. - msch.cancelRead(ErrStreamReset) - msch.cancelWrite(ErrStreamReset) - case closeTag: - if err := mp.skipNextMsg(mlen); err != nil { - mp.shutdownErr = err - return - } - - if !ok { - // may have canceled our reads already. - continue - } - - // unregister and throw away future data. - mp.chLock.Lock() - delete(mp.channels, ch) - mp.chLock.Unlock() - - // close data channel, there will be no more data. - close(msch.dataIn) - - // We intentionally don't cancel any deadlines, cancel reads, cancel - // writes, etc. We just deliver the EOF by closing the - // data channel, and unregister the channel so we don't - // receive any more data. The user still needs to call - // `Close()` or `Reset()`. - case messageTag: - if !ok { - // We're not accepting data on this stream, for - // some reason. It's likely that we reset it, or - // simply canceled reads (e.g., called Close). - if err := mp.skipNextMsg(mlen); err != nil { - mp.shutdownErr = err - return - } - continue - } - - read: - for rd := 0; rd < mlen; { - nextChunk := mlen - rd - if nextChunk > BufferSize { - nextChunk = BufferSize - } - - b, err := mp.readNextChunk(nextChunk) - if err != nil { - mp.shutdownErr = err - return - } - - rd += nextChunk - - if !recvTimeout.Stop() && !recvTimeoutFired { - <-recvTimeout.C - } - recvTimeout.Reset(ReceiveTimeout) - recvTimeoutFired = false - - select { - case msch.dataIn <- b: - - case <-msch.readCancel: - // the user has canceled reading. walk away. - mp.putBufferInbound(b) - if err := mp.skipNextMsg(mlen - rd); err != nil { - mp.shutdownErr = err - return - } - break read - - case <-recvTimeout.C: - recvTimeoutFired = true - mp.putBufferInbound(b) - log.Warnf("timed out receiving message into stream queue.") - // Do not do this asynchronously. Otherwise, we - // could drop a message, then receive a message, - // then reset. - msch.Reset() - if err := mp.skipNextMsg(mlen - rd); err != nil { - mp.shutdownErr = err - return - } - continue loop - - case <-mp.shutdown: - mp.putBufferInbound(b) - return - } - } - - default: - log.Debugf("message with unknown header on stream %s", ch) - mp.skipNextMsg(mlen) - if ok { - msch.Reset() - } - } - } -} - -func (mp *Multiplex) isShutdown() bool { - select { - case <-mp.shutdown: - return true - default: - return false - } -} - -func (mp *Multiplex) sendResetMsg(header uint64, hard bool) { - ctx, cancel := context.WithTimeout(context.Background(), ResetStreamTimeout) - defer cancel() - - err := mp.sendMsg(ctx.Done(), nil, header, nil) - if err != nil && !mp.isShutdown() { - if hard { - log.Warnf("error sending reset message: %s; killing connection", err.Error()) - mp.Close() - } else { - log.Debugf("error sending reset message: %s", err.Error()) - } - } -} - -func (mp *Multiplex) readNextHeader() (uint64, uint64, error) { - h, err := varint.ReadUvarint(mp.buf) - if err != nil { - return 0, 0, err - } - - // get channel ID - ch := h >> 3 - - rem := h & 7 - - return ch, rem, nil -} - -func (mp *Multiplex) readNextMsgLen() (int, error) { - l, err := varint.ReadUvarint(mp.buf) - if err != nil { - return 0, err - } - - if l > uint64(MaxMessageSize) { - return 0, fmt.Errorf("message size too large") - } - - if l == 0 { - return 0, nil - } - - return int(l), nil -} - -func (mp *Multiplex) readNextChunk(mlen int) ([]byte, error) { - buf, err := mp.getBufferInbound(mlen) - if err != nil { - return nil, err - } - - _, err = io.ReadFull(mp.buf, buf) - if err != nil { - mp.putBufferInbound(buf) - return nil, err - } - - return buf, nil -} - -func (mp *Multiplex) skipNextMsg(mlen int) error { - if mlen == 0 { - return nil - } - - _, err := mp.buf.Discard(mlen) - return err -} - -func (mp *Multiplex) getBufferInbound(length int) ([]byte, error) { - timerFired := false - defer func() { - if !mp.bufInTimer.Stop() && !timerFired { - <-mp.bufInTimer.C - } - }() - mp.bufInTimer.Reset(getInputBufferTimeout) - - select { - case mp.bufIn <- struct{}{}: - case <-mp.bufInTimer.C: - timerFired = true - return nil, errTimeout - case <-mp.shutdown: - return nil, ErrShutdown - } - - return mp.getBuffer(length), nil -} - -func (mp *Multiplex) getBufferOutbound(length int, timeout, cancel <-chan struct{}) ([]byte, error) { - select { - case mp.bufOut <- struct{}{}: - case <-timeout: - return nil, errTimeout - case <-cancel: - return nil, ErrStreamClosed - case <-mp.shutdown: - return nil, ErrShutdown - } - - return mp.getBuffer(length), nil -} - -func (mp *Multiplex) getBuffer(length int) []byte { - return pool.Get(length) -} - -func (mp *Multiplex) putBufferInbound(b []byte) { - mp.putBuffer(b, mp.bufIn) -} - -func (mp *Multiplex) putBufferOutbound(b []byte) { - mp.putBuffer(b, mp.bufOut) -} - -func (mp *Multiplex) putBuffer(slice []byte, putBuf chan struct{}) { - <-putBuf - pool.Put(slice) -} diff --git a/vendor/github.com/libp2p/go-mplex/stream.go b/vendor/github.com/libp2p/go-mplex/stream.go deleted file mode 100644 index fca3142ae..000000000 --- a/vendor/github.com/libp2p/go-mplex/stream.go +++ /dev/null @@ -1,268 +0,0 @@ -package multiplex - -import ( - "context" - "errors" - "io" - "sync" - "time" - - "go.uber.org/multierr" -) - -var ( - ErrStreamReset = errors.New("stream reset") - ErrStreamClosed = errors.New("closed stream") -) - -// streamID is a convenience type for operating on stream IDs -type streamID struct { - id uint64 - initiator bool -} - -// header computes the header for the given tag -func (id *streamID) header(tag uint64) uint64 { - header := id.id<<3 | tag - if !id.initiator { - header-- - } - return header -} - -type Stream struct { - id streamID - name string - dataIn chan []byte - mp *Multiplex - - extra []byte - - // exbuf is for holding the reference to the beginning of the extra slice - // for later memory pool freeing - exbuf []byte - - rDeadline, wDeadline pipeDeadline - - clLock sync.Mutex - writeCancelErr, readCancelErr error - writeCancel, readCancel chan struct{} -} - -func (s *Stream) Name() string { - return s.name -} - -// tries to preload pending data -func (s *Stream) preloadData() { - select { - case read, ok := <-s.dataIn: - if !ok { - return - } - s.extra = read - s.exbuf = read - default: - } -} - -func (s *Stream) waitForData() error { - select { - case read, ok := <-s.dataIn: - if !ok { - return io.EOF - } - s.extra = read - s.exbuf = read - return nil - case <-s.readCancel: - // This is the only place where it's safe to return these. - s.returnBuffers() - return s.readCancelErr - case <-s.rDeadline.wait(): - return errTimeout - } -} - -func (s *Stream) returnBuffers() { - if s.exbuf != nil { - s.mp.putBufferInbound(s.exbuf) - s.exbuf = nil - s.extra = nil - } - for { - select { - case read, ok := <-s.dataIn: - if !ok { - return - } - if read == nil { - continue - } - s.mp.putBufferInbound(read) - default: - return - } - } -} - -func (s *Stream) Read(b []byte) (int, error) { - select { - case <-s.readCancel: - return 0, s.readCancelErr - default: - } - - if s.extra == nil { - err := s.waitForData() - if err != nil { - return 0, err - } - } - n := 0 - for s.extra != nil && n < len(b) { - read := copy(b[n:], s.extra) - n += read - if read < len(s.extra) { - s.extra = s.extra[read:] - } else { - if s.exbuf != nil { - s.mp.putBufferInbound(s.exbuf) - } - s.extra = nil - s.exbuf = nil - s.preloadData() - } - } - return n, nil -} - -func (s *Stream) Write(b []byte) (int, error) { - var written int - for written < len(b) { - wl := len(b) - written - if wl > ChunkSize { - wl = ChunkSize - } - - n, err := s.write(b[written : written+wl]) - if err != nil { - return written, err - } - - written += n - } - - return written, nil -} - -func (s *Stream) write(b []byte) (int, error) { - select { - case <-s.writeCancel: - return 0, s.writeCancelErr - default: - } - - err := s.mp.sendMsg(s.wDeadline.wait(), s.writeCancel, s.id.header(messageTag), b) - if err != nil { - return 0, err - } - - return len(b), nil -} - -func (s *Stream) cancelWrite(err error) bool { - s.wDeadline.close() - - s.clLock.Lock() - defer s.clLock.Unlock() - select { - case <-s.writeCancel: - return false - default: - s.writeCancelErr = err - close(s.writeCancel) - return true - } -} - -func (s *Stream) cancelRead(err error) bool { - // Always unregister for reading first, even if we're already closed (or - // already closing). When handleIncoming calls this, it expects the - // stream to be unregistered by the time it returns. - s.mp.chLock.Lock() - delete(s.mp.channels, s.id) - s.mp.chLock.Unlock() - - s.rDeadline.close() - - s.clLock.Lock() - defer s.clLock.Unlock() - select { - case <-s.readCancel: - return false - default: - s.readCancelErr = err - close(s.readCancel) - return true - } -} - -func (s *Stream) CloseWrite() error { - if !s.cancelWrite(ErrStreamClosed) { - // Check if we closed the stream _nicely_. If so, we don't need - // to report an error to the user. - if s.writeCancelErr == ErrStreamClosed { - return nil - } - // Closed for some other reason. Report it. - return s.writeCancelErr - } - - ctx, cancel := context.WithTimeout(context.Background(), ResetStreamTimeout) - defer cancel() - - err := s.mp.sendMsg(ctx.Done(), nil, s.id.header(closeTag), nil) - // We failed to close the stream after 2 minutes, something is probably wrong. - if err != nil && !s.mp.isShutdown() { - log.Warnf("Error closing stream: %s; killing connection", err.Error()) - s.mp.Close() - } - return err -} - -func (s *Stream) CloseRead() error { - s.cancelRead(ErrStreamClosed) - return nil -} - -func (s *Stream) Close() error { - return multierr.Combine(s.CloseRead(), s.CloseWrite()) -} - -func (s *Stream) Reset() error { - s.cancelRead(ErrStreamReset) - - if s.cancelWrite(ErrStreamReset) { - // Send a reset in the background. - go s.mp.sendResetMsg(s.id.header(resetTag), true) - } - - return nil -} - -func (s *Stream) SetDeadline(t time.Time) error { - s.rDeadline.set(t) - s.wDeadline.set(t) - return nil -} - -func (s *Stream) SetReadDeadline(t time.Time) error { - s.rDeadline.set(t) - return nil -} - -func (s *Stream) SetWriteDeadline(t time.Time) error { - s.wDeadline.set(t) - return nil -} diff --git a/vendor/github.com/libp2p/go-mplex/version.json b/vendor/github.com/libp2p/go-mplex/version.json deleted file mode 100644 index 9f6d6fca3..000000000 --- a/vendor/github.com/libp2p/go-mplex/version.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "version": "v0.7.0" -} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go index 323958e58..ca6a16bb4 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go @@ -698,7 +698,7 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics } // AddDiscoveredPeer to add a discovered peer to the node peerStore -func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubsubTopics []string, connectNow bool) { +func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubsubTopics []string, enr *enode.Node, connectNow bool) { p := service.PeerData{ Origin: origin, AddrInfo: peer.AddrInfo{ @@ -706,6 +706,7 @@ func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wp Addrs: addrs, }, PubsubTopics: pubsubTopics, + ENR: enr, } w.peermanager.AddDiscoveredPeer(p, connectNow) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go index 60608f779..90f67f066 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go @@ -12,7 +12,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p" - mplex "github.com/libp2p/go-libp2p-mplex" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/config" "github.com/libp2p/go-libp2p/core/crypto" @@ -559,7 +558,6 @@ var DefaultLibP2POptions = []libp2p.Option{ libp2p.UserAgent(UserAgent), libp2p.ChainOptions( libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport), - libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport), ), libp2p.EnableNATService(), libp2p.ConnectionManager(newConnManager(200, 300, connmgr.WithGracePeriod(0))), diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/fastest_peer_selector.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/fastest_peer_selector.go new file mode 100644 index 000000000..a73182383 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/fastest_peer_selector.go @@ -0,0 +1,172 @@ +package peermanager + +import ( + "context" + "errors" + "sort" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/waku-org/go-waku/logging" + "go.uber.org/zap" +) + +type FastestPeerSelector struct { + sync.RWMutex + + host host.Host + + logger *zap.Logger +} + +func NewFastestPeerSelector(logger *zap.Logger) *FastestPeerSelector { + return &FastestPeerSelector{ + logger: logger.Named("rtt-cache"), + } +} + +func (r *FastestPeerSelector) SetHost(h host.Host) { + r.host = h +} + +func (r *FastestPeerSelector) PingPeer(ctx context.Context, peer peer.ID) (time.Duration, error) { + if peer == r.host.ID() { + return 0, errors.New("can't ping yourself") + } + + ctx, cancel := context.WithTimeout(ctx, 7*time.Second) + defer cancel() + + select { + case <-ctx.Done(): + return 0, ctx.Err() + + case result := <-ping.Ping(ctx, r.host, peer): + r.Lock() + defer r.Unlock() + + if result.Error == nil { + return result.RTT, nil + } else { + r.logger.Debug("could not ping", logging.HostID("peer", peer), zap.Error(result.Error)) + return 0, result.Error + } + } + +} + +func (r *FastestPeerSelector) FastestPeer(ctx context.Context, peers peer.IDSlice) (peer.ID, error) { + var peerRTT []pingResult + var peerRTTMutex sync.Mutex + + wg := sync.WaitGroup{} + pingCh := make(chan peer.ID) + + pinged := make(map[peer.ID]struct{}) + + go func() { + // Ping any peer with no latency recorded + for peerToPing := range pingCh { + go func(p peer.ID) { + defer wg.Done() + rtt := time.Hour + result, err := r.PingPeer(ctx, p) + if err == nil { + rtt = result + } + + peerRTTMutex.Lock() + peerRTT = append(peerRTT, pingResult{ + peerID: p, + rtt: rtt, + connectedness: r.host.Network().Connectedness(p), + }) + peerRTTMutex.Unlock() + }(peerToPing) + } + }() + + for _, p := range peers { + latency := r.host.Peerstore().LatencyEWMA(p) + if latency == 0 { + wg.Add(1) + pinged[p] = struct{}{} // To avoid double pings + pingCh <- p + } else { + peerRTTMutex.Lock() + peerRTT = append(peerRTT, pingResult{ + peerID: p, + rtt: latency, + connectedness: r.host.Network().Connectedness(p), + }) + peerRTTMutex.Unlock() + } + } + + // Wait for pings to be done (if any) + wg.Wait() + close(pingCh) + + sort.Sort(pingSort(peerRTT)) + + for _, p := range peerRTT { + if p.rtt == time.Hour { + break + } + + // Make sure peer is reachable + _, exists := pinged[p.peerID] // Did we just ping the peer? + if !exists { + _, err := r.PingPeer(ctx, p.peerID) + if err != nil { + continue + } else { + if p.rtt != time.Hour { + return p.peerID, nil + } + } + } else { + if p.rtt != time.Hour { + return p.peerID, nil + } + } + } + + return "", ErrNoPeersAvailable +} + +type pingResult struct { + peerID peer.ID + rtt time.Duration + connectedness network.Connectedness +} + +type pingSort []pingResult + +func (a pingSort) Len() int { + return len(a) +} + +func (a pingSort) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +var connectednessPriority map[network.Connectedness]int + +func init() { + // Closer to 0 is prefered + connectednessPriority = map[network.Connectedness]int{ + network.CanConnect: 1, + network.Connected: 1, + network.NotConnected: 2, + network.CannotConnect: 3, + } +} + +func (a pingSort) Less(i, j int) bool { + return connectednessPriority[a[i].connectedness] < connectednessPriority[a[j].connectedness] && a[i].rtt < a[j].rtt +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go index 8dc8e6038..6f40a1711 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_connector.go @@ -193,7 +193,6 @@ func (c *PeerConnectionStrategy) canDialPeer(pi peer.AddrInfo) bool { c.mux.Lock() defer c.mux.Unlock() val, ok := c.cache.Get(pi.ID) - var cachedPeer *connCacheData if ok { tv := val.(*connCacheData) now := time.Now() @@ -204,15 +203,25 @@ func (c *PeerConnectionStrategy) canDialPeer(pi peer.AddrInfo) bool { } c.logger.Debug("Proceeding with connecting to peer", zap.Time("currentTime", now), zap.Time("nextTry", tv.nextTry)) - tv.nextTry = now.Add(tv.strat.Delay()) + } + return true +} + +func (c *PeerConnectionStrategy) addConnectionBackoff(peerID peer.ID) { + c.mux.Lock() + defer c.mux.Unlock() + val, ok := c.cache.Get(peerID) + var cachedPeer *connCacheData + if ok { + tv := val.(*connCacheData) + tv.nextTry = time.Now().Add(tv.strat.Delay()) } else { cachedPeer = &connCacheData{strat: c.backoff()} cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay()) c.logger.Debug("Initializing connectionCache for peer ", - logging.HostID("peerID", pi.ID), zap.Time("until", cachedPeer.nextTry)) - c.cache.Add(pi.ID, cachedPeer) + logging.HostID("peerID", peerID), zap.Time("until", cachedPeer.nextTry)) + c.cache.Add(peerID, cachedPeer) } - return true } func (c *PeerConnectionStrategy) dialPeers() { @@ -255,6 +264,7 @@ func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) { defer cancel() err := c.host.Connect(ctx, pi) if err != nil && !errors.Is(err, context.Canceled) { + c.addConnectionBackoff(pi.ID) c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi) c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err)) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go index d1af8ce3c..e811632e6 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go @@ -84,6 +84,7 @@ type PeerManager struct { discoveryService *discv5.DiscoveryV5 wakuprotoToENRFieldMap map[protocol.ID]WakuProtoInfo TopicHealthNotifCh chan<- TopicHealthStatus + rttCache *FastestPeerSelector } // PeerSelection provides various options based on which Peer is selected from a list of peers. @@ -188,6 +189,7 @@ func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMet subRelayTopics: make(map[string]*NodeTopicDetails), maxPeers: maxPeers, wakuprotoToENRFieldMap: map[protocol.ID]WakuProtoInfo{}, + rttCache: NewFastestPeerSelector(logger), } logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections), zap.Int("maxRelayPeers", maxRelayPeers), @@ -206,6 +208,7 @@ func (pm *PeerManager) SetDiscv5(discv5 *discv5.DiscoveryV5) { // SetHost sets the host to be used in order to access the peerStore. func (pm *PeerManager) SetHost(host host.Host) { pm.host = host + pm.rttCache.SetHost(host) } // SetPeerConnector sets the peer connector to be used for establishing relay connections. @@ -215,7 +218,6 @@ func (pm *PeerManager) SetPeerConnector(pc *PeerConnectionStrategy) { // Start starts the processing to be done by peer manager. func (pm *PeerManager) Start(ctx context.Context) { - pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) pm.ctx = ctx @@ -429,17 +431,21 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { if err == nil { enr, err := pm.host.Peerstore().(wps.WakuPeerstore).ENR(p.AddrInfo.ID) // Verifying if the enr record is more recent (DiscV5 and peer exchange can return peers already seen) - if err == nil && enr.Record().Seq() >= p.ENR.Seq() { - return - } - if err != nil { - //Peer is already in peer-store but it doesn't have an enr, but discovered peer has ENR - pm.logger.Info("peer already found in peerstore, but doesn't have an ENR record, re-adding", - logging.HostID("peer", p.AddrInfo.ID), zap.Uint64("newENRSeq", p.ENR.Seq())) + if err == nil { + if p.ENR != nil { + if enr.Record().Seq() >= p.ENR.Seq() { + return + } + //Peer is already in peer-store but stored ENR is older than discovered one. + pm.logger.Info("peer already found in peerstore, but re-adding it as ENR sequence is higher than locally stored", + logging.HostID("peer", p.AddrInfo.ID), zap.Uint64("newENRSeq", p.ENR.Seq()), zap.Uint64("storedENRSeq", enr.Record().Seq())) + } else { + pm.logger.Info("peer already found in peerstore, but no new ENR", logging.HostID("peer", p.AddrInfo.ID)) + } } else { - //Peer is already in peer-store but stored ENR is older than discovered one. - pm.logger.Info("peer already found in peerstore, but re-adding it as ENR sequence is higher than locally stored", - logging.HostID("peer", p.AddrInfo.ID), zap.Uint64("newENRSeq", p.ENR.Seq()), zap.Uint64("storedENRSeq", enr.Record().Seq())) + //Peer is in peer-store but it doesn't have an enr + pm.logger.Info("peer already found in peerstore, but doesn't have an ENR record, re-adding", + logging.HostID("peer", p.AddrInfo.ID)) } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_selection.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_selection.go index 2b4c3807d..4c1268bb6 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_selection.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_selection.go @@ -3,13 +3,9 @@ package peermanager import ( "context" "errors" - "sync" - "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" - "github.com/waku-org/go-waku/logging" wps "github.com/waku-org/go-waku/waku/v2/peerstore" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "go.uber.org/zap" @@ -178,11 +174,6 @@ func (pm *PeerManager) SelectPeers(criteria PeerSelectionCriteria) (peer.IDSlice } } -type pingResult struct { - p peer.ID - rtt time.Duration -} - // SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time // If a list of specific peers is passed, the peer will be chosen from that list assuming // it supports the chosen protocol, otherwise it will chose a peer from the node peerstore @@ -204,54 +195,8 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) ( if err != nil { return "", err } - wg := sync.WaitGroup{} - waitCh := make(chan struct{}) - pingCh := make(chan pingResult, 1000) - wg.Add(len(peers)) - - go func() { - for _, p := range peers { - go func(p peer.ID) { - defer wg.Done() - ctx, cancel := context.WithTimeout(criteria.Ctx, 3*time.Second) - defer cancel() - result := <-ping.Ping(ctx, pm.host, p) - if result.Error == nil { - pingCh <- pingResult{ - p: p, - rtt: result.RTT, - } - } else { - pm.logger.Debug("could not ping", logging.HostID("peer", p), zap.Error(result.Error)) - } - }(p) - } - wg.Wait() - close(waitCh) - close(pingCh) - }() - - select { - case <-waitCh: - var min *pingResult - for p := range pingCh { - if min == nil { - min = &p - } else { - if p.rtt < min.rtt { - min = &p - } - } - } - if min == nil { - return "", ErrNoPeersAvailable - } - - return min.p, nil - case <-criteria.Ctx.Done(): - return "", ErrNoPeersAvailable - } + return pm.rttCache.FastestPeer(criteria.Ctx, peers) } // FilterPeersByProto filters list of peers that support specified protocols. diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/storev3.pb.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/storev3.pb.go index 10f90e0b1..c4e0bbe28 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/storev3.pb.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/storev3.pb.go @@ -30,6 +30,7 @@ type WakuMessageKeyValue struct { MessageHash []byte `protobuf:"bytes,1,opt,name=message_hash,json=messageHash,proto3,oneof" json:"message_hash,omitempty"` // Globally unique key for a Waku Message Message *pb.WakuMessage `protobuf:"bytes,2,opt,name=message,proto3,oneof" json:"message,omitempty"` // Full message content as value + PubsubTopic *string `protobuf:"bytes,3,opt,name=pubsub_topic,json=pubsubTopic,proto3,oneof" json:"pubsub_topic,omitempty"` } func (x *WakuMessageKeyValue) Reset() { @@ -78,6 +79,13 @@ func (x *WakuMessageKeyValue) GetMessage() *pb.WakuMessage { return nil } +func (x *WakuMessageKeyValue) GetPubsubTopic() string { + if x != nil && x.PubsubTopic != nil { + return *x.PubsubTopic + } + return "" +} + type StoreQueryRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -285,7 +293,7 @@ var file_storev3_proto_rawDesc = []byte{ 0x0a, 0x0d, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x76, 0x33, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0d, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x33, 0x1a, 0x1d, 0x77, 0x61, 0x6b, 0x75, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2f, 0x76, 0x31, 0x2f, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x97, 0x01, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xd0, 0x01, 0x0a, 0x13, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x4b, 0x65, 0x79, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x26, 0x0a, 0x0c, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x0b, 0x6d, @@ -293,60 +301,63 @@ var file_storev3_proto_rawDesc = []byte{ 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x01, 0x52, 0x07, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x88, 0x01, 0x01, 0x42, 0x0f, 0x0a, 0x0d, 0x5f, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x42, 0x0a, 0x0a, 0x08, 0x5f, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xf8, 0x03, 0x0a, 0x11, 0x53, 0x74, 0x6f, 0x72, - 0x65, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, - 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, - 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x08, 0x52, 0x0b, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x44, 0x61, 0x74, 0x61, 0x12, - 0x26, 0x0a, 0x0c, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, - 0x0a, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x0b, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x54, - 0x6f, 0x70, 0x69, 0x63, 0x88, 0x01, 0x01, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x74, 0x65, - 0x6e, 0x74, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x0b, 0x20, 0x03, 0x28, 0x09, 0x52, - 0x0d, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x22, - 0x0a, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x0c, 0x20, 0x01, - 0x28, 0x12, 0x48, 0x01, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x53, 0x74, 0x61, 0x72, 0x74, 0x88, - 0x01, 0x01, 0x12, 0x1e, 0x0a, 0x08, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x65, 0x6e, 0x64, 0x18, 0x0d, - 0x20, 0x01, 0x28, 0x12, 0x48, 0x02, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x45, 0x6e, 0x64, 0x88, - 0x01, 0x01, 0x12, 0x25, 0x0a, 0x0e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x68, 0x61, - 0x73, 0x68, 0x65, 0x73, 0x18, 0x14, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x0d, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x48, 0x61, 0x73, 0x68, 0x65, 0x73, 0x12, 0x30, 0x0a, 0x11, 0x70, 0x61, 0x67, - 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x18, 0x33, - 0x20, 0x01, 0x28, 0x0c, 0x48, 0x03, 0x52, 0x10, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x88, 0x01, 0x01, 0x12, 0x2d, 0x0a, 0x12, 0x70, - 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x66, 0x6f, 0x72, 0x77, 0x61, 0x72, - 0x64, 0x18, 0x34, 0x20, 0x01, 0x28, 0x08, 0x52, 0x11, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x12, 0x2e, 0x0a, 0x10, 0x70, 0x61, - 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x35, - 0x20, 0x01, 0x28, 0x04, 0x48, 0x04, 0x52, 0x0f, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x88, 0x01, 0x01, 0x42, 0x0f, 0x0a, 0x0d, 0x5f, 0x70, - 0x75, 0x62, 0x73, 0x75, 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x0d, 0x0a, 0x0b, 0x5f, - 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x74, - 0x69, 0x6d, 0x65, 0x5f, 0x65, 0x6e, 0x64, 0x42, 0x14, 0x0a, 0x12, 0x5f, 0x70, 0x61, 0x67, 0x69, - 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x42, 0x13, 0x0a, - 0x11, 0x5f, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6c, 0x69, 0x6d, - 0x69, 0x74, 0x22, 0xa7, 0x02, 0x0a, 0x12, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x51, 0x75, 0x65, 0x72, - 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x24, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x74, - 0x75, 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0d, 0x48, 0x00, 0x52, - 0x0a, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65, 0x88, 0x01, 0x01, 0x12, 0x24, - 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x64, 0x65, 0x73, 0x63, 0x18, 0x0b, 0x20, - 0x01, 0x28, 0x09, 0x48, 0x01, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x44, 0x65, 0x73, - 0x63, 0x88, 0x01, 0x01, 0x12, 0x3e, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, - 0x18, 0x14, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x73, 0x74, - 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x33, 0x2e, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x4b, 0x65, 0x79, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x08, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x73, 0x12, 0x30, 0x0a, 0x11, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x5f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x18, 0x33, 0x20, 0x01, 0x28, 0x0c, 0x48, - 0x02, 0x52, 0x10, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, - 0x73, 0x6f, 0x72, 0x88, 0x01, 0x01, 0x42, 0x0e, 0x0a, 0x0c, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, - 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x42, 0x0e, 0x0a, 0x0c, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, - 0x73, 0x5f, 0x64, 0x65, 0x73, 0x63, 0x42, 0x14, 0x0a, 0x12, 0x5f, 0x70, 0x61, 0x67, 0x69, 0x6e, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x88, 0x01, 0x01, 0x12, 0x26, 0x0a, 0x0c, 0x70, 0x75, + 0x62, 0x73, 0x75, 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x48, 0x02, 0x52, 0x0b, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x88, + 0x01, 0x01, 0x42, 0x0f, 0x0a, 0x0d, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x68, + 0x61, 0x73, 0x68, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, + 0x0f, 0x0a, 0x0d, 0x5f, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, + 0x22, 0xf8, 0x03, 0x0a, 0x11, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, + 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69, 0x6e, 0x63, + 0x6c, 0x75, 0x64, 0x65, 0x44, 0x61, 0x74, 0x61, 0x12, 0x26, 0x0a, 0x0c, 0x70, 0x75, 0x62, 0x73, + 0x75, 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, + 0x52, 0x0b, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x88, 0x01, 0x01, + 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x6f, 0x70, 0x69, + 0x63, 0x73, 0x18, 0x0b, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, + 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x22, 0x0a, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x5f, + 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x12, 0x48, 0x01, 0x52, 0x09, 0x74, + 0x69, 0x6d, 0x65, 0x53, 0x74, 0x61, 0x72, 0x74, 0x88, 0x01, 0x01, 0x12, 0x1e, 0x0a, 0x08, 0x74, + 0x69, 0x6d, 0x65, 0x5f, 0x65, 0x6e, 0x64, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x12, 0x48, 0x02, 0x52, + 0x07, 0x74, 0x69, 0x6d, 0x65, 0x45, 0x6e, 0x64, 0x88, 0x01, 0x01, 0x12, 0x25, 0x0a, 0x0e, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x65, 0x73, 0x18, 0x14, 0x20, + 0x03, 0x28, 0x0c, 0x52, 0x0d, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x61, 0x73, 0x68, + 0x65, 0x73, 0x12, 0x30, 0x0a, 0x11, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x5f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x18, 0x33, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x03, 0x52, + 0x10, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f, + 0x72, 0x88, 0x01, 0x01, 0x12, 0x2d, 0x0a, 0x12, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x5f, 0x66, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x18, 0x34, 0x20, 0x01, 0x28, 0x08, + 0x52, 0x11, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x46, 0x6f, 0x72, 0x77, + 0x61, 0x72, 0x64, 0x12, 0x2e, 0x0a, 0x10, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x35, 0x20, 0x01, 0x28, 0x04, 0x48, 0x04, 0x52, + 0x0f, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x69, 0x6d, 0x69, 0x74, + 0x88, 0x01, 0x01, 0x42, 0x0f, 0x0a, 0x0d, 0x5f, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x5f, 0x74, + 0x6f, 0x70, 0x69, 0x63, 0x42, 0x0d, 0x0a, 0x0b, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x73, 0x74, + 0x61, 0x72, 0x74, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x65, 0x6e, 0x64, + 0x42, 0x14, 0x0a, 0x12, 0x5f, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, + 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x42, 0x13, 0x0a, 0x11, 0x5f, 0x70, 0x61, 0x67, 0x69, 0x6e, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x22, 0xa7, 0x02, 0x0a, 0x12, + 0x53, 0x74, 0x6f, 0x72, 0x65, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, + 0x64, 0x12, 0x24, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, + 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0d, 0x48, 0x00, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, + 0x43, 0x6f, 0x64, 0x65, 0x88, 0x01, 0x01, 0x12, 0x24, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x5f, 0x64, 0x65, 0x73, 0x63, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09, 0x48, 0x01, 0x52, 0x0a, + 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x44, 0x65, 0x73, 0x63, 0x88, 0x01, 0x01, 0x12, 0x3e, 0x0a, + 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x14, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x22, 0x2e, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x33, 0x2e, + 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x4b, 0x65, 0x79, 0x56, 0x61, + 0x6c, 0x75, 0x65, 0x52, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x12, 0x30, 0x0a, + 0x11, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x75, 0x72, 0x73, + 0x6f, 0x72, 0x18, 0x33, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x02, 0x52, 0x10, 0x70, 0x61, 0x67, 0x69, + 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x88, 0x01, 0x01, 0x42, + 0x0e, 0x0a, 0x0c, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x42, + 0x0e, 0x0a, 0x0c, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x64, 0x65, 0x73, 0x63, 0x42, + 0x14, 0x0a, 0x12, 0x5f, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, + 0x75, 0x72, 0x73, 0x6f, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/storev3.proto b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/storev3.proto index c828a713f..f6a89c6db 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/storev3.proto +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/storev3.proto @@ -8,6 +8,7 @@ import "waku/message/v1/message.proto"; message WakuMessageKeyValue { optional bytes message_hash = 1; // Globally unique key for a Waku Message optional waku.message.v1.WakuMessage message = 2; // Full message content as value + optional string pubsub_topic = 3; } message StoreQueryRequest { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/validation.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/validation.go index 40bdfade6..f54dea90e 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/validation.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/pb/validation.go @@ -14,7 +14,6 @@ var ( errMaxContentTopics = errors.New("exceeds the maximum number of ContentTopics allowed") errEmptyContentTopic = errors.New("one or more content topics specified is empty") errMissingPubsubTopic = errors.New("missing PubsubTopic field") - errMissingContentTopics = errors.New("missing ContentTopics field") errMissingStatusCode = errors.New("missing StatusCode field") errInvalidTimeRange = errors.New("invalid time range") errInvalidMessageHash = errors.New("invalid message hash") @@ -40,9 +39,7 @@ func (x *StoreQueryRequest) Validate() error { return errMissingPubsubTopic } - if len(x.ContentTopics) == 0 { - return errMissingContentTopics - } else if len(x.ContentTopics) > MaxContentTopics { + if len(x.ContentTopics) > MaxContentTopics { return errMaxContentTopics } else { for _, m := range x.ContentTopics { @@ -83,6 +80,10 @@ func (x *WakuMessageKeyValue) Validate() error { } if x.Message != nil { + if x.GetPubsubTopic() == "" { + return errMissingPubsubTopic + } + return x.Message.Validate() } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/result.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/result.go index be67671ec..180d27de1 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/result.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/result.go @@ -33,10 +33,6 @@ func (r *Result) Query() *pb.StoreQueryRequest { return r.storeRequest } -func (r *Result) PubsubTopic() string { - return r.storeRequest.GetPubsubTopic() -} - func (r *Result) Next(ctx context.Context) (bool, error) { if !r.started { r.started = true diff --git a/vendor/modules.txt b/vendor/modules.txt index 63f03fbbb..be7703f01 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -568,17 +568,11 @@ github.com/libp2p/go-libp2p/p2p/transport/webtransport # github.com/libp2p/go-libp2p-asn-util v0.3.0 ## explicit; go 1.19 github.com/libp2p/go-libp2p-asn-util -# github.com/libp2p/go-libp2p-mplex v0.9.0 -## explicit; go 1.20 -github.com/libp2p/go-libp2p-mplex # github.com/libp2p/go-libp2p-pubsub v0.10.1 ## explicit; go 1.20 github.com/libp2p/go-libp2p-pubsub github.com/libp2p/go-libp2p-pubsub/pb github.com/libp2p/go-libp2p-pubsub/timecache -# github.com/libp2p/go-mplex v0.7.0 -## explicit; go 1.17 -github.com/libp2p/go-mplex # github.com/libp2p/go-msgio v0.3.0 ## explicit; go 1.18 github.com/libp2p/go-msgio @@ -1029,7 +1023,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240507175626-19d27befd98b +# github.com/waku-org/go-waku v0.8.1-0.20240529181619-d1cb6b0eaa7f ## explicit; go 1.20 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/waku/persistence diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 51b6c0f34..8a34f3559 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -449,7 +449,7 @@ func (w *Waku) discoverAndConnectPeers() error { func (w *Waku) connect(peerInfo peer.AddrInfo, origin wps.Origin) { // Connection will be prunned eventually by the connection manager if needed // The peer connector in go-waku uses Connect, so it will execute identify as part of its - w.node.AddDiscoveredPeer(peerInfo.ID, peerInfo.Addrs, origin, []string{w.cfg.DefaultShardPubsubTopic}, true) + w.node.AddDiscoveredPeer(peerInfo.ID, peerInfo.Addrs, origin, []string{w.cfg.DefaultShardPubsubTopic}, nil, true) } func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) {