go-libp2p-pubsub/pubsub.go

1443 lines
39 KiB
Go
Raw Normal View History

package pubsub
import (
"context"
"encoding/binary"
2019-11-16 01:14:10 +08:00
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p-pubsub/timecache"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
2019-05-26 17:19:03 +01:00
logging "github.com/ipfs/go-log/v2"
)
// DefaultMaximumMessageSize is 1mb.
const DefaultMaxMessageSize = 1 << 20
2019-01-15 15:39:13 +02:00
var (
// TimeCacheDuration specifies how long a message ID will be remembered as seen.
// Use WithSeenMessagesTTL to configure this per pubsub instance, instead of overriding the global default.
2019-01-15 15:39:13 +02:00
TimeCacheDuration = 120 * time.Second
// TimeCacheStrategy specifies which type of lookup/cleanup strategy is used by the seen messages cache.
// Use WithSeenMessagesStrategy to configure this per pubsub instance, instead of overriding the global default.
TimeCacheStrategy = timecache.Strategy_FirstSeen
// ErrSubscriptionCancelled may be returned when a subscription Next() is called after the
// subscription has been cancelled.
2020-07-13 13:12:27 -07:00
ErrSubscriptionCancelled = errors.New("subscription cancelled")
2019-01-15 15:39:13 +02:00
)
var log = logging.Logger("pubsub")
type ProtocolMatchFn = func(protocol.ID) func(protocol.ID) bool
2019-01-04 13:09:21 +02:00
// PubSub is the implementation of the pubsub system.
type PubSub struct {
// atomic counter for seqnos
// NOTE: Must be declared at the top of the struct as we perform atomic
// operations on this field.
//
// See: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
counter uint64
host host.Host
rt PubSubRouter
2019-04-29 22:45:48 +03:00
val *validation
disc *discover
2019-11-04 19:22:14 +02:00
tracer *pubsubTracer
2021-09-20 20:17:33 -07:00
peerFilter PeerFilter
// maxMessageSize is the maximum message size; it applies globally to all
// topics.
maxMessageSize int
2019-11-16 01:14:10 +08:00
// size of the outbound message channel that we maintain for each peer
peerOutboundQueueSize int
// incoming messages from other peers
incoming chan *RPC
// addSub is a control channel for us to add and remove subscriptions
addSub chan *addSubReq
// addRelay is a control channel for us to add and remove relays
addRelay chan *addRelayReq
// rmRelay is a relay cancellation channel
rmRelay chan string
// get list of topics we are subscribed to
getTopics chan *topicReq
// get chan of peers we are connected to
getPeers chan *listPeerReq
// send subscription here to cancel it
cancelCh chan *Subscription
// addSub is a channel for us to add a topic
addTopic chan *addTopicReq
// removeTopic is a topic cancellation channel
rmTopic chan *rmTopicReq
// a notification channel for new peer connections accumulated
newPeers chan struct{}
newPeersPrioLk sync.RWMutex
newPeersMx sync.Mutex
newPeersPend map[peer.ID]struct{}
// a notification channel for new outoging peer streams
2019-05-26 17:19:03 +01:00
newPeerStream chan network.Stream
// a notification channel for errors opening new peer streams
newPeerError chan peer.ID
// a notification channel for when our peers die
2021-07-13 20:24:29 +03:00
peerDead chan struct{}
peerDeadPrioLk sync.RWMutex
peerDeadMx sync.Mutex
peerDeadPend map[peer.ID]struct{}
// backoff for retrying new connections to dead peers
deadPeerBackoff *backoff
// The set of topics we are subscribed to
mySubs map[string]map[*Subscription]struct{}
// The set of topics we are relaying for
myRelays map[string]int
// The set of topics we are interested in
myTopics map[string]*Topic
// topics tracks which topics each of our peers are subscribed to
topics map[string]map[peer.ID]struct{}
// sendMsg handles messages that have been validated
sendMsg chan *Message
// addVal handles validator registration requests
addVal chan *addValReq
// rmVal handles validator unregistration requests
rmVal chan *rmValReq
2018-02-19 14:50:14 +02:00
// eval thunk in event loop
eval chan func()
2019-01-15 16:07:58 +02:00
// peer blacklist
2019-01-17 14:05:04 +02:00
blacklist Blacklist
2019-01-15 16:07:58 +02:00
blacklistPeer chan peer.ID
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
peers map[peer.ID]*rpcQueue
2021-01-12 14:52:00 +02:00
inboundStreamsMx sync.Mutex
inboundStreams map[peer.ID]network.Stream
seenMessages timecache.TimeCache
seenMsgTTL time.Duration
seenMsgStrategy timecache.Strategy
2022-01-09 18:14:39 +02:00
// generator used to compute the ID for a message
idGen *msgIDGenerator
// key for signing messages; nil when signing is disabled
signKey crypto.PrivKey
// source ID for signed messages; corresponds to signKey, empty when signing is disabled.
// If empty, the author and seq-nr are completely omitted from the messages.
signID peer.ID
2018-08-29 17:19:34 +03:00
// strict mode rejects all unsigned messages prior to validation
signPolicy MessageSignaturePolicy
2020-09-30 14:53:57 +03:00
// filter for tracking subscriptions in topics of interest; if nil, then we track all subscriptions
subFilter SubscriptionFilter
// protoMatchFunc is a matching function for protocol selection.
2021-07-29 17:54:04 -04:00
protoMatchFunc ProtocolMatchFn
ctx context.Context
Adds Application Specific RPC Inspector (#509) * Update go.mod * Refactor GossipSub Construction (#1) * Enables non-atomic validation for peer scoring parameters (#499) * decouples topic scoring parameters * adds skiping atomic validation for topic parameters * cleans up * adds skip atomic validation to peer score threshold * adds skip atomic validation for peer parameters * adds test for non-atomic validation * adds tests for peer score * adds tests for peer score thresholds * refactors tests * chore: Update .github/workflows/stale.yml [skip ci] * adds with gossipsub tracker Co-authored-by: libp2p-mgmt-read-write[bot] <104492852+libp2p-mgmt-read-write[bot]@users.noreply.github.com> * decouples options * fixes conflict * reverts back module * fixes peer score helper * Adds send control message to gossipsub router (#2) * adjusts libp2p version (#3) * Update go.mod (#4) * adds app specific rpc handler * Create ci.yml (#5) * Create Makefile (#7) * Revert "Merge branch 'yahya/gossipsub-router-interface'" (#6) This reverts commit 1c91995b7fbce0e4b9c5990c5bfda0d555267182. * Update ci.yml (#9) * Revert "Merge branch 'master' into yahya/adds-rpc-inspector" This reverts commit 352d7471c58580480b7f6592001bc3e9b910fa77. * Revert "Merge remote-tracking branch 'origin/yahya/adds-rpc-inspector' into yahya/adds-rpc-inspector" This reverts commit 586c5cb6eb2a971a1590ea32050de139316984d2. * Revert "Merge branch 'master' into yahya/adds-rpc-inspector" This reverts commit 2e13ee8b95dded5a3401dd86f952fae3419bd86b. * moves app specific inspector to pubsub * removes option from gossipsub * moves app specific rpc inspector up * refactors app specific to return an error Co-authored-by: libp2p-mgmt-read-write[bot] <104492852+libp2p-mgmt-read-write[bot]@users.noreply.github.com>
2022-11-30 22:10:07 -08:00
// appSpecificRpcInspector is an auxiliary that may be set by the application to inspect incoming RPCs prior to
// processing them. The inspector is invoked on an accepted RPC right prior to handling it.
// The return value of the inspector function is an error indicating whether the RPC should be processed or not.
// If the error is nil, the RPC is processed as usual. If the error is non-nil, the RPC is dropped.
appSpecificRpcInspector func(peer.ID, *RPC) error
}
2019-01-04 13:09:21 +02:00
// PubSubRouter is the message router component of PubSub.
type PubSubRouter interface {
2018-03-10 10:08:50 +02:00
// Protocols returns the list of protocols supported by the router.
Protocols() []protocol.ID
2018-03-10 10:08:50 +02:00
// Attach is invoked by the PubSub constructor to attach the router to a
// freshly initialized PubSub instance.
Attach(*PubSub)
2018-03-10 10:08:50 +02:00
// AddPeer notifies the router that a new peer has been connected.
AddPeer(peer.ID, protocol.ID)
2018-03-10 10:08:50 +02:00
// RemovePeer notifies the router that a peer has been disconnected.
RemovePeer(peer.ID)
// EnoughPeers returns whether the router needs more peers before it's ready to publish new records.
// Suggested (if greater than 0) is a suggested number of peers that the router should need.
EnoughPeers(topic string, suggested int) bool
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
// AcceptFrom is invoked on any RPC envelope before pushing it to the validation pipeline
// or processing control information.
2020-05-08 10:12:52 -04:00
// Allows routers with internal scoring to vet peers before committing any processing resources
// to the message and implement an effective graylist and react to validation queue overload.
AcceptFrom(peer.ID) AcceptStatus
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
// PreValidation is invoked on messages in the RPC envelope right before pushing it to
// the validation pipeline
PreValidation([]*Message)
2018-03-10 10:08:50 +02:00
// HandleRPC is invoked to process control messages in the RPC envelope.
// It is invoked after subscriptions and payload messages have been processed.
2018-01-27 09:52:35 +02:00
HandleRPC(*RPC)
2018-03-10 10:08:50 +02:00
// Publish is invoked to forward a new message that has been validated.
Publish(*Message)
2018-03-10 10:08:50 +02:00
// Join notifies the router that we want to receive and forward messages in a topic.
// It is invoked after the subscription announcement.
Join(topic string)
2018-03-10 10:08:50 +02:00
// Leave notifies the router that we are no longer interested in a topic.
// It is invoked after the unsubscription announcement.
Leave(topic string)
}
type AcceptStatus int
const (
2021-07-12 21:56:09 +02:00
// AcceptNone signals to drop the incoming RPC
AcceptNone AcceptStatus = iota
2020-09-04 14:49:25 +03:00
// AcceptControl signals to accept the incoming RPC only for control message processing by
// the router. Included payload messages will _not_ be pushed to the validation queue.
AcceptControl
2021-07-12 21:56:09 +02:00
// AcceptAll signals to accept the incoming RPC for full processing
AcceptAll
)
type Message struct {
*pb.Message
ID string
2020-01-27 13:44:03 +01:00
ReceivedFrom peer.ID
2019-12-19 15:15:45 -08:00
ValidatorData interface{}
Local bool
}
func (m *Message) GetFrom() peer.ID {
return peer.ID(m.Message.GetFrom())
}
type RPC struct {
pb.RPC
// unexported on purpose, not sending this over the wire
from peer.ID
}
type Option func(*PubSub) error
2019-01-04 13:09:21 +02:00
// NewPubSub returns a new PubSub management object.
func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error) {
ps := &PubSub{
2019-11-16 01:14:10 +08:00
host: h,
ctx: ctx,
rt: rt,
val: newValidation(),
2021-09-20 20:17:33 -07:00
peerFilter: DefaultPeerFilter,
2019-11-16 01:14:10 +08:00
disc: &discover{},
maxMessageSize: DefaultMaxMessageSize,
2019-11-16 01:14:10 +08:00
peerOutboundQueueSize: 32,
signID: h.ID(),
signKey: nil,
signPolicy: StrictSign,
2019-11-16 01:14:10 +08:00
incoming: make(chan *RPC, 32),
newPeers: make(chan struct{}, 1),
newPeersPend: make(map[peer.ID]struct{}),
2019-11-16 01:14:10 +08:00
newPeerStream: make(chan network.Stream),
newPeerError: make(chan peer.ID),
2021-07-13 20:24:29 +03:00
peerDead: make(chan struct{}, 1),
peerDeadPend: make(map[peer.ID]struct{}),
deadPeerBackoff: newBackoff(ctx, 1000, BackoffCleanupInterval, MaxBackoffAttempts),
2019-11-16 01:14:10 +08:00
cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
addRelay: make(chan *addRelayReq),
rmRelay: make(chan string),
2019-11-16 01:14:10 +08:00
addTopic: make(chan *addTopicReq),
rmTopic: make(chan *rmTopicReq),
getTopics: make(chan *topicReq),
sendMsg: make(chan *Message, 32),
addVal: make(chan *addValReq),
rmVal: make(chan *rmValReq),
eval: make(chan func()),
myTopics: make(map[string]*Topic),
mySubs: make(map[string]map[*Subscription]struct{}),
myRelays: make(map[string]int),
2019-11-16 01:14:10 +08:00
topics: make(map[string]map[peer.ID]struct{}),
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
peers: make(map[peer.ID]*rpcQueue),
2021-01-12 14:52:00 +02:00
inboundStreams: make(map[peer.ID]network.Stream),
2019-11-16 01:14:10 +08:00
blacklist: NewMapBlacklist(),
blacklistPeer: make(chan peer.ID),
seenMsgTTL: TimeCacheDuration,
seenMsgStrategy: TimeCacheStrategy,
2022-01-09 18:14:39 +02:00
idGen: newMsgIdGenerator(),
2019-11-16 01:14:10 +08:00
counter: uint64(time.Now().UnixNano()),
}
for _, opt := range opts {
err := opt(ps)
if err != nil {
return nil, err
}
}
if ps.signPolicy.mustSign() {
if ps.signID == "" {
return nil, fmt.Errorf("strict signature usage enabled but message author was disabled")
}
ps.signKey = ps.host.Peerstore().PrivKey(ps.signID)
if ps.signKey == nil {
return nil, fmt.Errorf("can't sign for peer %s: no private key", ps.signID)
}
}
ps.seenMessages = timecache.NewTimeCacheWithStrategy(ps.seenMsgStrategy, ps.seenMsgTTL)
if err := ps.disc.Start(ps); err != nil {
return nil, err
}
rt.Attach(ps)
for _, id := range rt.Protocols() {
if ps.protoMatchFunc != nil {
h.SetStreamHandlerMatch(id, ps.protoMatchFunc(id), ps.handleNewStream)
} else {
h.SetStreamHandler(id, ps.handleNewStream)
}
}
go ps.watchForNewPeers(ctx)
2019-04-29 22:45:48 +03:00
ps.val.Start(ps)
2019-04-29 22:45:48 +03:00
go ps.processLoop(ctx)
return ps, nil
}
// MsgIdFunction returns a unique ID for the passed Message, and PubSub can be customized to use any
// implementation of this function by configuring it with the Option from WithMessageIdFn.
type MsgIdFunction func(pmsg *pb.Message) string
// WithMessageIdFn is an option to customize the way a message ID is computed for a pubsub message.
// The default ID function is DefaultMsgIdFn (concatenate source and seq nr.),
// but it can be customized to e.g. the hash of the message.
func WithMessageIdFn(fn MsgIdFunction) Option {
return func(p *PubSub) error {
2022-01-09 18:14:39 +02:00
p.idGen.Default = fn
return nil
}
}
2021-09-20 20:17:33 -07:00
// PeerFilter is used to filter pubsub peers. It should return true for peers that are accepted for
// a given topic. PubSub can be customized to use any implementation of this function by configuring
// it with the Option from WithPeerFilter.
type PeerFilter func(pid peer.ID, topic string) bool
// WithPeerFilter is an option to set a filter for pubsub peers.
// The default peer filter is DefaultPeerFilter (which always returns true), but it can be customized
// to any custom implementation.
func WithPeerFilter(filter PeerFilter) Option {
return func(p *PubSub) error {
p.peerFilter = filter
return nil
}
}
2019-11-16 01:14:10 +08:00
// WithPeerOutboundQueueSize is an option to set the buffer size for outbound messages to a peer
// We start dropping messages to a peer if the outbound queue if full
func WithPeerOutboundQueueSize(size int) Option {
return func(p *PubSub) error {
2019-11-16 01:39:12 +08:00
if size <= 0 {
return errors.New("outbound queue size must always be positive")
2019-11-16 01:14:10 +08:00
}
p.peerOutboundQueueSize = size
return nil
}
}
// WithMessageSignaturePolicy sets the mode of operation for producing and verifying message signatures.
func WithMessageSignaturePolicy(policy MessageSignaturePolicy) Option {
return func(p *PubSub) error {
p.signPolicy = policy
return nil
}
}
// WithMessageSigning enables or disables message signing (enabled by default).
// Deprecated: signature verification without message signing,
// or message signing without verification, are not recommended.
func WithMessageSigning(enabled bool) Option {
2018-08-26 13:40:11 +03:00
return func(p *PubSub) error {
if enabled {
p.signPolicy |= msgSigning
} else {
p.signPolicy &^= msgSigning
}
return nil
}
}
// WithMessageAuthor sets the author for outbound messages to the given peer ID
// (defaults to the host's ID). If message signing is enabled, the private key
// must be available in the host's peerstore.
func WithMessageAuthor(author peer.ID) Option {
return func(p *PubSub) error {
2020-04-23 15:33:19 +03:00
author := author
if author == "" {
author = p.host.ID()
}
p.signID = author
return nil
}
}
// WithNoAuthor omits the author and seq-number data of messages, and disables the use of signatures.
// Not recommended to use with the default message ID function, see WithMessageIdFn.
func WithNoAuthor() Option {
return func(p *PubSub) error {
p.signID = ""
p.signPolicy &^= msgSigning
return nil
}
}
2019-05-02 20:15:55 +03:00
// WithStrictSignatureVerification is an option to enable or disable strict message signing.
// When enabled (which is the default), unsigned messages will be discarded.
// Deprecated: signature verification without message signing,
// or message signing without verification, are not recommended.
func WithStrictSignatureVerification(required bool) Option {
return func(p *PubSub) error {
if required {
p.signPolicy |= msgVerification
} else {
p.signPolicy &^= msgVerification
}
2018-08-26 13:40:11 +03:00
return nil
}
}
2019-01-17 14:05:04 +02:00
// WithBlacklist provides an implementation of the blacklist; the default is a
// MapBlacklist
func WithBlacklist(b Blacklist) Option {
return func(p *PubSub) error {
p.blacklist = b
return nil
}
}
// WithDiscovery provides a discovery mechanism used to bootstrap and provide peers into PubSub
func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option {
return func(p *PubSub) error {
discoverOpts := defaultDiscoverOptions()
for _, opt := range opts {
err := opt(discoverOpts)
if err != nil {
return err
}
}
p.disc.discovery = &pubSubDiscovery{Discovery: d, opts: discoverOpts.opts}
p.disc.options = discoverOpts
return nil
}
}
2019-11-04 19:22:14 +02:00
// WithEventTracer provides a tracer for the pubsub system
func WithEventTracer(tracer EventTracer) Option {
return func(p *PubSub) error {
if p.tracer != nil {
2020-03-01 14:29:24 +02:00
p.tracer.tracer = tracer
} else {
2022-01-09 18:14:39 +02:00
p.tracer = &pubsubTracer{tracer: tracer, pid: p.host.ID(), idGen: p.idGen}
2020-03-01 14:29:24 +02:00
}
2019-11-04 19:22:14 +02:00
return nil
}
}
2021-03-10 13:39:56 +02:00
// WithRawTracer adds a raw tracer to the pubsub system.
// Multiple tracers can be added using multiple invocations of the option.
func WithRawTracer(tracer RawTracer) Option {
2020-05-19 10:50:45 -04:00
return func(p *PubSub) error {
if p.tracer != nil {
2021-03-10 13:39:56 +02:00
p.tracer.raw = append(p.tracer.raw, tracer)
2020-05-19 10:50:45 -04:00
} else {
2022-01-09 18:14:39 +02:00
p.tracer = &pubsubTracer{raw: []RawTracer{tracer}, pid: p.host.ID(), idGen: p.idGen}
2020-05-19 10:50:45 -04:00
}
return nil
}
}
// WithMaxMessageSize sets the global maximum message size for pubsub wire
// messages. The default value is 1MiB (DefaultMaxMessageSize).
//
// Observe the following warnings when setting this option.
//
// WARNING #1: Make sure to change the default protocol prefixes for floodsub
// (FloodSubID) and gossipsub (GossipSubID). This avoids accidentally joining
// the public default network, which uses the default max message size, and
// therefore will cause messages to be dropped.
//
// WARNING #2: Reducing the default max message limit is fine, if you are
// certain that your application messages will not exceed the new limit.
// However, be wary of increasing the limit, as pubsub networks are naturally
// write-amplifying, i.e. for every message we receive, we send D copies of the
// message to our peers. If those messages are large, the bandwidth requirements
// will grow linearly. Note that propagation is sent on the uplink, which
// traditionally is more constrained than the downlink. Instead, consider
// out-of-band retrieval for large messages, by sending a CID (Content-ID) or
// another type of locator, such that messages can be fetched on-demand, rather
// than being pushed proactively. Under this design, you'd use the pubsub layer
// as a signalling system, rather than a data delivery system.
func WithMaxMessageSize(maxMessageSize int) Option {
return func(ps *PubSub) error {
ps.maxMessageSize = maxMessageSize
return nil
}
}
2021-07-29 17:54:04 -04:00
// WithProtocolMatchFn sets a custom matching function for protocol selection to
// be used by the protocol handler on the Host's Mux. Should be combined with
// WithGossipSubProtocols feature function for checking if certain protocol features
// are supported
func WithProtocolMatchFn(m ProtocolMatchFn) Option {
return func(ps *PubSub) error {
2021-07-29 17:54:04 -04:00
ps.protoMatchFunc = m
return nil
}
}
// WithSeenMessagesTTL configures when a previously seen message ID can be forgotten about
func WithSeenMessagesTTL(ttl time.Duration) Option {
return func(ps *PubSub) error {
ps.seenMsgTTL = ttl
return nil
}
}
// WithSeenMessagesStrategy configures which type of lookup/cleanup strategy is used by the seen messages cache
func WithSeenMessagesStrategy(strategy timecache.Strategy) Option {
return func(ps *PubSub) error {
ps.seenMsgStrategy = strategy
return nil
}
}
// WithAppSpecificRpcInspector sets a hook that inspect incomings RPCs prior to
// processing them. The inspector is invoked on an accepted RPC just before it
// is handled. If inspector's error is nil, the RPC is handled. Otherwise, it
// is dropped.
Adds Application Specific RPC Inspector (#509) * Update go.mod * Refactor GossipSub Construction (#1) * Enables non-atomic validation for peer scoring parameters (#499) * decouples topic scoring parameters * adds skiping atomic validation for topic parameters * cleans up * adds skip atomic validation to peer score threshold * adds skip atomic validation for peer parameters * adds test for non-atomic validation * adds tests for peer score * adds tests for peer score thresholds * refactors tests * chore: Update .github/workflows/stale.yml [skip ci] * adds with gossipsub tracker Co-authored-by: libp2p-mgmt-read-write[bot] <104492852+libp2p-mgmt-read-write[bot]@users.noreply.github.com> * decouples options * fixes conflict * reverts back module * fixes peer score helper * Adds send control message to gossipsub router (#2) * adjusts libp2p version (#3) * Update go.mod (#4) * adds app specific rpc handler * Create ci.yml (#5) * Create Makefile (#7) * Revert "Merge branch 'yahya/gossipsub-router-interface'" (#6) This reverts commit 1c91995b7fbce0e4b9c5990c5bfda0d555267182. * Update ci.yml (#9) * Revert "Merge branch 'master' into yahya/adds-rpc-inspector" This reverts commit 352d7471c58580480b7f6592001bc3e9b910fa77. * Revert "Merge remote-tracking branch 'origin/yahya/adds-rpc-inspector' into yahya/adds-rpc-inspector" This reverts commit 586c5cb6eb2a971a1590ea32050de139316984d2. * Revert "Merge branch 'master' into yahya/adds-rpc-inspector" This reverts commit 2e13ee8b95dded5a3401dd86f952fae3419bd86b. * moves app specific inspector to pubsub * removes option from gossipsub * moves app specific rpc inspector up * refactors app specific to return an error Co-authored-by: libp2p-mgmt-read-write[bot] <104492852+libp2p-mgmt-read-write[bot]@users.noreply.github.com>
2022-11-30 22:10:07 -08:00
func WithAppSpecificRpcInspector(inspector func(peer.ID, *RPC) error) Option {
return func(ps *PubSub) error {
ps.appSpecificRpcInspector = inspector
return nil
}
}
// processLoop handles all inputs arriving on the channels
func (p *PubSub) processLoop(ctx context.Context) {
defer func() {
// Clean up go routines.
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
for _, queue := range p.peers {
queue.Close()
}
p.peers = nil
p.topics = nil
p.seenMessages.Done()
}()
for {
select {
case <-p.newPeers:
p.handlePendingPeers()
case s := <-p.newPeerStream:
pid := s.Conn().RemotePeer()
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
q, ok := p.peers[pid]
if !ok {
2020-05-05 20:38:53 +03:00
log.Warn("new stream for unknown peer: ", pid)
s.Reset()
continue
}
2019-01-17 14:05:04 +02:00
if p.blacklist.Contains(pid) {
2020-05-05 20:38:53 +03:00
log.Warn("closing stream for blacklisted peer: ", pid)
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
q.Close()
delete(p.peers, pid)
s.Reset()
continue
}
p.rt.AddPeer(pid, s.Protocol())
case pid := <-p.newPeerError:
delete(p.peers, pid)
2021-07-13 20:24:29 +03:00
case <-p.peerDead:
p.handleDeadPeers()
case treq := <-p.getTopics:
var out []string
for t := range p.mySubs {
out = append(out, t)
}
treq.resp <- out
case topic := <-p.addTopic:
p.handleAddTopic(topic)
case topic := <-p.rmTopic:
p.handleRemoveTopic(topic)
case sub := <-p.cancelCh:
p.handleRemoveSubscription(sub)
case sub := <-p.addSub:
p.handleAddSubscription(sub)
case relay := <-p.addRelay:
p.handleAddRelay(relay)
case topic := <-p.rmRelay:
p.handleRemoveRelay(topic)
case preq := <-p.getPeers:
tmap, ok := p.topics[preq.topic]
if preq.topic != "" && !ok {
preq.resp <- nil
continue
}
var peers []peer.ID
for p := range p.peers {
if preq.topic != "" {
_, ok := tmap[p]
if !ok {
continue
}
}
peers = append(peers, p)
}
preq.resp <- peers
case rpc := <-p.incoming:
2018-01-27 09:52:35 +02:00
p.handleIncomingRPC(rpc)
case msg := <-p.sendMsg:
p.publishMessage(msg)
case req := <-p.addVal:
2019-04-29 22:45:48 +03:00
p.val.AddValidator(req)
case req := <-p.rmVal:
2019-04-29 22:45:48 +03:00
p.val.RemoveValidator(req)
2018-02-19 14:50:14 +02:00
case thunk := <-p.eval:
thunk()
2019-01-15 16:07:58 +02:00
case pid := <-p.blacklistPeer:
log.Infof("Blacklisting peer %s", pid)
2019-01-17 14:05:04 +02:00
p.blacklist.Add(pid)
2019-01-15 16:07:58 +02:00
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
q, ok := p.peers[pid]
if ok {
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
q.Close()
delete(p.peers, pid)
for t, tmap := range p.topics {
if _, ok := tmap[pid]; ok {
delete(tmap, pid)
p.notifyLeave(t, pid)
}
}
p.rt.RemovePeer(pid)
}
case <-ctx.Done():
log.Info("pubsub processloop shutting down")
return
}
}
}
func (p *PubSub) handlePendingPeers() {
p.newPeersPrioLk.Lock()
if len(p.newPeersPend) == 0 {
p.newPeersPrioLk.Unlock()
return
}
newPeers := p.newPeersPend
p.newPeersPend = make(map[peer.ID]struct{})
p.newPeersPrioLk.Unlock()
for pid := range newPeers {
// Make sure we have a non-limited connection. We do this late because we may have
// disconnected in the meantime.
if p.host.Network().Connectedness(pid) != network.Connected {
continue
}
if _, ok := p.peers[pid]; ok {
log.Debug("already have connection to peer: ", pid)
continue
}
if p.blacklist.Contains(pid) {
log.Warn("ignoring connection from blacklisted peer: ", pid)
continue
}
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
rpcQueue := newRpcQueue(p.peerOutboundQueueSize)
rpcQueue.Push(p.getHelloPacket(), true)
go p.handleNewPeer(p.ctx, pid, rpcQueue)
p.peers[pid] = rpcQueue
}
}
2021-07-13 20:24:29 +03:00
func (p *PubSub) handleDeadPeers() {
p.peerDeadPrioLk.Lock()
if len(p.peerDeadPend) == 0 {
p.peerDeadPrioLk.Unlock()
2021-07-13 20:24:29 +03:00
return
}
deadPeers := p.peerDeadPend
p.peerDeadPend = make(map[peer.ID]struct{})
p.peerDeadPrioLk.Unlock()
2021-07-13 20:24:29 +03:00
for pid := range deadPeers {
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
q, ok := p.peers[pid]
2021-07-13 20:24:29 +03:00
if !ok {
continue
}
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
q.Close()
delete(p.peers, pid)
for t, tmap := range p.topics {
if _, ok := tmap[pid]; ok {
delete(tmap, pid)
p.notifyLeave(t, pid)
}
}
p.rt.RemovePeer(pid)
2021-07-13 20:24:29 +03:00
if p.host.Network().Connectedness(pid) == network.Connected {
backoffDelay, err := p.deadPeerBackoff.updateAndGet(pid)
if err != nil {
log.Debug(err)
continue
}
2021-07-13 20:24:29 +03:00
// still connected, must be a duplicate connection being closed.
// we respawn the writer as we need to ensure there is a stream active
log.Debugf("peer declared dead but still connected; respawning writer: %s", pid)
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
rpcQueue := newRpcQueue(p.peerOutboundQueueSize)
rpcQueue.Push(p.getHelloPacket(), true)
p.peers[pid] = rpcQueue
go p.handleNewPeerWithBackoff(p.ctx, pid, backoffDelay, rpcQueue)
2021-07-13 20:24:29 +03:00
}
}
}
// handleAddTopic adds a tracker for a particular topic.
// Only called from processLoop.
func (p *PubSub) handleAddTopic(req *addTopicReq) {
topic := req.topic
topicID := topic.topic
t, ok := p.myTopics[topicID]
if ok {
req.resp <- t
return
}
p.myTopics[topicID] = topic
req.resp <- topic
}
// handleRemoveTopic removes Topic tracker from bookkeeping.
// Only called from processLoop.
func (p *PubSub) handleRemoveTopic(req *rmTopicReq) {
topic := p.myTopics[req.topic.topic]
if topic == nil {
req.resp <- nil
return
}
if len(topic.evtHandlers) == 0 &&
len(p.mySubs[req.topic.topic]) == 0 &&
p.myRelays[req.topic.topic] == 0 {
delete(p.myTopics, topic.topic)
req.resp <- nil
return
}
req.resp <- fmt.Errorf("cannot close topic: outstanding event handlers or subscriptions")
}
// handleRemoveSubscription removes Subscription sub from bookeeping.
// If this was the last subscription and no more relays exist for a given topic,
// it will also announce that this node is not subscribing to this topic anymore.
// Only called from processLoop.
func (p *PubSub) handleRemoveSubscription(sub *Subscription) {
subs := p.mySubs[sub.topic]
if subs == nil {
return
}
sub.err = ErrSubscriptionCancelled
2019-06-21 08:46:41 +02:00
sub.close()
delete(subs, sub)
if len(subs) == 0 {
delete(p.mySubs, sub.topic)
2020-05-01 10:04:51 +02:00
// stop announcing only if there are no more subs and relays
if p.myRelays[sub.topic] == 0 {
p.disc.StopAdvertise(sub.topic)
p.announce(sub.topic, false)
p.rt.Leave(sub.topic)
}
}
}
// handleAddSubscription adds a Subscription for a particular topic. If it is
// the first subscription and no relays exist so far for the topic, it will
// announce that this node subscribes to the topic.
// Only called from processLoop.
func (p *PubSub) handleAddSubscription(req *addSubReq) {
sub := req.sub
subs := p.mySubs[sub.topic]
// announce we want this topic if neither subs nor relays exist so far
if len(subs) == 0 && p.myRelays[sub.topic] == 0 {
p.disc.Advertise(sub.topic)
p.announce(sub.topic, true)
p.rt.Join(sub.topic)
}
// make new if not there
if subs == nil {
p.mySubs[sub.topic] = make(map[*Subscription]struct{})
}
sub.cancelCh = p.cancelCh
p.mySubs[sub.topic][sub] = struct{}{}
req.resp <- sub
}
// handleAddRelay adds a relay for a particular topic. If it is
// the first relay and no subscriptions exist so far for the topic , it will
// announce that this node relays for the topic.
// Only called from processLoop.
func (p *PubSub) handleAddRelay(req *addRelayReq) {
topic := req.topic
p.myRelays[topic]++
// announce we want this topic if neither relays nor subs exist so far
if p.myRelays[topic] == 1 && len(p.mySubs[topic]) == 0 {
p.disc.Advertise(topic)
p.announce(topic, true)
p.rt.Join(topic)
}
// flag used to prevent calling cancel function multiple times
isCancelled := false
relayCancelFunc := func() {
if isCancelled {
return
}
select {
case p.rmRelay <- topic:
isCancelled = true
case <-p.ctx.Done():
}
}
req.resp <- relayCancelFunc
}
// handleRemoveRelay removes one relay reference from bookkeeping.
// If this was the last relay reference and no more subscriptions exist
// for a given topic, it will also announce that this node is not relaying
// for this topic anymore.
// Only called from processLoop.
func (p *PubSub) handleRemoveRelay(topic string) {
if p.myRelays[topic] == 0 {
return
}
p.myRelays[topic]--
if p.myRelays[topic] == 0 {
delete(p.myRelays, topic)
2020-05-01 10:04:51 +02:00
// stop announcing only if there are no more relays and subs
if len(p.mySubs[topic]) == 0 {
p.disc.StopAdvertise(topic)
p.announce(topic, false)
p.rt.Leave(topic)
}
}
}
// announce announces whether or not this node is interested in a given topic
// Only called from processLoop.
func (p *PubSub) announce(topic string, sub bool) {
subopt := &pb.RPC_SubOpts{
Topicid: &topic,
Subscribe: &sub,
}
out := rpcWithSubs(subopt)
for pid, peer := range p.peers {
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
err := peer.Push(out, false)
if err != nil {
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
p.tracer.DropRPC(out, pid)
2018-10-29 11:10:31 +02:00
go p.announceRetry(pid, topic, sub)
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
continue
}
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
p.tracer.SendRPC(out, pid)
}
}
2018-10-29 11:10:31 +02:00
func (p *PubSub) announceRetry(pid peer.ID, topic string, sub bool) {
time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond)
retry := func() {
_, okSubs := p.mySubs[topic]
_, okRelays := p.myRelays[topic]
ok := okSubs || okRelays
if (ok && sub) || (!ok && !sub) {
2018-10-29 11:10:31 +02:00
p.doAnnounceRetry(pid, topic, sub)
}
}
select {
case p.eval <- retry:
case <-p.ctx.Done():
}
}
2018-10-29 11:10:31 +02:00
func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) {
peer, ok := p.peers[pid]
if !ok {
return
}
subopt := &pb.RPC_SubOpts{
Topicid: &topic,
Subscribe: &sub,
2018-10-29 11:10:31 +02:00
}
out := rpcWithSubs(subopt)
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
err := peer.Push(out, false)
if err != nil {
2018-10-29 11:10:31 +02:00
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
p.tracer.DropRPC(out, pid)
2018-10-29 11:10:31 +02:00
go p.announceRetry(pid, topic, sub)
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
return
2018-10-29 11:10:31 +02:00
}
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
p.tracer.SendRPC(out, pid)
2018-10-29 11:10:31 +02:00
}
// notifySubs sends a given message to all corresponding subscribers.
// Only called from processLoop.
func (p *PubSub) notifySubs(msg *Message) {
2020-09-29 18:05:54 +03:00
topic := msg.GetTopic()
subs := p.mySubs[topic]
for f := range subs {
select {
case f.ch <- msg:
default:
p.tracer.UndeliverableMessage(msg)
2020-09-29 18:05:54 +03:00
log.Infof("Can't deliver message to subscription for topic %s; subscriber too slow", topic)
}
}
}
// seenMessage returns whether we already saw this message before
func (p *PubSub) seenMessage(id string) bool {
return p.seenMessages.Has(id)
}
// markSeen marks a message as seen such that seenMessage returns `true' for the given id
// returns true if the message was freshly marked
func (p *PubSub) markSeen(id string) bool {
return p.seenMessages.Add(id)
}
// subscribedToMessage returns whether we are subscribed to one of the topics
// of a given message
func (p *PubSub) subscribedToMsg(msg *pb.Message) bool {
if len(p.mySubs) == 0 {
return false
}
2020-09-29 18:05:54 +03:00
topic := msg.GetTopic()
_, ok := p.mySubs[topic]
return ok
}
// canRelayMsg returns whether we are able to relay for one of the topics
// of a given message
func (p *PubSub) canRelayMsg(msg *pb.Message) bool {
if len(p.myRelays) == 0 {
return false
}
2020-09-29 18:05:54 +03:00
topic := msg.GetTopic()
relays := p.myRelays[topic]
return relays > 0
}
func (p *PubSub) notifyLeave(topic string, pid peer.ID) {
if t, ok := p.myTopics[topic]; ok {
t.sendNotification(PeerEvent{PeerLeave, pid})
}
}
2018-01-27 09:52:35 +02:00
func (p *PubSub) handleIncomingRPC(rpc *RPC) {
Adds Application Specific RPC Inspector (#509) * Update go.mod * Refactor GossipSub Construction (#1) * Enables non-atomic validation for peer scoring parameters (#499) * decouples topic scoring parameters * adds skiping atomic validation for topic parameters * cleans up * adds skip atomic validation to peer score threshold * adds skip atomic validation for peer parameters * adds test for non-atomic validation * adds tests for peer score * adds tests for peer score thresholds * refactors tests * chore: Update .github/workflows/stale.yml [skip ci] * adds with gossipsub tracker Co-authored-by: libp2p-mgmt-read-write[bot] <104492852+libp2p-mgmt-read-write[bot]@users.noreply.github.com> * decouples options * fixes conflict * reverts back module * fixes peer score helper * Adds send control message to gossipsub router (#2) * adjusts libp2p version (#3) * Update go.mod (#4) * adds app specific rpc handler * Create ci.yml (#5) * Create Makefile (#7) * Revert "Merge branch 'yahya/gossipsub-router-interface'" (#6) This reverts commit 1c91995b7fbce0e4b9c5990c5bfda0d555267182. * Update ci.yml (#9) * Revert "Merge branch 'master' into yahya/adds-rpc-inspector" This reverts commit 352d7471c58580480b7f6592001bc3e9b910fa77. * Revert "Merge remote-tracking branch 'origin/yahya/adds-rpc-inspector' into yahya/adds-rpc-inspector" This reverts commit 586c5cb6eb2a971a1590ea32050de139316984d2. * Revert "Merge branch 'master' into yahya/adds-rpc-inspector" This reverts commit 2e13ee8b95dded5a3401dd86f952fae3419bd86b. * moves app specific inspector to pubsub * removes option from gossipsub * moves app specific rpc inspector up * refactors app specific to return an error Co-authored-by: libp2p-mgmt-read-write[bot] <104492852+libp2p-mgmt-read-write[bot]@users.noreply.github.com>
2022-11-30 22:10:07 -08:00
// pass the rpc through app specific validation (if any available).
if p.appSpecificRpcInspector != nil {
// check if the RPC is allowed by the external inspector
if err := p.appSpecificRpcInspector(rpc.from, rpc); err != nil {
log.Debugf("application-specific inspection failed, rejecting incoming rpc: %s", err)
return // reject the RPC
}
}
2019-11-04 19:22:14 +02:00
p.tracer.RecvRPC(rpc)
2020-09-30 14:53:57 +03:00
subs := rpc.GetSubscriptions()
if len(subs) != 0 && p.subFilter != nil {
var err error
subs, err = p.subFilter.FilterIncomingSubscriptions(rpc.from, subs)
if err != nil {
log.Debugf("subscription filter error: %s; ignoring RPC", err)
return
}
}
for _, subopt := range subs {
t := subopt.GetTopicid()
2020-09-30 14:53:57 +03:00
if subopt.GetSubscribe() {
tmap, ok := p.topics[t]
if !ok {
tmap = make(map[peer.ID]struct{})
p.topics[t] = tmap
}
if _, ok = tmap[rpc.from]; !ok {
tmap[rpc.from] = struct{}{}
if topic, ok := p.myTopics[t]; ok {
2019-06-21 08:46:41 +02:00
peer := rpc.from
topic.sendNotification(PeerEvent{PeerJoin, peer})
}
}
} else {
tmap, ok := p.topics[t]
if !ok {
continue
}
if _, ok := tmap[rpc.from]; ok {
delete(tmap, rpc.from)
p.notifyLeave(t, rpc.from)
}
}
}
// ask the router to vet the peer before commiting any processing resources
switch p.rt.AcceptFrom(rpc.from) {
case AcceptNone:
2020-09-02 20:44:00 +03:00
log.Debugf("received RPC from router graylisted peer %s; dropping RPC", rpc.from)
return
case AcceptControl:
if len(rpc.GetPublish()) > 0 {
2020-09-02 20:44:00 +03:00
log.Debugf("peer %s was throttled by router; ignoring %d payload messages", rpc.from, len(rpc.GetPublish()))
}
p.tracer.ThrottlePeer(rpc.from)
case AcceptAll:
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
var toPush []*Message
for _, pmsg := range rpc.GetPublish() {
if !(p.subscribedToMsg(pmsg) || p.canRelayMsg(pmsg)) {
2020-09-02 20:44:00 +03:00
log.Debug("received message in topic we didn't subscribe to; ignoring message")
continue
}
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
msg := &Message{pmsg, "", rpc.from, nil, false}
if p.shouldPush(msg) {
toPush = append(toPush, msg)
}
}
p.rt.PreValidation(toPush)
for _, msg := range toPush {
p.pushMsg(msg)
}
}
2018-01-27 09:52:35 +02:00
p.rt.HandleRPC(rpc)
}
// DefaultMsgIdFn returns a unique ID of the passed Message
func DefaultMsgIdFn(pmsg *pb.Message) string {
return string(pmsg.GetFrom()) + string(pmsg.GetSeqno())
}
2021-09-20 20:17:33 -07:00
// DefaultPeerFilter accepts all peers on all topics
func DefaultPeerFilter(pid peer.ID, topic string) bool {
return true
}
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
// shouldPush filters a message before validating and pushing it
// It returns true if the message can be further validated and pushed
func (p *PubSub) shouldPush(msg *Message) bool {
src := msg.ReceivedFrom
2019-01-15 16:07:58 +02:00
// reject messages from blacklisted peers
2019-01-17 14:05:04 +02:00
if p.blacklist.Contains(src) {
2020-09-01 20:22:30 +03:00
log.Debugf("dropping message from blacklisted peer %s", src)
p.tracer.RejectMessage(msg, RejectBlacklstedPeer)
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
return false
2019-01-15 16:07:58 +02:00
}
2019-01-16 12:26:26 +02:00
// even if they are forwarded by good peers
2019-01-17 14:05:04 +02:00
if p.blacklist.Contains(msg.GetFrom()) {
2020-09-01 20:22:30 +03:00
log.Debugf("dropping message from blacklisted source %s", src)
p.tracer.RejectMessage(msg, RejectBlacklistedSource)
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
return false
2019-01-16 12:26:26 +02:00
}
err := p.checkSigningPolicy(msg)
if err != nil {
log.Debugf("dropping message from %s: %s", src, err)
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
return false
}
// reject messages claiming to be from ourselves but not locally published
self := p.host.ID()
if peer.ID(msg.GetFrom()) == self && src != self {
log.Debugf("dropping message claiming to be from self but forwarded from %s", src)
p.tracer.RejectMessage(msg, RejectSelfOrigin)
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
return false
}
// have we already seen and validated this message?
2022-01-09 18:14:39 +02:00
id := p.idGen.ID(msg)
if p.seenMessage(id) {
p.tracer.DuplicateMessage(msg)
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
return false
}
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
return true
}
// pushMsg pushes a message performing validation as necessary
func (p *PubSub) pushMsg(msg *Message) {
src := msg.ReceivedFrom
id := p.idGen.ID(msg)
if !p.val.Push(src, msg) {
return
}
if p.markSeen(id) {
p.publishMessage(msg)
}
}
func (p *PubSub) checkSigningPolicy(msg *Message) error {
2018-08-29 17:19:34 +03:00
// reject unsigned messages when strict before we even process the id
if p.signPolicy.mustVerify() {
if p.signPolicy.mustSign() {
if msg.Signature == nil {
p.tracer.RejectMessage(msg, RejectMissingSignature)
return ValidationError{Reason: RejectMissingSignature}
}
// Actual signature verification happens in the validation pipeline,
// after checking if the message was already seen or not,
// to avoid unnecessary signature verification processing-cost.
} else {
if msg.Signature != nil {
p.tracer.RejectMessage(msg, RejectUnexpectedSignature)
return ValidationError{Reason: RejectUnexpectedSignature}
}
// If we are expecting signed messages, and not authoring messages,
// then do no accept seq numbers, from data, or key data.
// The default msgID function still relies on Seqno and From,
// but is not used if we are not authoring messages ourselves.
if p.signID == "" {
if msg.Seqno != nil || msg.From != nil || msg.Key != nil {
p.tracer.RejectMessage(msg, RejectUnexpectedAuthInfo)
return ValidationError{Reason: RejectUnexpectedAuthInfo}
}
}
}
2018-08-29 17:19:34 +03:00
}
return nil
}
func (p *PubSub) publishMessage(msg *Message) {
2019-11-04 19:22:14 +02:00
p.tracer.DeliverMessage(msg)
p.notifySubs(msg)
if !msg.Local {
p.rt.Publish(msg)
}
}
type addTopicReq struct {
topic *Topic
resp chan *Topic
}
type rmTopicReq struct {
topic *Topic
resp chan error
}
type TopicOptions struct{}
type TopicOpt func(t *Topic) error
2022-01-22 23:48:23 +02:00
// WithTopicMessageIdFn sets custom MsgIdFunction for a Topic, enabling topics to have own msg id generation rules.
func WithTopicMessageIdFn(msgId MsgIdFunction) TopicOpt {
return func(t *Topic) error {
t.p.idGen.Set(t.topic, msgId)
return nil
}
}
// Join joins the topic and returns a Topic handle. Only one Topic handle should exist per topic, and Join will error if
// the Topic handle already exists.
func (p *PubSub) Join(topic string, opts ...TopicOpt) (*Topic, error) {
t, ok, err := p.tryJoin(topic, opts...)
if err != nil {
return nil, err
}
if !ok {
return nil, fmt.Errorf("topic already exists")
}
return t, nil
}
// tryJoin is an internal function that tries to join a topic
// Returns the topic if it can be created or found
// Returns true if the topic was newly created, false otherwise
// Can be removed once pubsub.Publish() and pubsub.Subscribe() are removed
func (p *PubSub) tryJoin(topic string, opts ...TopicOpt) (*Topic, bool, error) {
2020-09-30 14:53:57 +03:00
if p.subFilter != nil && !p.subFilter.CanSubscribe(topic) {
return nil, false, fmt.Errorf("topic is not allowed by the subscription filter")
}
t := &Topic{
p: p,
topic: topic,
evtHandlers: make(map[*TopicEventHandler]struct{}),
}
for _, opt := range opts {
err := opt(t)
if err != nil {
return nil, false, err
}
}
resp := make(chan *Topic, 1)
select {
case t.p.addTopic <- &addTopicReq{
topic: t,
resp: resp,
}:
case <-t.p.ctx.Done():
return nil, false, t.p.ctx.Err()
}
returnedTopic := <-resp
if returnedTopic != t {
return returnedTopic, false, nil
}
return t, true, nil
}
type addSubReq struct {
sub *Subscription
resp chan *Subscription
}
type SubOpt func(sub *Subscription) error
// Subscribe returns a new Subscription for the given topic.
2021-07-12 21:56:09 +02:00
// Note that subscription is not an instantaneous operation. It may take some time
// before the subscription is processed by the pubsub main loop and propagated to our peers.
//
// Deprecated: use pubsub.Join() and topic.Subscribe() instead
func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error) {
// ignore whether the topic was newly created or not, since either way we have a valid topic to work with
topicHandle, _, err := p.tryJoin(topic)
if err != nil {
return nil, err
}
return topicHandle.Subscribe(opts...)
}
// WithBufferSize is a Subscribe option to customize the size of the subscribe output buffer.
// The default length is 32 but it can be configured to avoid dropping messages if the consumer is not reading fast
// enough.
func WithBufferSize(size int) SubOpt {
return func(sub *Subscription) error {
sub.ch = make(chan *Message, size)
return nil
}
}
type topicReq struct {
resp chan []string
}
2019-01-04 13:09:21 +02:00
// GetTopics returns the topics this node is subscribed to.
func (p *PubSub) GetTopics() []string {
out := make(chan []string, 1)
select {
case p.getTopics <- &topicReq{resp: out}:
case <-p.ctx.Done():
return nil
}
return <-out
}
2019-01-04 13:09:21 +02:00
// Publish publishes data to the given topic.
//
// Deprecated: use pubsub.Join() and topic.Publish() instead
func (p *PubSub) Publish(topic string, data []byte, opts ...PubOpt) error {
// ignore whether the topic was newly created or not, since either way we have a valid topic to work with
t, _, err := p.tryJoin(topic)
if err != nil {
return err
}
return t.Publish(context.TODO(), data, opts...)
}
2018-03-10 10:14:21 +02:00
func (p *PubSub) nextSeqno() []byte {
seqno := make([]byte, 8)
counter := atomic.AddUint64(&p.counter, 1)
binary.BigEndian.PutUint64(seqno, counter)
return seqno
}
type listPeerReq struct {
resp chan []peer.ID
topic string
}
2019-01-04 13:09:21 +02:00
// ListPeers returns a list of peers we are connected to in the given topic.
func (p *PubSub) ListPeers(topic string) []peer.ID {
out := make(chan []peer.ID)
select {
case p.getPeers <- &listPeerReq{
resp: out,
topic: topic,
}:
case <-p.ctx.Done():
return nil
}
return <-out
}
2019-01-15 16:07:58 +02:00
// BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.
func (p *PubSub) BlacklistPeer(pid peer.ID) {
select {
case p.blacklistPeer <- pid:
case <-p.ctx.Done():
}
2019-01-15 16:07:58 +02:00
}
2019-01-04 13:09:21 +02:00
// RegisterTopicValidator registers a validator for topic.
// By default validators are asynchronous, which means they will run in a separate goroutine.
// The number of active goroutines is controlled by global and per topic validator
// throttles; if it exceeds the throttle threshold, messages will be dropped.
2020-04-28 17:40:36 +03:00
func (p *PubSub) RegisterTopicValidator(topic string, val interface{}, opts ...ValidatorOpt) error {
addVal := &addValReq{
topic: topic,
validate: val,
resp: make(chan error, 1),
}
for _, opt := range opts {
err := opt(addVal)
if err != nil {
return err
}
}
select {
case p.addVal <- addVal:
case <-p.ctx.Done():
return p.ctx.Err()
}
return <-addVal.resp
}
2019-01-04 13:09:21 +02:00
// UnregisterTopicValidator removes a validator from a topic.
// Returns an error if there was no validator registered with the topic.
func (p *PubSub) UnregisterTopicValidator(topic string) error {
rmVal := &rmValReq{
topic: topic,
resp: make(chan error, 1),
}
select {
case p.rmVal <- rmVal:
case <-p.ctx.Done():
return p.ctx.Err()
}
return <-rmVal.resp
}
type RelayCancelFunc func()
type addRelayReq struct {
topic string
resp chan RelayCancelFunc
}