Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ThreadSelectorEventLoop on Windows with ProactorEventLoop #820

Closed
wants to merge 16 commits into from

Conversation

davidbrochart
Copy link
Contributor

Changes

This PR allows anyio.wait_socket_readable(sock) to work on Windows when using the asyncio backend with a proactor event loop, by using Tornado's SelectorThread, as discussed with @minrk in zeromq/pyzmq#1827.

Checklist

If this is a user-facing code change, like a bugfix or a new feature, please ensure that
you've fulfilled the following conditions (where applicable):

  • You've added tests (in tests/) added which would fail without your patch
  • You've updated the documentation (in docs/, in case of behavior changes or new
    features)
  • You've added a new changelog entry (in docs/versionhistory.rst).

@davidbrochart davidbrochart force-pushed the selector-thread branch 3 times, most recently from c5ff14d to 6719d69 Compare November 9, 2024 15:18
@agronholm
Copy link
Owner

To me this seems like overkill. Why would we have to shoehorn these into the async event loop?

@agronholm
Copy link
Owner

What I'm saying is that a simpler solution would be better, where we just spawn a thread on demand that runs select(...) on all the sockets that need to wait for read/write. Or am I missing something?

@davidbrochart
Copy link
Contributor Author

But that would mean one thread per socket, right? While with this solution there is only one thread per event loop.

@agronholm
Copy link
Owner

But that would mean one thread per socket, right? While with this solution there is only one thread per event loop.

No, I never said that. There would be just one thread which would select() on multiple sockets.

@davidbrochart
Copy link
Contributor Author

OK, well there might be a better way than this PR, which is trying to bring Tornado's implementation almost untouched.

@agronholm
Copy link
Owner

I think Tornado had different needs, hence their more elaborate solution.

@minrk
Copy link

minrk commented Nov 11, 2024

Why would we have to shoehorn these into the async event loop?

I don't really understand what you mean by shoehorning into the loop. The selector is not being added to the loop itself, it is a single Thread running select which implements the add_reader interface of the EventLoop API in a threadsafe way, to call select in a thread and run callbacks in the event loop when select returns. It is an object with a reference to the event loop and a weakref registry to store one of these threads per event loop (only once add_reader is called).

However you want to do it, I think it is appropriate to implement it optimistically as if add_reader can be assumed, because asyncio maintainers agree that it should be, even on Proactor, even though nobody has had the time to implement it yet (I still think it was a mistake to allow Proactor to become the default without this fundamental API). That way you can remove the workaround when you drop support for Pythons that didn't support it without changing ~anything. That is I think the main thing the tornado approach accomplishes because you only need to replace loop.add_reader with get_selector(loop).add_reader, which does nothing but return loop.add_reader everywhere but Windows + Proactor.

where we just spawn a thread on demand that runs select(...) on all the sockets that need to wait for read/write. Or am I missing something?

It is indeed very simple if you have a single thread per socket, but if you want to use a shared thread instead of thread-per-socket, you need to handle the fact that add_reader and friends may be called when select is already blocking in a background thread, which means needing:

  1. a threadsafe way to update the sockets for select, and
  2. a mechanism to wake the thread when the socket list is updated

which is where most of the logic in the tornado implementation resides. You could reimplement it, but it seems to me like it's going to need to be quite similar to tornado's implementation if it's going to work. But maybe you have an idea I'm missing about how to handle waking the select thread in a simpler way?

@davidbrochart
Copy link
Contributor Author

I don't understand that test_wait_socket_readable fails on trio now.

@davidbrochart davidbrochart force-pushed the selector-thread branch 2 times, most recently from 3cccf1c to 5589633 Compare November 11, 2024 17:04
@davidbrochart
Copy link
Contributor Author

And now I don't understand that the test won't be skipped for Windows/Trio :)

@agronholm
Copy link
Owner

And now I don't understand that the test won't be skipped for Windows/Trio :)

Have you tried using pytest.mark.skipif to skip the test?

@davidbrochart
Copy link
Contributor Author

