Skip to content
This repository was archived by the owner on Sep 19, 2021. It is now read-only.

Commit c61ff47

Browse files
committed
Finish Integration With Relay
Added a system called the Timeline which allows work to be set to run at specific points in time. This allows work to be queued. To allow the relay server data to merge in with the server, a work item is set to run every minute. When it finishs, it sets itself to run again in a minute. When data needs to be sent to the relay, a work item is added to the timeline so that the current connection can be closed before sending data to the relay. The timeline allowed for the Hub to be removed as work items to handle connections could be added to the timeline. Closes #4.
1 parent 29c811d commit c61ff47

4 files changed

Lines changed: 101 additions & 53 deletions

File tree

src/codeu/chat/RelayMain.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616

1717
import java.io.IOException;
1818

19-
import codeu.chat.common.Hub;
2019
import codeu.chat.relay.Server;
2120
import codeu.chat.relay.ServerFrontEnd;
2221
import codeu.chat.util.Logger;
22+
import codeu.chat.util.Timeline;
2323
import codeu.chat.util.connections.Connection;
2424
import codeu.chat.util.connections.ConnectionSource;
2525
import codeu.chat.util.connections.ServerConnectionSource;
@@ -56,39 +56,39 @@ public static void main(String[] args) {
5656
private static void startRelay(ConnectionSource source) {
5757

5858
final Server relay = new Server(1024, 16);
59-
6059
LOG.info("Relay object created.");
6160

62-
// TODO: Load team information
63-
6461
final ServerFrontEnd frontEnd = new ServerFrontEnd(relay);
65-
6662
LOG.info("Relay front end object created.");
6763

68-
LOG.info("Starting relay main loop...");
69-
70-
final Runnable hub = new Hub(source, new Hub.Handler() {
71-
72-
@Override
73-
public void handle(Connection connection) throws Exception {
74-
75-
frontEnd.handleConnection(connection);
64+
final Timeline timeline = new Timeline();
65+
LOG.info("Relay timeline created.");
7666

77-
}
78-
79-
@Override
80-
public void onException(Exception ex) {
67+
// TODO: Load team information
8168

82-
System.out.println("ERROR: front end failed to handle connection. Check log for details.");
83-
LOG.error(ex, "Exception handling connection.");
69+
LOG.info("Starting relay main loop...");
8470

71+
while (true) {
72+
try {
73+
74+
LOG.info("Established connection...");
75+
final Connection connection = source.connect();
76+
LOG.info("Connection established.");
77+
78+
timeline.scheduleNow(new Runnable() {
79+
@Override
80+
public void run() {
81+
try {
82+
frontEnd.handleConnection(connection);
83+
} catch (Exception ex) {
84+
LOG.error(ex, "Exception handling connection.");
85+
}
86+
}
87+
});
88+
89+
} catch (IOException ex) {
90+
LOG.error(ex, "Failed to establish connection.");
8591
}
86-
});
87-
88-
LOG.info("Starting hub...");
89-
90-
hub.run();
91-
92-
LOG.info("Hub exited.");
92+
}
9393
}
9494
}

src/codeu/chat/ServerMain.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import java.io.IOException;
1919

20-
import codeu.chat.common.Hub;
2120
import codeu.chat.common.Relay;
2221
import codeu.chat.common.Secret;
2322
import codeu.chat.common.Uuid;

src/codeu/chat/server/Server.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ public void run() {
8080

8181
}
8282

83-
// Do this all again in 60 seconds
84-
timeline.scheduleIn(60000, this);
83+
// Do this again in 5 seconds
84+
timeline.scheduleIn(5000, this);
8585
}
8686
});
8787
}

src/codeu/chat/util/Timeline.java

