Skip to content

fix(websockets): Initialise io_loop in multi-worker environments #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions bokeh_django/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,30 +214,33 @@ def application_context(self) -> ApplicationContext:
if self._application_context is None:
self._application_context = self.scope["url_route"]["kwargs"]["app_context"]

if self._application_context.io_loop is None:
raise RuntimeError("io_loop should already been set")
# Explicitly set io_loop here (likely running in multi-worker environment)
if self._application_context._loop is None:
self._application_context._loop = IOLoop.current()
log.debug("io_loop has been re-set")

return self._application_context

async def connect(self):
log.info('WebSocket connection opened')

subprotocols = self.scope["subprotocols"]
if len(subprotocols) != 2 or subprotocols[0] != 'bokeh':
self.close()
await self.close()
raise RuntimeError("Subprotocol header is not 'bokeh'")

token = subprotocols[1]
if token is None:
self.close()
await self.close()
raise RuntimeError("No token received in subprotocol header")

now = calendar.timegm(dt.datetime.utcnow().utctimetuple())
now = calendar.timegm(dt.datetime.now(dt.UTC).utctimetuple())
payload = get_token_payload(token)
if 'session_expiry' not in payload:
self.close()
await self.close()
raise RuntimeError("Session expiry has not been provided")
elif now >= payload['session_expiry']:
self.close()
await self.close()
raise RuntimeError("Token is expired.")
elif not check_token_signature(token,
signed=False,
Expand All @@ -252,7 +255,7 @@ def on_fully_opened(future):
# this isn't really an error (unless we have a
# bug), it just means a client disconnected
# immediately, most likely.
log.debug("Failed to fully open connlocksection %r", e)
log.debug("Failed to fully open connection %r", e)

future = self._async_open(token)

Expand All @@ -263,7 +266,9 @@ def on_fully_opened(future):
await self.accept("bokeh")

async def disconnect(self, close_code):
self.connection.session.destroy()
if hasattr(self, "connection"):
self.connection.session.destroy()
await super().disconnect(close_code)

async def receive(self, text_data) -> None:
fragment = text_data
Expand All @@ -277,8 +282,19 @@ async def receive(self, text_data) -> None:
async def _async_open(self, token: str) -> None:
try:
session_id = get_session_id(token)
await self.application_context.create_session_if_needed(session_id, self.request, token)
session = self.application_context.get_session(session_id)

# Ensure io_loop is set before creating session (likely running in multi-worker environment)
if self._application_context._loop is None:
self._application_context._loop = IOLoop.current()
log.debug("io_loop has been re-set")

# Try to create or get session
try:
session = await self.application_context.create_session_if_needed(session_id, self.request, token)

except Exception as e:
log.error("Error creating session: %s", e)
raise e

protocol = Protocol()
self.receiver = Receiver(protocol)
Expand All @@ -292,7 +308,7 @@ async def _async_open(self, token: str) -> None:

except Exception as e:
log.error("Could not create new server session, reason: %s", e)
self.close()
await self.close()
raise e

msg = self.connection.protocol.create('ACK')
Expand Down