use a nonblocking DatagramChannel for receiving data

This commit is contained in:
Andy Prock 2015-09-28 13:11:10 -07:00
parent 409d235416
commit 79e9283d01
4 changed files with 132 additions and 93 deletions

View File

@ -14,97 +14,107 @@ import java.io.IOException;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.net.DatagramPacket; import java.net.DatagramPacket;
import java.net.DatagramSocket; import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
/** /**
* This is a specialized AsyncTask that receives data from a socket in the background, and * This is a specialized AsyncTask that receives data from a socket in the background, and
* notifies it's listener when data is received. * notifies it's listener when data is received. This is not threadsafe, the listener
* should handle synchronicity.
*/ */
public class UdpReceiverTask extends AsyncTask<Void, UdpReceiverTask.ReceivedPacket, Void> { public class UdpReceiverTask extends AsyncTask<Void, Void, Void> {
private static final String TAG = "UdpReceiverTask"; private static final String TAG = "UdpReceiverTask";
private static final int MAX_UDP_DATAGRAM_LEN = 1024; private static final int MAX_UDP_DATAGRAM_LEN = 1024;
private DatagramSocket mSocket; private DatagramChannel mChannel;
private WeakReference<OnDataReceivedListener> mReceiverListener; private WeakReference<OnDataReceivedListener> mReceiverListener;
/** /**
* An {@link AsyncTask} that blocks to receive data from a socket. * An {@link AsyncTask} that blocks to receive data from a socket.
* Received data is sent via the {@link OnDataReceivedListener} * Received data is sent via the {@link OnDataReceivedListener}
*/ */
public UdpReceiverTask(DatagramSocket socket, UdpReceiverTask.OnDataReceivedListener public UdpReceiverTask(DatagramChannel channel, UdpReceiverTask.OnDataReceivedListener
receiverListener) { receiverListener) {
this.mSocket = socket; this.mChannel = channel;
this.mReceiverListener = new WeakReference<>(receiverListener); this.mReceiverListener = new WeakReference<>(receiverListener);
} }
/**
* Returns the UdpReceiverTask's DatagramChannel.
*/
public DatagramChannel getChannel() {
return mChannel;
}
/** /**
* An infinite loop to block and read data from the socket. * An infinite loop to block and read data from the socket.
*/ */
@Override @Override
protected Void doInBackground(Void... a) { protected Void doInBackground(Void... a) {
OnDataReceivedListener receiverListener = mReceiverListener.get(); OnDataReceivedListener receiverListener = mReceiverListener.get();
Selector selector = null;
try { try {
byte[] lMsg = new byte[MAX_UDP_DATAGRAM_LEN]; selector = Selector.open();
DatagramPacket dp = new DatagramPacket(lMsg, lMsg.length); mChannel.register(selector, SelectionKey.OP_READ);
while(!isCancelled()){ } catch (ClosedChannelException cce) {
mSocket.receive(dp); if (receiverListener != null) {
publishProgress(new ReceivedPacket(Base64.encodeToString(lMsg, Base64.NO_WRAP), receiverListener.didReceiveError(cce.getMessage());
dp.getAddress().getHostAddress(), dp.getPort()));
} }
} catch (IOException ioe) { } catch (IOException ioe) {
if (receiverListener != null) { if (receiverListener != null) {
receiverListener.didReceiveError(ioe.getMessage()); receiverListener.didReceiveError(ioe.getMessage());
} }
} catch (RuntimeException rte) { }
if (receiverListener != null) {
receiverListener.didReceiveRuntimeException(rte); final ByteBuffer packet = ByteBuffer.allocate(MAX_UDP_DATAGRAM_LEN);
while(!isCancelled()){
try {
if(selector.selectNow() >= 1){
final InetSocketAddress address = (InetSocketAddress) mChannel.receive(packet);
String base64Data = Base64.encodeToString(packet.array(), Base64.NO_WRAP);
receiverListener.didReceiveData(base64Data, address.getHostName(), address.getPort());
packet.clear();
}
} catch (IOException ioe) {
if (receiverListener != null) {
receiverListener.didReceiveError(ioe.getMessage());
}
this.cancel(false);
} catch (RuntimeException rte) {
if (receiverListener != null) {
receiverListener.didReceiveRuntimeException(rte);
}
this.cancel(false);
} }
} }
return null; return null;
} }
/**
* Send data out to the listener.
* @param {@link ReceivedPacket} packet marshalled data
*/
@Override
protected void onProgressUpdate(ReceivedPacket... packet) {
OnDataReceivedListener receiverListener = mReceiverListener.get();
if (receiverListener != null) {
receiverListener.didReceiveData(packet[0].base64String, packet[0].address,
packet[0].port);
}
}
/** /**
* Close if cancelled. * Close if cancelled.
*/ */
@Override @Override
protected void onCancelled(){ protected void onCancelled() {
if (mSocket != null){ OnDataReceivedListener receiverListener = mReceiverListener.get();
mSocket.close();
if (mChannel != null && mChannel.isOpen()){
try {
mChannel.close();
} catch (IOException ioe) {
if (receiverListener != null) {
receiverListener.didReceiveError(ioe.getMessage());
}
}
} }
} }
/**
* Internal class used to marshall packet data as a progress update.
* base64String the data encoded as a base64 string
* address the address of the sender
* port the port number of the sender
*/
class ReceivedPacket {
String base64String;
String address;
int port;
ReceivedPacket(String base64String, String address, int port) {
this.base64String = base64String;
this.address = address;
this.port = port;
}
}
/** /**
* Listener interface for receive events. * Listener interface for receive events.
*/ */

View File

@ -11,29 +11,30 @@ import android.os.AsyncTask;
import java.io.IOException; import java.io.IOException;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.net.DatagramPacket; import java.net.SocketAddress;
import java.net.DatagramSocket; import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
/** /**
* Specialized AsyncTask that transmits data in the background, and notifies listeners of the result. * Specialized AsyncTask that transmits data in the background, and notifies listeners of the result.
*/ */
public class UdpSenderTask extends AsyncTask<DatagramPacket, Void, Void> { public class UdpSenderTask extends AsyncTask<UdpSenderTask.SenderPacket, Void, Void> {
private static final String TAG = "UdpSenderTask"; private static final String TAG = "UdpSenderTask";
private DatagramSocket mSocket; private DatagramChannel mChannel;
private WeakReference<OnDataSentListener> mListener; private WeakReference<OnDataSentListener> mListener;
public UdpSenderTask(DatagramSocket socket, OnDataSentListener listener) { public UdpSenderTask(DatagramChannel channel, OnDataSentListener listener) {
this.mSocket = socket; this.mChannel = channel;
this.mListener = new WeakReference<>(listener); this.mListener = new WeakReference<>(listener);
} }
@Override @Override
protected Void doInBackground(DatagramPacket... params) { protected Void doInBackground(SenderPacket... params) {
OnDataSentListener listener = mListener.get(); OnDataSentListener listener = mListener.get();
try { try {
mSocket.send(params[0]); mChannel.send(params[0].data, params[0].socketAddress);
if (listener != null) { if (listener != null) {
listener.onDataSent(this); listener.onDataSent(this);
} }
@ -50,6 +51,14 @@ public class UdpSenderTask extends AsyncTask<DatagramPacket, Void, Void> {
return null; return null;
} }
/**
* Simple class to marshall outgoing data across to this AsyncTask
*/
public static class SenderPacket {
SocketAddress socketAddress;
ByteBuffer data;
}
/** /**
* Callbacks for data send events. * Callbacks for data send events.
*/ */

View File

@ -11,33 +11,35 @@ import android.os.AsyncTask;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import android.util.Base64; import android.util.Base64;
import com.facebook.common.logging.FLog;
import com.facebook.react.bridge.Callback; import com.facebook.react.bridge.Callback;
import java.io.IOException; import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket; import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.SocketException; import java.net.SocketException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel; import java.nio.channels.DatagramChannel;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import static com.tradle.react.UdpSenderTask.OnDataSentListener;
/** /**
* Client class that wraps a sender and a receiver for UDP data. * Client class that wraps a sender and a receiver for UDP data.
*/ */
public final class UdpSocketClient implements UdpReceiverTask.OnDataReceivedListener, UdpSenderTask.OnDataSentListener { public final class UdpSocketClient implements UdpReceiverTask.OnDataReceivedListener, OnDataSentListener {
private final OnDataReceivedListener mReceiverListener; private final OnDataReceivedListener mReceiverListener;
private final OnRuntimeExceptionListener mExceptionListener; private final OnRuntimeExceptionListener mExceptionListener;
private final boolean mReuseAddress; private final boolean mReuseAddress;
private final Map<UdpSenderTask, Callback> mPendingSends;
private DatagramChannel mChannel;
private DatagramSocket mSocket;
private UdpReceiverTask mReceiverTask; private UdpReceiverTask mReceiverTask;
private final Map<UdpSenderTask, Callback> mPendingSends;
private DatagramChannel mSenderChannel;
private UdpSocketClient(Builder builder) { private UdpSocketClient(Builder builder) {
this.mReceiverListener = builder.receiverListener; this.mReceiverListener = builder.receiverListener;
this.mExceptionListener = builder.exceptionListener; this.mExceptionListener = builder.exceptionListener;
@ -58,20 +60,20 @@ public final class UdpSocketClient implements UdpReceiverTask.OnDataReceivedList
* binding. * binding.
*/ */
public void bind(Integer port, @Nullable String address) throws IOException { public void bind(Integer port, @Nullable String address) throws IOException {
mChannel = DatagramChannel.open(); DatagramChannel receiverChannel = DatagramChannel.open();
mChannel.configureBlocking(true); receiverChannel.configureBlocking(false);
mSocket = mChannel.socket(); mReceiverTask = new UdpReceiverTask(receiverChannel, this);
mReceiverTask = new UdpReceiverTask(mSocket, this);
SocketAddress socketAddress = null; SocketAddress socketAddress;
if (address != null) { if (address != null) {
socketAddress = new InetSocketAddress(address, port); socketAddress = new InetSocketAddress(address, port);
} else { } else {
socketAddress = new InetSocketAddress(port); socketAddress = new InetSocketAddress(port);
} }
mSocket.setReuseAddress(mReuseAddress); DatagramSocket socket = receiverChannel.socket();
mSocket.bind(socketAddress); socket.setReuseAddress(mReuseAddress);
socket.bind(socketAddress);
// begin listening for data in the background // begin listening for data in the background
mReceiverTask.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR); mReceiverTask.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR);
@ -86,16 +88,25 @@ public final class UdpSocketClient implements UdpReceiverTask.OnDataReceivedList
* @param callback callback for results * @param callback callback for results
* @throws UnknownHostException * @throws UnknownHostException
*/ */
public void send(String base64String, Integer port, String address, @Nullable Callback callback) throws UnknownHostException { public void send(String base64String, Integer port, String address, @Nullable Callback callback) throws UnknownHostException, IOException {
if (null == mSenderChannel || !mSenderChannel.isOpen()) {
mSenderChannel = DatagramChannel.open();
mSenderChannel.configureBlocking(true);
}
byte[] data = Base64.decode(base64String, Base64.NO_WRAP); byte[] data = Base64.decode(base64String, Base64.NO_WRAP);
DatagramPacket packet = new DatagramPacket(data, data.length,
InetAddress.getByName(address), port); UdpSenderTask task = new UdpSenderTask(mSenderChannel, this);
UdpSenderTask task = new UdpSenderTask(mSocket, this); UdpSenderTask.SenderPacket packet = new UdpSenderTask.SenderPacket();
packet.data = ByteBuffer.wrap(data);
packet.socketAddress = new InetSocketAddress(address, port);
if (callback != null) { if (callback != null) {
synchronized (mPendingSends) { synchronized (mPendingSends) {
mPendingSends.put(task, callback); mPendingSends.put(task, callback);
} }
} }
task.execute(packet); task.execute(packet);
} }
@ -103,8 +114,11 @@ public final class UdpSocketClient implements UdpReceiverTask.OnDataReceivedList
* Sets the socket to enable broadcasts. * Sets the socket to enable broadcasts.
*/ */
public void setBroadcast(boolean flag) throws SocketException { public void setBroadcast(boolean flag) throws SocketException {
if (mSocket != null) { if (mReceiverTask != null) {
mSocket.setBroadcast(flag); DatagramChannel channel = mReceiverTask.getChannel();
if (channel != null) {
channel.socket().setBroadcast(flag);
}
} }
} }
@ -113,9 +127,12 @@ public final class UdpSocketClient implements UdpReceiverTask.OnDataReceivedList
*/ */
public void close() throws IOException { public void close() throws IOException {
if (mReceiverTask != null && !mReceiverTask.isCancelled()) { if (mReceiverTask != null && !mReceiverTask.isCancelled()) {
mReceiverTask.cancel(false); // stop the receiving task, and close the channel
} else if (mChannel.isOpen()) { mReceiverTask.cancel(true);
mChannel.close(); }
if (mSenderChannel != null && mSenderChannel.isOpen()) {
mSenderChannel.close();
} }
} }

View File

@ -70,10 +70,10 @@ public final class UdpSockets extends ReactContextBaseJavaModule
mClients.clear(); mClients.clear();
} }
}.execute().get(); }.execute().get();
} catch (InterruptedException e) { } catch (InterruptedException ioe) {
FLog.e(TAG, "onCatalystInstanceDestroy", e); FLog.e(TAG, "onCatalystInstanceDestroy", ioe);
} catch (ExecutionException e) { } catch (ExecutionException ee) {
FLog.e(TAG, "onCatalystInstanceDestroy", e); FLog.e(TAG, "onCatalystInstanceDestroy", ee);
} }
} }
@ -140,15 +140,15 @@ public final class UdpSockets extends ReactContextBaseJavaModule
result.putInt("port", port); result.putInt("port", port);
callback.invoke(null, result); callback.invoke(null, result);
} catch (SocketException exception) { } catch (SocketException se) {
// Socket is already bound or a problem occurred during binding // Socket is already bound or a problem occurred during binding
callback.invoke(UdpErrorUtil.getError(null, exception.getMessage())); callback.invoke(UdpErrorUtil.getError(null, se.getMessage()));
} catch (IllegalArgumentException exception) { } catch (IllegalArgumentException iae) {
// SocketAddress is not supported // SocketAddress is not supported
callback.invoke(UdpErrorUtil.getError(null, exception.getMessage())); callback.invoke(UdpErrorUtil.getError(null, iae.getMessage()));
} catch (IOException exception) { } catch (IOException ioe) {
// an exception occurred // an exception occurred
callback.invoke(UdpErrorUtil.getError(null, exception.getMessage())); callback.invoke(UdpErrorUtil.getError(null, ioe.getMessage()));
} }
} }
}.execute(); }.execute();
@ -170,8 +170,11 @@ public final class UdpSockets extends ReactContextBaseJavaModule
try { try {
client.send(base64String, port, address, callback); client.send(base64String, port, address, callback);
} catch (UnknownHostException e) { } catch (UnknownHostException uhe) {
callback.invoke(UdpErrorUtil.getError(null, e.getMessage())); callback.invoke(UdpErrorUtil.getError(null, uhe.getMessage()));
} catch (IOException ioe) {
// an exception occurred
callback.invoke(UdpErrorUtil.getError(null, ioe.getMessage()));
} }
} }
}.execute(); }.execute();
@ -193,8 +196,8 @@ public final class UdpSockets extends ReactContextBaseJavaModule
try { try {
client.close(); client.close();
callback.invoke(); callback.invoke();
} catch (IOException e) { } catch (IOException ioe) {
callback.invoke(UdpErrorUtil.getError(null, e.getMessage())); callback.invoke(UdpErrorUtil.getError(null, ioe.getMessage()));
} }
mClients.remove(cId); mClients.remove(cId);