bug_: fix resend type for private group messages (#5258)

* chore_: bump go-waku

* fix_: fix resend type for private group messages

---------

Co-authored-by: Richard Ramos <info@richardramos.me>
This commit is contained in:
Andrea Maria Piana 2024-05-30 13:18:54 +01:00 committed by GitHub
parent fb88f541f2
commit 0061c563f2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 341 additions and 1441 deletions

4
go.mod
View File

@ -93,7 +93,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0 github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2 github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20240507175626-19d27befd98b github.com/waku-org/go-waku v0.8.1-0.20240529181619-d1cb6b0eaa7f
github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1
@ -189,8 +189,6 @@ require (
github.com/libp2p/go-cidranger v1.1.0 // indirect github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect
github.com/libp2p/go-libp2p-mplex v0.9.0 // indirect
github.com/libp2p/go-mplex v0.7.0 // indirect
github.com/libp2p/go-msgio v0.3.0 // indirect github.com/libp2p/go-msgio v0.3.0 // indirect
github.com/libp2p/go-nat v0.2.0 // indirect github.com/libp2p/go-nat v0.2.0 // indirect
github.com/libp2p/go-netroute v0.2.1 // indirect github.com/libp2p/go-netroute v0.2.1 // indirect

8
go.sum
View File

@ -1362,15 +1362,11 @@ github.com/libp2p/go-libp2p v0.32.2 h1:s8GYN4YJzgUoyeYNPdW7JZeZ5Ee31iNaIBfGYMAY4
github.com/libp2p/go-libp2p v0.32.2/go.mod h1:E0LKe+diV/ZVJVnOJby8VC5xzHF0660osg71skcxJvk= github.com/libp2p/go-libp2p v0.32.2/go.mod h1:E0LKe+diV/ZVJVnOJby8VC5xzHF0660osg71skcxJvk=
github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLEQHwOCZ7s8s= github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLEQHwOCZ7s8s=
github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNzVVaMNmq2NACuqyB9w= github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNzVVaMNmq2NACuqyB9w=
github.com/libp2p/go-libp2p-mplex v0.9.0 h1:R58pDRAmuBXkYugbSSXR9wrTX3+1pFM1xP2bLuodIq8=
github.com/libp2p/go-libp2p-mplex v0.9.0/go.mod h1:ro1i4kuwiFT+uMPbIDIFkcLs1KRbNp0QwnUXM+P64Og=
github.com/libp2p/go-libp2p-pubsub v0.10.1 h1:/RqOZpEtAolsr8/9CC8KqROJSOZeu7lK7fPftn4MwNg= github.com/libp2p/go-libp2p-pubsub v0.10.1 h1:/RqOZpEtAolsr8/9CC8KqROJSOZeu7lK7fPftn4MwNg=
github.com/libp2p/go-libp2p-pubsub v0.10.1/go.mod h1:1OxbaT/pFRO5h+Dpze8hdHQ63R0ke55XTs6b6NwLLkw= github.com/libp2p/go-libp2p-pubsub v0.10.1/go.mod h1:1OxbaT/pFRO5h+Dpze8hdHQ63R0ke55XTs6b6NwLLkw=
github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA= github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA=
github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg= github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg=
github.com/libp2p/go-maddr-filter v0.1.0/go.mod h1:VzZhTXkMucEGGEOSKddrwGiOv0tUhgnKqNEmIAz/bPU= github.com/libp2p/go-maddr-filter v0.1.0/go.mod h1:VzZhTXkMucEGGEOSKddrwGiOv0tUhgnKqNEmIAz/bPU=
github.com/libp2p/go-mplex v0.7.0 h1:BDhFZdlk5tbr0oyFq/xv/NPGfjbnrsDam1EvutpBDbY=
github.com/libp2p/go-mplex v0.7.0/go.mod h1:rW8ThnRcYWft/Jb2jeORBmPd6xuG3dGxWN/W168L9EU=
github.com/libp2p/go-msgio v0.3.0 h1:mf3Z8B1xcFN314sWX+2vOTShIE0Mmn2TXn3YCUQGNj0= github.com/libp2p/go-msgio v0.3.0 h1:mf3Z8B1xcFN314sWX+2vOTShIE0Mmn2TXn3YCUQGNj0=
github.com/libp2p/go-msgio v0.3.0/go.mod h1:nyRM819GmVaF9LX3l03RMh10QdOroF++NBbxAb0mmDM= github.com/libp2p/go-msgio v0.3.0/go.mod h1:nyRM819GmVaF9LX3l03RMh10QdOroF++NBbxAb0mmDM=
github.com/libp2p/go-nat v0.2.0 h1:Tyz+bUFAYqGyJ/ppPPymMGbIgNRH+WqC5QrT5fKrrGk= github.com/libp2p/go-nat v0.2.0 h1:Tyz+bUFAYqGyJ/ppPPymMGbIgNRH+WqC5QrT5fKrrGk=
@ -2142,8 +2138,8 @@ github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 h1:4K3IS97Jry
github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20240507175626-19d27befd98b h1:2NR0UCjuuAFmnkhsvlCKn7PTs4JxUjSq4s7lSWaG0ek= github.com/waku-org/go-waku v0.8.1-0.20240529181619-d1cb6b0eaa7f h1:KiDqcxmCi74BGDZzkGT5T83QhEL/rPrUbEiJWOuiuU4=
github.com/waku-org/go-waku v0.8.1-0.20240507175626-19d27befd98b/go.mod h1:yXnWChXRKTb+NhALbFysluxgSwuxeTF2rhanDJkIx+k= github.com/waku-org/go-waku v0.8.1-0.20240529181619-d1cb6b0eaa7f/go.mod h1:yXnWChXRKTb+NhALbFysluxgSwuxeTF2rhanDJkIx+k=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=

View File

@ -288,6 +288,19 @@ func (c *Chat) IsActivePersonalChat() bool {
return c.Active && (c.OneToOne() || c.PrivateGroupChat() || c.Public()) && c.CommunityID == "" return c.Active && (c.OneToOne() || c.PrivateGroupChat() || c.Public()) && c.CommunityID == ""
} }
// DefaultResendType returns the resend type for a chat.
// This function currently infers the ResendType from the chat type.
// Note that specific message might have different resent types. At times
// some messages dictate their ResendType based on their own properties and
// context, rather than the chat type it is associated with.
func (c *Chat) DefaultResendType() common.ResendType {
if c.OneToOne() || c.PrivateGroupChat() {
return common.ResendTypeDataSync
}
return common.ResendTypeRawMessage
}
func (c *Chat) shouldBeSynced() bool { func (c *Chat) shouldBeSynced() bool {
isPublicChat := !c.Timeline() && !c.ProfileUpdates() && c.Public() isPublicChat := !c.Timeline() && !c.ProfileUpdates() && c.Public()
return isPublicChat || c.OneToOne() || c.PrivateGroupChat() return isPublicChat || c.OneToOne() || c.PrivateGroupChat()

View File

@ -163,6 +163,47 @@ func (s *ChatTestSuite) TestUpdateFirstMessageTimestamp() {
setAndCheck(100, true, 100) setAndCheck(100, true, 100)
} }
func (s *ChatTestSuite) TestDefaultResendType() {
testID := "some-id"
testCases := []struct {
Name string
ExpectedResendType common.ResendType
Chat Chat
}{
{
Name: "one to one chat",
ExpectedResendType: common.ResendTypeDataSync,
Chat: Chat{
ID: testID,
ChatType: ChatTypeOneToOne,
},
},
{
Name: "private group chat",
ExpectedResendType: common.ResendTypeDataSync,
Chat: Chat{
ID: testID,
ChatType: ChatTypePrivateGroupChat,
},
},
{
Name: "community chat",
ExpectedResendType: common.ResendTypeRawMessage,
Chat: Chat{
ID: testID,
ChatType: ChatTypeCommunityChat,
},
},
}
for _, tc := range testCases {
s.Run(tc.Name, func() {
s.Require().Equal(tc.ExpectedResendType, tc.Chat.DefaultResendType())
})
}
}
func (s *ChatTestSuite) TestDeepLink() { func (s *ChatTestSuite) TestDeepLink() {
chat := &Chat{ chat := &Chat{
CommunityID: "0x02b1188c997e666cd5505ffd5c4b5fdbe3084b78a486d8e709da3b32ad3708a89e", CommunityID: "0x02b1188c997e666cd5505ffd5c4b5fdbe3084b78a486d8e709da3b32ad3708a89e",

View File

@ -2382,10 +2382,7 @@ func (m *Messenger) sendChatMessage(ctx context.Context, message *common.Message
SendPushNotification: m.featureFlags.PushNotifications, SendPushNotification: m.featureFlags.PushNotifications,
Payload: encodedMessage, Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_CHAT_MESSAGE, MessageType: protobuf.ApplicationMetadataMessage_CHAT_MESSAGE,
ResendType: common.ResendTypeRawMessage, ResendType: chat.DefaultResendType(),
}
if chat.ChatType == ChatTypeOneToOne {
rawMessage.ResendType = common.ResendTypeDataSync
} }
// We want to save the raw message before dispatching it, to avoid race conditions // We want to save the raw message before dispatching it, to avoid race conditions

View File

@ -235,7 +235,7 @@ func (m *Messenger) DeleteMessageAndSend(ctx context.Context, messageID string)
Payload: encodedMessage, Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_DELETE_MESSAGE, MessageType: protobuf.ApplicationMetadataMessage_DELETE_MESSAGE,
SkipGroupMessageWrap: true, SkipGroupMessageWrap: true,
ResendType: GetResendTypeForChat(chat), ResendType: chat.DefaultResendType(),
} }
_, err = m.dispatchMessage(ctx, rawMessage) _, err = m.dispatchMessage(ctx, rawMessage)

View File

@ -56,7 +56,7 @@ func (m *Messenger) sendPinMessage(ctx context.Context, message *common.PinMessa
Payload: encodedMessage, Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_PIN_MESSAGE, MessageType: protobuf.ApplicationMetadataMessage_PIN_MESSAGE,
SkipGroupMessageWrap: true, SkipGroupMessageWrap: true,
ResendType: GetResendTypeForChat(chat), ResendType: chat.DefaultResendType(),
} }
_, err = m.dispatchMessage(ctx, rawMessage) _, err = m.dispatchMessage(ctx, rawMessage)
if err != nil { if err != nil {

View File

@ -1,16 +0,0 @@
package protocol
import "github.com/status-im/status-go/protocol/common"
// GetResendTypeForChat returns the resend type for a chat.
// This function currently infers the ResendType from the chat type.
// However, it is recommended to explicitly determine the ResendType based on
// specific message characteristics to avoid implicit assumptions. This ensures
// that each message dictates its ResendType based on its own properties and
// context, rather than the chat type it is associated with.
func GetResendTypeForChat(chat *Chat) common.ResendType {
if chat.ChatType == ChatTypeOneToOne {
return common.ResendTypeDataSync
}
return common.ResendTypeRawMessage
}

View File

@ -1 +0,0 @@
*.swp

View File

@ -1,21 +0,0 @@
The MIT License (MIT)
Copyright (c) 2014 Juan Batiz-Benet
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -1,11 +0,0 @@
# DEPRECATION NOTICE
mplex has been deprecated.
see https://github.com/libp2p/specs/issues/553 for details
# go-libp2p-mplex - a go-stream-muxer shim for multiplex
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) [![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) ![](https://raw.githubusercontent.com/libp2p/go-stream-muxer/master/img/badge.png)
This is an implementation of the [go-libp2p muxer](https://pkg.go.dev/github.com/libp2p/go-libp2p@v0.30.0/core/network#Multiplexer) interface for [multiplex](https://github.com/libp2p/go-mplex). For more information, see that repo.

View File

@ -1,48 +0,0 @@
package mplex
import (
"context"
"github.com/libp2p/go-libp2p/core/network"
mp "github.com/libp2p/go-mplex"
)
type conn mp.Multiplex
var _ network.MuxedConn = &conn{}
// NewMuxedConn constructs a new Conn from a *mp.Multiplex.
func NewMuxedConn(m *mp.Multiplex) network.MuxedConn {
return (*conn)(m)
}
func (c *conn) Close() error {
return c.mplex().Close()
}
func (c *conn) IsClosed() bool {
return c.mplex().IsClosed()
}
// OpenStream creates a new stream.
func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) {
s, err := c.mplex().NewStream(ctx)
if err != nil {
return nil, err
}
return (*stream)(s), nil
}
// AcceptStream accepts a stream opened by the other side.
func (c *conn) AcceptStream() (network.MuxedStream, error) {
s, err := c.mplex().Accept()
if err != nil {
return nil, err
}
return (*stream)(s), nil
}
func (c *conn) mplex() *mp.Multiplex {
return (*mp.Multiplex)(c)
}

View File

@ -1,64 +0,0 @@
package mplex
import (
"time"
"github.com/libp2p/go-libp2p/core/network"
mp "github.com/libp2p/go-mplex"
)
// stream implements network.MuxedStream over mplex.Stream.
type stream mp.Stream
var _ network.MuxedStream = &stream{}
func (s *stream) Read(b []byte) (n int, err error) {
n, err = s.mplex().Read(b)
if err == mp.ErrStreamReset {
err = network.ErrReset
}
return n, err
}
func (s *stream) Write(b []byte) (n int, err error) {
n, err = s.mplex().Write(b)
if err == mp.ErrStreamReset {
err = network.ErrReset
}
return n, err
}
func (s *stream) Close() error {
return s.mplex().Close()
}
func (s *stream) CloseWrite() error {
return s.mplex().CloseWrite()
}
func (s *stream) CloseRead() error {
return s.mplex().CloseRead()
}
func (s *stream) Reset() error {
return s.mplex().Reset()
}
func (s *stream) SetDeadline(t time.Time) error {
return s.mplex().SetDeadline(t)
}
func (s *stream) SetReadDeadline(t time.Time) error {
return s.mplex().SetReadDeadline(t)
}
func (s *stream) SetWriteDeadline(t time.Time) error {
return s.mplex().SetWriteDeadline(t)
}
func (s *stream) mplex() *mp.Stream {
return (*mp.Stream)(s)
}

View File

@ -1,30 +0,0 @@
// DEPRECATED: mplex has been deprecated. Users should prefer Yamux over mplex. see https://github.com/libp2p/specs/issues/553
// for details
package mplex
import (
"net"
"github.com/libp2p/go-libp2p/core/network"
mp "github.com/libp2p/go-mplex"
)
// DefaultTransport has default settings for Transport
var DefaultTransport = &Transport{}
const ID = "/mplex/6.7.0"
var _ network.Multiplexer = &Transport{}
// Transport implements mux.Multiplexer that constructs
// mplex-backed muxed connections.
type Transport struct{}
func (t *Transport) NewConn(nc net.Conn, isServer bool, scope network.PeerScope) (network.MuxedConn, error) {
m, err := mp.NewMultiplex(nc, isServer, scope)
if err != nil {
return nil, err
}
return NewMuxedConn(m), nil
}

View File

@ -1,3 +0,0 @@
{
"version": "v0.9.0"
}

View File

@ -1,21 +0,0 @@
The MIT License (MIT)
Copyright (c) 2016 Jeromy Johnson
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -1,27 +0,0 @@
# go-mplex
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai)
[![Go Reference](https://pkg.go.dev/badge/github.com/libp2p/go-mplex.svg)](https://pkg.go.dev/github.com/libp2p/go-mplex)
[![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](https://libp2p.io/)
[![](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p)
[![Discourse posts](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg)](https://discuss.libp2p.io)
A super simple [stream muxing](https://docs.libp2p.io/concepts/stream-multiplexing/) library implementing [mplex](https://github.com/libp2p/specs/tree/master/mplex).
## Usage
```go
mplex := multiplex.NewMultiplex(mysocket)
s, _ := mplex.NewStream()
s.Write([]byte("Hello World!"))
s.Close()
os, _ := mplex.Accept()
// echo back everything received
io.Copy(os, os)
```
---
The last gx published version of this module was: 0.2.35: QmWGQQ6Tz8AdUpxktLf3zgnVN9Vy8fcWVezZJSU3ZmiANj

View File

@ -1,97 +0,0 @@
// Copied from the go standard library.
//
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE-BSD file.
package multiplex
import (
"sync"
"time"
)
// pipeDeadline is an abstraction for handling timeouts.
type pipeDeadline struct {
mu sync.Mutex // Guards timer and cancel
timer *time.Timer
cancel chan struct{} // Must be non-nil
}
func makePipeDeadline() pipeDeadline {
return pipeDeadline{cancel: make(chan struct{})}
}
// set sets the point in time when the deadline will time out.
// A timeout event is signaled by closing the channel returned by waiter.
// Once a timeout has occurred, the deadline can be refreshed by specifying a
// t value in the future.
//
// A zero value for t prevents timeout.
func (d *pipeDeadline) set(t time.Time) {
d.mu.Lock()
defer d.mu.Unlock()
// deadline closed
if d.cancel == nil {
return
}
if d.timer != nil && !d.timer.Stop() {
<-d.cancel // Wait for the timer callback to finish and close cancel
}
d.timer = nil
// Time is zero, then there is no deadline.
closed := isClosedChan(d.cancel)
if t.IsZero() {
if closed {
d.cancel = make(chan struct{})
}
return
}
// Time in the future, setup a timer to cancel in the future.
if dur := time.Until(t); dur > 0 {
if closed {
d.cancel = make(chan struct{})
}
d.timer = time.AfterFunc(dur, func() {
close(d.cancel)
})
return
}
// Time in the past, so close immediately.
if !closed {
close(d.cancel)
}
}
// wait returns a channel that is closed when the deadline is exceeded.
func (d *pipeDeadline) wait() chan struct{} {
d.mu.Lock()
defer d.mu.Unlock()
return d.cancel
}
// close closes, the deadline. Any future calls to `set` will do nothing.
func (d *pipeDeadline) close() {
d.mu.Lock()
defer d.mu.Unlock()
if d.timer != nil && !d.timer.Stop() {
<-d.cancel // Wait for the timer callback to finish and close cancel
}
d.timer = nil
d.cancel = nil
}
func isClosedChan(c <-chan struct{}) bool {
select {
case <-c:
return true
default:
return false
}
}

View File

@ -1,670 +0,0 @@
package multiplex
import (
"bufio"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"os"
"runtime/debug"
"sync"
"time"
pool "github.com/libp2p/go-buffer-pool"
logging "github.com/ipfs/go-log/v2"
"github.com/multiformats/go-varint"
)
var log = logging.Logger("mplex")
const (
MaxMessageSize = 1 << 20
BufferSize = 4096
MaxBuffers = 4
MinMemoryReservation = 3 * BufferSize
)
var (
ChunkSize = BufferSize - 20
)
// Max time to block waiting for a slow reader to read from a stream before
// resetting it. Preferably, we'd have some form of back-pressure mechanism but
// we don't have that in this protocol.
var ReceiveTimeout = 5 * time.Second
// ErrShutdown is returned when operating on a shutdown session
var ErrShutdown = errors.New("session shut down")
// ErrTwoInitiators is returned when both sides think they're the initiator
var ErrTwoInitiators = errors.New("two initiators")
// ErrInvalidState is returned when the other side does something it shouldn't.
// In this case, we close the connection to be safe.
var ErrInvalidState = errors.New("received an unexpected message from the peer")
var errTimeout = timeout{}
var ResetStreamTimeout = 2 * time.Minute
var getInputBufferTimeout = time.Minute
type timeout struct{}
func (timeout) Error() string { return "i/o deadline exceeded" }
func (timeout) Temporary() bool { return true }
func (timeout) Timeout() bool { return true }
// The MemoryManager allows management of memory allocations.
type MemoryManager interface {
// ReserveMemory reserves memory / buffer.
ReserveMemory(size int, prio uint8) error
// ReleaseMemory explicitly releases memory previously reserved with ReserveMemory
ReleaseMemory(size int)
}
type nullMemoryManager struct{}
func (m *nullMemoryManager) ReserveMemory(size int, prio uint8) error { return nil }
func (m *nullMemoryManager) ReleaseMemory(size int) {}
// +1 for initiator
const (
newStreamTag = 0
messageTag = 2
closeTag = 4
resetTag = 6
)
// Multiplex is a mplex session.
type Multiplex struct {
con net.Conn
buf *bufio.Reader
nextID uint64
initiator bool
memoryManager MemoryManager
closed chan struct{}
shutdown chan struct{}
shutdownErr error
shutdownLock sync.Mutex
writeCh chan []byte
nstreams chan *Stream
channels map[streamID]*Stream
chLock sync.Mutex
bufIn, bufOut chan struct{}
bufInTimer *time.Timer
reservedMemory int
}
// NewMultiplex creates a new multiplexer session.
func NewMultiplex(con net.Conn, initiator bool, memoryManager MemoryManager) (*Multiplex, error) {
if memoryManager == nil {
memoryManager = &nullMemoryManager{}
}
mp := &Multiplex{
con: con,
initiator: initiator,
channels: make(map[streamID]*Stream),
closed: make(chan struct{}),
shutdown: make(chan struct{}),
nstreams: make(chan *Stream, 16),
memoryManager: memoryManager,
}
// up-front reserve memory for the essential buffers (1 input, 1 output + the reader buffer)
if err := mp.memoryManager.ReserveMemory(MinMemoryReservation, 255); err != nil {
return nil, err
}
mp.reservedMemory += MinMemoryReservation
bufs := 1
// reserve some more memory for buffers if possible
for i := 1; i < MaxBuffers; i++ {
var prio uint8
if bufs < 2 {
prio = 192
} else {
prio = 128
}
// 2xBufferSize -- one for input and one for output
if err := mp.memoryManager.ReserveMemory(2*BufferSize, prio); err != nil {
break
}
mp.reservedMemory += 2 * BufferSize
bufs++
}
mp.buf = bufio.NewReaderSize(con, BufferSize)
mp.writeCh = make(chan []byte, bufs)
mp.bufIn = make(chan struct{}, bufs)
mp.bufOut = make(chan struct{}, bufs)
mp.bufInTimer = time.NewTimer(0)
if !mp.bufInTimer.Stop() {
<-mp.bufInTimer.C
}
go mp.handleIncoming()
go mp.handleOutgoing()
return mp, nil
}
func (mp *Multiplex) newStream(id streamID, name string) (s *Stream) {
s = &Stream{
id: id,
name: name,
dataIn: make(chan []byte, 1),
rDeadline: makePipeDeadline(),
wDeadline: makePipeDeadline(),
mp: mp,
writeCancel: make(chan struct{}),
readCancel: make(chan struct{}),
}
return
}
// Accept accepts the next stream from the connection.
func (m *Multiplex) Accept() (*Stream, error) {
select {
case s, ok := <-m.nstreams:
if !ok {
return nil, errors.New("multiplex closed")
}
return s, nil
case <-m.closed:
return nil, m.shutdownErr
}
}
// Close closes the session.
func (mp *Multiplex) Close() error {
mp.closeNoWait()
// Wait for the receive loop to finish.
<-mp.closed
return nil
}
func (mp *Multiplex) closeNoWait() {
mp.shutdownLock.Lock()
select {
case <-mp.shutdown:
default:
mp.memoryManager.ReleaseMemory(mp.reservedMemory)
mp.con.Close()
close(mp.shutdown)
}
mp.shutdownLock.Unlock()
}
// IsClosed returns true if the session is closed.
func (mp *Multiplex) IsClosed() bool {
select {
case <-mp.closed:
return true
default:
return false
}
}
// CloseChan returns a read-only channel which will be closed when the session is closed
func (mp *Multiplex) CloseChan() <-chan struct{} {
return mp.closed
}
func (mp *Multiplex) sendMsg(timeout, cancel <-chan struct{}, header uint64, data []byte) error {
buf, err := mp.getBufferOutbound(len(data)+20, timeout, cancel)
if err != nil {
return err
}
n := 0
n += binary.PutUvarint(buf[n:], header)
n += binary.PutUvarint(buf[n:], uint64(len(data)))
n += copy(buf[n:], data)
select {
case mp.writeCh <- buf[:n]:
return nil
case <-mp.shutdown:
mp.putBufferOutbound(buf)
return ErrShutdown
case <-timeout:
mp.putBufferOutbound(buf)
return errTimeout
case <-cancel:
mp.putBufferOutbound(buf)
return ErrStreamClosed
}
}
func (mp *Multiplex) handleOutgoing() {
defer func() {
if rerr := recover(); rerr != nil {
fmt.Fprintf(os.Stderr, "caught panic in handleOutgoing: %s\n%s\n", rerr, debug.Stack())
}
}()
for {
select {
case <-mp.shutdown:
return
case data := <-mp.writeCh:
err := mp.doWriteMsg(data)
mp.putBufferOutbound(data)
if err != nil {
// the connection is closed by this time
log.Warnf("error writing data: %s", err.Error())
return
}
}
}
}
func (mp *Multiplex) doWriteMsg(data []byte) error {
if mp.isShutdown() {
return ErrShutdown
}
_, err := mp.con.Write(data)
if err != nil {
mp.closeNoWait()
}
return err
}
func (mp *Multiplex) nextChanID() uint64 {
out := mp.nextID
mp.nextID++
return out
}
// NewStream creates a new stream.
func (mp *Multiplex) NewStream(ctx context.Context) (*Stream, error) {
return mp.NewNamedStream(ctx, "")
}
// NewNamedStream creates a new named stream.
func (mp *Multiplex) NewNamedStream(ctx context.Context, name string) (*Stream, error) {
mp.chLock.Lock()
// We could call IsClosed but this is faster (given that we already have
// the lock).
if mp.channels == nil {
mp.chLock.Unlock()
return nil, ErrShutdown
}
sid := mp.nextChanID()
header := (sid << 3) | newStreamTag
if name == "" {
name = fmt.Sprint(sid)
}
s := mp.newStream(streamID{
id: sid,
initiator: true,
}, name)
mp.channels[s.id] = s
mp.chLock.Unlock()
err := mp.sendMsg(ctx.Done(), nil, header, []byte(name))
if err != nil {
if err == errTimeout {
return nil, ctx.Err()
}
return nil, err
}
return s, nil
}
func (mp *Multiplex) cleanup() {
mp.closeNoWait()
// Take the channels.
mp.chLock.Lock()
channels := mp.channels
mp.channels = nil
mp.chLock.Unlock()
// Cancel any reads/writes
for _, msch := range channels {
msch.cancelRead(ErrStreamReset)
msch.cancelWrite(ErrStreamReset)
}
// And... shutdown!
if mp.shutdownErr == nil {
mp.shutdownErr = ErrShutdown
}
close(mp.closed)
}
func (mp *Multiplex) handleIncoming() {
defer func() {
if rerr := recover(); rerr != nil {
fmt.Fprintf(os.Stderr, "caught panic in handleIncoming: %s\n%s\n", rerr, debug.Stack())
}
}()
defer mp.cleanup()
recvTimeout := time.NewTimer(0)
defer recvTimeout.Stop()
recvTimeoutFired := false
loop:
for {
chID, tag, err := mp.readNextHeader()
if err != nil {
mp.shutdownErr = err
return
}
remoteIsInitiator := tag&1 == 0
ch := streamID{
// true if *I'm* the initiator.
initiator: !remoteIsInitiator,
id: chID,
}
// Rounds up the tag:
// 0 -> 0
// 1 -> 2
// 2 -> 2
// 3 -> 4
// etc...
tag += (tag & 1)
mlen, err := mp.readNextMsgLen()
if err != nil {
mp.shutdownErr = err
return
}
mp.chLock.Lock()
msch, ok := mp.channels[ch]
mp.chLock.Unlock()
switch tag {
case newStreamTag:
if ok {
log.Debugf("received NewStream message for existing stream: %d", ch)
mp.shutdownErr = ErrInvalidState
return
}
// skip stream name, this is not at all useful in the context of libp2p streams
if err := mp.skipNextMsg(mlen); err != nil {
mp.shutdownErr = err
return
}
msch = mp.newStream(ch, "")
mp.chLock.Lock()
mp.channels[ch] = msch
mp.chLock.Unlock()
select {
case mp.nstreams <- msch:
case <-mp.shutdown:
return
}
case resetTag:
if err := mp.skipNextMsg(mlen); err != nil {
mp.shutdownErr = err
return
}
if !ok {
// This is *ok*. We forget the stream on reset.
continue
}
// Cancel any ongoing reads/writes.
msch.cancelRead(ErrStreamReset)
msch.cancelWrite(ErrStreamReset)
case closeTag:
if err := mp.skipNextMsg(mlen); err != nil {
mp.shutdownErr = err
return
}
if !ok {
// may have canceled our reads already.
continue
}
// unregister and throw away future data.
mp.chLock.Lock()
delete(mp.channels, ch)
mp.chLock.Unlock()
// close data channel, there will be no more data.
close(msch.dataIn)
// We intentionally don't cancel any deadlines, cancel reads, cancel
// writes, etc. We just deliver the EOF by closing the
// data channel, and unregister the channel so we don't
// receive any more data. The user still needs to call
// `Close()` or `Reset()`.
case messageTag:
if !ok {
// We're not accepting data on this stream, for
// some reason. It's likely that we reset it, or
// simply canceled reads (e.g., called Close).
if err := mp.skipNextMsg(mlen); err != nil {
mp.shutdownErr = err
return
}
continue
}
read:
for rd := 0; rd < mlen; {
nextChunk := mlen - rd
if nextChunk > BufferSize {
nextChunk = BufferSize
}
b, err := mp.readNextChunk(nextChunk)
if err != nil {
mp.shutdownErr = err
return
}
rd += nextChunk
if !recvTimeout.Stop() && !recvTimeoutFired {
<-recvTimeout.C
}
recvTimeout.Reset(ReceiveTimeout)
recvTimeoutFired = false
select {
case msch.dataIn <- b:
case <-msch.readCancel:
// the user has canceled reading. walk away.
mp.putBufferInbound(b)
if err := mp.skipNextMsg(mlen - rd); err != nil {
mp.shutdownErr = err
return
}
break read
case <-recvTimeout.C:
recvTimeoutFired = true
mp.putBufferInbound(b)
log.Warnf("timed out receiving message into stream queue.")
// Do not do this asynchronously. Otherwise, we
// could drop a message, then receive a message,
// then reset.
msch.Reset()
if err := mp.skipNextMsg(mlen - rd); err != nil {
mp.shutdownErr = err
return
}
continue loop
case <-mp.shutdown:
mp.putBufferInbound(b)
return
}
}
default:
log.Debugf("message with unknown header on stream %s", ch)
mp.skipNextMsg(mlen)
if ok {
msch.Reset()
}
}
}
}
func (mp *Multiplex) isShutdown() bool {
select {
case <-mp.shutdown:
return true
default:
return false
}
}
func (mp *Multiplex) sendResetMsg(header uint64, hard bool) {
ctx, cancel := context.WithTimeout(context.Background(), ResetStreamTimeout)
defer cancel()
err := mp.sendMsg(ctx.Done(), nil, header, nil)
if err != nil && !mp.isShutdown() {
if hard {
log.Warnf("error sending reset message: %s; killing connection", err.Error())
mp.Close()
} else {
log.Debugf("error sending reset message: %s", err.Error())
}
}
}
func (mp *Multiplex) readNextHeader() (uint64, uint64, error) {
h, err := varint.ReadUvarint(mp.buf)
if err != nil {
return 0, 0, err
}
// get channel ID
ch := h >> 3
rem := h & 7
return ch, rem, nil
}
func (mp *Multiplex) readNextMsgLen() (int, error) {
l, err := varint.ReadUvarint(mp.buf)
if err != nil {
return 0, err
}
if l > uint64(MaxMessageSize) {
return 0, fmt.Errorf("message size too large")
}
if l == 0 {
return 0, nil
}
return int(l), nil
}
func (mp *Multiplex) readNextChunk(mlen int) ([]byte, error) {
buf, err := mp.getBufferInbound(mlen)
if err != nil {
return nil, err
}
_, err = io.ReadFull(mp.buf, buf)
if err != nil {
mp.putBufferInbound(buf)
return nil, err
}
return buf, nil
}
func (mp *Multiplex) skipNextMsg(mlen int) error {
if mlen == 0 {
return nil
}
_, err := mp.buf.Discard(mlen)
return err
}
func (mp *Multiplex) getBufferInbound(length int) ([]byte, error) {
timerFired := false
defer func() {
if !mp.bufInTimer.Stop() && !timerFired {
<-mp.bufInTimer.C
}
}()
mp.bufInTimer.Reset(getInputBufferTimeout)
select {
case mp.bufIn <- struct{}{}:
case <-mp.bufInTimer.C:
timerFired = true
return nil, errTimeout
case <-mp.shutdown:
return nil, ErrShutdown
}
return mp.getBuffer(length), nil
}
func (mp *Multiplex) getBufferOutbound(length int, timeout, cancel <-chan struct{}) ([]byte, error) {
select {
case mp.bufOut <- struct{}{}:
case <-timeout:
return nil, errTimeout
case <-cancel:
return nil, ErrStreamClosed
case <-mp.shutdown:
return nil, ErrShutdown
}
return mp.getBuffer(length), nil
}
func (mp *Multiplex) getBuffer(length int) []byte {
return pool.Get(length)
}
func (mp *Multiplex) putBufferInbound(b []byte) {
mp.putBuffer(b, mp.bufIn)
}
func (mp *Multiplex) putBufferOutbound(b []byte) {
mp.putBuffer(b, mp.bufOut)
}
func (mp *Multiplex) putBuffer(slice []byte, putBuf chan struct{}) {
<-putBuf
pool.Put(slice)
}

View File

@ -1,268 +0,0 @@
package multiplex
import (
"context"
"errors"
"io"
"sync"
"time"
"go.uber.org/multierr"
)
var (
ErrStreamReset = errors.New("stream reset")
ErrStreamClosed = errors.New("closed stream")
)
// streamID is a convenience type for operating on stream IDs
type streamID struct {
id uint64
initiator bool
}
// header computes the header for the given tag
func (id *streamID) header(tag uint64) uint64 {
header := id.id<<3 | tag
if !id.initiator {
header--
}
return header
}
type Stream struct {
id streamID
name string
dataIn chan []byte
mp *Multiplex
extra []byte
// exbuf is for holding the reference to the beginning of the extra slice
// for later memory pool freeing
exbuf []byte
rDeadline, wDeadline pipeDeadline
clLock sync.Mutex
writeCancelErr, readCancelErr error
writeCancel, readCancel chan struct{}
}
func (s *Stream) Name() string {
return s.name
}
// tries to preload pending data
func (s *Stream) preloadData() {
select {
case read, ok := <-s.dataIn:
if !ok {
return
}
s.extra = read
s.exbuf = read
default:
}
}
func (s *Stream) waitForData() error {
select {
case read, ok := <-s.dataIn:
if !ok {
return io.EOF
}
s.extra = read
s.exbuf = read
return nil
case <-s.readCancel:
// This is the only place where it's safe to return these.
s.returnBuffers()
return s.readCancelErr
case <-s.rDeadline.wait():
return errTimeout
}
}
func (s *Stream) returnBuffers() {
if s.exbuf != nil {
s.mp.putBufferInbound(s.exbuf)
s.exbuf = nil
s.extra = nil
}
for {
select {
case read, ok := <-s.dataIn:
if !ok {
return
}
if read == nil {
continue
}
s.mp.putBufferInbound(read)
default:
return
}
}
}
func (s *Stream) Read(b []byte) (int, error) {
select {
case <-s.readCancel:
return 0, s.readCancelErr
default:
}
if s.extra == nil {
err := s.waitForData()
if err != nil {
return 0, err
}
}
n := 0
for s.extra != nil && n < len(b) {
read := copy(b[n:], s.extra)
n += read
if read < len(s.extra) {
s.extra = s.extra[read:]
} else {
if s.exbuf != nil {
s.mp.putBufferInbound(s.exbuf)
}
s.extra = nil
s.exbuf = nil
s.preloadData()
}
}
return n, nil
}
func (s *Stream) Write(b []byte) (int, error) {
var written int
for written < len(b) {
wl := len(b) - written
if wl > ChunkSize {
wl = ChunkSize
}
n, err := s.write(b[written : written+wl])
if err != nil {
return written, err
}
written += n
}
return written, nil
}
func (s *Stream) write(b []byte) (int, error) {
select {
case <-s.writeCancel:
return 0, s.writeCancelErr
default:
}
err := s.mp.sendMsg(s.wDeadline.wait(), s.writeCancel, s.id.header(messageTag), b)
if err != nil {
return 0, err
}
return len(b), nil
}
func (s *Stream) cancelWrite(err error) bool {
s.wDeadline.close()
s.clLock.Lock()
defer s.clLock.Unlock()
select {
case <-s.writeCancel:
return false
default:
s.writeCancelErr = err
close(s.writeCancel)
return true
}
}
func (s *Stream) cancelRead(err error) bool {
// Always unregister for reading first, even if we're already closed (or
// already closing). When handleIncoming calls this, it expects the
// stream to be unregistered by the time it returns.
s.mp.chLock.Lock()
delete(s.mp.channels, s.id)
s.mp.chLock.Unlock()
s.rDeadline.close()
s.clLock.Lock()
defer s.clLock.Unlock()
select {
case <-s.readCancel:
return false
default:
s.readCancelErr = err
close(s.readCancel)
return true
}
}
func (s *Stream) CloseWrite() error {
if !s.cancelWrite(ErrStreamClosed) {
// Check if we closed the stream _nicely_. If so, we don't need
// to report an error to the user.
if s.writeCancelErr == ErrStreamClosed {
return nil
}
// Closed for some other reason. Report it.
return s.writeCancelErr
}
ctx, cancel := context.WithTimeout(context.Background(), ResetStreamTimeout)
defer cancel()
err := s.mp.sendMsg(ctx.Done(), nil, s.id.header(closeTag), nil)
// We failed to close the stream after 2 minutes, something is probably wrong.
if err != nil && !s.mp.isShutdown() {
log.Warnf("Error closing stream: %s; killing connection", err.Error())
s.mp.Close()
}
return err
}
func (s *Stream) CloseRead() error {
s.cancelRead(ErrStreamClosed)
return nil
}
func (s *Stream) Close() error {
return multierr.Combine(s.CloseRead(), s.CloseWrite())
}
func (s *Stream) Reset() error {
s.cancelRead(ErrStreamReset)
if s.cancelWrite(ErrStreamReset) {
// Send a reset in the background.
go s.mp.sendResetMsg(s.id.header(resetTag), true)
}
return nil
}
func (s *Stream) SetDeadline(t time.Time) error {
s.rDeadline.set(t)
s.wDeadline.set(t)
return nil
}
func (s *Stream) SetReadDeadline(t time.Time) error {
s.rDeadline.set(t)
return nil
}
func (s *Stream) SetWriteDeadline(t time.Time) error {
s.wDeadline.set(t)
return nil
}

View File

@ -1,3 +0,0 @@
{
"version": "v0.7.0"
}

View File

@ -698,7 +698,7 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics
} }
// AddDiscoveredPeer to add a discovered peer to the node peerStore // AddDiscoveredPeer to add a discovered peer to the node peerStore
func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubsubTopics []string, connectNow bool) { func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubsubTopics []string, enr *enode.Node, connectNow bool) {
p := service.PeerData{ p := service.PeerData{
Origin: origin, Origin: origin,
AddrInfo: peer.AddrInfo{ AddrInfo: peer.AddrInfo{
@ -706,6 +706,7 @@ func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wp
Addrs: addrs, Addrs: addrs,
}, },
PubsubTopics: pubsubTopics, PubsubTopics: pubsubTopics,
ENR: enr,
} }
w.peermanager.AddDiscoveredPeer(p, connectNow) w.peermanager.AddDiscoveredPeer(p, connectNow)
} }

View File

@ -12,7 +12,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
mplex "github.com/libp2p/go-libp2p-mplex"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/config" "github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/crypto"
@ -559,7 +558,6 @@ var DefaultLibP2POptions = []libp2p.Option{
libp2p.UserAgent(UserAgent), libp2p.UserAgent(UserAgent),
libp2p.ChainOptions( libp2p.ChainOptions(
libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport), libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
), ),
libp2p.EnableNATService(), libp2p.EnableNATService(),
libp2p.ConnectionManager(newConnManager(200, 300, connmgr.WithGracePeriod(0))), libp2p.ConnectionManager(newConnManager(200, 300, connmgr.WithGracePeriod(0))),

View File

@ -0,0 +1,172 @@
package peermanager
import (
"context"
"errors"
"sort"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/waku-org/go-waku/logging"
"go.uber.org/zap"
)
type FastestPeerSelector struct {
sync.RWMutex
host host.Host
logger *zap.Logger
}
func NewFastestPeerSelector(logger *zap.Logger) *FastestPeerSelector {
return &FastestPeerSelector{
logger: logger.Named("rtt-cache"),
}
}
func (r *FastestPeerSelector) SetHost(h host.Host) {
r.host = h
}
func (r *FastestPeerSelector) PingPeer(ctx context.Context, peer peer.ID) (time.Duration, error) {
if peer == r.host.ID() {
return 0, errors.New("can't ping yourself")
}
ctx, cancel := context.WithTimeout(ctx, 7*time.Second)
defer cancel()
select {
case <-ctx.Done():
return 0, ctx.Err()
case result := <-ping.Ping(ctx, r.host, peer):
r.Lock()
defer r.Unlock()
if result.Error == nil {
return result.RTT, nil
} else {
r.logger.Debug("could not ping", logging.HostID("peer", peer), zap.Error(result.Error))
return 0, result.Error
}
}
}
func (r *FastestPeerSelector) FastestPeer(ctx context.Context, peers peer.IDSlice) (peer.ID, error) {
var peerRTT []pingResult
var peerRTTMutex sync.Mutex
wg := sync.WaitGroup{}
pingCh := make(chan peer.ID)
pinged := make(map[peer.ID]struct{})
go func() {
// Ping any peer with no latency recorded
for peerToPing := range pingCh {
go func(p peer.ID) {
defer wg.Done()
rtt := time.Hour
result, err := r.PingPeer(ctx, p)
if err == nil {
rtt = result
}
peerRTTMutex.Lock()
peerRTT = append(peerRTT, pingResult{
peerID: p,
rtt: rtt,
connectedness: r.host.Network().Connectedness(p),
})
peerRTTMutex.Unlock()
}(peerToPing)
}
}()
for _, p := range peers {
latency := r.host.Peerstore().LatencyEWMA(p)
if latency == 0 {
wg.Add(1)
pinged[p] = struct{}{} // To avoid double pings
pingCh <- p
} else {
peerRTTMutex.Lock()
peerRTT = append(peerRTT, pingResult{
peerID: p,
rtt: latency,
connectedness: r.host.Network().Connectedness(p),
})
peerRTTMutex.Unlock()
}
}
// Wait for pings to be done (if any)
wg.Wait()
close(pingCh)
sort.Sort(pingSort(peerRTT))
for _, p := range peerRTT {
if p.rtt == time.Hour {
break
}
// Make sure peer is reachable
_, exists := pinged[p.peerID] // Did we just ping the peer?
if !exists {
_, err := r.PingPeer(ctx, p.peerID)
if err != nil {
continue
} else {
if p.rtt != time.Hour {
return p.peerID, nil
}
}
} else {
if p.rtt != time.Hour {
return p.peerID, nil
}
}
}
return "", ErrNoPeersAvailable
}
type pingResult struct {
peerID peer.ID
rtt time.Duration
connectedness network.Connectedness
}
type pingSort []pingResult
func (a pingSort) Len() int {
return len(a)
}
func (a pingSort) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
var connectednessPriority map[network.Connectedness]int
func init() {
// Closer to 0 is prefered
connectednessPriority = map[network.Connectedness]int{
network.CanConnect: 1,
network.Connected: 1,
network.NotConnected: 2,
network.CannotConnect: 3,
}
}
func (a pingSort) Less(i, j int) bool {
return connectednessPriority[a[i].connectedness] < connectednessPriority[a[j].connectedness] && a[i].rtt < a[j].rtt
}

View File

@ -193,7 +193,6 @@ func (c *PeerConnectionStrategy) canDialPeer(pi peer.AddrInfo) bool {
c.mux.Lock() c.mux.Lock()
defer c.mux.Unlock() defer c.mux.Unlock()
val, ok := c.cache.Get(pi.ID) val, ok := c.cache.Get(pi.ID)
var cachedPeer *connCacheData
if ok { if ok {
tv := val.(*connCacheData) tv := val.(*connCacheData)
now := time.Now() now := time.Now()
@ -204,15 +203,25 @@ func (c *PeerConnectionStrategy) canDialPeer(pi peer.AddrInfo) bool {
} }
c.logger.Debug("Proceeding with connecting to peer", c.logger.Debug("Proceeding with connecting to peer",
zap.Time("currentTime", now), zap.Time("nextTry", tv.nextTry)) zap.Time("currentTime", now), zap.Time("nextTry", tv.nextTry))
tv.nextTry = now.Add(tv.strat.Delay()) }
return true
}
func (c *PeerConnectionStrategy) addConnectionBackoff(peerID peer.ID) {
c.mux.Lock()
defer c.mux.Unlock()
val, ok := c.cache.Get(peerID)
var cachedPeer *connCacheData
if ok {
tv := val.(*connCacheData)
tv.nextTry = time.Now().Add(tv.strat.Delay())
} else { } else {
cachedPeer = &connCacheData{strat: c.backoff()} cachedPeer = &connCacheData{strat: c.backoff()}
cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay()) cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay())
c.logger.Debug("Initializing connectionCache for peer ", c.logger.Debug("Initializing connectionCache for peer ",
logging.HostID("peerID", pi.ID), zap.Time("until", cachedPeer.nextTry)) logging.HostID("peerID", peerID), zap.Time("until", cachedPeer.nextTry))
c.cache.Add(pi.ID, cachedPeer) c.cache.Add(peerID, cachedPeer)
} }
return true
} }
func (c *PeerConnectionStrategy) dialPeers() { func (c *PeerConnectionStrategy) dialPeers() {
@ -255,6 +264,7 @@ func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) {
defer cancel() defer cancel()
err := c.host.Connect(ctx, pi) err := c.host.Connect(ctx, pi)
if err != nil && !errors.Is(err, context.Canceled) { if err != nil && !errors.Is(err, context.Canceled) {
c.addConnectionBackoff(pi.ID)
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi) c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi)
c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err)) c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
} }

View File

@ -84,6 +84,7 @@ type PeerManager struct {
discoveryService *discv5.DiscoveryV5 discoveryService *discv5.DiscoveryV5
wakuprotoToENRFieldMap map[protocol.ID]WakuProtoInfo wakuprotoToENRFieldMap map[protocol.ID]WakuProtoInfo
TopicHealthNotifCh chan<- TopicHealthStatus TopicHealthNotifCh chan<- TopicHealthStatus
rttCache *FastestPeerSelector
} }
// PeerSelection provides various options based on which Peer is selected from a list of peers. // PeerSelection provides various options based on which Peer is selected from a list of peers.
@ -188,6 +189,7 @@ func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMet
subRelayTopics: make(map[string]*NodeTopicDetails), subRelayTopics: make(map[string]*NodeTopicDetails),
maxPeers: maxPeers, maxPeers: maxPeers,
wakuprotoToENRFieldMap: map[protocol.ID]WakuProtoInfo{}, wakuprotoToENRFieldMap: map[protocol.ID]WakuProtoInfo{},
rttCache: NewFastestPeerSelector(logger),
} }
logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections), logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections),
zap.Int("maxRelayPeers", maxRelayPeers), zap.Int("maxRelayPeers", maxRelayPeers),
@ -206,6 +208,7 @@ func (pm *PeerManager) SetDiscv5(discv5 *discv5.DiscoveryV5) {
// SetHost sets the host to be used in order to access the peerStore. // SetHost sets the host to be used in order to access the peerStore.
func (pm *PeerManager) SetHost(host host.Host) { func (pm *PeerManager) SetHost(host host.Host) {
pm.host = host pm.host = host
pm.rttCache.SetHost(host)
} }
// SetPeerConnector sets the peer connector to be used for establishing relay connections. // SetPeerConnector sets the peer connector to be used for establishing relay connections.
@ -215,7 +218,6 @@ func (pm *PeerManager) SetPeerConnector(pc *PeerConnectionStrategy) {
// Start starts the processing to be done by peer manager. // Start starts the processing to be done by peer manager.
func (pm *PeerManager) Start(ctx context.Context) { func (pm *PeerManager) Start(ctx context.Context) {
pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField)
pm.ctx = ctx pm.ctx = ctx
@ -429,17 +431,21 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
if err == nil { if err == nil {
enr, err := pm.host.Peerstore().(wps.WakuPeerstore).ENR(p.AddrInfo.ID) enr, err := pm.host.Peerstore().(wps.WakuPeerstore).ENR(p.AddrInfo.ID)
// Verifying if the enr record is more recent (DiscV5 and peer exchange can return peers already seen) // Verifying if the enr record is more recent (DiscV5 and peer exchange can return peers already seen)
if err == nil && enr.Record().Seq() >= p.ENR.Seq() { if err == nil {
if p.ENR != nil {
if enr.Record().Seq() >= p.ENR.Seq() {
return return
} }
if err != nil {
//Peer is already in peer-store but it doesn't have an enr, but discovered peer has ENR
pm.logger.Info("peer already found in peerstore, but doesn't have an ENR record, re-adding",
logging.HostID("peer", p.AddrInfo.ID), zap.Uint64("newENRSeq", p.ENR.Seq()))
} else {
//Peer is already in peer-store but stored ENR is older than discovered one. //Peer is already in peer-store but stored ENR is older than discovered one.
pm.logger.Info("peer already found in peerstore, but re-adding it as ENR sequence is higher than locally stored", pm.logger.Info("peer already found in peerstore, but re-adding it as ENR sequence is higher than locally stored",
logging.HostID("peer", p.AddrInfo.ID), zap.Uint64("newENRSeq", p.ENR.Seq()), zap.Uint64("storedENRSeq", enr.Record().Seq())) logging.HostID("peer", p.AddrInfo.ID), zap.Uint64("newENRSeq", p.ENR.Seq()), zap.Uint64("storedENRSeq", enr.Record().Seq()))
} else {
pm.logger.Info("peer already found in peerstore, but no new ENR", logging.HostID("peer", p.AddrInfo.ID))
}
} else {
//Peer is in peer-store but it doesn't have an enr
pm.logger.Info("peer already found in peerstore, but doesn't have an ENR record, re-adding",
logging.HostID("peer", p.AddrInfo.ID))
} }
} }

View File

@ -3,13 +3,9 @@ package peermanager
import ( import (
"context" "context"
"errors" "errors"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/waku-org/go-waku/logging"
wps "github.com/waku-org/go-waku/waku/v2/peerstore" wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"go.uber.org/zap" "go.uber.org/zap"
@ -178,11 +174,6 @@ func (pm *PeerManager) SelectPeers(criteria PeerSelectionCriteria) (peer.IDSlice
} }
} }
type pingResult struct {
p peer.ID
rtt time.Duration
}
// SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time // SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time
// If a list of specific peers is passed, the peer will be chosen from that list assuming // If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore // it supports the chosen protocol, otherwise it will chose a peer from the node peerstore
@ -204,54 +195,8 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (
if err != nil { if err != nil {
return "", err return "", err
} }
wg := sync.WaitGroup{}
waitCh := make(chan struct{})
pingCh := make(chan pingResult, 1000)
wg.Add(len(peers)) return pm.rttCache.FastestPeer(criteria.Ctx, peers)
go func() {
for _, p := range peers {
go func(p peer.ID) {
defer wg.Done()
ctx, cancel := context.WithTimeout(criteria.Ctx, 3*time.Second)
defer cancel()
result := <-ping.Ping(ctx, pm.host, p)
if result.Error == nil {
pingCh <- pingResult{
p: p,
rtt: result.RTT,
}
} else {
pm.logger.Debug("could not ping", logging.HostID("peer", p), zap.Error(result.Error))
}
}(p)
}
wg.Wait()
close(waitCh)
close(pingCh)
}()
select {
case <-waitCh:
var min *pingResult
for p := range pingCh {
if min == nil {
min = &p
} else {
if p.rtt < min.rtt {
min = &p
}
}
}
if min == nil {
return "", ErrNoPeersAvailable
}
return min.p, nil
case <-criteria.Ctx.Done():
return "", ErrNoPeersAvailable
}
} }
// FilterPeersByProto filters list of peers that support specified protocols. // FilterPeersByProto filters list of peers that support specified protocols.

View File

@ -30,6 +30,7 @@ type WakuMessageKeyValue struct {
MessageHash []byte `protobuf:"bytes,1,opt,name=message_hash,json=messageHash,proto3,oneof" json:"message_hash,omitempty"` // Globally unique key for a Waku Message MessageHash []byte `protobuf:"bytes,1,opt,name=message_hash,json=messageHash,proto3,oneof" json:"message_hash,omitempty"` // Globally unique key for a Waku Message
Message *pb.WakuMessage `protobuf:"bytes,2,opt,name=message,proto3,oneof" json:"message,omitempty"` // Full message content as value Message *pb.WakuMessage `protobuf:"bytes,2,opt,name=message,proto3,oneof" json:"message,omitempty"` // Full message content as value
PubsubTopic *string `protobuf:"bytes,3,opt,name=pubsub_topic,json=pubsubTopic,proto3,oneof" json:"pubsub_topic,omitempty"`
} }
func (x *WakuMessageKeyValue) Reset() { func (x *WakuMessageKeyValue) Reset() {
@ -78,6 +79,13 @@ func (x *WakuMessageKeyValue) GetMessage() *pb.WakuMessage {
return nil return nil
} }
func (x *WakuMessageKeyValue) GetPubsubTopic() string {
if x != nil && x.PubsubTopic != nil {
return *x.PubsubTopic
}
return ""
}
type StoreQueryRequest struct { type StoreQueryRequest struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
@ -285,7 +293,7 @@ var file_storev3_proto_rawDesc = []byte{
0x0a, 0x0d, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x76, 0x33, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x0d, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x76, 0x33, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
0x0d, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x33, 0x1a, 0x1d, 0x0d, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x33, 0x1a, 0x1d,
0x77, 0x61, 0x6b, 0x75, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2f, 0x76, 0x31, 0x2f, 0x77, 0x61, 0x6b, 0x75, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2f, 0x76, 0x31, 0x2f,
0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x97, 0x01, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xd0, 0x01,
0x0a, 0x13, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x4b, 0x65, 0x79, 0x0a, 0x13, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x4b, 0x65, 0x79,
0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x26, 0x0a, 0x0c, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x26, 0x0a, 0x0c, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x0b, 0x6d, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x0b, 0x6d,
@ -293,60 +301,63 @@ var file_storev3_proto_rawDesc = []byte{
0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c,
0x2e, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x76, 0x31,
0x2e, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x01, 0x52, 0x07, 0x2e, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x01, 0x52, 0x07,
0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x88, 0x01, 0x01, 0x42, 0x0f, 0x0a, 0x0d, 0x5f, 0x6d, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x88, 0x01, 0x01, 0x12, 0x26, 0x0a, 0x0c, 0x70, 0x75,
0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x62, 0x73, 0x75, 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xf8, 0x03, 0x0a, 0x11, 0x53, 0x74, 0x6f, 0x72, 0x48, 0x02, 0x52, 0x0b, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x88,
0x65, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x01, 0x01, 0x42, 0x0f, 0x0a, 0x0d, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x68,
0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x61, 0x73, 0x68, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42,
0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x0f, 0x0a, 0x0d, 0x5f, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63,
0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x22, 0xf8, 0x03, 0x0a, 0x11, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52,
0x28, 0x08, 0x52, 0x0b, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x44, 0x61, 0x74, 0x61, 0x12, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73,
0x26, 0x0a, 0x0c, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75,
0x0a, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x0b, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x54, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65,
0x6f, 0x70, 0x69, 0x63, 0x88, 0x01, 0x01, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69, 0x6e, 0x63,
0x6e, 0x74, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x0b, 0x20, 0x03, 0x28, 0x09, 0x52, 0x6c, 0x75, 0x64, 0x65, 0x44, 0x61, 0x74, 0x61, 0x12, 0x26, 0x0a, 0x0c, 0x70, 0x75, 0x62, 0x73,
0x0d, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x22, 0x75, 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00,
0x0a, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x0c, 0x20, 0x01, 0x52, 0x0b, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x88, 0x01, 0x01,
0x28, 0x12, 0x48, 0x01, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x53, 0x74, 0x61, 0x72, 0x74, 0x88, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x6f, 0x70, 0x69,
0x01, 0x01, 0x12, 0x1e, 0x0a, 0x08, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x65, 0x6e, 0x64, 0x18, 0x0d, 0x63, 0x73, 0x18, 0x0b, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e,
0x20, 0x01, 0x28, 0x12, 0x48, 0x02, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x45, 0x6e, 0x64, 0x88, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x22, 0x0a, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x5f,
0x01, 0x01, 0x12, 0x25, 0x0a, 0x0e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x68, 0x61, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x12, 0x48, 0x01, 0x52, 0x09, 0x74,
0x73, 0x68, 0x65, 0x73, 0x18, 0x14, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x0d, 0x6d, 0x65, 0x73, 0x73, 0x69, 0x6d, 0x65, 0x53, 0x74, 0x61, 0x72, 0x74, 0x88, 0x01, 0x01, 0x12, 0x1e, 0x0a, 0x08, 0x74,
0x61, 0x67, 0x65, 0x48, 0x61, 0x73, 0x68, 0x65, 0x73, 0x12, 0x30, 0x0a, 0x11, 0x70, 0x61, 0x67, 0x69, 0x6d, 0x65, 0x5f, 0x65, 0x6e, 0x64, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x12, 0x48, 0x02, 0x52,
0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x18, 0x33, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x45, 0x6e, 0x64, 0x88, 0x01, 0x01, 0x12, 0x25, 0x0a, 0x0e, 0x6d,
0x20, 0x01, 0x28, 0x0c, 0x48, 0x03, 0x52, 0x10, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x65, 0x73, 0x18, 0x14, 0x20,
0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x88, 0x01, 0x01, 0x12, 0x2d, 0x0a, 0x12, 0x70, 0x03, 0x28, 0x0c, 0x52, 0x0d, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x61, 0x73, 0x68,
0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x66, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x65, 0x73, 0x12, 0x30, 0x0a, 0x11, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e,
0x64, 0x18, 0x34, 0x20, 0x01, 0x28, 0x08, 0x52, 0x11, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x5f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x18, 0x33, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x03, 0x52,
0x69, 0x6f, 0x6e, 0x46, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x12, 0x2e, 0x0a, 0x10, 0x70, 0x61, 0x10, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f,
0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x35, 0x72, 0x88, 0x01, 0x01, 0x12, 0x2d, 0x0a, 0x12, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69,
0x20, 0x01, 0x28, 0x04, 0x48, 0x04, 0x52, 0x0f, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x66, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x18, 0x34, 0x20, 0x01, 0x28, 0x08,
0x6f, 0x6e, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x88, 0x01, 0x01, 0x42, 0x0f, 0x0a, 0x0d, 0x5f, 0x70, 0x52, 0x11, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x46, 0x6f, 0x72, 0x77,
0x75, 0x62, 0x73, 0x75, 0x62, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x0d, 0x0a, 0x0b, 0x5f, 0x61, 0x72, 0x64, 0x12, 0x2e, 0x0a, 0x10, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f,
0x74, 0x69, 0x6d, 0x65, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x74, 0x6e, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x35, 0x20, 0x01, 0x28, 0x04, 0x48, 0x04, 0x52,
0x69, 0x6d, 0x65, 0x5f, 0x65, 0x6e, 0x64, 0x42, 0x14, 0x0a, 0x12, 0x5f, 0x70, 0x61, 0x67, 0x69, 0x0f, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x69, 0x6d, 0x69, 0x74,
0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x42, 0x13, 0x0a, 0x88, 0x01, 0x01, 0x42, 0x0f, 0x0a, 0x0d, 0x5f, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x5f, 0x74,
0x11, 0x5f, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6c, 0x69, 0x6d, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x0d, 0x0a, 0x0b, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x73, 0x74,
0x69, 0x74, 0x22, 0xa7, 0x02, 0x0a, 0x12, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x51, 0x75, 0x65, 0x72, 0x61, 0x72, 0x74, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x5f, 0x65, 0x6e, 0x64,
0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x42, 0x14, 0x0a, 0x12, 0x5f, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f,
0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x42, 0x13, 0x0a, 0x11, 0x5f, 0x70, 0x61, 0x67, 0x69, 0x6e,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x24, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x22, 0xa7, 0x02, 0x0a, 0x12,
0x75, 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0d, 0x48, 0x00, 0x52, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x0a, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65, 0x88, 0x01, 0x01, 0x12, 0x24, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64,
0x0a, 0x0b, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x64, 0x65, 0x73, 0x63, 0x18, 0x0b, 0x20, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49,
0x01, 0x28, 0x09, 0x48, 0x01, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x44, 0x65, 0x73, 0x64, 0x12, 0x24, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65,
0x63, 0x88, 0x01, 0x01, 0x12, 0x3e, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0d, 0x48, 0x00, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73,
0x18, 0x14, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x73, 0x74, 0x43, 0x6f, 0x64, 0x65, 0x88, 0x01, 0x01, 0x12, 0x24, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x74, 0x75,
0x6f, 0x72, 0x65, 0x2e, 0x76, 0x33, 0x2e, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x73, 0x5f, 0x64, 0x65, 0x73, 0x63, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09, 0x48, 0x01, 0x52, 0x0a,
0x67, 0x65, 0x4b, 0x65, 0x79, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x44, 0x65, 0x73, 0x63, 0x88, 0x01, 0x01, 0x12, 0x3e, 0x0a,
0x61, 0x67, 0x65, 0x73, 0x12, 0x30, 0x0a, 0x11, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x14, 0x20, 0x03, 0x28, 0x0b, 0x32,
0x6f, 0x6e, 0x5f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x18, 0x33, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x22, 0x2e, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x33, 0x2e,
0x02, 0x52, 0x10, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x4b, 0x65, 0x79, 0x56, 0x61,
0x73, 0x6f, 0x72, 0x88, 0x01, 0x01, 0x42, 0x0e, 0x0a, 0x0c, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x6c, 0x75, 0x65, 0x52, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x12, 0x30, 0x0a,
0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x42, 0x0e, 0x0a, 0x0c, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x11, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x75, 0x72, 0x73,
0x73, 0x5f, 0x64, 0x65, 0x73, 0x63, 0x42, 0x14, 0x0a, 0x12, 0x5f, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x6f, 0x72, 0x18, 0x33, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x02, 0x52, 0x10, 0x70, 0x61, 0x67, 0x69,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x88, 0x01, 0x01, 0x42,
0x6f, 0x74, 0x6f, 0x33, 0x0e, 0x0a, 0x0c, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x42,
0x0e, 0x0a, 0x0c, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x64, 0x65, 0x73, 0x63, 0x42,
0x14, 0x0a, 0x12, 0x5f, 0x70, 0x61, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63,
0x75, 0x72, 0x73, 0x6f, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (

View File

@ -8,6 +8,7 @@ import "waku/message/v1/message.proto";
message WakuMessageKeyValue { message WakuMessageKeyValue {
optional bytes message_hash = 1; // Globally unique key for a Waku Message optional bytes message_hash = 1; // Globally unique key for a Waku Message
optional waku.message.v1.WakuMessage message = 2; // Full message content as value optional waku.message.v1.WakuMessage message = 2; // Full message content as value
optional string pubsub_topic = 3;
} }
message StoreQueryRequest { message StoreQueryRequest {

View File

@ -14,7 +14,6 @@ var (
errMaxContentTopics = errors.New("exceeds the maximum number of ContentTopics allowed") errMaxContentTopics = errors.New("exceeds the maximum number of ContentTopics allowed")
errEmptyContentTopic = errors.New("one or more content topics specified is empty") errEmptyContentTopic = errors.New("one or more content topics specified is empty")
errMissingPubsubTopic = errors.New("missing PubsubTopic field") errMissingPubsubTopic = errors.New("missing PubsubTopic field")
errMissingContentTopics = errors.New("missing ContentTopics field")
errMissingStatusCode = errors.New("missing StatusCode field") errMissingStatusCode = errors.New("missing StatusCode field")
errInvalidTimeRange = errors.New("invalid time range") errInvalidTimeRange = errors.New("invalid time range")
errInvalidMessageHash = errors.New("invalid message hash") errInvalidMessageHash = errors.New("invalid message hash")
@ -40,9 +39,7 @@ func (x *StoreQueryRequest) Validate() error {
return errMissingPubsubTopic return errMissingPubsubTopic
} }
if len(x.ContentTopics) == 0 { if len(x.ContentTopics) > MaxContentTopics {
return errMissingContentTopics
} else if len(x.ContentTopics) > MaxContentTopics {
return errMaxContentTopics return errMaxContentTopics
} else { } else {
for _, m := range x.ContentTopics { for _, m := range x.ContentTopics {
@ -83,6 +80,10 @@ func (x *WakuMessageKeyValue) Validate() error {
} }
if x.Message != nil { if x.Message != nil {
if x.GetPubsubTopic() == "" {
return errMissingPubsubTopic
}
return x.Message.Validate() return x.Message.Validate()
} }

View File

@ -33,10 +33,6 @@ func (r *Result) Query() *pb.StoreQueryRequest {
return r.storeRequest return r.storeRequest
} }
func (r *Result) PubsubTopic() string {
return r.storeRequest.GetPubsubTopic()
}
func (r *Result) Next(ctx context.Context) (bool, error) { func (r *Result) Next(ctx context.Context) (bool, error) {
if !r.started { if !r.started {
r.started = true r.started = true

8
vendor/modules.txt vendored
View File

@ -568,17 +568,11 @@ github.com/libp2p/go-libp2p/p2p/transport/webtransport
# github.com/libp2p/go-libp2p-asn-util v0.3.0 # github.com/libp2p/go-libp2p-asn-util v0.3.0
## explicit; go 1.19 ## explicit; go 1.19
github.com/libp2p/go-libp2p-asn-util github.com/libp2p/go-libp2p-asn-util
# github.com/libp2p/go-libp2p-mplex v0.9.0
## explicit; go 1.20
github.com/libp2p/go-libp2p-mplex
# github.com/libp2p/go-libp2p-pubsub v0.10.1 # github.com/libp2p/go-libp2p-pubsub v0.10.1
## explicit; go 1.20 ## explicit; go 1.20
github.com/libp2p/go-libp2p-pubsub github.com/libp2p/go-libp2p-pubsub
github.com/libp2p/go-libp2p-pubsub/pb github.com/libp2p/go-libp2p-pubsub/pb
github.com/libp2p/go-libp2p-pubsub/timecache github.com/libp2p/go-libp2p-pubsub/timecache
# github.com/libp2p/go-mplex v0.7.0
## explicit; go 1.17
github.com/libp2p/go-mplex
# github.com/libp2p/go-msgio v0.3.0 # github.com/libp2p/go-msgio v0.3.0
## explicit; go 1.18 ## explicit; go 1.18
github.com/libp2p/go-msgio github.com/libp2p/go-msgio
@ -1029,7 +1023,7 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20240507175626-19d27befd98b # github.com/waku-org/go-waku v0.8.1-0.20240529181619-d1cb6b0eaa7f
## explicit; go 1.20 ## explicit; go 1.20
github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/waku/persistence github.com/waku-org/go-waku/waku/persistence

View File

@ -449,7 +449,7 @@ func (w *Waku) discoverAndConnectPeers() error {
func (w *Waku) connect(peerInfo peer.AddrInfo, origin wps.Origin) { func (w *Waku) connect(peerInfo peer.AddrInfo, origin wps.Origin) {
// Connection will be prunned eventually by the connection manager if needed // Connection will be prunned eventually by the connection manager if needed
// The peer connector in go-waku uses Connect, so it will execute identify as part of its // The peer connector in go-waku uses Connect, so it will execute identify as part of its
w.node.AddDiscoveredPeer(peerInfo.ID, peerInfo.Addrs, origin, []string{w.cfg.DefaultShardPubsubTopic}, true) w.node.AddDiscoveredPeer(peerInfo.ID, peerInfo.Addrs, origin, []string{w.cfg.DefaultShardPubsubTopic}, nil, true)
} }
func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) {