propagate errors from datastore ops

This commit is contained in:
vyzo 2020-11-10 16:06:39 +02:00
parent 49cca12207
commit 9ef1f9e49e
1 changed files with 74 additions and 52 deletions

View File

@ -40,7 +40,7 @@ const (
// NewBasicConnectionGater creates a new connection gater. // NewBasicConnectionGater creates a new connection gater.
// The ds argument is an (optional, can be nil) datastore to persist the connection gater // The ds argument is an (optional, can be nil) datastore to persist the connection gater
// filters // filters
func NewBasicConnectionGater(ds datastore.Datastore) *BasicConnectionGater { func NewBasicConnectionGater(ds datastore.Datastore) (*BasicConnectionGater, error) {
cg := &BasicConnectionGater{ cg := &BasicConnectionGater{
blockedPeers: make(map[peer.ID]struct{}), blockedPeers: make(map[peer.ID]struct{}),
blockedAddrs: make(map[string]struct{}), blockedAddrs: make(map[string]struct{}),
@ -49,24 +49,27 @@ func NewBasicConnectionGater(ds datastore.Datastore) *BasicConnectionGater {
if ds != nil { if ds != nil {
cg.ds = namespace.Wrap(ds, datastore.NewKey(ns)) cg.ds = namespace.Wrap(ds, datastore.NewKey(ns))
cg.loadRules() err := cg.loadRules()
if err != nil {
return nil, err
}
} }
return cg return cg, nil
} }
func (cg *BasicConnectionGater) loadRules() { func (cg *BasicConnectionGater) loadRules() error {
// load blocked peers // load blocked peers
res, err := cg.ds.Query(query.Query{Prefix: keyPeer}) res, err := cg.ds.Query(query.Query{Prefix: keyPeer})
if err != nil { if err != nil {
log.Errorf("error querying datastore for blocked peers: %s", err) log.Errorf("error querying datastore for blocked peers: %s", err)
return return err
} }
for r := range res.Next() { for r := range res.Next() {
if r.Error != nil { if r.Error != nil {
log.Errorf("query result error: %s", r.Error) log.Errorf("query result error: %s", r.Error)
return return err
} }
p := peer.ID(r.Entry.Value) p := peer.ID(r.Entry.Value)
@ -77,13 +80,13 @@ func (cg *BasicConnectionGater) loadRules() {
res, err = cg.ds.Query(query.Query{Prefix: keyAddr}) res, err = cg.ds.Query(query.Query{Prefix: keyAddr})
if err != nil { if err != nil {
log.Errorf("error querying datastore for blocked addrs: %s", err) log.Errorf("error querying datastore for blocked addrs: %s", err)
return return err
} }
for r := range res.Next() { for r := range res.Next() {
if r.Error != nil { if r.Error != nil {
log.Errorf("query result error: %s", r.Error) log.Errorf("query result error: %s", r.Error)
return return err
} }
ip := net.IP(r.Entry.Value) ip := net.IP(r.Entry.Value)
@ -94,53 +97,60 @@ func (cg *BasicConnectionGater) loadRules() {
res, err = cg.ds.Query(query.Query{Prefix: keySubnet}) res, err = cg.ds.Query(query.Query{Prefix: keySubnet})
if err != nil { if err != nil {
log.Errorf("error querying datastore for blocked subnets: %s", err) log.Errorf("error querying datastore for blocked subnets: %s", err)
return return err
} }
for r := range res.Next() { for r := range res.Next() {
if r.Error != nil { if r.Error != nil {
log.Errorf("query result error: %s", r.Error) log.Errorf("query result error: %s", r.Error)
return return err
} }
ipnetStr := string(r.Entry.Value) ipnetStr := string(r.Entry.Value)
_, ipnet, err := net.ParseCIDR(ipnetStr) _, ipnet, err := net.ParseCIDR(ipnetStr)
if err != nil { if err != nil {
log.Errorf("error parsing CIDR subnet: %s", err) log.Errorf("error parsing CIDR subnet: %s", err)
return return err
} }
cg.blockedSubnets[ipnetStr] = ipnet cg.blockedSubnets[ipnetStr] = ipnet
} }
return nil
} }
// BlockPeer adds a peer to the set of blocked peers // BlockPeer adds a peer to the set of blocked peers
func (cg *BasicConnectionGater) BlockPeer(p peer.ID) { func (cg *BasicConnectionGater) BlockPeer(p peer.ID) error {
cg.Lock()
defer cg.Unlock()
cg.blockedPeers[p] = struct{}{}
if cg.ds != nil { if cg.ds != nil {
err := cg.ds.Put(datastore.NewKey(keyPeer+p.String()), []byte(p)) err := cg.ds.Put(datastore.NewKey(keyPeer+p.String()), []byte(p))
if err != nil { if err != nil {
log.Errorf("error writing blocked peer to datastore: %s", err) log.Errorf("error writing blocked peer to datastore: %s", err)
return err
} }
} }
cg.Lock()
defer cg.Unlock()
cg.blockedPeers[p] = struct{}{}
return nil
} }
// UnblockPeer removes a peer from the set of blocked peers // UnblockPeer removes a peer from the set of blocked peers
func (cg *BasicConnectionGater) UnblockPeer(p peer.ID) { func (cg *BasicConnectionGater) UnblockPeer(p peer.ID) error {
if cg.ds != nil {
err := cg.ds.Delete(datastore.NewKey(keyPeer + p.String()))
if err != nil {
log.Errorf("error deleting blocked peer from datastore: %s", err)
return err
}
}
cg.Lock() cg.Lock()
defer cg.Unlock() defer cg.Unlock()
delete(cg.blockedPeers, p) delete(cg.blockedPeers, p)
if cg.ds != nil { return nil
err := cg.ds.Delete(datastore.NewKey(keyPeer + p.String()))
if err != nil {
log.Errorf("error deleting blocked peer from datastore: %s", err)
}
}
} }
// ListBlockedPeers return a list of blocked peers // ListBlockedPeers return a list of blocked peers
@ -157,33 +167,39 @@ func (cg *BasicConnectionGater) ListBlockedPeers() []peer.ID {
} }
// BlockAddr adds an IP address to the set of blocked addresses // BlockAddr adds an IP address to the set of blocked addresses
func (cg *BasicConnectionGater) BlockAddr(ip net.IP) { func (cg *BasicConnectionGater) BlockAddr(ip net.IP) error {
if cg.ds != nil {
err := cg.ds.Put(datastore.NewKey(keyAddr+ip.String()), []byte(ip))
if err != nil {
log.Errorf("error writing blocked addr to datastore: %s", err)
return err
}
}
cg.Lock() cg.Lock()
defer cg.Unlock() defer cg.Unlock()
cg.blockedAddrs[ip.String()] = struct{}{} cg.blockedAddrs[ip.String()] = struct{}{}
if cg.ds != nil { return nil
err := cg.ds.Put(datastore.NewKey(keyAddr+ip.String()), []byte(ip))
if err != nil {
log.Errorf("error writing blocked addr to datastore: %s", err)
}
}
} }
// UnblockAddr removes an IP address from the set of blocked addresses // UnblockAddr removes an IP address from the set of blocked addresses
func (cg *BasicConnectionGater) UnblockAddr(ip net.IP) { func (cg *BasicConnectionGater) UnblockAddr(ip net.IP) error {
if cg.ds != nil {
err := cg.ds.Delete(datastore.NewKey(keyAddr + ip.String()))
if err != nil {
log.Errorf("error deleting blocked addr from datastore: %s", err)
return err
}
}
cg.Lock() cg.Lock()
defer cg.Unlock() defer cg.Unlock()
delete(cg.blockedAddrs, ip.String()) delete(cg.blockedAddrs, ip.String())
if cg.ds != nil { return nil
err := cg.ds.Delete(datastore.NewKey(keyAddr + ip.String()))
if err != nil {
log.Errorf("error deleting blocked addr from datastore: %s", err)
}
}
} }
// ListBlockedAddrs return a list of blocked IP addresses // ListBlockedAddrs return a list of blocked IP addresses
@ -201,33 +217,39 @@ func (cg *BasicConnectionGater) ListBlockedAddrs() []net.IP {
} }
// BlockSubnet adds an IP subnet to the set of blocked addresses // BlockSubnet adds an IP subnet to the set of blocked addresses
func (cg *BasicConnectionGater) BlockSubnet(ipnet *net.IPNet) { func (cg *BasicConnectionGater) BlockSubnet(ipnet *net.IPNet) error {
if cg.ds != nil {
err := cg.ds.Put(datastore.NewKey(keySubnet+ipnet.String()), []byte(ipnet.String()))
if err != nil {
log.Errorf("error writing blocked addr to datastore: %s", err)
return err
}
}
cg.Lock() cg.Lock()
defer cg.Unlock() defer cg.Unlock()
cg.blockedSubnets[ipnet.String()] = ipnet cg.blockedSubnets[ipnet.String()] = ipnet
if cg.ds != nil { return nil
err := cg.ds.Put(datastore.NewKey(keySubnet+ipnet.String()), []byte(ipnet.String()))
if err != nil {
log.Errorf("error writing blocked addr to datastore: %s", err)
}
}
} }
// UnblockSubnet removes an IP address from the set of blocked addresses // UnblockSubnet removes an IP address from the set of blocked addresses
func (cg *BasicConnectionGater) UnblockSubnet(ipnet *net.IPNet) { func (cg *BasicConnectionGater) UnblockSubnet(ipnet *net.IPNet) error {
if cg.ds != nil {
err := cg.ds.Delete(datastore.NewKey(keySubnet + ipnet.String()))
if err != nil {
log.Errorf("error deleting blocked subnet from datastore: %s", err)
return err
}
}
cg.Lock() cg.Lock()
defer cg.Unlock() defer cg.Unlock()
delete(cg.blockedSubnets, ipnet.String()) delete(cg.blockedSubnets, ipnet.String())
if cg.ds != nil { return nil
err := cg.ds.Delete(datastore.NewKey(keySubnet + ipnet.String()))
if err != nil {
log.Errorf("error deleting blocked subnet from datastore: %s", err)
}
}
} }
// ListBlockedSubnets return a list of blocked IP subnets // ListBlockedSubnets return a list of blocked IP subnets