Lines changed: 73 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
// code has been executed.
2727
public final class Timeline {
2828

29+
private final static Logger.Log LOG = Logger.newLog(Timeline.class);
30+
2931
private static final class Event implements Comparable<Event> {
3032

3133
public final long time;
@@ -49,29 +51,55 @@ public int compareTo(Event other) {
4951

5052
// This thread is used to track the time of events and moves events from the
5153
// "backlog" queue to the "todo" queue when it is time to execute. They are
52-
// seperated to allow this thread to be safely interrupted.
54+
// separated to allow this thread to be safely interrupted.
5355
private final Thread scheduler = new Thread() {
5456
@Override
5557
public void run() {
5658
while (running) {
57-
final long now = System.currentTimeMillis();
58-
final Event next = backlog.poll();
59+
60+
Event next;
61+
5962
try {
60-
if (next == null) {
61-
Thread.sleep(10000);
62-
} else if (next.time > now) {
63-
// Put it back (it's not time).
64-
while (!backlog.offer(next)) {
65-
// force this to go through
66-
}
67-
Thread.sleep(next.time - now);
63+
next = backlog.take();
64+
} catch (InterruptedException ex) {
65+
// Rather than try to handle the exception here, set "next"
66+
// to null and let the normal flow handle the case.
67+
next = null;
68+
}
69+
70+
long sleep = 0;
71+
72+
if (next != null) {
73+
74+
final long now = System.currentTimeMillis();
75+
76+
// Check which queue the event should be added to. If it
77+
// is time to execute, it should be added to the "todo"
78+
// queue. If it is not time, it should be added back to the
79+
// "backlog".
80+
// If the item is added back to the backlog, we know how long
81+
// it will be until it will be executed. That means we can sleep
82+
// until then.
83+
if (next.time <= now) {
84+
forceAdd(todo, next.callback);
85+
sleep = 0;
6886
} else {
69-
while (!todo.offer(next.callback)) {
70-
// force this to go through
71-
}
87+
// Put it back (it's not time).
88+
forceAdd(backlog, next);
89+
sleep = next.time - now;
90+
}
91+
}
92+
93+
if (sleep > 0) {
94+
try {
95+
Thread.sleep(sleep);
96+
} catch (InterruptedException ex) {
97+
// There are two cases this will happen:
98+
// 1. A new item was added and we are being woken to
99+
// check if we need to update the time.
100+
// 2. It is time to exit and we need to wake-up so that
101+
// we can check that "running" is "false".
72102
}
73-
} catch (InterruptedException ex) {
74-
// A new event was added - need to recheck all the times.
75103
}
76104
}
77105
}
@@ -87,8 +115,11 @@ public void run() {
87115
try {
88116
todo.take().run();
89117
} catch (Exception ex) {
90-
// Catch all exceptions here to stop any rouge action from
118+
// Catch all exceptions here to stop any rogue action from
91119
// take down the timeline.
120+
LOG.warning(
121+
"An exception was seen on the timeline (%s)",
122+
ex.toString());
92123
}
93124
}
94125
}
@@ -120,27 +151,45 @@ public void scheduleIn(long ms, Runnable callback) {
120151
// point in time.
121152
public void scheduleAt(long timeMs, Runnable callback) {
122153
final Event event = new Event(timeMs, callback);
123-
while (!backlog.offer(event)) {
124-
// force add
125-
}
154+
forceAdd(backlog, event);
126155
scheduler.interrupt(); // wake it up
127156
}
128157

158+
// STOP
159+
//
160+
// Tell the timeline to shutdown. This is a non-blocking call.
129161
public void stop() {
130162
running = false;
131-
forceStop(executor);
132-
forceStop(scheduler);
163+
164+
// Interrupt does not force a thread to exit. It signal's the
165+
// thead that it is time to stop execution. As the threads may
166+
// be sleeping, this will force them awake.
167+
executor.interrupt();
168+
scheduler.interrupt();
133169
}
134170

135-
private static void forceStop(Thread thread) {
171+
// JOIN
172+
//
173+
// Wait for the timeline to shutdown. This is a blocking call.
174+
public void join() {
175+
forceJoin(executor);
176+
forceJoin(scheduler);
177+
}
178+
179+
private static void forceJoin(Thread thread) {
136180
while (true) {
137181
try {
138-
thread.interrupt();
139182
thread.join();
140183
break;
141184
} catch (InterruptedException ex) {
142185
// Do nothing - allow this to try again.
143186
}
144187
}
145188
}
189+
190+
private static <T> void forceAdd(BlockingQueue<T> queue, T value) {
191+
while (!queue.offer(value)) {
192+
// try again...
193+
}
194+
}
146195
}

0 commit comments

Comments
 (0)