Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions rcljava/include/org_ros2_rcljava_executors_BaseExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetInit(
JNIEXPORT void
JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeDisposeWaitSet(JNIEnv *, jclass, jlong);

/*
* Class: org_ros2_rcljava_executors_BaseExecutor
* Method: nativeWaitSetResize
* Signature: (JJ)V
Comment thread
ivanpauno marked this conversation as resolved.
Outdated
*/
JNIEXPORT void
JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetResize(
JNIEnv *, jclass, jlong, jint, jint, jint, jint, jint, jint);

/*
* Class: org_ros2_rcljava_executors_BaseExecutor
* Method: nativeWaitSetClear
Expand Down
18 changes: 18 additions & 0 deletions rcljava/src/main/cpp/org_ros2_rcljava_executors_BaseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,24 @@ Java_org_ros2_rcljava_executors_BaseExecutor_nativeDisposeWaitSet(
}
}

JNIEXPORT void
JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetResize(
JNIEnv * env, jclass, jlong wait_set_handle, jint number_of_subscriptions,
jint number_of_guard_conditions, jint number_of_timers, jint number_of_clients,
jint number_of_services, jint number_of_events)
{
rcl_wait_set_t * wait_set = reinterpret_cast<rcl_wait_set_t *>(wait_set_handle);

rcl_ret_t ret = rcl_wait_set_resize(
wait_set, number_of_subscriptions, number_of_guard_conditions, number_of_timers,
number_of_clients, number_of_services, number_of_events);
if (ret != RCL_RET_OK) {
std::string msg = "Failed to resize wait set: " + std::string(rcl_get_error_string().str);
rcl_reset_error();
rcljava_throw_rclexception(env, ret, msg);
}
}

JNIEXPORT void JNICALL
Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetClear(
JNIEnv * env, jclass, jlong wait_set_handle)
Expand Down
189 changes: 91 additions & 98 deletions rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

import javax.swing.Action;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -76,6 +77,20 @@ public class BaseExecutor {

private List<Map.Entry<Long, ActionServer>> actionServerHandles = new ArrayList<Map.Entry<Long, ActionServer>>();

private long waitSetHandle = 0;

public BaseExecutor() {
this.waitSetHandle = nativeGetZeroInitializedWaitSet();
long contextHandle = RCLJava.getDefaultContext().getHandle();
nativeWaitSetInit(
this.waitSetHandle, contextHandle, 0, 0,
0, 0, 0, 0);
}

public void dispose() {
nativeDisposeWaitSet(this.waitSetHandle);
}

protected void addNode(ComposableNode node) {
this.nodes.add(node);
}
Expand Down Expand Up @@ -225,38 +240,28 @@ protected void waitForWork(long timeout) {
}
}

int subscriptionsSize = 0;
int timersSize = 0;
int clientsSize = 0;
int servicesSize = 0;
int subscriptionsSize = this.subscriptionHandles.size();
int timersSize = this.timerHandles.size();
int clientsSize = this.clientHandles.size();
int servicesSize = this.serviceHandles.size();
int eventsSize = this.eventHandles.size();

for (ComposableNode node : this.nodes) {
subscriptionsSize += node.getNode().getSubscriptions().size();
timersSize += node.getNode().getTimers().size();
clientsSize += node.getNode().getClients().size();
servicesSize += node.getNode().getServices().size();

for (ActionServer actionServer : node.getNode().getActionServers()) {
subscriptionsSize += actionServer.getNumberOfSubscriptions();
timersSize += actionServer.getNumberOfTimers();
clientsSize += actionServer.getNumberOfClients();
servicesSize += actionServer.getNumberOfServices();
}
for (Map.Entry<Long, ActionServer> entry : this.actionServerHandles) {
ActionServer actionServer = entry.getValue();
subscriptionsSize += actionServer.getNumberOfSubscriptions();
timersSize += actionServer.getNumberOfTimers();
clientsSize += actionServer.getNumberOfClients();
servicesSize += actionServer.getNumberOfServices();
}

if (subscriptionsSize == 0 && timersSize == 0 && clientsSize == 0 && servicesSize == 0) {
return;
}

long waitSetHandle = nativeGetZeroInitializedWaitSet();
long contextHandle = RCLJava.getDefaultContext().getHandle();
nativeWaitSetInit(
waitSetHandle, contextHandle, subscriptionsSize, 0,
long waitSetHandle = this.waitSetHandle;
nativeWaitSetResize(
waitSetHandle, subscriptionsSize, 0,
timersSize, clientsSize, servicesSize, eventsSize);

nativeWaitSetClear(waitSetHandle);

for (Map.Entry<Long, Subscription> entry : this.subscriptionHandles) {
nativeWaitSetAddSubscription(waitSetHandle, entry.getKey());
}
Expand All @@ -282,93 +287,70 @@ protected void waitForWork(long timeout) {
}

nativeWait(waitSetHandle, timeout);

for (int i = 0; i < this.subscriptionHandles.size(); ++i) {
if (!nativeWaitSetSubscriptionIsReady(waitSetHandle, i)) {
this.subscriptionHandles.get(i).setValue(null);
}
}

for (int i = 0; i < this.timerHandles.size(); ++i) {
if (!nativeWaitSetTimerIsReady(waitSetHandle, i)) {
this.timerHandles.get(i).setValue(null);
}
}

for (int i = 0; i < this.serviceHandles.size(); ++i) {
if (!nativeWaitSetServiceIsReady(waitSetHandle, i)) {
this.serviceHandles.get(i).setValue(null);
}
}

for (int i = 0; i < this.clientHandles.size(); ++i) {
if (!nativeWaitSetClientIsReady(waitSetHandle, i)) {
this.clientHandles.get(i).setValue(null);
}
}

for (int i = 0; i < this.eventHandles.size(); ++i) {
if (!nativeWaitSetEventIsReady(waitSetHandle, i)) {
this.eventHandles.get(i).setValue(null);
}
}

for (Map.Entry<Long, ActionServer> entry : this.actionServerHandles) {
if (!entry.getValue().isReady(waitSetHandle)) {
entry.setValue(null);
}
}

Iterator<Map.Entry<Long, Subscription>> subscriptionIterator =
this.subscriptionHandles.iterator();
while (subscriptionIterator.hasNext()) {
Map.Entry<Long, Subscription> entry = subscriptionIterator.next();
if (entry.getValue() == null) {
subscriptionIterator.remove();
{
int waitSetIndex = 0;
Iterator<Map.Entry<Long, Subscription>> it = this.subscriptionHandles.iterator();
while (it.hasNext()) {
it.next();
if (!nativeWaitSetSubscriptionIsReady(waitSetHandle, waitSetIndex)) {
it.remove();
}
++waitSetIndex;
}
}

Iterator<Map.Entry<Long, Timer>> timerIterator = this.timerHandles.iterator();
while (timerIterator.hasNext()) {
Map.Entry<Long, Timer> entry = timerIterator.next();
if (entry.getValue() == null) {
timerIterator.remove();
{
int waitSetIndex = 0;
Iterator<Map.Entry<Long, Timer>> it = this.timerHandles.iterator();
while (it.hasNext()) {
it.next();
if (!nativeWaitSetTimerIsReady(waitSetHandle, waitSetIndex)) {
it.remove();
}
++waitSetIndex;
}
}

Iterator<Map.Entry<Long, Service>> serviceIterator = this.serviceHandles.iterator();
while (serviceIterator.hasNext()) {
Map.Entry<Long, Service> entry = serviceIterator.next();
if (entry.getValue() == null) {
serviceIterator.remove();
{
int waitSetIndex = 0;
Iterator<Map.Entry<Long, Service>> it = this.serviceHandles.iterator();
while (it.hasNext()) {
it.next();
if (!nativeWaitSetServiceIsReady(waitSetHandle, waitSetIndex)) {
it.remove();
}
++waitSetIndex;
}
}

Iterator<Map.Entry<Long, Client>> clientIterator = this.clientHandles.iterator();
while (clientIterator.hasNext()) {
Map.Entry<Long, Client> entry = clientIterator.next();
if (entry.getValue() == null) {
clientIterator.remove();
{
int waitSetIndex = 0;
Iterator<Map.Entry<Long, Client>> it = this.clientHandles.iterator();
while (it.hasNext()) {
it.next();
if (!nativeWaitSetClientIsReady(waitSetHandle, waitSetIndex)) {
it.remove();
}
++waitSetIndex;
}
}

Iterator<Map.Entry<Long, EventHandler>> eventIterator = this.eventHandles.iterator();
while (eventIterator.hasNext()) {
Map.Entry<Long, EventHandler> entry = eventIterator.next();
if (entry.getValue() == null) {
eventIterator.remove();
{
int waitSetIndex = 0;
Iterator<Map.Entry<Long, EventHandler>> it = this.eventHandles.iterator();
while (it.hasNext()) {
it.next();
if (!nativeWaitSetEventIsReady(waitSetHandle, waitSetIndex)) {
it.remove();
}
++waitSetIndex;
}
}

Iterator<Map.Entry<Long, ActionServer>> actionServerIterator = this.actionServerHandles.iterator();
while (actionServerIterator.hasNext()) {
Map.Entry<Long, ActionServer> entry = actionServerIterator.next();
if (entry.getValue() == null) {
actionServerIterator.remove();
{
Iterator<Map.Entry<Long, ActionServer>> it = this.actionServerHandles.iterator();
while (it.hasNext()) {
Map.Entry<Long, ActionServer> entry = it.next();
if (!entry.getValue().isReady(waitSetHandle)) {
it.remove();
}
}
}

nativeDisposeWaitSet(waitSetHandle);
}

protected AnyExecutable getNextExecutable() {
Expand Down Expand Up @@ -463,6 +445,9 @@ public void spinUntilComplete(Future future, long maxDurationNs) {
anyExecutable = getNextExecutable();
}
}
if (!RCLJava.ok()) {
this.dispose();
}
}

private void spinSomeImpl(long maxDurationNs, boolean exhaustive) {
Expand All @@ -483,6 +468,9 @@ private void spinSomeImpl(long maxDurationNs, boolean exhaustive) {
workAvailable = false;
}
}
if (!RCLJava.ok()) {
this.dispose();
}
}

protected void spinSome(long maxDurationNs) {
Expand Down Expand Up @@ -513,6 +501,11 @@ private static native void nativeWaitSetInit(
long waitSetHandle, long contextHandle, int numberOfSubscriptions,
int numberOfGuardConditions, int numberOfTimers, int numberOfClients,
int numberOfServices, int numberOfEvents);

private static native void nativeWaitSetResize(
long waitSetHandle, int numberOfSubscriptions,
int numberOfGuardConditions, int numberOfTimers, int numberOfClients,
int numberOfServices, int numberOfEvents);

private static native void nativeWaitSetClear(long waitSetHandle);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,8 @@ private void run() {
this.spinOnce();
}
}
if (!RCLJava.ok()) {
this.baseExecutor.dispose();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,8 @@ public void spin() {
while (RCLJava.ok()) {
this.spinOnce();
}
if (!RCLJava.ok()) {
this.baseExecutor.dispose();
}
}
}