It seems that pytest.mark.skipif doesn't support fixtures, and we need the anyio_backend_name fixture. I don't see any other way than using pytest.skip inside the test.

sock.connect(("127.0.0.1", port))
sock.sendall(b"Hello, world")

with move_on_after(0.1):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a timeout this strict? It's bound to cause flakiness on slow/busy systems.
Also, wouldn't fail_after eliminate the need to set a flag?

Suggested change
with move_on_after(0.1):
with fail_after(5):

On another note, I think this should be on the same line as with conn:, given that it's the only place where the timeout can work.

sock.bind(("127.0.0.1", 0))
port = sock.getsockname()[1]
sock.listen()
thread = Thread(target=client, args=(port,), daemon=True)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be daemon, given how it will either succeed or fail instantly. Given the continuous effort on CPython's side to get rid of daemon threads, I'm also looking for alternatives.

Also, the thread needs to be joined uncoditionally, even if the test fails. With this code, it won't be joined if the test fails.

@agronholm
Copy link
Owner

It seems that pytest.mark.skipif doesn't support fixtures, and we need the anyio_backend_name fixture. I don't see any other way than using pytest.skip inside the test.

Do you have a Windows machine to run this locally on? Setting a breakpoint on the first line would probably shed some light on this.

@davidbrochart
Copy link
Contributor Author

Do you have a Windows machine to run this locally on?

No, but with platform.system() == "Linux" I see that the test is skipped on my Linux machine for trio, so I really have no clue.

@agronholm
Copy link
Owner

Do you have a Windows machine to run this locally on?

No, but with platform.system() == "Linux" I see that the test is skipped on my Linux machine for trio, so I really have no clue.

I have a Windows machine here, maybe I could take a look.

@agronholm
Copy link
Owner

Something weird is going on here. If I only run the test on Trio, it's skipped as expected. I would say that the exception is some sort of fallout from the asyncio version of the test.

@agronholm
Copy link
Owner

Specifically, it has to do with the thread that's spawned when wait_socket_readable() is called, not so much the thread that's spawned in the test code directly – that looks totally fine.

@agronholm
Copy link
Owner

I checked that the ResourceWarning about the unclosed socket had nothing to do with any of the three sockets created during the test.

