2929import java .util .logging .Level ;
3030
3131class ChannelExecutor implements Runnable {
32+ //@ApiStatus.Experimental // Temporary option for testing performance effect of using yield on Windows
33+ private static final boolean USE_YIELD_ON_WINDOWS = Boolean .getBoolean ("TimeBase.network.executor.windows.useYield" );
34+
3235 private static volatile ChannelExecutor INSTANCE ;
3336
3437 private static ChannelExecutor createInstance (AffinityConfig affinityConfig ) {
@@ -37,6 +40,12 @@ private static ChannelExecutor createInstance(AffinityConfig affinityConfig) {
3740 return executor ;
3841 }
3942
43+ @ SuppressWarnings ("SameParameterValue" )
44+ //@VisibleForTesting
45+ static ChannelExecutor createNonSharedTestInstance (AffinityConfig affinityConfig ) {
46+ return createInstance (affinityConfig );
47+ }
48+
4049 public static ChannelExecutor getInstance (AffinityConfig affinityConfig ) {
4150 // "Double checked lock" (via volatile)
4251 if (INSTANCE == null ) {
@@ -51,11 +60,11 @@ public static ChannelExecutor getInstance(AffinityConfig affinityConfig) {
5160
5261 private final QuickList <Entry > channels = new QuickList <>();
5362 private boolean stopped = false ;
54- private final CPUEater cpuEater ;
63+ private final CPUEater cpuEater ; // Used only for Windows
5564 private final int idleTime ;
5665 private final Thread thread ;
5766
58- private static ChannelExecutor create (AffinityConfig affinityConfig ) {
67+ static ChannelExecutor create (AffinityConfig affinityConfig ) {
5968 ThreadFactory factory = new AffinityThreadFactoryBuilder (affinityConfig )
6069 .setNameFormat ("ChannelExecutor Thread" )
6170 .setDaemon (true )
@@ -66,7 +75,7 @@ private static ChannelExecutor create(AffinityConfig affinityConfig) {
6675
6776 private ChannelExecutor (ThreadFactory factory ) {
6877 idleTime = VSProtocol .getIdleTime ();
69- cpuEater = new CPUEater (idleTime );
78+ cpuEater = Util . IS_WINDOWS_OS ? new CPUEater (idleTime ) : null ;
7079
7180 this .thread = factory .newThread (this );
7281 }
@@ -81,65 +90,124 @@ public void shutdown () {
8190 }
8291
8392 public void addChannel (VSChannel channel ) {
93+ assert channel != null ;
94+
95+ if (channel == null )
96+ return ;
97+
8498 synchronized (channels ) {
8599 channels .linkLast (new Entry (channel ));
86100 }
87101
88102 wakeup ();
89103 }
90104
105+ public void removeChannel (VSChannel channel ) {
106+ assert channel != null ;
107+
108+
109+ synchronized (channels ) {
110+ Entry entry = channels .getFirst ();
111+
112+ while (entry != null ) {
113+ if (entry .channel .equals (channel )) {
114+ remove (entry );
115+ return ;
116+ } else {
117+ entry = entry .next ();
118+ }
119+ }
120+ }
121+
122+ wakeup ();
123+ }
124+
91125 @ Override
92126 public void run () {
93127 assert Thread .currentThread () == this .thread ;
94128
95129 while (!stopped ) {
96130 Entry entry ;
131+ boolean isEmpty ;
97132
133+ long bytesSent = 0 ;
98134 synchronized (channels ) {
99135 entry = channels .getFirst ();
100- }
101-
102- if (entry == null ) {
103- LockSupport .park ();
104-
105- if (Thread .interrupted ()) {
106- if (stopped )
107- break ;
108- }
109- }
136+ isEmpty = entry == null ;
110137
111- synchronized (channels ) {
112- entry = channels .getFirst ();
113138 while (entry != null ) {
114-
115139 VSChannel channel = entry .channel ;
116140 try {
117- if (channel != null && channel .getNoDelay () && channel .getState () == VSChannelState .Connected ) {
118- VSOutputStream out = channel .getOutputStream ();
119- out .flushAvailable ();
120-
121- entry = entry .next ();
122- } else if (channel != null ) {
123- if (channel .getState () == VSChannelState .Removed || channel .getState () == VSChannelState .Closed )
141+ switch (channel .getState ()) {
142+ case Connected : {
143+ if (channel .getNoDelay ()) {
144+ // Flush
145+ VSOutputStream out = channel .getOutputStream ();
146+ // We do not want to send all at once, we will, re-try send shortly
147+ bytesSent += out .flushAvailable (false );
148+ }
149+ break ;
150+ }
151+ case Removed :
152+ case Closed : {
124153 entry = remove (entry );
154+ continue ;
155+ }
125156 }
126157 } catch (ChannelClosedException e ) {
127158 // ignore
128159 entry = remove (entry );
160+ continue ;
129161 } catch (IOException e ) {
130162 VSProtocol .LOGGER .log (Level .WARNING , "Exception while flushing data" , e );
131163 }
164+
165+ // Move to the next channel
166+ entry = entry .next ();
167+ }
168+ }
169+
170+ if (isEmpty ) {
171+ // No channels to process => Wait for channels to be added.
172+ LockSupport .park ();
173+
174+ if (Thread .interrupted ()) {
175+ if (stopped ) {
176+ break ;
177+ }
178+ }
179+ } else {
180+ // Do not wait if we have sent any data.
181+ // It's very likely that it's time to send more because the sending time is relatively long.
182+ if (bytesSent == 0 ) {
183+ // Wait till next time to flush channels
184+ idleWait ();
132185 }
133186 }
187+ }
188+ }
134189
135- if (!Util .IS_WINDOWS_OS ) {
136- LockSupport .parkNanos (idleTime );
190+ private void idleWait () {
191+ if (!Util .IS_WINDOWS_OS ) {
192+ if (USE_YIELD_ON_WINDOWS ) {
193+ waitWithYield (idleTime );
137194 } else {
138- if (TimeKeeper .getMode () == TimeKeeper .Mode .HIGH_RESOLUTION_SYNC_BACK )
139- TimeKeeper .parkNanos (idleTime );
140- else
141- cpuEater .run ();
195+ LockSupport .parkNanos (idleTime );
142196 }
197+ } else {
198+ if (TimeKeeper .getMode () == TimeKeeper .Mode .HIGH_RESOLUTION_SYNC_BACK ) {
199+ TimeKeeper .parkNanos (idleTime );
200+ } else {
201+ cpuEater .run ();
202+ }
203+ }
204+ }
205+
206+ private static void waitWithYield (int idleTime ) {
207+ long start = System .nanoTime ();
208+ long end = start + idleTime ;
209+ while (System .nanoTime () < end ) {
210+ Thread .yield ();
143211 }
144212 }
145213
@@ -150,7 +218,7 @@ private Entry remove(Entry entry) {
150218 }
151219
152220 private static class Entry extends QuickList .Entry <Entry > {
153- VSChannel channel ;
221+ final VSChannel channel ;
154222
155223 private Entry (VSChannel channel ) {
156224 this .channel = channel ;
@@ -162,7 +230,7 @@ private static class CPUEater {
162230 private final long cycles ;
163231
164232 private final MemoryDataOutput out = new MemoryDataOutput ();
165- private final double value = 345.56787899 ;
233+ private static final double value = 345.56787899 ;
166234
167235 private CPUEater (long nanos ) {
168236 this .avgCostOfNanoTimeCall = nanoTimeCost ();
0 commit comments