From d3ae5c94195fe4acc2133bc74d94a8f356c08f93 Mon Sep 17 00:00:00 2001 From: romanman Date: Sat, 1 Nov 2014 21:49:53 -0500 Subject: [PATCH] Infrastructure for channel management --- .../java/org/ethereum/net/eth/EthHandler.java | 1 + .../java/org/ethereum/net/p2p/P2pHandler.java | 1 + .../java/org/ethereum/net/server/Channel.java | 20 ++++++++++++ .../server/EthereumChannelInitializer.java | 5 +++ .../org/ethereum/net/server/PeerServer.java | 31 +++++++++++++++++-- .../java/org/ethereum/net/shh/ShhHandler.java | 1 + 6 files changed, 57 insertions(+), 2 deletions(-) diff --git a/ethereumj-core/src/main/java/org/ethereum/net/eth/EthHandler.java b/ethereumj-core/src/main/java/org/ethereum/net/eth/EthHandler.java index 35581800..680b94ad 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/eth/EthHandler.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/eth/EthHandler.java @@ -145,6 +145,7 @@ public class EthHandler extends SimpleChannelInboundHandler { @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { logger.debug("handlerRemoved: kill timers in EthHandler"); + active = false; this.killTimers(); } diff --git a/ethereumj-core/src/main/java/org/ethereum/net/p2p/P2pHandler.java b/ethereumj-core/src/main/java/org/ethereum/net/p2p/P2pHandler.java index f5ab5500..5041f1be 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/p2p/P2pHandler.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/p2p/P2pHandler.java @@ -132,6 +132,7 @@ public class P2pHandler extends SimpleChannelInboundHandler { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { + active = false; this.killTimers(); } diff --git a/ethereumj-core/src/main/java/org/ethereum/net/server/Channel.java b/ethereumj-core/src/main/java/org/ethereum/net/server/Channel.java index 07285351..be3b3845 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/server/Channel.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/server/Channel.java @@ -1,5 +1,10 @@ package org.ethereum.net.server; +import org.ethereum.net.MessageQueue; +import org.ethereum.net.eth.EthHandler; +import org.ethereum.net.p2p.P2pHandler; +import org.ethereum.net.shh.ShhHandler; + /** * www.etherj.com * @@ -8,4 +13,19 @@ package org.ethereum.net.server; */ public class Channel { + + MessageQueue msgQueue; + P2pHandler p2pHandler; + EthHandler ethHandler; + ShhHandler shhHandler; + + + public Channel(MessageQueue msgQueue, P2pHandler p2pHandler, EthHandler ethHandler, ShhHandler shhHandler) { + this.msgQueue = msgQueue; + this.p2pHandler = p2pHandler; + this.ethHandler = ethHandler; + this.shhHandler = shhHandler; + } + + } diff --git a/ethereumj-core/src/main/java/org/ethereum/net/server/EthereumChannelInitializer.java b/ethereumj-core/src/main/java/org/ethereum/net/server/EthereumChannelInitializer.java index 67da1f2b..224aadd1 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/server/EthereumChannelInitializer.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/server/EthereumChannelInitializer.java @@ -29,7 +29,11 @@ import static org.ethereum.config.SystemProperties.CONFIG; public class EthereumChannelInitializer extends ChannelInitializer { private static final Logger logger = LoggerFactory.getLogger("net"); + private PeerServer peerServer; + public EthereumChannelInitializer(PeerServer peerServer) { + this.peerServer = peerServer; + } public void initChannel(NioSocketChannel ch) throws Exception { @@ -63,6 +67,7 @@ public class EthereumChannelInitializer extends ChannelInitializer channels = Collections.synchronizedList(new ArrayList()); + public PeerServer() { } @@ -41,6 +48,23 @@ public class PeerServer { public void start(int port) { + + inactivesCollector.scheduleAtFixedRate(new TimerTask() { + public void run() { + + Iterator iter = channels.iterator(); + while(iter.hasNext()){ + Channel channel = iter.next(); + if(!channel.p2pHandler.isActive()){ + + iter.remove(); + logger.info("Channel removed: {}", channel.p2pHandler.getHandshakeHelloMessage()); + } + } + } + }, 2000, 5000); + + EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); @@ -59,7 +83,7 @@ public class PeerServer { b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONFIG.peerConnectionTimeout()); b.handler(new LoggingHandler()); - b.childHandler(new EthereumChannelInitializer()); + b.childHandler(new EthereumChannelInitializer(this)); // Start the client. logger.info("Listening for incoming connections, port: [{}] ", port); @@ -71,7 +95,7 @@ public class PeerServer { } catch (Exception e) { logger.debug("Exception: {} ({})", e.getMessage(), e.getClass().getName()); - throw new Error("Disconnnected"); + throw new Error("Server Disconnnected"); } finally { workerGroup.shutdownGracefully(); @@ -82,6 +106,9 @@ public class PeerServer { this.peerListener = peerListener; } + synchronized public void addChannel(Channel channel){ + channels.add(channel); + } } diff --git a/ethereumj-core/src/main/java/org/ethereum/net/shh/ShhHandler.java b/ethereumj-core/src/main/java/org/ethereum/net/shh/ShhHandler.java index 5e0faea3..a21354a8 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/shh/ShhHandler.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/shh/ShhHandler.java @@ -63,6 +63,7 @@ public class ShhHandler extends SimpleChannelInboundHandler { @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + active = false; logger.debug("handlerRemoved: ... "); }