@@ -199,7 +199,8 @@ def Destroy(self):
199
199
self .SetDestroy ()
200
200
if self ._backgroundThread is not None :
201
201
# make sure to stop subscriptions and close the websocket first
202
- self ._backgroundThread .RunCoroutine (self ._StopAllSubscriptions ()).result ()
202
+ with self ._subscriptionLock :
203
+ self ._backgroundThread .RunCoroutine (self ._StopAllSubscriptions (ControllerGraphClientException (_ ('Shutting down' )))).result ()
203
204
# next destroy the thread
204
205
self ._backgroundThread .Destroy ()
205
206
self ._backgroundThread = None
@@ -488,13 +489,14 @@ async def _ListenToWebSocket(self):
488
489
with self ._subscriptionLock :
489
490
await self ._StopAllSubscriptions (ControllerGraphClientException (_ ('Failed to listen to WebSocket: %s' ) % (e )))
490
491
491
- async def _StopAllSubscriptions (self , error : Optional [ControllerGraphClientException ] = None ):
492
+ async def _StopAllSubscriptions (self , error : Optional [ControllerGraphClientException ]):
493
+ """Needs to run under self._subscriptionLock
494
+ """
492
495
# close the websocket
493
496
await self ._CloseWebSocket ()
494
497
# send a message back to the callers using the callback function and drop all subscriptions
495
- if error is not None :
496
- for subscriptionId , subscription in self ._subscriptions .items ():
497
- subscription .GetSubscriptionCallbackFunction ()(error = error , response = None )
498
+ for subscriptionId , subscription in self ._subscriptions .items ():
499
+ subscription .GetSubscriptionCallbackFunction ()(error = error , response = None )
498
500
self ._subscriptions .clear ()
499
501
500
502
def SubscribeGraphAPI (self , query : str , callbackFunction : Callable [[Optional [str ], Optional [dict ]], None ], variables : Optional [dict ] = None ) -> Subscription :
0 commit comments