Skip to content

Commit 3fbbd83

Browse files
committed
Add support for non-blocking HTTP handlers
1 parent a4460ae commit 3fbbd83

File tree

2 files changed

+48
-8
lines changed

2 files changed

+48
-8
lines changed

python/mujinasync/asynchttp.py

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,26 @@
11
# -*- coding: utf-8 -*-
22

3+
from typing import Optional, Any
4+
35
from .asynctcp import TcpServer, TcpConnection, TcpClient
46

57
import logging
68
log = logging.getLogger(__name__)
79

10+
class HttpResponseProcessing(Exception):
11+
# Exception class that user logic can throw during HandleHttpRequest to indicate that they have received this
12+
# request, but are not yet ready to return the response (e.g, calling a slow external resource), and the
13+
# connection should not block for a response.
14+
# If this is thrown, then the active request will be stashed on the connection and the http server will
15+
# continue processing any other active requests
16+
pass
17+
818

919
class HttpConnection(TcpConnection):
10-
pass
20+
# If this connection has already parsed a request, and that request has been pushed to the caller but backpressured,
21+
# then it will be stashed on the connection so that it can be re-submitted each spin until the user logic is ready
22+
# to return a response for it.
23+
pendingRequest: Optional["HttpRequest"] = None
1124

1225
class HttpRequest(object):
1326
method = 'GET'
@@ -19,6 +32,10 @@ class HttpRequest(object):
1932

2033
response = None
2134

35+
# For user programs that make use of non-blocking handlers
36+
# Any information necessary to track the progress of the request can be stored here
37+
userState: Optional[Any] = None
38+
2239
def __repr__(self):
2340
return '<%s(%s)>' % (self.__class__.__name__, ', '.join([
2441
('%s=%r' % (key, getattr(self, key)))
@@ -73,19 +90,34 @@ def _HandleHttpReceiveOnce(self, connection):
7390
response = None
7491
try:
7592
response = self._HandleHttpRequest(connection, request)
93+
except HttpResponseProcessing:
94+
# If the current request would block, then cache it and re-submit it next loop to check if it is complete.
95+
# Send no response on the connection here - the request is still processing
96+
connection.pendingRequest = request
97+
connection.hasPendingWork = True
98+
return
7699
except Exception as e:
77100
log.exception('caught exception while handling http request %r: %s', request, e)
78-
finally:
79-
if response is None:
80-
response = HttpResponse(request, statusCode=500, statusText='Internal Server Error')
81-
log.verbose('sending http response: %r', response)
82-
self._SendHttpResponse(connection, request, response)
101+
102+
if response is None:
103+
response = HttpResponse(request, statusCode=500, statusText='Internal Server Error')
104+
105+
log.verbose('sending http response: %r', response)
106+
self._SendHttpResponse(connection, request, response)
107+
83108
return True # handled one request, try next one
84109

85110
def _HandleHttpRequest(self, connection, request):
86111
return self._CallApi('HandleHttpRequest', request=request, connection=connection, server=self)
87112

88113
def _TryReceiveHttpRequest(self, connection):
114+
# If this connection already has a pending request, pop it off and return it
115+
if connection.pendingRequest:
116+
request = connection.pendingRequest
117+
connection.pendingRequest = None
118+
connection.hasPendingWork = False
119+
return request
120+
89121
bufferData = connection.receiveBuffer.readView.tobytes()
90122
if b'\r\n\r\n' not in bufferData:
91123
if len(bufferData) > 10240:

python/mujinasync/asynctcp.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,15 @@ class TcpConnection(object):
7070
closeType = None # Immediate, AfterSend
7171
sendBuffer = None # buffer to hold data waiting to be sent
7272
receiveBuffer = None # buffer to hold data received before consumption
73+
hasPendingWork: bool # Should this socket be submitted as a 'readable' socket even if no new data is received?
7374

7475
def __init__(self, connectionSocket, remoteAddress):
7576
self.connectionSocket = connectionSocket
7677
self.remoteAddress = remoteAddress
7778
self.closeType = None
7879
self.sendBuffer = TcpBuffer()
7980
self.receiveBuffer = TcpBuffer()
81+
self.hasPendingWork = False
8082

8183
def __repr__(self):
8284
return '<%s(%s)>' % (self.__class__.__name__, ', '.join([
@@ -295,7 +297,7 @@ def SpinOnce(self, timeout=0):
295297
:param timeout: in seconds, pass in 0 to not wait for socket events, otherwise, will wait up to specified timeout
296298
"""
297299
newConnections = [] # list of tuple (serverClient, connection)
298-
300+
299301
# construct a list of connections to select on
300302
rsockets = []
301303
wsockets = []
@@ -330,7 +332,7 @@ def SpinOnce(self, timeout=0):
330332
client._connections.append(connection)
331333
newConnections.append((client, connection))
332334
timeout = 0 # force no wait at select later since we have a new connection to report right away
333-
335+
334336
# pool all the sockets
335337
socketConnections = {}
336338
for serverClient in self._servers + self._clients:
@@ -436,6 +438,12 @@ def SpinOnce(self, timeout=0):
436438
connection.connectionSocket = None
437439
serverClient._connections.remove(connection)
438440

441+
# Handle server sockets that are processing non-blocking work
442+
for server in self._servers:
443+
for connection in server._connections:
444+
if connection.hasPendingWork:
445+
receivedConnections.append((server, connection))
446+
439447
# let user code run at the very end
440448
for serverClient, connection in newConnections:
441449
serverClient._HandleTcpConnect(connection)

0 commit comments

Comments
 (0)