Commit 57e057c8 authored by Martin Sustrik's avatar Martin Sustrik

1st version of Java poll added

parent ed8fe683
......@@ -24,29 +24,29 @@
#include "org_zmq_Context.h"
static void *fetch_socket (JNIEnv *env, jobject socket);
/** Handle to Java's Context::contextHandle. */
static jfieldID ctx_handle_fid = NULL;
/**
* Make sure we have a valid pointer to Java's Context::contextHandle.
*/
static void ensure_context (JNIEnv *env,
jobject obj)
static void ensure_context (JNIEnv *env, jobject obj)
{
if (ctx_handle_fid == NULL) {
jclass cls = env->GetObjectClass (obj);
assert (cls);
ctx_handle_fid = env->GetFieldID (cls, "contextHandle", "J");
assert (ctx_handle_fid);
env->DeleteLocalRef (cls);
assert (cls);
ctx_handle_fid = env->GetFieldID (cls, "contextHandle", "J");
assert (ctx_handle_fid);
env->DeleteLocalRef (cls);
}
}
/**
* Get the value of Java's Context::contextHandle.
*/
static void *get_context (JNIEnv *env,
jobject obj)
static void *get_context (JNIEnv *env, jobject obj)
{
ensure_context (env, obj);
void *s = (void*) env->GetLongField (obj, ctx_handle_fid);
......@@ -56,9 +56,7 @@ static void *get_context (JNIEnv *env,
/**
* Set the value of Java's Context::contextHandle.
*/
static void put_context (JNIEnv *env,
jobject obj,
void *s)
static void put_context (JNIEnv *env, jobject obj, void *s)
{
ensure_context (env, obj);
env->SetLongField (obj, ctx_handle_fid, (jlong) s);
......@@ -67,8 +65,7 @@ static void put_context (JNIEnv *env,
/**
* Raise an exception that includes 0MQ's error message.
*/
static void raise_exception (JNIEnv *env,
int err)
static void raise_exception (JNIEnv *env, int err)
{
// Get exception class.
jclass exception_class = env->FindClass ("java/lang/Exception");
......@@ -88,18 +85,15 @@ static void raise_exception (JNIEnv *env,
* Called to construct a Java Context object.
*/
JNIEXPORT void JNICALL Java_org_zmq_Context_construct (JNIEnv *env,
jobject obj,
jint app_threads,
jint io_threads,
jint flags)
jobject obj, jint app_threads, jint io_threads, jint flags)
{
void *c = get_context (env, obj);
assert (! c);
assert (!c);
c = zmq_init (app_threads, io_threads, flags);
put_context(env, obj, c);
put_context (env, obj, c);
if (c == NULL) {
if (!c) {
raise_exception (env, errno);
return;
}
......@@ -109,7 +103,7 @@ JNIEXPORT void JNICALL Java_org_zmq_Context_construct (JNIEnv *env,
* Called to destroy a Java Context object.
*/
JNIEXPORT void JNICALL Java_org_zmq_Context_finalize (JNIEnv *env,
jobject obj)
jobject obj)
{
void *c = get_context (env, obj);
assert (c);
......@@ -118,3 +112,107 @@ JNIEXPORT void JNICALL Java_org_zmq_Context_finalize (JNIEnv *env,
put_context (env, obj, NULL);
assert (rc == 0);
}
JNIEXPORT jlong JNICALL Java_org_zmq_Context_poll (JNIEnv *env,
jobject obj,
jobjectArray socket_0mq,
jshortArray event_0mq,
jshortArray revent_0mq,
jlong timeout)
{
jsize ls_0mq = 0;
jsize le_0mq = 0;
jsize lr_0mq = 0;
if (socket_0mq)
ls_0mq = env->GetArrayLength (socket_0mq);
if (event_0mq)
le_0mq = env->GetArrayLength (event_0mq);
if (revent_0mq)
lr_0mq = env->GetArrayLength (revent_0mq);
if (ls_0mq != le_0mq || ls_0mq != lr_0mq)
return 0;
jsize ls = ls_0mq;
if (ls <= 0)
return 0;
zmq_pollitem_t *pitem = new zmq_pollitem_t [ls];
short pc = 0;
int rc = 0;
// Add 0MQ sockets.
if (ls_0mq > 0) {
jshort *e_0mq = env->GetShortArrayElements (event_0mq, 0);
if (e_0mq != NULL) {
for (int i = 0; i < ls_0mq; ++i) {
jobject s_0mq = env->GetObjectArrayElement (socket_0mq, i);
if (!s_0mq)
continue;
void *s = fetch_socket (env, s_0mq);
if (!s)
continue;
pitem [pc].socket = s;
pitem [pc].fd = 0;
pitem [pc].events = e_0mq [i];
pitem [pc].revents = 0;
++pc;
}
env->ReleaseShortArrayElements(event_0mq, e_0mq, 0);
}
}
if (pc == ls) {
pc = 0;
long tout = (long) timeout;
rc = zmq_poll (pitem, ls, tout);
int err = 0;
const char *msg = "";
if (rc < 0) {
err = errno;
msg = zmq_strerror (err);
}
}
// Set 0MQ results.
if (ls_0mq > 0) {
jshort *r_0mq = env->GetShortArrayElements (revent_0mq, 0);
if (r_0mq) {
for (int i = 0; i < ls_0mq; ++i) {
r_0mq [i] = pitem [pc].revents;
++pc;
}
env->ReleaseShortArrayElements(revent_0mq, r_0mq, 0);
}
}
delete [] pitem;
return rc;
}
/**
* Get the value of socketHandle for the specified Java Socket.
*/
static void *fetch_socket (JNIEnv *env, jobject socket)
{
static jmethodID get_socket_handle_mid = NULL;
if (get_socket_handle_mid == NULL) {
jclass cls = env->GetObjectClass (socket);
assert (cls);
get_socket_handle_mid = env->GetMethodID (cls,
"getSocketHandle", "()J");
env->DeleteLocalRef (cls);
assert (get_socket_handle_mid);
}
void *s = (void*) env->CallLongMethod (socket, get_socket_handle_mid);
if (env->ExceptionCheck ()) {
s = NULL;
}
assert (s);
return s;
}
......@@ -32,23 +32,21 @@ static jfieldID socket_handle_fid = NULL;
/**
* Make sure we have a valid pointer to Java's Socket::socketHandle.
*/
static void ensure_socket (JNIEnv *env,
jobject obj)
static void ensure_socket (JNIEnv *env, jobject obj)
{
if (socket_handle_fid == NULL) {
jclass cls = env->GetObjectClass (obj);
assert (cls);
socket_handle_fid = env->GetFieldID (cls, "socketHandle", "J");
assert (socket_handle_fid);
env->DeleteLocalRef (cls);
assert (cls);
socket_handle_fid = env->GetFieldID (cls, "socketHandle", "J");
assert (socket_handle_fid);
env->DeleteLocalRef (cls);
}
}
/**
* Get the value of Java's Socket::socketHandle.
*/
static void *get_socket (JNIEnv *env,
jobject obj)
static void *get_socket (JNIEnv *env, jobject obj)
{
ensure_socket (env, obj);
void *s = (void*) env->GetLongField (obj, socket_handle_fid);
......@@ -58,42 +56,35 @@ static void *get_socket (JNIEnv *env,
/**
* Set the value of Java's Socket::socketHandle.
*/
static void put_socket (JNIEnv *env,
jobject obj,
void *s)
static void put_socket (JNIEnv *env, jobject obj, void *s)
{
ensure_socket (env, obj);
env->SetLongField (obj, socket_handle_fid, (jlong) s);
}
/**
* Get the value of contextHandle for the Java Context associated with
* this Java Socket object.
* Get the value of contextHandle for the specified Java Context.
*/
static void *fetch_context (JNIEnv *env,
jobject context)
static void *fetch_context (JNIEnv *env, jobject context)
{
static jmethodID get_context_handle_mid = NULL;
if (get_context_handle_mid == NULL) {
if (!get_context_handle_mid) {
jclass cls = env->GetObjectClass (context);
assert (cls);
get_context_handle_mid = env->GetMethodID (cls,
"getContextHandle",
"()J");
env->DeleteLocalRef (cls);
assert (get_context_handle_mid);
assert (cls);
get_context_handle_mid = env->GetMethodID (cls,
"getContextHandle", "()J");
env->DeleteLocalRef (cls);
assert (get_context_handle_mid);
}
void *zmq_ctx = (void*) env->CallLongMethod (context,
get_context_handle_mid);
void *c = (void*) env->CallLongMethod (context, get_context_handle_mid);
if (env->ExceptionCheck ()) {
zmq_ctx = NULL;
c = NULL;
}
assert (zmq_ctx);
return zmq_ctx;
assert (c);
return c;
}
/**
......@@ -119,20 +110,18 @@ static void raise_exception (JNIEnv *env, int err)
* Called to construct a Java Socket object.
*/
JNIEXPORT void JNICALL Java_org_zmq_Socket_construct (JNIEnv *env,
jobject obj,
jobject context,
jint type)
jobject obj, jobject context, jint type)
{
void *s = get_socket (env, obj);
assert (! s);
void *zmq_ctx = fetch_context (env, context);
s = zmq_socket (zmq_ctx, type);
void *c = fetch_context (env, context);
s = zmq_socket (c, type);
put_socket(env, obj, s);
if (s == NULL) {
raise_exception (env, errno);
return;
return;
}
}
......@@ -140,7 +129,7 @@ JNIEXPORT void JNICALL Java_org_zmq_Socket_construct (JNIEnv *env,
* Called to destroy a Java Socket object.
*/
JNIEXPORT void JNICALL Java_org_zmq_Socket_finalize (JNIEnv *env,
jobject obj)
jobject obj)
{
void *s = get_socket (env, obj);
assert (s);
......@@ -184,10 +173,7 @@ JNIEXPORT void JNICALL Java_org_zmq_Socket_setsockopt__IJ (JNIEnv *env,
* Called by Java's Socket::setsockopt(int option, String optval).
*/
JNIEXPORT void JNICALL Java_org_zmq_Socket_setsockopt__ILjava_lang_String_2 (
JNIEnv *env,
jobject obj,
jint option,
jstring optval)
JNIEnv *env, jobject obj, jint option, jstring optval)
{
switch (option) {
case ZMQ_IDENTITY:
......@@ -219,9 +205,8 @@ JNIEXPORT void JNICALL Java_org_zmq_Socket_setsockopt__ILjava_lang_String_2 (
/**
* Called by Java's Socket::bind(String addr).
*/
JNIEXPORT void JNICALL Java_org_zmq_Socket_bind (JNIEnv *env,
jobject obj,
jstring addr)
JNIEXPORT void JNICALL Java_org_zmq_Socket_bind (JNIEnv *env, jobject obj,
jstring addr)
{
void *s = get_socket (env, obj);
assert (s);
......@@ -248,8 +233,7 @@ JNIEXPORT void JNICALL Java_org_zmq_Socket_bind (JNIEnv *env,
* Called by Java's Socket::connect(String addr).
*/
JNIEXPORT void JNICALL Java_org_zmq_Socket_connect (JNIEnv *env,
jobject obj,
jstring addr)
jobject obj, jstring addr)
{
void *s = get_socket (env, obj);
assert (s);
......@@ -276,9 +260,7 @@ JNIEXPORT void JNICALL Java_org_zmq_Socket_connect (JNIEnv *env,
* Called by Java's Socket::send(byte [] msg, long flags).
*/
JNIEXPORT jboolean JNICALL Java_org_zmq_Socket_send (JNIEnv *env,
jobject obj,
jbyteArray msg,
jlong flags)
jobject obj, jbyteArray msg, jlong flags)
{
void *s = get_socket (env, obj);
assert (s);
......@@ -316,8 +298,7 @@ JNIEXPORT jboolean JNICALL Java_org_zmq_Socket_send (JNIEnv *env,
/**
* Called by Java's Socket::flush().
*/
JNIEXPORT void JNICALL Java_org_zmq_Socket_flush (JNIEnv *env,
jobject obj)
JNIEXPORT void JNICALL Java_org_zmq_Socket_flush (JNIEnv *env, jobject obj)
{
void *s = get_socket (env, obj);
assert (s);
......@@ -334,8 +315,7 @@ JNIEXPORT void JNICALL Java_org_zmq_Socket_flush (JNIEnv *env,
* Called by Java's Socket::recv(long flags).
*/
JNIEXPORT jbyteArray JNICALL Java_org_zmq_Socket_recv (JNIEnv *env,
jobject obj,
jlong flags)
jobject obj, jlong flags)
{
void *s = get_socket (env, obj);
assert (s);
......@@ -357,10 +337,9 @@ JNIEXPORT jbyteArray JNICALL Java_org_zmq_Socket_recv (JNIEnv *env,
jbyteArray data = env->NewByteArray (zmq_msg_size (&message));
assert (data);
env->SetByteArrayRegion (data,
0,
zmq_msg_size (&message),
(jbyte*) zmq_msg_data (&message));
env->SetByteArrayRegion (data, 0, zmq_msg_size (&message),
(jbyte*) zmq_msg_data (&message));
return data;
}
......@@ -26,6 +26,10 @@ public class Context {
public static final int POLL = 1;
public static final int POLLIN = 1;
public static final int POLLOUT = 2;
public static final int POLLERR = 4;
/**
* Class constructor.
*
......@@ -36,6 +40,20 @@ public class Context {
construct (appThreads, ioThreads, flags);
}
/**
* Issue a poll call on the specified 0MQ sockets.
* This function is experimental and may change in the future.
*
* @param socket an array of 0MQ Socket objects to poll.
* @param event an array of short values specifying what to poll for.
* @param revent an array of short values with the results.
* @param timeout the maximum timeout in microseconds.
*/
public native long poll (Socket[] socket,
short[] event,
short[] revent,
long timeout);
/** Initialize the JNI interface */
protected native void construct (int appThreads, int ioThreads, int flags);
......@@ -50,10 +68,9 @@ public class Context {
* @return the internal 0MQ context handle.
*/
private long getContextHandle () {
return contextHandle;
return contextHandle;
}
/** Opaque data used by JNI driver. */
private long contextHandle;
}
......@@ -50,7 +50,6 @@ public class Socket {
public static final int SNDBUF = 11;
public static final int RCVBUF = 12;
/**
* Class constructor.
*
......@@ -127,11 +126,9 @@ public class Socket {
* @return the internal 0MQ socket handle.
*/
private long getSocketHandle () {
return socketHandle;
return socketHandle;
}
/** Opaque data used by JNI driver. */
private long socketHandle;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment