Merge pull request #866 from fjl/p2p-last-minute

Last minute p2p fixes
This commit is contained in:
Jeffrey Wilcke 2015-05-06 14:49:52 -07:00
commit a0cb1945ae
6 changed files with 24 additions and 26 deletions

View File

@ -31,8 +31,8 @@ var (
// Timeouts // Timeouts
const ( const (
respTimeout = 300 * time.Millisecond respTimeout = 500 * time.Millisecond
sendTimeout = 300 * time.Millisecond sendTimeout = 500 * time.Millisecond
expiration = 20 * time.Second expiration = 20 * time.Second
refreshInterval = 1 * time.Hour refreshInterval = 1 * time.Hour

View File

@ -91,7 +91,8 @@ func Map(m Interface, c chan struct{}, protocol string, extport, intport int, na
}() }()
glog.V(logger.Debug).Infof("add mapping: %s %d -> %d (%s) using %s\n", protocol, extport, intport, name, m) glog.V(logger.Debug).Infof("add mapping: %s %d -> %d (%s) using %s\n", protocol, extport, intport, name, m)
if err := m.AddMapping(protocol, intport, extport, name, mapTimeout); err != nil { if err := m.AddMapping(protocol, intport, extport, name, mapTimeout); err != nil {
glog.V(logger.Error).Infof("mapping error: %v\n", err) glog.V(logger.Warn).Infof("network port %d could not be mapped: %v\n", intport, err)
glog.V(logger.Debug).Infof("mapping with %v returned %v\n", m, err)
} }
for { for {
select { select {
@ -102,7 +103,8 @@ func Map(m Interface, c chan struct{}, protocol string, extport, intport int, na
case <-refresh.C: case <-refresh.C:
glog.V(logger.Detail).Infof("refresh mapping: %s %d -> %d (%s) using %s\n", protocol, extport, intport, name, m) glog.V(logger.Detail).Infof("refresh mapping: %s %d -> %d (%s) using %s\n", protocol, extport, intport, name, m)
if err := m.AddMapping(protocol, intport, extport, name, mapTimeout); err != nil { if err := m.AddMapping(protocol, intport, extport, name, mapTimeout); err != nil {
glog.V(logger.Error).Infof("mapping error: %v\n", err) glog.V(logger.Warn).Infof("network port %d could not be mapped: %v\n", intport, err)
glog.V(logger.Debug).Infof("mapping with %v returned %v\n", m, err)
} }
refresh.Reset(mapUpdateInterval) refresh.Reset(mapUpdateInterval)
} }
@ -225,7 +227,7 @@ func (n *autodisc) wait() error {
return nil return nil
} }
if found = <-n.done; found == nil { if found = <-n.done; found == nil {
return errors.New("no devices discovered") return errors.New("no UPnP or NAT-PMP router discovered")
} }
n.mu.Lock() n.mu.Lock()
n.found = found n.found = found

View File

@ -10,6 +10,7 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
) )
@ -34,10 +35,6 @@ const (
// Peer represents a connected remote node. // Peer represents a connected remote node.
type Peer struct { type Peer struct {
// Peers have all the log methods.
// Use them to display messages related to the peer.
*logger.Logger
conn net.Conn conn net.Conn
rw *conn rw *conn
running map[string]*protoRW running map[string]*protoRW
@ -99,10 +96,8 @@ func (p *Peer) String() string {
} }
func newPeer(fd net.Conn, conn *conn, protocols []Protocol) *Peer { func newPeer(fd net.Conn, conn *conn, protocols []Protocol) *Peer {
logtag := fmt.Sprintf("Peer %.8x %v", conn.ID[:], fd.RemoteAddr())
protomap := matchProtocols(protocols, conn.Caps, conn) protomap := matchProtocols(protocols, conn.Caps, conn)
p := &Peer{ p := &Peer{
Logger: logger.NewLogger(logtag),
conn: fd, conn: fd,
rw: conn, rw: conn,
running: protomap, running: protomap,
@ -130,7 +125,7 @@ func (p *Peer) run() DiscReason {
} else { } else {
// Note: We rely on protocols to abort if there is a write // Note: We rely on protocols to abort if there is a write
// error. It might be more robust to handle them here as well. // error. It might be more robust to handle them here as well.
p.DebugDetailf("Read error: %v\n", err) glog.V(logger.Detail).Infof("%v: Read error: %v\n", p, err)
reason = DiscNetworkError reason = DiscNetworkError
} }
case err := <-p.protoErr: case err := <-p.protoErr:
@ -141,7 +136,7 @@ func (p *Peer) run() DiscReason {
close(p.closed) close(p.closed)
p.politeDisconnect(reason) p.politeDisconnect(reason)
p.wg.Wait() p.wg.Wait()
p.Debugf("Disconnected: %v\n", reason) glog.V(logger.Debug).Infof("%v: Disconnected: %v\n", p, reason)
return reason return reason
} }
@ -195,7 +190,7 @@ func (p *Peer) handle(msg Msg) error {
// This is the last message. We don't need to discard or // This is the last message. We don't need to discard or
// check errors because, the connection will be closed after it. // check errors because, the connection will be closed after it.
rlp.Decode(msg.Payload, &reason) rlp.Decode(msg.Payload, &reason)
p.Debugf("Disconnect requested: %v\n", reason[0]) glog.V(logger.Debug).Infof("%v: Disconnect Requested: %v\n", p, reason[0])
return DiscRequested return DiscRequested
case msg.Code < baseProtocolLength: case msg.Code < baseProtocolLength:
// ignore other base protocol messages // ignore other base protocol messages
@ -239,14 +234,14 @@ func (p *Peer) startProtocols() {
for _, proto := range p.running { for _, proto := range p.running {
proto := proto proto := proto
proto.closed = p.closed proto.closed = p.closed
p.DebugDetailf("Starting protocol %s/%d\n", proto.Name, proto.Version) glog.V(logger.Detail).Infof("%v: Starting protocol %s/%d\n", p, proto.Name, proto.Version)
go func() { go func() {
err := proto.Run(p, proto) err := proto.Run(p, proto)
if err == nil { if err == nil {
p.DebugDetailf("Protocol %s/%d returned\n", proto.Name, proto.Version) glog.V(logger.Detail).Infof("%v: Protocol %s/%d returned\n", p, proto.Name, proto.Version)
err = errors.New("protocol returned") err = errors.New("protocol returned")
} else { } else if err != io.EOF {
p.DebugDetailf("Protocol %s/%d error: %v\n", proto.Name, proto.Version, err) glog.V(logger.Detail).Infof("%v: Protocol %s/%d error: \n", p, proto.Name, proto.Version, err)
} }
p.protoErr <- err p.protoErr <- err
p.wg.Done() p.wg.Done()

View File

@ -428,7 +428,7 @@ func (srv *Server) dialLoop() {
case <-refresh.C: case <-refresh.C:
// Grab some nodes to connect to if we're not at capacity. // Grab some nodes to connect to if we're not at capacity.
srv.lock.RLock() srv.lock.RLock()
needpeers := len(srv.peers) < srv.MaxPeers needpeers := len(srv.peers) < srv.MaxPeers/2
srv.lock.RUnlock() srv.lock.RUnlock()
if needpeers { if needpeers {
go func() { go func() {

View File

@ -5,6 +5,8 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"gopkg.in/fatih/set.v0" "gopkg.in/fatih/set.v0"
@ -36,13 +38,13 @@ func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *peer {
// into the network. // into the network.
func (self *peer) start() { func (self *peer) start() {
go self.update() go self.update()
self.peer.Debugln("whisper started") glog.V(logger.Debug).Infof("%v: whisper started", self.peer)
} }
// stop terminates the peer updater, stopping message forwarding to it. // stop terminates the peer updater, stopping message forwarding to it.
func (self *peer) stop() { func (self *peer) stop() {
close(self.quit) close(self.quit)
self.peer.Debugln("whisper stopped") glog.V(logger.Debug).Infof("%v: whisper stopped", self.peer)
} }
// handshake sends the protocol initiation status message to the remote peer and // handshake sends the protocol initiation status message to the remote peer and
@ -94,7 +96,7 @@ func (self *peer) update() {
case <-transmit.C: case <-transmit.C:
if err := self.broadcast(); err != nil { if err := self.broadcast(); err != nil {
self.peer.Infoln("broadcast failed:", err) glog.V(logger.Info).Infof("%v: broadcast failed: %v", self.peer, err)
return return
} }
@ -152,7 +154,6 @@ func (self *peer) broadcast() error {
if err := p2p.Send(self.ws, messagesCode, transmit); err != nil { if err := p2p.Send(self.ws, messagesCode, transmit); err != nil {
return err return err
} }
self.peer.DebugDetailln("broadcasted", len(transmit), "message(s)") glog.V(logger.Detail).Infoln(self.peer, "broadcasted", len(transmit), "message(s)")
return nil return nil
} }

View File

@ -197,14 +197,14 @@ func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
} }
var envelopes []*Envelope var envelopes []*Envelope
if err := packet.Decode(&envelopes); err != nil { if err := packet.Decode(&envelopes); err != nil {
peer.Infof("failed to decode enveloped: %v", err) glog.V(logger.Info).Infof("%v: failed to decode envelope: %v", peer, err)
continue continue
} }
// Inject all envelopes into the internal pool // Inject all envelopes into the internal pool
for _, envelope := range envelopes { for _, envelope := range envelopes {
if err := self.add(envelope); err != nil { if err := self.add(envelope); err != nil {
// TODO Punish peer here. Invalid envelope. // TODO Punish peer here. Invalid envelope.
peer.Debugf("failed to pool envelope: %f", err) glog.V(logger.Debug).Infof("%v: failed to pool envelope: %v", peer, err)
} }
whisperPeer.mark(envelope) whisperPeer.mark(envelope)
} }