Revised sync mode WebClient/RTMClient to address concurrency issues#662
Conversation
Codecov Report
@@ Coverage Diff @@
## master #662 +/- ##
==========================================
- Coverage 86.19% 85.28% -0.92%
==========================================
Files 17 17
Lines 2413 2568 +155
Branches 198 237 +39
==========================================
+ Hits 2080 2190 +110
- Misses 262 284 +22
- Partials 71 94 +23
Continue to review full report at Codecov.
|
| callback, rtm_client=self, web_client=web_client, data=data | ||
| ) | ||
|
|
||
| while future.running(): |
| @RTMClient.run_on(event="message") | ||
| # even though run_async=False, handlers for RTM events can be a coroutine | ||
| async def send_reply(**payload): | ||
| def send_reply(**payload): |
There was a problem hiding this comment.
coroutines no longer work when run_async=False. I think it's much more valid.
| self.web_client = WebClient( | ||
| token=self.bot_token, | ||
| run_async=False, | ||
| loop=asyncio.new_event_loop(), # TODO: this doesn't work without this |
There was a problem hiding this comment.
unnecessary as run_async=False no longer uses an event loop internally
|
|
||
|
|
||
| # This doesn't work | ||
| # Fixed in 2.6.0: This doesn't work |
There was a problem hiding this comment.
WebClient w/ run_async=False is now thread-safe.
| ): | ||
| self.token = token.strip() | ||
| self.run_async = run_async | ||
| self.thread_pool_executor = ThreadPoolExecutor( |
There was a problem hiding this comment.
this is not so critical but it's just a minor improvement
| import slack.version as ver | ||
|
|
||
|
|
||
| def get_user_agent(): |
There was a problem hiding this comment.
Extracted to reuse in UrllibWebClient - the method needs to be outside the BaseClient to avoid circular import issues.
| # Using this is no longer recommended - just keep this for backward-compatibility | ||
| return self._event_loop.run_until_complete(future) | ||
| else: | ||
| return self._sync_send(api_url=api_url, req_args=req_args) |
There was a problem hiding this comment.
this is the new way
| "Use WebClient with run_async=False and use_sync_aiohttp=False." | ||
| ) | ||
| raise e.SlackRequestError(msg) | ||
| response = self._client._sync_request( |
There was a problem hiding this comment.
As this method is not a coroutine, using sync client also for run_async=True clients. Regarding run_async=True, it works anyways but we can revisit this to make it completely non-blocking in the future.
| self.default_headers = default_headers | ||
| self.web_client = web_client | ||
|
|
||
| def api_call( |
There was a problem hiding this comment.
It's also possible to use this method for any API calls. As described in slack_response.py, pagination iterator doesn't work when directly using this class. To use the feature, developers should use WebClient with run_async=False. The reason I gave up supporting the interaction with this class is circular import issues with BaseClient.
| "status_code": 200, | ||
| } | ||
| coro.return_value = SlackResponse(**data) | ||
| corofunc = Mock(name="mock_rtm_response", side_effect=asyncio.coroutine(coro)) |
There was a problem hiding this comment.
I removed some existing mock utilities depending on asyncio. The dependency caused the difficulties for detecting potential concurrency issues when run_async=False.
|
I've merged a fix for #650 in this pull request. |
| self.fail("Raising an error here was expected") | ||
| except Exception as e: | ||
| self.assertEqual(str(e), "The server responded with: {'ok': False, 'error': 'invalid_auth'}") | ||
| self.assertEqual( |
| ) | ||
| else: | ||
| self._execute_in_thread(callback, data) | ||
| await self._execute_in_thread( |
|
Looks good at high level 👍 |
There was a problem hiding this comment.
@stevengill and i did a collaborative review, and i'm submitting it for the both of us. we didn't spend any time looking through the tests, but we did look at all of the implementation.
@seratch this looks like a great step towards resolving many of our concurrency issues. so excited to see this land! there are a few questions and comments in here that i think would be important to address before we land/release this change.
|
|
||
| if inspect.iscoroutinefunction(callback): | ||
| if self.run_async or inspect.iscoroutinefunction(callback): | ||
| await callback( |
There was a problem hiding this comment.
if i understand this correctly, when run_async=True but the callback is not a coroutine, then the callback will be invoked with the await keyword. This seems to be a problem (ref):
This means that synchronous and asynchronous functions/callables are different types - you can't just mix and match them. Try to await a sync function and you'll see Python complain, forget to await an async function and you'll get back a coroutine object rather than the result you wanted.
Maybe we should change the or to an and, and also have another case for the situation I described above that throws an explicit error. Or maybe this isn't necessary because we expect developers to understand the runtime error (complaining as the author above put it) and know how to deal with it. IMHO having our own explicit error with a readable description would be easier to debug.
Another solution could be that we want to just call callback without await if we detect that its not a coroutine, no matter what run_async is set to. What do you think about this?
There was a problem hiding this comment.
This is a great catch. In future major releases, we may be able to clearly say "if you go with run_sync=True, all callbacks must be coroutines." but it's not that timing when we release a minor version. Also, we don't need to have this change to resolve the existing concurrency issues.
I will just revert this change.
There was a problem hiding this comment.
with this change reverted, when run_async=False the callback will still be invoked as a coroutine (using the await). that will also lead to problems. it seems like the way to fix this would be to detect when run_async=True and !inspect.iscoroutinefunction(callback) to throw an error.
There was a problem hiding this comment.
Thanks, letting developers notice doing wrong seems a nice addition. I've added some tests and added an error in 227f949
| ) | ||
| else: | ||
| self._execute_in_thread(callback, data) | ||
| await self._execute_in_thread( |
There was a problem hiding this comment.
From my understanding, we always block on the return of the callback when run_async=False. If that is the case, why are we using the ThreadPoolExecutor to invoke the callback on another thread? It seems that the executor will only have one worker/thread at any given time (since we always block on their completion within _execute_on_thread()). The same behavior could be accomplished by simply running the callback on the current thread, right?
Is there something about the consistency of _dispatch_event() that changes when we don't await on anything (by calling callback() synchronously on the same thread)?
|
|
||
| # If you see the following errors with #stop() method calls, call `RTMClient#async_stop()` instead | ||
| # | ||
| # /python3.8/asyncio/base_events.py:641: |
There was a problem hiding this comment.
This is unrelated to the code review suggestions. The tests have been passed but I overlooked this warning for two test cases.
| if ( | ||
| self.auto_reconnect | ||
| and not self._stopped | ||
| and error_code != "invalid_auth" # "invalid_auth" is unrecoverable |
|
@aoberoi @stevengill Thanks for your insightful review. I've updated this pull request and now it's ready for view again. |
* #530 Fixed by changing _execute_in_thread to be a coroutine * #569 Resolved by removing a blocking loop (while future.running()) * #645 WebClient(run_async=False) no longer depends on asyncio by default * #633 WebClient(run_async=False) doesn't internally depend on aiohttp * #631 When run_async=True, RTM listner can be a normal function and WebClient is free from the event loop * #630 WebClient no longer depends on aiohttp when run_async=False * #497 Fixed when run_async=False / can be closed as we don't support run_async=True for this use case (in Flask)
* Get rid of thread pool executor as we no longer need threads internally * Add async_stop() method for safer termination of RTMClient for the cases having unexpected exceptions in callbacks * Revert the behavior of run_async=True to allow using non-async methods * Simplify the Retry-After header value extraction code
* Merge UrllibWebClient's functionalities into BaseClient not to increase unnecesesary complexity such as circular import issues * Call show_2020_01_deprecation() only once * Test if values are dict and they're empty * Rename _sync_request to _request_for_pagination to be clearer
|
I've rebased this branch on the latest master branch. It's ready to merge once I get reviewers' approvals. |
aoberoi
left a comment
There was a problem hiding this comment.
Just one comment that needs attention here: #662 (comment).
I don't think its critical, but probably worth looking at once more. Approved!
Summary
WebClientandRTMClientwithrun_async=Falsehave been having many issues such as #497 #530 #569 #630 #631 #633 #645 . This pull request fixes the following issues by revising the internals of WebClient and RTMClient when run_async=False.The revised
WebClientnever relies on aiohttp whenrun_async=False(the default). In the case, the API client simply sends HTTP requests utilizing the Python standard APIs (urllib). If a user would like to fall back to the previous behavior using aiohttp in a blocking way, it's still possible to use it by settinguse_sync_aiohttp=Truein addition torun_async=False. But I strongly recommend switching to the new one.RTMClientstill tightly depends on asyncio for WebSocket management. Some error handling issues #558 #611 #522 are still unfixed. I'll address those separately.SlackResponseto always useUrllibWebClient.As I mentioned above, #558 #611 #522 are outside of the scope of this pull request. They may be fixed in the forthcoming pull requests.
Requirements (place an
xin each[ ])