go-libp2p-pubsub/randomsub.go

170 lines
3.2 KiB
Go
Raw Normal View History

package pubsub
2018-10-10 14:17:27 +00:00
import (
"context"
2020-04-23 16:37:05 +00:00
"math"
2018-10-10 14:17:27 +00:00
2019-05-26 16:19:03 +00:00
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
2018-10-10 14:17:27 +00:00
)
const (
RandomSubID = protocol.ID("/randomsub/1.0.0")
)
var (
RandomSubD = 6
)
2019-01-04 11:09:21 +00:00
// NewRandomSub returns a new PubSub object using RandomSubRouter as the router.
func NewRandomSub(ctx context.Context, h host.Host, size int, opts ...Option) (*PubSub, error) {
2018-10-10 14:17:27 +00:00
rt := &RandomSubRouter{
size: size,
2018-10-10 14:17:27 +00:00
peers: make(map[peer.ID]protocol.ID),
}
return NewPubSub(ctx, h, rt, opts...)
}
// RandomSubRouter is a router that implements a random propagation strategy.
// For each message, it selects the square root of the network size peers, with a min of RandomSubD,
2020-04-23 16:37:05 +00:00
// and forwards the message to them.
2018-10-10 14:17:27 +00:00
type RandomSubRouter struct {
2019-11-04 18:43:48 +00:00
p *PubSub
peers map[peer.ID]protocol.ID
size int
2019-11-04 18:43:48 +00:00
tracer *pubsubTracer
2018-10-10 14:17:27 +00:00
}
func (rs *RandomSubRouter) Protocols() []protocol.ID {
2018-10-10 14:39:38 +00:00
return []protocol.ID{RandomSubID, FloodSubID}
2018-10-10 14:17:27 +00:00
}
func (rs *RandomSubRouter) Attach(p *PubSub) {
rs.p = p
rs.tracer = p.tracer
2018-10-10 14:17:27 +00:00
}
func (rs *RandomSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
2019-11-04 18:43:48 +00:00
rs.tracer.AddPeer(p, proto)
2018-10-10 14:17:27 +00:00
rs.peers[p] = proto
}
func (rs *RandomSubRouter) RemovePeer(p peer.ID) {
2019-11-04 18:43:48 +00:00
rs.tracer.RemovePeer(p)
2018-10-10 14:17:27 +00:00
delete(rs.peers, p)
}
func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool {
// check all peers in the topic
tmap, ok := rs.p.topics[topic]
if !ok {
return false
}
fsPeers := 0
rsPeers := 0
// count floodsub and randomsub peers
for p := range tmap {
switch rs.peers[p] {
case FloodSubID:
fsPeers++
case RandomSubID:
rsPeers++
}
}
if suggested == 0 {
suggested = RandomSubD
}
if fsPeers+rsPeers >= suggested {
return true
}
if rsPeers >= RandomSubD {
return true
}
return false
}
func (rs *RandomSubRouter) AcceptFrom(peer.ID) bool {
return true
}
2018-10-10 14:17:27 +00:00
func (rs *RandomSubRouter) HandleRPC(rpc *RPC) {}
func (rs *RandomSubRouter) Publish(msg *Message) {
from := msg.ReceivedFrom
2018-10-10 14:17:27 +00:00
tosend := make(map[peer.ID]struct{})
rspeers := make(map[peer.ID]struct{})
src := peer.ID(msg.GetFrom())
for _, topic := range msg.GetTopicIDs() {
tmap, ok := rs.p.topics[topic]
if !ok {
continue
}
for p := range tmap {
if p == from || p == src {
continue
}
if rs.peers[p] == FloodSubID {
tosend[p] = struct{}{}
} else {
rspeers[p] = struct{}{}
}
}
}
if len(rspeers) > RandomSubD {
2020-04-23 16:37:05 +00:00
target := RandomSubD
sqrt := int(math.Ceil(math.Sqrt(float64(rs.size))))
2020-04-23 16:37:05 +00:00
if sqrt > target {
target = sqrt
}
if target > len(rspeers) {
target = len(rspeers)
}
2018-10-10 14:17:27 +00:00
xpeers := peerMapToList(rspeers)
shufflePeers(xpeers)
xpeers = xpeers[:RandomSubD]
for _, p := range xpeers {
tosend[p] = struct{}{}
}
} else {
for p := range rspeers {
tosend[p] = struct{}{}
}
}
out := rpcWithMessages(msg.Message)
2018-10-10 14:17:27 +00:00
for p := range tosend {
mch, ok := rs.p.peers[p]
if !ok {
continue
}
select {
case mch <- out:
2019-11-04 18:43:48 +00:00
rs.tracer.SendRPC(out, p)
2018-10-10 14:17:27 +00:00
default:
log.Infof("dropping message to peer %s: queue full", p)
2019-11-04 18:43:48 +00:00
rs.tracer.DropRPC(out, p)
2018-10-10 14:17:27 +00:00
}
}
}
2019-11-04 18:43:48 +00:00
func (rs *RandomSubRouter) Join(topic string) {
rs.tracer.Join(topic)
}
2018-10-10 14:17:27 +00:00
2019-11-04 18:43:48 +00:00
func (rs *RandomSubRouter) Leave(topic string) {
rs.tracer.Join(topic)
}