Infrastructure for channel management

This commit is contained in:
romanman 2014-11-01 21:49:53 -05:00
parent 04c7a5c032
commit d3ae5c9419
6 changed files with 57 additions and 2 deletions

View File

@ -145,6 +145,7 @@ public class EthHandler extends SimpleChannelInboundHandler<EthMessage> {
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
logger.debug("handlerRemoved: kill timers in EthHandler");
active = false;
this.killTimers();
}

View File

@ -132,6 +132,7 @@ public class P2pHandler extends SimpleChannelInboundHandler<P2pMessage> {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
active = false;
this.killTimers();
}

View File

@ -1,5 +1,10 @@
package org.ethereum.net.server;
import org.ethereum.net.MessageQueue;
import org.ethereum.net.eth.EthHandler;
import org.ethereum.net.p2p.P2pHandler;
import org.ethereum.net.shh.ShhHandler;
/**
* www.etherj.com
*
@ -8,4 +13,19 @@ package org.ethereum.net.server;
*/
public class Channel {
MessageQueue msgQueue;
P2pHandler p2pHandler;
EthHandler ethHandler;
ShhHandler shhHandler;
public Channel(MessageQueue msgQueue, P2pHandler p2pHandler, EthHandler ethHandler, ShhHandler shhHandler) {
this.msgQueue = msgQueue;
this.p2pHandler = p2pHandler;
this.ethHandler = ethHandler;
this.shhHandler = shhHandler;
}
}

View File

@ -29,7 +29,11 @@ import static org.ethereum.config.SystemProperties.CONFIG;
public class EthereumChannelInitializer extends ChannelInitializer<NioSocketChannel> {
private static final Logger logger = LoggerFactory.getLogger("net");
private PeerServer peerServer;
public EthereumChannelInitializer(PeerServer peerServer) {
this.peerServer = peerServer;
}
public void initChannel(NioSocketChannel ch) throws Exception {
@ -63,6 +67,7 @@ public class EthereumChannelInitializer extends ChannelInitializer<NioSocketChan
ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(32368));
ch.config().setOption(ChannelOption.SO_RCVBUF, 32368);
peerServer.addChannel(new Channel(msgQueue, p2pHandler, ethHandler, shhHandler));
// todo: check if have or not active peer if not set this one
}

View File

@ -9,9 +9,12 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import org.ethereum.net.PeerListener;
import org.ethereum.net.p2p.HelloMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import static org.ethereum.config.SystemProperties.CONFIG;
/**
@ -29,8 +32,12 @@ public class PeerServer {
private PeerListener peerListener;
Timer inactivesCollector = new Timer("inactivesCollector");
private boolean peerDiscoveryMode = false;
List<Channel> channels = Collections.synchronizedList(new ArrayList<Channel>());
public PeerServer() {
}
@ -41,6 +48,23 @@ public class PeerServer {
public void start(int port) {
inactivesCollector.scheduleAtFixedRate(new TimerTask() {
public void run() {
Iterator<Channel> iter = channels.iterator();
while(iter.hasNext()){
Channel channel = iter.next();
if(!channel.p2pHandler.isActive()){
iter.remove();
logger.info("Channel removed: {}", channel.p2pHandler.getHandshakeHelloMessage());
}
}
}
}, 2000, 5000);
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
@ -59,7 +83,7 @@ public class PeerServer {
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONFIG.peerConnectionTimeout());
b.handler(new LoggingHandler());
b.childHandler(new EthereumChannelInitializer());
b.childHandler(new EthereumChannelInitializer(this));
// Start the client.
logger.info("Listening for incoming connections, port: [{}] ", port);
@ -71,7 +95,7 @@ public class PeerServer {
} catch (Exception e) {
logger.debug("Exception: {} ({})", e.getMessage(), e.getClass().getName());
throw new Error("Disconnnected");
throw new Error("Server Disconnnected");
} finally {
workerGroup.shutdownGracefully();
@ -82,6 +106,9 @@ public class PeerServer {
this.peerListener = peerListener;
}
synchronized public void addChannel(Channel channel){
channels.add(channel);
}
}

View File

@ -63,6 +63,7 @@ public class ShhHandler extends SimpleChannelInboundHandler<ShhMessage> {
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
active = false;
logger.debug("handlerRemoved: ... ");
}