2016-09-10 03:13:50 +00:00
package floodsub
2016-09-10 00:15:39 +00:00
import (
2016-09-10 23:03:53 +00:00
"context"
2016-09-11 03:47:12 +00:00
"encoding/binary"
2016-09-10 00:15:39 +00:00
"fmt"
2018-01-03 09:25:20 +00:00
"sync/atomic"
2016-09-10 00:15:39 +00:00
"time"
2016-09-13 02:59:24 +00:00
pb "github.com/libp2p/go-floodsub/pb"
2016-09-10 15:14:17 +00:00
2017-11-16 15:13:31 +00:00
logging "github.com/ipfs/go-log"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
protocol "github.com/libp2p/go-libp2p-protocol"
timecache "github.com/whyrusleeping/timecache"
2016-09-10 00:15:39 +00:00
)
2017-11-22 17:40:45 +00:00
const (
2017-11-23 18:12:59 +00:00
ID = protocol . ID ( "/floodsub/1.0.0" )
2017-12-16 12:12:23 +00:00
defaultMaxConcurrency = 10
2017-11-23 18:12:59 +00:00
defaultValidateTimeout = 150 * time . Millisecond
2017-11-22 17:40:45 +00:00
)
2016-09-10 00:15:39 +00:00
2016-09-10 03:13:50 +00:00
var log = logging . Logger ( "floodsub" )
2016-09-10 00:15:39 +00:00
type PubSub struct {
host host . Host
2016-09-11 20:56:07 +00:00
// incoming messages from other peers
2016-09-10 00:15:39 +00:00
incoming chan * RPC
2016-09-11 20:56:07 +00:00
// messages we are publishing out to our peers
publish chan * Message
// addSub is a control channel for us to add and remove subscriptions
2016-10-19 23:01:06 +00:00
addSub chan * addSubReq
2016-09-11 20:56:07 +00:00
2016-10-19 23:01:06 +00:00
// get list of topics we are subscribed to
2016-09-14 22:11:41 +00:00
getTopics chan * topicReq
2016-10-19 23:01:06 +00:00
// get chan of peers we are connected to
2016-10-18 18:13:12 +00:00
getPeers chan * listPeerReq
2016-09-15 01:07:30 +00:00
2016-10-19 23:01:06 +00:00
// send subscription here to cancel it
cancelCh chan * Subscription
2016-09-14 21:12:20 +00:00
2016-09-11 20:56:07 +00:00
// a notification channel for incoming streams from other peers
2016-09-10 00:15:39 +00:00
newPeers chan inet . Stream
2016-09-11 20:56:07 +00:00
// a notification channel for when our peers die
2016-09-10 00:15:39 +00:00
peerDead chan peer . ID
2016-09-11 20:56:07 +00:00
// The set of topics we are subscribed to
2016-10-19 23:01:06 +00:00
myTopics map [ string ] map [ * Subscription ] struct { }
2016-09-10 00:15:39 +00:00
2016-09-11 20:56:07 +00:00
// topics tracks which topics each of our peers are subscribed to
topics map [ string ] map [ peer . ID ] struct { }
2017-11-08 19:00:52 +00:00
// sendMsg handles messages that have been validated
sendMsg chan sendReq
2016-09-11 03:47:12 +00:00
peers map [ peer . ID ] chan * RPC
seenMessages * timecache . TimeCache
2016-09-10 00:15:39 +00:00
2016-09-10 23:03:53 +00:00
ctx context . Context
2018-01-03 09:25:20 +00:00
// atomic counter for seqnos
counter uint64
2016-09-10 00:15:39 +00:00
}
type Message struct {
2016-09-10 15:14:17 +00:00
* pb . Message
2016-09-10 00:15:39 +00:00
}
2016-09-10 15:14:17 +00:00
func ( m * Message ) GetFrom ( ) peer . ID {
return peer . ID ( m . Message . GetFrom ( ) )
2016-09-10 00:15:39 +00:00
}
type RPC struct {
2016-09-10 15:14:17 +00:00
pb . RPC
2016-09-10 00:15:39 +00:00
// unexported on purpose, not sending this over the wire
from peer . ID
}
2017-12-16 12:12:23 +00:00
type Option func ( * PubSub ) error
2016-10-20 11:23:38 +00:00
// NewFloodSub returns a new FloodSub management object
2017-12-16 12:12:23 +00:00
func NewFloodSub ( ctx context . Context , h host . Host , opts ... Option ) ( * PubSub , error ) {
2016-09-10 00:15:39 +00:00
ps := & PubSub {
2018-01-13 12:31:34 +00:00
host : h ,
ctx : ctx ,
incoming : make ( chan * RPC , 32 ) ,
publish : make ( chan * Message ) ,
newPeers : make ( chan inet . Stream ) ,
peerDead : make ( chan peer . ID ) ,
cancelCh : make ( chan * Subscription ) ,
getPeers : make ( chan * listPeerReq ) ,
addSub : make ( chan * addSubReq ) ,
getTopics : make ( chan * topicReq ) ,
sendMsg : make ( chan sendReq ) ,
myTopics : make ( map [ string ] map [ * Subscription ] struct { } ) ,
topics : make ( map [ string ] map [ peer . ID ] struct { } ) ,
peers : make ( map [ peer . ID ] chan * RPC ) ,
seenMessages : timecache . NewTimeCache ( time . Second * 30 ) ,
counter : uint64 ( time . Now ( ) . UnixNano ( ) ) ,
2016-09-10 00:15:39 +00:00
}
2017-12-16 12:12:23 +00:00
for _ , opt := range opts {
err := opt ( ps )
if err != nil {
return nil , err
}
}
2016-09-10 00:15:39 +00:00
h . SetStreamHandler ( ID , ps . handleNewStream )
2016-09-12 20:22:16 +00:00
h . Network ( ) . Notify ( ( * PubSubNotif ) ( ps ) )
2016-09-10 00:15:39 +00:00
2016-09-10 23:03:53 +00:00
go ps . processLoop ( ctx )
2016-09-10 00:15:39 +00:00
2017-12-16 12:12:23 +00:00
return ps , nil
2016-09-10 00:15:39 +00:00
}
2016-10-20 11:23:38 +00:00
// processLoop handles all inputs arriving on the channels
2016-09-10 23:03:53 +00:00
func ( p * PubSub ) processLoop ( ctx context . Context ) {
2017-10-14 18:13:23 +00:00
defer func ( ) {
// Clean up go routines.
for _ , ch := range p . peers {
close ( ch )
}
p . peers = nil
p . topics = nil
} ( )
2016-09-10 00:15:39 +00:00
for {
select {
case s := <- p . newPeers :
pid := s . Conn ( ) . RemotePeer ( )
2016-07-27 07:35:41 +00:00
ch , ok := p . peers [ pid ]
2016-09-10 00:15:39 +00:00
if ok {
log . Error ( "already have connection to peer: " , pid )
2016-07-27 07:35:41 +00:00
close ( ch )
2016-09-10 00:15:39 +00:00
}
messages := make ( chan * RPC , 32 )
2016-09-10 23:03:53 +00:00
go p . handleSendingMessages ( ctx , s , messages )
2016-09-10 00:15:39 +00:00
messages <- p . getHelloPacket ( )
p . peers [ pid ] = messages
case pid := <- p . peerDead :
2016-07-27 07:35:41 +00:00
ch , ok := p . peers [ pid ]
if ok {
close ( ch )
}
2016-09-10 00:15:39 +00:00
delete ( p . peers , pid )
2016-09-12 20:22:16 +00:00
for _ , t := range p . topics {
delete ( t , pid )
}
2016-09-14 22:11:41 +00:00
case treq := <- p . getTopics :
var out [ ] string
2016-11-11 15:22:47 +00:00
for t := range p . myTopics {
out = append ( out , t )
2016-09-14 22:11:41 +00:00
}
treq . resp <- out
2016-10-19 23:01:06 +00:00
case sub := <- p . cancelCh :
p . handleRemoveSubscription ( sub )
2016-09-10 00:15:39 +00:00
case sub := <- p . addSub :
2016-10-19 23:01:06 +00:00
p . handleAddSubscription ( sub )
2016-10-18 18:13:12 +00:00
case preq := <- p . getPeers :
tmap , ok := p . topics [ preq . topic ]
if preq . topic != "" && ! ok {
preq . resp <- nil
continue
}
2016-09-15 01:07:30 +00:00
var peers [ ] peer . ID
for p := range p . peers {
2016-10-18 18:13:12 +00:00
if preq . topic != "" {
_ , ok := tmap [ p ]
if ! ok {
continue
}
}
2016-09-15 01:07:30 +00:00
peers = append ( peers , p )
}
2016-10-18 18:13:12 +00:00
preq . resp <- peers
2016-09-10 00:15:39 +00:00
case rpc := <- p . incoming :
err := p . handleIncomingRPC ( rpc )
if err != nil {
log . Error ( "handling RPC: " , err )
2016-09-11 20:56:07 +00:00
continue
2016-09-10 00:15:39 +00:00
}
2016-09-11 03:47:12 +00:00
case msg := <- p . publish :
2018-01-13 12:31:34 +00:00
subs := p . getSubscriptions ( msg )
p . pushMsg ( subs , p . host . ID ( ) , msg )
2017-11-22 17:40:45 +00:00
2017-11-08 19:00:52 +00:00
case req := <- p . sendMsg :
p . maybePublishMessage ( req . from , req . msg . Message )
2016-09-10 23:03:53 +00:00
case <- ctx . Done ( ) :
log . Info ( "pubsub processloop shutting down" )
return
2016-09-10 00:15:39 +00:00
}
}
}
2016-09-10 03:13:50 +00:00
2016-10-20 11:23:38 +00:00
// handleRemoveSubscription removes Subscription sub from bookeeping.
// If this was the last Subscription for a given topic, it will also announce
// that this node is not subscribing to this topic anymore.
// Only called from processLoop.
2016-10-19 23:01:06 +00:00
func ( p * PubSub ) handleRemoveSubscription ( sub * Subscription ) {
subs := p . myTopics [ sub . topic ]
if subs == nil {
return
2016-09-11 03:47:12 +00:00
}
2016-09-10 03:13:50 +00:00
2016-10-19 23:01:06 +00:00
sub . err = fmt . Errorf ( "subscription cancelled by calling sub.Cancel()" )
close ( sub . ch )
delete ( subs , sub )
2016-09-10 03:13:50 +00:00
2016-10-19 23:01:06 +00:00
if len ( subs ) == 0 {
2016-11-11 15:22:47 +00:00
delete ( p . myTopics , sub . topic )
2016-10-19 23:01:06 +00:00
p . announce ( sub . topic , false )
}
}
2016-09-11 03:47:12 +00:00
2016-10-20 11:23:38 +00:00
// handleAddSubscription adds a Subscription for a particular topic. If it is
// the first Subscription for the topic, it will announce that this node
// subscribes to the topic.
// Only called from processLoop.
2016-10-19 23:01:06 +00:00
func ( p * PubSub ) handleAddSubscription ( req * addSubReq ) {
2017-11-08 19:00:52 +00:00
sub := req . sub
subs := p . myTopics [ sub . topic ]
2016-10-19 23:01:06 +00:00
// announce we want this topic
if len ( subs ) == 0 {
2017-11-08 19:00:52 +00:00
p . announce ( sub . topic , true )
2016-10-19 23:01:06 +00:00
}
// make new if not there
if subs == nil {
2017-11-08 19:00:52 +00:00
p . myTopics [ sub . topic ] = make ( map [ * Subscription ] struct { } )
subs = p . myTopics [ sub . topic ]
2016-10-19 23:01:06 +00:00
}
2017-11-08 19:00:52 +00:00
sub . ch = make ( chan * Message , 32 )
sub . cancelCh = p . cancelCh
2016-10-19 23:01:06 +00:00
p . myTopics [ sub . topic ] [ sub ] = struct { } { }
req . resp <- sub
}
2016-10-20 11:23:38 +00:00
// announce announces whether or not this node is interested in a given topic
// Only called from processLoop.
2016-10-19 23:01:06 +00:00
func ( p * PubSub ) announce ( topic string , sub bool ) {
subopt := & pb . RPC_SubOpts {
Topicid : & topic ,
Subscribe : & sub ,
2016-09-11 03:47:12 +00:00
}
2016-09-11 20:56:07 +00:00
out := rpcWithSubs ( subopt )
2017-09-02 03:10:06 +00:00
for pid , peer := range p . peers {
select {
case peer <- out :
default :
log . Infof ( "dropping announce message to peer %s: queue full" , pid )
}
2016-09-11 03:47:12 +00:00
}
2016-09-10 03:13:50 +00:00
}
2016-09-10 00:15:39 +00:00
2016-10-20 11:23:38 +00:00
// notifySubs sends a given message to all corresponding subscribbers.
// Only called from processLoop.
2016-09-11 20:56:07 +00:00
func ( p * PubSub ) notifySubs ( msg * pb . Message ) {
for _ , topic := range msg . GetTopicIDs ( ) {
2016-10-19 23:01:06 +00:00
subs := p . myTopics [ topic ]
for f := range subs {
f . ch <- & Message { msg }
2016-09-11 20:56:07 +00:00
}
2016-09-10 00:15:39 +00:00
}
}
2016-10-20 11:23:38 +00:00
// seenMessage returns whether we already saw this message before
2016-09-11 03:47:12 +00:00
func ( p * PubSub ) seenMessage ( id string ) bool {
return p . seenMessages . Has ( id )
}
2016-10-20 11:23:38 +00:00
// markSeen marks a message as seen such that seenMessage returns `true' for the given id
2016-09-11 03:47:12 +00:00
func ( p * PubSub ) markSeen ( id string ) {
p . seenMessages . Add ( id )
}
2016-10-20 11:23:38 +00:00
// subscribedToMessage returns whether we are subscribed to one of the topics
// of a given message
2016-09-11 20:56:07 +00:00
func ( p * PubSub ) subscribedToMsg ( msg * pb . Message ) bool {
2017-09-02 01:16:41 +00:00
if len ( p . myTopics ) == 0 {
return false
}
2016-09-11 20:56:07 +00:00
for _ , t := range msg . GetTopicIDs ( ) {
if _ , ok := p . myTopics [ t ] ; ok {
return true
}
}
return false
}
2016-09-10 00:15:39 +00:00
func ( p * PubSub ) handleIncomingRPC ( rpc * RPC ) error {
2016-09-11 03:47:12 +00:00
for _ , subopt := range rpc . GetSubscriptions ( ) {
t := subopt . GetTopicid ( )
if subopt . GetSubscribe ( ) {
2016-09-10 00:15:39 +00:00
tmap , ok := p . topics [ t ]
if ! ok {
tmap = make ( map [ peer . ID ] struct { } )
p . topics [ t ] = tmap
}
tmap [ rpc . from ] = struct { } { }
2016-09-11 03:47:12 +00:00
} else {
2016-09-10 00:15:39 +00:00
tmap , ok := p . topics [ t ]
if ! ok {
2016-09-11 03:47:12 +00:00
continue
2016-09-10 00:15:39 +00:00
}
delete ( tmap , rpc . from )
}
2016-09-11 03:47:12 +00:00
}
2016-09-10 03:13:50 +00:00
2016-09-11 03:47:12 +00:00
for _ , pmsg := range rpc . GetPublish ( ) {
2016-09-11 20:56:07 +00:00
if ! p . subscribedToMsg ( pmsg ) {
log . Warning ( "received message we didn't subscribe to. Dropping." )
2016-09-11 03:47:12 +00:00
continue
2016-09-10 00:15:39 +00:00
}
2018-01-13 12:31:34 +00:00
msg := & Message { pmsg }
subs := p . getSubscriptions ( msg )
p . pushMsg ( subs , rpc . from , msg )
2016-09-11 20:56:07 +00:00
}
2018-01-13 12:31:34 +00:00
2016-09-11 20:56:07 +00:00
return nil
}
2016-09-10 00:15:39 +00:00
2016-10-20 11:23:38 +00:00
// msgID returns a unique ID of the passed Message
2016-07-27 07:35:41 +00:00
func msgID ( pmsg * pb . Message ) string {
return string ( pmsg . GetFrom ( ) ) + string ( pmsg . GetSeqno ( ) )
}
2018-01-13 12:31:34 +00:00
// pushMsg pushes a message to a number of subscriptions, performing validation
// as necessary
func ( p * PubSub ) pushMsg ( subs [ ] * Subscription , src peer . ID , msg * Message ) {
// we perform validation if _any_ of the subscriptions has a validator
// because the message is sent once for all topics
needval := false
for _ , sub := range subs {
if sub . validate != nil {
needval = true
break
}
}
2017-12-16 12:12:23 +00:00
2018-01-13 12:44:33 +00:00
if needval {
// validation is asynchronous
// XXX vyzo: do we want a global validation throttle here?
go p . validate ( subs , src , msg )
2018-01-13 12:31:34 +00:00
return
}
2017-11-22 17:40:45 +00:00
2018-01-13 12:44:33 +00:00
go func ( ) {
p . sendMsg <- sendReq {
from : src ,
msg : msg ,
}
} ( )
2018-01-13 12:31:34 +00:00
}
2017-12-16 12:12:23 +00:00
2018-01-13 12:31:34 +00:00
// validate performs validation and only sends the message if all validators succeed
func ( p * PubSub ) validate ( subs [ ] * Subscription , src peer . ID , msg * Message ) {
results := make ( [ ] chan bool , 0 , len ( subs ) )
throttle := false
2017-12-16 12:12:23 +00:00
2018-01-13 12:31:34 +00:00
loop :
for _ , sub := range subs {
if sub . validate == nil {
continue
}
rch := make ( chan bool , 1 )
results = append ( results , rch )
2017-11-23 13:39:14 +00:00
select {
2018-01-13 12:31:34 +00:00
case sub . validateThrottle <- struct { } { } :
go func ( sub * Subscription , msg * Message , rch chan bool ) {
rch <- sub . validateMsg ( p . ctx , msg )
<- sub . validateThrottle
} ( sub , msg , rch )
default :
log . Debugf ( "validation throttled for topic %s" , sub . topic )
throttle = true
break loop
}
}
if throttle {
log . Warningf ( "message validation throttled; dropping message from %s" , src )
return
}
for _ , rch := range results {
valid := <- rch
if ! valid {
log . Warningf ( "message validation failed; dropping message from %s" , src )
return
2017-11-08 19:00:52 +00:00
}
}
2018-01-13 12:31:34 +00:00
// all validators were successful, send the message
p . sendMsg <- sendReq {
from : src ,
msg : msg ,
}
2017-11-08 19:00:52 +00:00
}
2016-09-11 20:56:07 +00:00
func ( p * PubSub ) maybePublishMessage ( from peer . ID , pmsg * pb . Message ) {
2016-07-27 07:35:41 +00:00
id := msgID ( pmsg )
2016-09-11 20:56:07 +00:00
if p . seenMessage ( id ) {
return
}
p . markSeen ( id )
p . notifySubs ( pmsg )
err := p . publishMessage ( from , pmsg )
if err != nil {
log . Error ( "publish message: " , err )
2016-09-10 00:15:39 +00:00
}
}
2016-09-11 03:47:12 +00:00
func ( p * PubSub ) publishMessage ( from peer . ID , msg * pb . Message ) error {
2016-09-12 20:22:16 +00:00
tosend := make ( map [ peer . ID ] struct { } )
for _ , topic := range msg . GetTopicIDs ( ) {
tmap , ok := p . topics [ topic ]
if ! ok {
continue
}
2016-09-11 03:47:12 +00:00
2017-11-23 13:39:14 +00:00
for p := range tmap {
2016-09-12 20:22:16 +00:00
tosend [ p ] = struct { } { }
}
2016-09-10 00:15:39 +00:00
}
2016-09-12 20:22:16 +00:00
out := rpcWithMessages ( msg )
for pid := range tosend {
2016-09-11 03:47:12 +00:00
if pid == from || pid == peer . ID ( msg . GetFrom ( ) ) {
2016-09-10 00:15:39 +00:00
continue
}
mch , ok := p . peers [ pid ]
if ! ok {
continue
}
2017-08-30 02:11:58 +00:00
select {
case mch <- out :
default :
2017-08-30 02:42:33 +00:00
log . Infof ( "dropping message to peer %s: queue full" , pid )
2017-08-30 02:11:58 +00:00
// Drop it. The peer is too slow.
}
2016-09-10 00:15:39 +00:00
}
return nil
}
2017-11-08 19:00:52 +00:00
// getSubscriptions returns all subscriptions the would receive the given message.
func ( p * PubSub ) getSubscriptions ( msg * Message ) [ ] * Subscription {
var subs [ ] * Subscription
for _ , topic := range msg . GetTopicIDs ( ) {
tSubs , ok := p . myTopics [ topic ]
if ! ok {
continue
}
for sub := range tSubs {
subs = append ( subs , sub )
}
}
return subs
}
2016-10-19 23:01:06 +00:00
type addSubReq struct {
2017-11-08 19:00:52 +00:00
sub * Subscription
resp chan * Subscription
}
2017-11-16 10:48:13 +00:00
type SubOpt func ( * Subscription ) error
2017-11-22 17:40:45 +00:00
type Validator func ( context . Context , * Message ) bool
2017-11-16 10:48:13 +00:00
2017-11-08 19:00:52 +00:00
// WithValidator is an option that can be supplied to Subscribe. The argument is a function that returns whether or not a given message should be propagated further.
2018-01-13 10:33:03 +00:00
func WithValidator ( validate Validator ) SubOpt {
2017-11-08 19:00:52 +00:00
return func ( sub * Subscription ) error {
sub . validate = validate
return nil
}
2017-11-23 18:12:59 +00:00
}
2017-11-08 19:00:52 +00:00
2017-11-23 18:12:59 +00:00
// WithValidatorTimeout is an option that can be supplied to Subscribe. The argument is a duration after which long-running validators are canceled.
2018-01-13 10:33:03 +00:00
func WithValidatorTimeout ( timeout time . Duration ) SubOpt {
2017-11-23 18:12:59 +00:00
return func ( sub * Subscription ) error {
sub . validateTimeout = timeout
return nil
}
2016-09-10 00:15:39 +00:00
}
2018-01-13 12:31:34 +00:00
func WithMaxConcurrency ( n int ) SubOpt {
return func ( sub * Subscription ) error {
sub . validateThrottle = make ( chan struct { } , n )
return nil
}
}
2016-10-20 11:23:38 +00:00
// Subscribe returns a new Subscription for the given topic
2017-11-16 10:48:13 +00:00
func ( p * PubSub ) Subscribe ( topic string , opts ... SubOpt ) ( * Subscription , error ) {
2016-11-11 23:47:53 +00:00
td := pb . TopicDescriptor { Name : & topic }
2016-09-10 00:15:39 +00:00
2017-11-08 19:00:52 +00:00
return p . SubscribeByTopicDescriptor ( & td , opts ... )
2016-09-10 00:15:39 +00:00
}
2016-11-11 15:22:47 +00:00
// SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor
2017-11-16 10:48:13 +00:00
func ( p * PubSub ) SubscribeByTopicDescriptor ( td * pb . TopicDescriptor , opts ... SubOpt ) ( * Subscription , error ) {
2016-11-11 15:22:47 +00:00
if td . GetAuth ( ) . GetMode ( ) != pb . TopicDescriptor_AuthOpts_NONE {
return nil , fmt . Errorf ( "auth mode not yet supported" )
}
if td . GetEnc ( ) . GetMode ( ) != pb . TopicDescriptor_EncOpts_NONE {
return nil , fmt . Errorf ( "encryption mode not yet supported" )
}
2017-11-08 19:00:52 +00:00
sub := & Subscription {
2017-11-23 18:12:59 +00:00
topic : td . GetName ( ) ,
validateTimeout : defaultValidateTimeout ,
2017-11-08 19:00:52 +00:00
}
for _ , opt := range opts {
err := opt ( sub )
if err != nil {
return nil , err
}
}
2018-01-13 12:31:34 +00:00
if sub . validate != nil && sub . validateThrottle == nil {
sub . validateThrottle = make ( chan struct { } , defaultMaxConcurrency )
}
2016-11-11 23:47:53 +00:00
out := make ( chan * Subscription , 1 )
p . addSub <- & addSubReq {
2017-11-08 19:00:52 +00:00
sub : sub ,
resp : out ,
2016-11-11 23:47:53 +00:00
}
return <- out , nil
2016-11-11 15:22:47 +00:00
}
2016-10-19 23:01:06 +00:00
type topicReq struct {
resp chan [ ] string
}
2016-10-20 11:23:38 +00:00
// GetTopics returns the topics this node is subscribed to
2016-10-19 23:01:06 +00:00
func ( p * PubSub ) GetTopics ( ) [ ] string {
out := make ( chan [ ] string , 1 )
p . getTopics <- & topicReq { resp : out }
return <- out
2016-09-10 00:15:39 +00:00
}
2016-10-20 11:23:38 +00:00
// Publish publishes data under the given topic
2016-09-10 00:15:39 +00:00
func ( p * PubSub ) Publish ( topic string , data [ ] byte ) error {
2018-01-03 18:30:28 +00:00
seqno := make ( [ ] byte , 8 )
2018-01-03 09:25:20 +00:00
counter := atomic . AddUint64 ( & p . counter , 1 )
2018-01-03 18:30:28 +00:00
binary . BigEndian . PutUint64 ( seqno , counter )
2016-09-11 03:47:12 +00:00
p . publish <- & Message {
& pb . Message {
Data : data ,
TopicIDs : [ ] string { topic } ,
2017-01-09 17:52:07 +00:00
From : [ ] byte ( p . host . ID ( ) ) ,
2016-09-11 03:47:12 +00:00
Seqno : seqno ,
2016-09-10 00:15:39 +00:00
} ,
}
return nil
}
2016-09-15 01:07:30 +00:00
2016-10-18 18:13:12 +00:00
type listPeerReq struct {
resp chan [ ] peer . ID
topic string
}
2017-11-08 19:00:52 +00:00
// sendReq is a request to call maybePublishMessage. It is issued after the subscription verification is done.
type sendReq struct {
from peer . ID
msg * Message
}
2016-10-20 11:23:38 +00:00
// ListPeers returns a list of peers we are connected to.
2016-10-18 18:13:12 +00:00
func ( p * PubSub ) ListPeers ( topic string ) [ ] peer . ID {
2016-09-15 01:07:30 +00:00
out := make ( chan [ ] peer . ID )
2016-10-18 18:13:12 +00:00
p . getPeers <- & listPeerReq {
resp : out ,
topic : topic ,
}
2016-09-15 01:07:30 +00:00
return <- out
}