implement addMembership/dropMembership for android

this enables support for protocols such as ssdp, which require multicasting
bump to version 1.1.1
This commit is contained in:
Andy Prock 2015-11-18 15:11:03 -08:00
parent 0349336d6c
commit 398935bb67
9 changed files with 213 additions and 104 deletions

View File

@ -247,12 +247,20 @@ UdpSocket.prototype.setMulticastLoopback = function(flag, callback) {
// nothing yet // nothing yet
} }
UdpSocket.prototype.addMembership = function(multicastAddress, multicastInterface, callback) { UdpSocket.prototype.addMembership = function(multicastAddress) {
// nothing yet if (this._state !== STATE.BOUND) {
throw new Error('you must bind before addMembership()')
}
Sockets.addMembership(this._id, multicastAddress);
} }
UdpSocket.prototype.dropMembership = function(multicastAddress, multicastInterface, callback) { UdpSocket.prototype.dropMembership = function(multicastAddress) {
// nothing yet if (this._state !== STATE.BOUND) {
throw new Error('you must bind before addMembership()')
}
Sockets.dropMembership(this._id, multicastAddress);
} }
UdpSocket.prototype.ref = function() { UdpSocket.prototype.ref = function() {

View File

@ -2,6 +2,7 @@
<manifest xmlns:android="http://schemas.android.com/apk/res/android" <manifest xmlns:android="http://schemas.android.com/apk/res/android"
package="com.tradle.react" > package="com.tradle.react" >
<uses-permission android:name="android.permission.INTERNET"/> <uses-permission android:name="android.permission.INTERNET"/>
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE"/> <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE"/>
<uses-permission android:name="android.permission.ACCESS_WIFI_STATE"/> <uses-permission android:name="android.permission.ACCESS_WIFI_STATE"/>
<uses-permission android:name="android.permission.CHANGE_WIFI_MULTICAST_STATE"/>
</manifest> </manifest>

View File

@ -5,22 +5,16 @@
* Created by Andy Prock on 9/24/15. * Created by Andy Prock on 9/24/15.
*/ */
package com.tradle.react; package com.tradle.react;
import android.os.AsyncTask; import android.os.AsyncTask;
import android.util.Base64; import android.util.Base64;
import java.io.IOException; import java.io.IOException;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.net.DatagramPacket; import java.net.DatagramPacket;
import java.net.DatagramSocket; import java.net.DatagramSocket;
import java.net.InetSocketAddress; import java.net.InetAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
/** /**
* This is a specialized AsyncTask that receives data from a socket in the background, and * This is a specialized AsyncTask that receives data from a socket in the background, and
@ -31,24 +25,24 @@ public class UdpReceiverTask extends AsyncTask<Void, Void, Void> {
private static final String TAG = "UdpReceiverTask"; private static final String TAG = "UdpReceiverTask";
private static final int MAX_UDP_DATAGRAM_LEN = 1024; private static final int MAX_UDP_DATAGRAM_LEN = 1024;
private DatagramChannel mChannel; private DatagramSocket mSocket;
private WeakReference<OnDataReceivedListener> mReceiverListener; private WeakReference<OnDataReceivedListener> mReceiverListener;
/** /**
* An {@link AsyncTask} that blocks to receive data from a socket. * An {@link AsyncTask} that blocks to receive data from a socket.
* Received data is sent via the {@link OnDataReceivedListener} * Received data is sent via the {@link OnDataReceivedListener}
*/ */
public UdpReceiverTask(DatagramChannel channel, UdpReceiverTask.OnDataReceivedListener public UdpReceiverTask(DatagramSocket socket, UdpReceiverTask.OnDataReceivedListener
receiverListener) { receiverListener) {
this.mChannel = channel; this.mSocket = socket;
this.mReceiverListener = new WeakReference<>(receiverListener); this.mReceiverListener = new WeakReference<>(receiverListener);
} }
/** /**
* Returns the UdpReceiverTask's DatagramChannel. * Returns the UdpReceiverTask's DatagramChannel.
*/ */
public DatagramChannel getChannel() { public DatagramSocket getSocket() {
return mChannel; return mSocket;
} }
/** /**
@ -58,29 +52,17 @@ public class UdpReceiverTask extends AsyncTask<Void, Void, Void> {
protected Void doInBackground(Void... a) { protected Void doInBackground(Void... a) {
OnDataReceivedListener receiverListener = mReceiverListener.get(); OnDataReceivedListener receiverListener = mReceiverListener.get();
Selector selector = null; final byte[] buffer = new byte[MAX_UDP_DATAGRAM_LEN];
try { DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
selector = Selector.open();
mChannel.register(selector, SelectionKey.OP_READ);
} catch (ClosedChannelException cce) {
if (receiverListener != null) {
receiverListener.didReceiveError(cce.getMessage());
}
} catch (IOException ioe) {
if (receiverListener != null) {
receiverListener.didReceiveError(ioe.getMessage());
}
}
final ByteBuffer packet = ByteBuffer.allocate(MAX_UDP_DATAGRAM_LEN); while (!isCancelled()) {
while(!isCancelled()){
try { try {
if(selector.selectNow() >= 1){ mSocket.receive(packet);
final InetSocketAddress address = (InetSocketAddress) mChannel.receive(packet);
String base64Data = Base64.encodeToString(packet.array(), Base64.NO_WRAP); final InetAddress address = packet.getAddress();
receiverListener.didReceiveData(base64Data, address.getHostName(), address.getPort()); final String base64Data = Base64.encodeToString(packet.getData(), packet.getOffset(),
packet.clear(); packet.getLength(), Base64.NO_WRAP);
} receiverListener.didReceiveData(base64Data, address.getHostName(), packet.getPort());
} catch (IOException ioe) { } catch (IOException ioe) {
if (receiverListener != null) { if (receiverListener != null) {
receiverListener.didReceiveError(ioe.getMessage()); receiverListener.didReceiveError(ioe.getMessage());
@ -102,17 +84,7 @@ public class UdpReceiverTask extends AsyncTask<Void, Void, Void> {
*/ */
@Override @Override
protected void onCancelled() { protected void onCancelled() {
OnDataReceivedListener receiverListener = mReceiverListener.get(); // mSocket.close();
if (mChannel != null && mChannel.isOpen()){
try {
mChannel.close();
} catch (IOException ioe) {
if (receiverListener != null) {
receiverListener.didReceiveError(ioe.getMessage());
}
}
}
} }
/** /**

View File

@ -11,9 +11,9 @@ import android.os.AsyncTask;
import java.io.IOException; import java.io.IOException;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
/** /**
* Specialized AsyncTask that transmits data in the background, and notifies listeners of the result. * Specialized AsyncTask that transmits data in the background, and notifies listeners of the result.
@ -21,11 +21,11 @@ import java.nio.channels.DatagramChannel;
public class UdpSenderTask extends AsyncTask<UdpSenderTask.SenderPacket, Void, Void> { public class UdpSenderTask extends AsyncTask<UdpSenderTask.SenderPacket, Void, Void> {
private static final String TAG = "UdpSenderTask"; private static final String TAG = "UdpSenderTask";
private DatagramChannel mChannel; private DatagramSocket mSocket;
private WeakReference<OnDataSentListener> mListener; private WeakReference<OnDataSentListener> mListener;
public UdpSenderTask(DatagramChannel channel, OnDataSentListener listener) { public UdpSenderTask(DatagramSocket socket, OnDataSentListener listener) {
this.mChannel = channel; this.mSocket = socket;
this.mListener = new WeakReference<>(listener); this.mListener = new WeakReference<>(listener);
} }
@ -34,7 +34,9 @@ public class UdpSenderTask extends AsyncTask<UdpSenderTask.SenderPacket, Void, V
OnDataSentListener listener = mListener.get(); OnDataSentListener listener = mListener.get();
try { try {
mChannel.send(params[0].data, params[0].socketAddress); SenderPacket packet = params[0];
mSocket.send(new DatagramPacket(packet.data, packet.data.length, packet.socketAddress));
if (listener != null) { if (listener != null) {
listener.onDataSent(this); listener.onDataSent(this);
} }
@ -56,7 +58,7 @@ public class UdpSenderTask extends AsyncTask<UdpSenderTask.SenderPacket, Void, V
*/ */
public static class SenderPacket { public static class SenderPacket {
SocketAddress socketAddress; SocketAddress socketAddress;
ByteBuffer data; byte[] data;
} }
/** /**

View File

@ -11,17 +11,16 @@ import android.os.AsyncTask;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import android.util.Base64; import android.util.Base64;
import com.facebook.common.logging.FLog;
import com.facebook.react.bridge.Callback; import com.facebook.react.bridge.Callback;
import java.io.IOException; import java.io.IOException;
import java.net.DatagramSocket; import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.SocketException; import java.net.SocketException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -38,7 +37,7 @@ public final class UdpSocketClient implements UdpReceiverTask.OnDataReceivedList
private UdpReceiverTask mReceiverTask; private UdpReceiverTask mReceiverTask;
private final Map<UdpSenderTask, Callback> mPendingSends; private final Map<UdpSenderTask, Callback> mPendingSends;
private DatagramChannel mSenderChannel; private DatagramSocket mSocket;
private UdpSocketClient(Builder builder) { private UdpSocketClient(Builder builder) {
this.mReceiverListener = builder.receiverListener; this.mReceiverListener = builder.receiverListener;
@ -47,6 +46,14 @@ public final class UdpSocketClient implements UdpReceiverTask.OnDataReceivedList
this.mPendingSends = new ConcurrentHashMap<>(); this.mPendingSends = new ConcurrentHashMap<>();
} }
/**
* Checks to see if client is receiving multicast packets.
* @return boolean true if receiving multicast packets, else false.
*/
public boolean isMulticast() {
return (mReceiverTask != null && mReceiverTask.getSocket() instanceof MulticastSocket);
}
/** /**
* Binds to a specific port or address. A random port is used if the address is {@code null}. * Binds to a specific port or address. A random port is used if the address is {@code null}.
* *
@ -60,9 +67,9 @@ public final class UdpSocketClient implements UdpReceiverTask.OnDataReceivedList
* binding. * binding.
*/ */
public void bind(Integer port, @Nullable String address) throws IOException { public void bind(Integer port, @Nullable String address) throws IOException {
DatagramChannel receiverChannel = DatagramChannel.open(); mSocket = new DatagramSocket(null);
receiverChannel.configureBlocking(false);
mReceiverTask = new UdpReceiverTask(receiverChannel, this); mReceiverTask = new UdpReceiverTask(mSocket, this);
SocketAddress socketAddress; SocketAddress socketAddress;
if (address != null) { if (address != null) {
@ -71,14 +78,52 @@ public final class UdpSocketClient implements UdpReceiverTask.OnDataReceivedList
socketAddress = new InetSocketAddress(port); socketAddress = new InetSocketAddress(port);
} }
DatagramSocket socket = receiverChannel.socket(); mSocket.setReuseAddress(mReuseAddress);
socket.setReuseAddress(mReuseAddress); mSocket.bind(socketAddress);
socket.bind(socketAddress);
// begin listening for data in the background // begin listening for data in the background
mReceiverTask.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR); mReceiverTask.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR);
} }
/**
* Adds this socket to the specified multicast group. Rebuilds the receiver task with a
* MulticastSocket.
*
* @param address the multicast group to join
* @throws UnknownHostException
* @throws IOException
*/
public void addMembership(String address) throws UnknownHostException, IOException {
final int port = mReceiverTask.getSocket().getLocalPort();
mReceiverTask.cancel(true);
final MulticastSocket socket = new MulticastSocket(port);
socket.joinGroup(InetAddress.getByName(address));
mReceiverTask = new UdpReceiverTask(socket, this);
// begin listening for data in the background
mReceiverTask.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR);
}
/**
* Removes this socket from the specified multicast group.
*
* @param address the multicast group to leave
* @throws UnknownHostException
* @throws IOException
*/
public void dropMembership(String address) throws UnknownHostException, IOException {
if (mReceiverTask != null && mReceiverTask.getSocket() instanceof MulticastSocket) {
if (!mReceiverTask.isCancelled()) {
mReceiverTask.cancel(true);
}
final MulticastSocket socket = (MulticastSocket) mReceiverTask.getSocket();
socket.leaveGroup(InetAddress.getByName(address));
}
}
/** /**
* Creates a UdpSenderTask, and transmits udp data in the background. * Creates a UdpSenderTask, and transmits udp data in the background.
* *
@ -87,18 +132,18 @@ public final class UdpSocketClient implements UdpReceiverTask.OnDataReceivedList
* @param address destination address * @param address destination address
* @param callback callback for results * @param callback callback for results
* @throws UnknownHostException * @throws UnknownHostException
* @throws IOException
*/ */
public void send(String base64String, Integer port, String address, @Nullable Callback callback) throws UnknownHostException, IOException { public void send(String base64String, Integer port, String address, @Nullable Callback callback) throws UnknownHostException, IOException {
if (null == mSenderChannel || !mSenderChannel.isOpen()) { if (null == mSocket || !mSocket.isBound()) {
mSenderChannel = DatagramChannel.open(); return;
mSenderChannel.configureBlocking(true);
} }
byte[] data = Base64.decode(base64String, Base64.NO_WRAP); byte[] data = Base64.decode(base64String, Base64.NO_WRAP);
UdpSenderTask task = new UdpSenderTask(mSenderChannel, this); UdpSenderTask task = new UdpSenderTask(mSocket, this);
UdpSenderTask.SenderPacket packet = new UdpSenderTask.SenderPacket(); UdpSenderTask.SenderPacket packet = new UdpSenderTask.SenderPacket();
packet.data = ByteBuffer.wrap(data); packet.data = data;
packet.socketAddress = new InetSocketAddress(address, port); packet.socketAddress = new InetSocketAddress(address, port);
if (callback != null) { if (callback != null) {
@ -115,9 +160,9 @@ public final class UdpSocketClient implements UdpReceiverTask.OnDataReceivedList
*/ */
public void setBroadcast(boolean flag) throws SocketException { public void setBroadcast(boolean flag) throws SocketException {
if (mReceiverTask != null) { if (mReceiverTask != null) {
DatagramChannel channel = mReceiverTask.getChannel(); DatagramSocket socket = mReceiverTask.getSocket();
if (channel != null) { if (socket != null) {
channel.socket().setBroadcast(flag); socket.setBroadcast(flag);
} }
} }
} }
@ -129,10 +174,14 @@ public final class UdpSocketClient implements UdpReceiverTask.OnDataReceivedList
if (mReceiverTask != null && !mReceiverTask.isCancelled()) { if (mReceiverTask != null && !mReceiverTask.isCancelled()) {
// stop the receiving task, and close the channel // stop the receiving task, and close the channel
mReceiverTask.cancel(true); mReceiverTask.cancel(true);
if (!mReceiverTask.getSocket().isClosed()) {
mReceiverTask.getSocket().close();
}
} }
if (mSenderChannel != null && mSenderChannel.isOpen()) { if (mSocket != null && !mSocket.isClosed()) {
mSenderChannel.close(); mSocket.close();
mSocket = null;
} }
} }

View File

@ -5,27 +5,29 @@
* Created by Andy Prock on 9/24/15. * Created by Andy Prock on 9/24/15.
*/ */
package com.tradle.react; package com.tradle.react;
import android.support.annotation.Nullable; import android.content.Context;
import android.util.SparseArray; import android.net.wifi.WifiManager;
import android.support.annotation.Nullable;
import android.util.SparseArray;
import com.facebook.common.logging.FLog; import com.facebook.common.logging.FLog;
import com.facebook.react.bridge.Arguments; import com.facebook.react.bridge.Arguments;
import com.facebook.react.bridge.Callback; import com.facebook.react.bridge.Callback;
import com.facebook.react.bridge.GuardedAsyncTask; import com.facebook.react.bridge.GuardedAsyncTask;
import com.facebook.react.bridge.ReactApplicationContext; import com.facebook.react.bridge.ReactApplicationContext;
import com.facebook.react.bridge.ReactContext; import com.facebook.react.bridge.ReactContext;
import com.facebook.react.bridge.ReactContextBaseJavaModule; import com.facebook.react.bridge.ReactContextBaseJavaModule;
import com.facebook.react.bridge.ReactMethod; import com.facebook.react.bridge.ReactMethod;
import com.facebook.react.bridge.ReadableMap; import com.facebook.react.bridge.ReadableMap;
import com.facebook.react.bridge.WritableMap; import com.facebook.react.bridge.WritableMap;
import com.facebook.react.modules.core.DeviceEventManagerModule; import com.facebook.react.modules.core.DeviceEventManagerModule;
import java.io.IOException; import java.io.IOException;
import java.net.SocketException; import java.net.SocketException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
/** /**
* The NativeModule in charge of storing active {@link UdpSocketClient}s, and acting as an api layer. * The NativeModule in charge of storing active {@link UdpSocketClient}s, and acting as an api layer.
@ -33,6 +35,7 @@ import java.util.concurrent.ExecutionException;
public final class UdpSockets extends ReactContextBaseJavaModule public final class UdpSockets extends ReactContextBaseJavaModule
implements UdpSocketClient.OnDataReceivedListener, UdpSocketClient.OnRuntimeExceptionListener { implements UdpSocketClient.OnDataReceivedListener, UdpSocketClient.OnRuntimeExceptionListener {
private static final String TAG = "UdpSockets"; private static final String TAG = "UdpSockets";
private WifiManager.MulticastLock mMulticastLock;
private SparseArray<UdpSocketClient> mClients = new SparseArray<>(); private SparseArray<UdpSocketClient> mClients = new SparseArray<>();
private boolean mShuttingDown = false; private boolean mShuttingDown = false;
@ -154,6 +157,65 @@ public final class UdpSockets extends ReactContextBaseJavaModule
}.execute(); }.execute();
} }
/**
* Joins a multi-cast group
*/
@ReactMethod
public void addMembership(final Integer cId, final String multicastAddress) {
new GuardedAsyncTask<Void, Void>(getReactApplicationContext()) {
@Override
protected void doInBackgroundGuarded(Void... params) {
UdpSocketClient client = findClient(cId, null);
if (client == null) {
return;
}
if (mMulticastLock == null) {
WifiManager wifiMgr = (WifiManager) getReactApplicationContext()
.getSystemService(Context.WIFI_SERVICE);
mMulticastLock = wifiMgr.createMulticastLock("react-native-udp");
mMulticastLock.setReferenceCounted(true);
}
if (!client.isMulticast()) {
// acquire the multi-cast lock, IF this is the
// first addMembership call for this client.
mMulticastLock.acquire();
}
try {
client.addMembership(multicastAddress);
} catch (IOException ioe) {
// an exception occurred
FLog.e(TAG, "addMembership", ioe);
}
}
}.execute();
}
/**
* Leaves a multi-cast group
*/
@ReactMethod
public void dropMembership(final Integer cId, final String multicastAddress) {
new GuardedAsyncTask<Void, Void>(getReactApplicationContext()) {
@Override
protected void doInBackgroundGuarded(Void... params) {
UdpSocketClient client = findClient(cId, null);
if (client == null) {
return;
}
try {
client.dropMembership(multicastAddress);
} catch (IOException ioe) {
// an exception occurred
FLog.e(TAG, "dropMembership", ioe);
}
}
}.execute();
}
/** /**
* Sends udp data via the {@link UdpSocketClient} * Sends udp data via the {@link UdpSocketClient}
*/ */
@ -193,6 +255,11 @@ public final class UdpSockets extends ReactContextBaseJavaModule
return; return;
} }
if (client.isMulticast() && mMulticastLock.isHeld()) {
// drop the multi-cast lock if this is a multi-cast client
mMulticastLock.release();
}
try { try {
client.close(); client.close();
callback.invoke(); callback.invoke();

View File

@ -1,6 +1,6 @@
Pod::Spec.new do |s| Pod::Spec.new do |s|
s.name = 'ReactUdp' s.name = 'ReactUdp'
s.version = '0.1.0' s.version = '1.1.1'
s.summary = 'node\'s dgram API in React Native.' s.summary = 'node\'s dgram API in React Native.'
s.description = <<-DESC s.description = <<-DESC
Enables accessing udp sockets in React Native. Enables accessing udp sockets in React Native.

View File

@ -114,6 +114,16 @@ RCT_EXPORT_METHOD(setBroadcast:(nonnull NSNumber*)cId
callback(@[[NSNull null]]); callback(@[[NSNull null]]);
} }
RCT_EXPORT_METHOD(addMembership:(nonnull NSNumber*)cId
multicastAddress:(NSString *)address) {
/* nop */
}
RCT_EXPORT_METHOD(dropMembership:(nonnull NSNumber*)cId
multicastAddress:(NSString *)address) {
/* nop */
}
- (void) onData:(UdpSocketClient*) client data:(NSData *)data host:(NSString *)host port:(uint16_t)port - (void) onData:(UdpSocketClient*) client data:(NSData *)data host:(NSString *)host port:(uint16_t)port
{ {
NSMutableDictionary* _clients = [UdpSockets clients]; NSMutableDictionary* _clients = [UdpSockets clients];

View File

@ -1,6 +1,6 @@
{ {
"name": "react-native-udp", "name": "react-native-udp",
"version": "1.1.0", "version": "1.1.1",
"description": "node's dgram API for react-native", "description": "node's dgram API for react-native",
"main": "UdpSockets.js", "main": "UdpSockets.js",
"scripts": { "scripts": {