go-libp2p-pubsub/floodsub.go
Adin Schmahmann f9c26c2e68 Added libp2p discovery capabilities to PubSub.
When the WithDiscovery option is passed to PubSub then PubSub will be able to search for more peers that are interested in our topics.

This includes the ability for Publishes (via Topic.Publish()) to block until the router is ready to publish. When a router is ready is currently defined by a combination of a user defined MinTopicSize function (passed into topic.Publish via the WithReadiness publish option) and the properties of the pubsub router used. The discovery tests show example usage.
2019-10-31 16:39:18 -04:00

104 lines
2.2 KiB
Go

package pubsub
import (
"context"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
const (
FloodSubID = protocol.ID("/floodsub/1.0.0")
FloodSubTopicSearchSize = 5
)
// NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps.
func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error) {
rt := &FloodSubRouter{
protocols: ps,
}
return NewPubSub(ctx, h, rt, opts...)
}
// NewFloodSub returns a new PubSub object using the FloodSubRouter.
func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
return NewFloodsubWithProtocols(ctx, h, []protocol.ID{FloodSubID}, opts...)
}
type FloodSubRouter struct {
p *PubSub
protocols []protocol.ID
}
func (fs *FloodSubRouter) Protocols() []protocol.ID {
return fs.protocols
}
func (fs *FloodSubRouter) Attach(p *PubSub) {
fs.p = p
}
func (fs *FloodSubRouter) AddPeer(peer.ID, protocol.ID) {}
func (fs *FloodSubRouter) RemovePeer(peer.ID) {}
func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool {
// check all peers in the topic
tmap, ok := fs.p.topics[topic]
if !ok {
return false
}
if suggested == 0 {
suggested = FloodSubTopicSearchSize
}
if len(tmap) >= suggested {
return true
}
return false
}
func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {}
func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) {
tosend := make(map[peer.ID]struct{})
for _, topic := range msg.GetTopicIDs() {
tmap, ok := fs.p.topics[topic]
if !ok {
continue
}
for p := range tmap {
tosend[p] = struct{}{}
}
}
out := rpcWithMessages(msg)
for pid := range tosend {
if pid == from || pid == peer.ID(msg.GetFrom()) {
continue
}
mch, ok := fs.p.peers[pid]
if !ok {
continue
}
select {
case mch <- out:
default:
log.Infof("dropping message to peer %s: queue full", pid)
// Drop it. The peer is too slow.
}
}
}
func (fs *FloodSubRouter) Join(topic string) {}
func (fs *FloodSubRouter) Leave(topic string) {}