2828
2929import org .ros2 .rcljava .RCLJava ;
3030import org .ros2 .rcljava .common .JNIUtils ;
31- import org .ros2 .rcljava .concurrent .RCLFuture ;
3231import org .ros2 .rcljava .consumers .Consumer ;
3332import org .ros2 .rcljava .interfaces .MessageDefinition ;
3433import org .ros2 .rcljava .interfaces .ServiceDefinition ;
@@ -53,7 +52,7 @@ public class ClientImpl<T extends ServiceDefinition> implements Client<T> {
5352 private final WeakReference <Node > nodeReference ;
5453 private long handle ;
5554 private final String serviceName ;
56- private Map <Long , Map .Entry <Consumer , RCLFuture >> pendingRequests ;
55+ private Map <Long , Map .Entry <Consumer , ResponseFuture >> pendingRequests ;
5756
5857 private final ServiceDefinition serviceDefinition ;
5958
@@ -67,43 +66,52 @@ public ClientImpl(
6766 this .handle = handle ;
6867 this .serviceName = serviceName ;
6968 this .serviceDefinition = serviceDefinition ;
70- this .pendingRequests = new HashMap <Long , Map .Entry <Consumer , RCLFuture >>();
69+ this .pendingRequests = new HashMap <Long , Map .Entry <Consumer , ResponseFuture >>();
7170 }
7271
7372 public ServiceDefinition getServiceDefinition () {
7473 return this .serviceDefinition ;
7574 }
7675
77- public final <U extends MessageDefinition , V extends MessageDefinition > Future <V >
76+ public final <U extends MessageDefinition , V extends MessageDefinition > ResponseFuture <V >
7877 asyncSendRequest (final U request ) {
7978 return asyncSendRequest (request , new Consumer <Future <V >>() {
8079 public void accept (Future <V > input ) {}
8180 });
8281 }
8382
84- public final <U extends MessageDefinition , V extends MessageDefinition > Future <V >
83+ public final <U extends MessageDefinition , V extends MessageDefinition > ResponseFuture <V >
8584 asyncSendRequest (final U request , final Consumer <Future <V >> callback ) {
8685 synchronized (pendingRequests ) {
8786 long sequenceNumber = nativeSendClientRequest (
8887 handle , request .getFromJavaConverterInstance (),
8988 request .getDestructorInstance (), request );
90- RCLFuture <V > future = new RCLFuture <V >();
89+ ResponseFuture <V > future = new ResponseFuture <V >(sequenceNumber );
9190
92- Map .Entry <Consumer , RCLFuture > entry =
93- new AbstractMap .SimpleEntry <Consumer , RCLFuture >(callback , future );
91+ Map .Entry <Consumer , ResponseFuture > entry =
92+ new AbstractMap .SimpleEntry <Consumer , ResponseFuture >(callback , future );
9493 pendingRequests .put (sequenceNumber , entry );
9594 return future ;
9695 }
9796 }
9897
98+ public final <V extends MessageDefinition > boolean
99+ removePendingRequest (ResponseFuture <V > future ) {
100+ synchronized (pendingRequests ) {
101+ Map .Entry <Consumer , ResponseFuture > entry = pendingRequests .remove (
102+ future .getRequestSequenceNumber ());
103+ return entry != null ;
104+ }
105+ }
106+
99107 public final <U extends MessageDefinition > void handleResponse (
100108 final RMWRequestId header , final U response ) {
101109 synchronized (pendingRequests ) {
102110 long sequenceNumber = header .sequenceNumber ;
103- Map .Entry <Consumer , RCLFuture > entry = pendingRequests .remove (sequenceNumber );
111+ Map .Entry <Consumer , ResponseFuture > entry = pendingRequests .remove (sequenceNumber );
104112 if (entry != null ) {
105113 Consumer <Future > callback = entry .getKey ();
106- RCLFuture <U > future = entry .getValue ();
114+ ResponseFuture <U > future = entry .getValue ();
107115 future .set (response );
108116 callback .accept (future );
109117 return ;
0 commit comments