Skip to content
This repository was archived by the owner on Jun 7, 2021. It is now read-only.

Commit 24466e3

Browse files
committed
Catch closed streams when returning generators
1 parent 11cf50b commit 24466e3

1 file changed

Lines changed: 18 additions & 6 deletions

File tree

webthing/server.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66
import sys
77
import typing
88
import types
9+
import logging
910
import asyncio
1011
import tornado.concurrent
1112
import tornado.gen
1213
import tornado.httpserver
1314
import tornado.ioloop
1415
import tornado.web
1516
import tornado.websocket
17+
from tornado.iostream import StreamClosedError
1618

1719
from .errors import PropertyError
1820
from .subscriber import Subscriber
@@ -59,6 +61,12 @@ def options(self, *args, **kwargs):
5961
"""Handle an OPTIONS request."""
6062
self.set_status(204)
6163

64+
async def write_and_flush(self, data):
65+
# Write data to memory
66+
self.write(data)
67+
# Write data to network
68+
await self.flush()
69+
6270
async def represent_response(
6371
self, data, content_type: str = "application/json", headers: dict = None
6472
):
@@ -78,16 +86,20 @@ async def represent_response(
7886

7987
if isinstance(data, typing.AsyncGenerator):
8088
async for frame in data:
81-
# Write data to memory
82-
self.write(frame)
8389
# Write data to network
84-
await self.flush()
90+
try:
91+
await self.write_and_flush(frame)
92+
except StreamClosedError:
93+
logging.warning("Stream is closed")
94+
break
8595
elif isinstance(data, types.GeneratorType):
8696
for frame in data:
87-
# Write data to memory
88-
self.write(frame)
8997
# Write data to network
90-
await self.flush()
98+
try:
99+
await self.write_and_flush(frame)
100+
except StreamClosedError:
101+
logging.warning("Stream is closed")
102+
break
91103
# If the response data is not a generator
92104
else:
93105
# If the response contentType is JSON, format data first

0 commit comments

Comments
 (0)