"""Ensure asyncio selector methods (add_reader, etc.) are available.
Running select in a thread and defining these methods on the running event loop.
Originally in tornado.platform.asyncio.
Redistributed under license Apache-2.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll need to include the copyright statement and notice

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if we end up going with their implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how this plays with AnyIO's MIT license? Would it be an issue to vendor this code?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the Apache 2 license supports re-distribution under a different license

@graingert
Copy link
Collaborator

graingert commented Nov 22, 2024

I think what @agronholm is looking for is something that uses the existing thread shutdown approach

https://github.com/agronholm/anyio/blob/master/src/anyio/_backends/_asyncio.py#L2417

Or maybe both should use the shutdown_asyncgens/patch loop.close hack that tornado uses

@davidbrochart
Copy link
Contributor Author

I think what @agronholm is looking for is something that uses the existing thread shutdown approach

It's already here:

find_root_task().add_done_callback(_at_loop_close_callback)

But maybe you'd like to get rid of the loop patch?

# patch loop.close to also close the selector thread
loop_close = asyncio_loop.close
def _close_selector_and_loop() -> None:
# restore original before calling selector.close,
# which in turn calls eventloop.close!
asyncio_loop.close = loop_close # type: ignore[method-assign]
_selectors.pop(asyncio_loop, None)
selector_thread.close()
asyncio_loop.close()
asyncio_loop.close = _close_selector_and_loop # type: ignore[method-assign]

@agronholm
Copy link
Owner

I think what @agronholm is looking for is something that uses the existing thread shutdown approach

It's already here:

find_root_task().add_done_callback(_at_loop_close_callback)

But maybe you'd like to get rid of the loop patch?

# patch loop.close to also close the selector thread
loop_close = asyncio_loop.close
def _close_selector_and_loop() -> None:
# restore original before calling selector.close,
# which in turn calls eventloop.close!
asyncio_loop.close = loop_close # type: ignore[method-assign]
_selectors.pop(asyncio_loop, None)
selector_thread.close()
asyncio_loop.close()
asyncio_loop.close = _close_selector_and_loop # type: ignore[method-assign]

If we could hook it into the life cycle of the root task, that would the ideal solution from the AnyIO PoV I think.

@graingert
Copy link
Collaborator

graingert commented Nov 25, 2024

Do we also want to close the selector thread after 5 seconds of not waiting for any sockets? Might not be worth bothering in this PR but maybe in a subsequent one?

@agronholm
Copy link
Owner

I think I'd like to take a stab at using the selectors API for this to potentially simplify the implementation (and to not have to think about the licensing stuff).

@graingert
Copy link
Collaborator

graingert commented Nov 28, 2024

Perhaps the way to go is to create an absolutely paired down copy of trio guest mode that only supports wait_readable/wait_writable, then we can use the CFFI IOCP trick instead of selectors, and only run the thread when waiting for IO

@agronholm
Copy link
Owner

Perhaps the way to go is to create an absolutely paired down copy of trio guest mode that only supports wait_readable/wait_writable, then we can use the CFFI IOCP trick instead of selectors, and only run the thread when waiting for IO

Isn't the entire point of using the IOCP trick to not have to spawn threads?

@graingert
Copy link
Collaborator

graingert commented Nov 28, 2024

I think yes it does help with that, but it also lets you wait on more sockets because select on windows is limited (and less efficient in other ways)

What's particularly helpful here is you can register/unregister an FD in the guest loop's IOCP without using a threading.Condition etc

@agronholm
Copy link
Owner

I think yes it does help with that, but it also lets you wait on more sockets because select on windows is limited (and less efficient in other ways)

Ah, yeah, I totally forgot about that. OTOH, the most common cases where these low-level functions are needed are libraries like libzmq and libpq5 (postgresql), and it's unlikely that users will run into the limitations of Windows select() then.

What's particularly helpful here is you can register/unregister an FD in the guest loop's IOCP without using a threading.Condition etc

Does the selectors API use that?

@graingert
Copy link
Collaborator

What's particularly helpful here is you can register/unregister an FD in the guest loop's IOCP without using a threading.Condition etc

Does the selectors API use that?

I don't know what you're asking? I'm assuming it's not possible to change what select is waiting on from outside the call to select, you need to wake it up and give it a new set of fds

@agronholm
Copy link
Owner

What's particularly helpful here is you can register/unregister an FD in the guest loop's IOCP without using a threading.Condition etc

Does the selectors API use that?

I don't know what you're asking? I'm assuming it's not possible to change what select is waiting on from outside the call to select, you need to wake it up and give it a new set of fds

Ah, that's what you were getting at. And you're saying that the IOCP trick does not require the running system call to be interrupted in order to add/remove fds to it (I haven't looked at it at all, just asking)?

@graingert
Copy link
Collaborator

What's particularly helpful here is you can register/unregister an FD in the guest loop's IOCP without using a threading.Condition etc

Does the selectors API use that?

I don't know what you're asking? I'm assuming it's not possible to change what select is waiting on from outside the call to select, you need to wake it up and give it a new set of fds

Ah, that's what you were getting at. And you're saying that the IOCP trick does not require the running system call to be interrupted in order to add/remove fds to it (I haven't looked at it at all, just asking)?

Yep! Thanks for bearing with me

@davidbrochart
Copy link
Contributor Author

Also, guest mode could be useful to integrate AnyIO with other event loops, right?

@agronholm
Copy link
Owner

Also, guest mode could be useful to integrate AnyIO with other event loops, right?

I don't think I'm following.

@davidbrochart
Copy link
Contributor Author

Just in general, like in Trio.

@graingert
Copy link
Collaborator

Also, guest mode could be useful to integrate AnyIO with other event loops, right?

This is the wrong sort of guest mode - it's making a pared down trio IO manager the guest to asyncio, it's not going to let asyncio operate as a guest on anything else

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants