From 849408dda60fe32d7abb78d103b09ca0bc7b5a60 Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 9 Jan 2014 23:15:51 +0100 Subject: [PATCH] Peer handling --- peer.go | 93 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ server.go | 20 ++++++++++-- 2 files changed, 111 insertions(+), 2 deletions(-) create mode 100644 peer.go diff --git a/peer.go b/peer.go new file mode 100644 index 000000000..0c8d38772 --- /dev/null +++ b/peer.go @@ -0,0 +1,93 @@ +package main + +import ( + "net" + "errors" + "log" +) + +type InMsg struct { + msgType string // Specifies how the encoded data should be interpreted + data []byte // RLP encoded data +} + +func ReadMessage(conn net.Conn) (*InMsg, error) { + buff := make([]byte, 4069) + + // Wait for a message from this peer + n, err := conn.Read(buff) + if err != nil { + return nil, err + } else if n == 0 { + return nil, errors.New("Empty message received") + } + + // Read the header (MAX n) + decoder := NewRlpDecoder(buff[:n]) + t := decoder.Get(0).AsString() + if t == "" { + return nil, errors.New("Data contained no data type") + } + + return &InMsg{msgType: t, data: decoder.Get(1).AsBytes()}, nil +} + +type OutMsg struct { + data []byte +} + +type Peer struct { + server *Server + conn net.Conn + outputQueue chan OutMsg + quit chan bool +} + +func NewPeer(conn net.Conn, server *Server) *Peer { + return &Peer{ + outputQueue: make(chan OutMsg, 1), // Buffered chan of 1 is enough + quit: make(chan bool), + + server: server, + conn: conn, + } +} + +// Outputs any RLP encoded data to the peer +func (p *Peer) QueueMessage(data []byte) { + p.outputQueue <- OutMsg{data: data} +} + +func (p *Peer) HandleOutbound() { +out: + for { + switch { + case <- p.quit: + break out + } + } +} + +func (p *Peer) HandleInbound() { + defer p.conn.Close() + +out: + for { + msg, err := ReadMessage(p.conn) + if err != nil { + log.Println(err) + + break out + } + + log.Println(msg) + } + + // Notify the out handler we're quiting + p.quit <- true +} + +func (p *Peer) Start() { + go p.HandleOutbound() + go p.HandleInbound() +} diff --git a/server.go b/server.go index d7718a8a6..feaf61076 100644 --- a/server.go +++ b/server.go @@ -2,7 +2,8 @@ package main import ( "container/list" - "time" + "net" + "log" ) var Db *LDBDatabase @@ -36,12 +37,27 @@ func NewServer() (*Server, error) { return server, nil } +func (s *Server) AddPeer(conn net.Conn) { + s.peers.PushBack(NewPeer(conn, s)) +} + // Start the server func (s *Server) Start() { // For now this function just blocks the main thread + ln, err := net.Listen("tcp", ":12345") + if err != nil { + log.Fatal(err) + } + go func() { for { - time.Sleep( time.Second ) + conn, err := ln.Accept() + if err != nil { + log.Println(err) + continue + } + + go s.AddPeer(conn) } }() }