Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

from_thread.run(_sync)? fails in any thread running Trio #2534

Open
gschaffner opened this issue Jan 18, 2023 · 8 comments
Open

from_thread.run(_sync)? fails in any thread running Trio #2534

gschaffner opened this issue Jan 18, 2023 · 8 comments

Comments

@gschaffner
Copy link
Member

hi! from_thread.run(_sync)? uses a heuristic to prevent the user from accidentally deadlocking their event loop by writing code like https://github.com/python-trio/trio/pull/1574/files#r435039316.

the heuristic is currently much more aggressive than is required to prevent this deadlock, however. it has the side-effect1 of preventing one from calling from_thread.run(_sync)? in any thread running trio, even though such calls are almost always safe and deadlock-free.

it would be useful to be able to call from_thread.run(_sync)? from trio threads2, but the current heuristic blindly prevents this. i'd like to propose relaxing from_thread.run(_sync)? to allow calling from_thread.run(_sync)? from trio threads. (one use-case of this is agronholm/anyio#525.)

Footnotes

  1. citing the RuntimeErrors reported in from_thread.run(_sync)?'s docstrings, this would appear to be an unintended side-effect of the current heuristic.

  2. for example in order to call Nursery.start_soon in a thread-safe manner.

@oremanj
Copy link
Member

oremanj commented Jan 18, 2023

I agree that the check is more zealous than it strictly needs to be. I don't think the idea of multiple Trio threads was contemplated when trio.from_thread was written, and even if you loosen the check, from_thread is still not a good fit for Trio-to-Trio calls (because it will block the originating thread until the target thread finishes doing whatever you asked it to do). I think a different interface that is designed for cross-Trio-thread calls (with the entrypoint functions being async instead of synchronous) would be clearer; what do you think?

I'm curious what you're doing that requires multiple Trio threads anyway -- we've considered the possibility but hadn't come across anyone with a concrete use case!

@smurfix
Copy link
Contributor

smurfix commented Jan 18, 2023

Isn't it fairly trivial to cross call from one Trio thread to the other?
await trio.to_thread(functools.partial(trio.from_thread.run, fn, *args, trio_token=other_task.token))

If you don't want the overhead, well, creating something like trio._threads.cross_thread.run(fn, *args) should be fairly simple.

However IMHO relaxing the rules about not calling any trio.from_thread.* function from within Trio isn't the right way to do this.

@gschaffner
Copy link
Member Author

gschaffner commented Jan 25, 2023

I'm curious what you're doing that requires multiple Trio threads anyway -- we've considered the possibility but hadn't come across anyone with a concrete use case! @oremanj

tl;dr tl;dr

a use-case for sync cross-calling between multiple threads running Trio event loops is agronholm/anyio#341.

tl;dr

a (short-lived) sync function can call a (short-lived) async function by running it in a temporary trio.run thread. the motivation/use-cases for this overlap a lot with the motivation/use-cases for greenback; this is useful in many of the cases that one might otherwise consider using greenback for. it does not bring with it the concerns around greenback being a pretty low-level hack (albeit a very cool one!) and a cause of scheduling race bugs. synchronous cross-calling support extends the applicability of this to cover even more cases that greenback can handle* and that we've encountered.

*: or rather, cases that greenback could handle if it didn't introduce scheduling race bugs. sync cross-calling allows this method to handle real cases we've encountered that greenback does not handle.

  • for example, i am working on a bidirectional RPC library. the API used by asynchronous clients is like

    async with Connection(ip_addr_or_hostname, ...) as remote_server:
        # a `Connection` wraps a `Nursery` where the connection's multiplexer runs in the
        # background.
        foo_ret = await remote_server.foo()
        something_local()
        bar_ret = await remote_server.bar()
        ...

    if Trio supported synchronous cross-calling between multiple Trio event loops (i.e. trio.from_thread.run(_sync)?(..., trio_token=not trio.lowlevel.current_trio_token()), we could also have an analogous API for synchronous clients:

    with SyncConnection(ip_addr_or_hostname, ...) as remote_server:
        # a `SyncConnection` wraps a `Connection` and its multiplexer, etc. that run in
        # a background thread scoped to this `with` block.
        #
        # under the hood, the main thead uses `trio.from_thread.run(_sync)?` to
        # communicate with the `SyncConnection`'s thread.
        foo_ret = remote_server.foo()
        something_local()
        bar_ret = remote_server.bar()
        ...

    this API is implemented and currently works seamlessly when called by sync code that isn't under an event loop, but it fails when called by sync code under an event loop because Trio currently blocks perfectly-safe cross-calls.

    (aside: if this example is screaming "sans I/O!" to you, see this footnote1.)

to my knowledge there is not a better way to accomplish this than sync cross-calling support. i'd be very happy to be wrong here though.

it's currently possible for Trio users to roll their own sync cross-calling support using only public trio.lowlevel API. this consists of just copying the implementation of from_thread.run(_sync)? but relaxing the overly-strict deadlock check. supporting sync cross-calling natively in Trio would allow Trio users to take advantage of convenient cross-calling without having to roll their own implementation using lowlevel and concurrent.futures. the change to Trio required to allow synchronous cross-calling would be extremely uninvasive, i believe (see details in #2535).

other people have been using AnyIO's API for this use-case for some time, but it seems they did not realize that it only works on the AsyncIO backend and not on Trio (reference) (AnyIO issue).

the problem

often times we want to run some async code in a blocking way within a sync function. for example, suppose we have a (short-lived) sync function (call it foo) that needs to call some (short-lived) async code (call it bar) in a blocking way. bar might make some network requests concurrently and make use of the responses. bar could do anything, so long as bar is:

  • an "independent" coroutine, i.e. bar doesn't share synchronization primitives with code external to it, so it's safe to trio.run(bar).

  • short-lived.

  • either necessarily async, or time-consuming/difficult to convert to a sync function. (in the example of concurrent network requests, even though a client library could also provide a sync interface via a shared sans I/O protocol implementation, bar needs to make the requests concurrently.)

foo is synchronous because it's something that we need to do in sync code often.

calls to foo have to fall into one of two cases:

  1. foo's caller is not running somewhere under an event loop.

    in this case, foo can just call bar_ret = trio.run(bar).

  2. foo's caller is running somewhere under an event loop. (note that foo's caller may still be a synchronous function.)

    in this case, foo can't do trio.run(bar) since the thread is already running a Trio loop.

(note: foo is a short-lived sync function. so, like any other sync code that is guaranteed to exit quickly, calling foo under an event loop should be okay—it will block the event loop only very briefly, but not for any problematic period of time.)

bar_ret = trio.run(bar) works fine in case 1, but not case 2. the answer for case 2 is of course that foo should be asynchronous if it's meant to be (at least sometimes!) called under asynchronous code. changing foo to be async isn't often enough by itself, since it also requires that the (case 2) callers of foo (if synchronous), their callers (if synchronous), their callers (if synchronous), and so on also need to be changed to async. these changes can end up affecting a large swath of API and, as you note in greenback's readme

getting there in one big step is not always feasible, especially if some of these layers are in libraries that you don't control.

why not use greenback?

so—greenback, while being a cool hack that one should ideally not depend on, can let us get around this while only having to touch the top and the bottom of the sync call stack, right?

def foo():
    async def bar():
        ...

    ...
    try:
        trio.lowlevel.current_task()
    except RuntimeError:
        bar_ret = trio.run(bar)
    else:
        bar_ret = greenback.await_(bar())
    ...

unfortunately not, because (in case 2) foo is now calling bar in an async way. the problem is that sync functions are sometimes written in a way that takes advantage of the property that because they are sync, the event loop can't run other tasks during their execution. greenback.await_ violates this fundamental property and can introduce a scheduling race that can cause things to break. here is an example:

(click to expand)
import greenback

import trio


def foo():
    async def bar():
        await trio.lowlevel.checkpoint()

    ...
    try:
        trio.lowlevel.current_task()
    except RuntimeError:
        bar_ret = trio.run(bar)
    else:
        bar_ret = greenback.await_(bar())
    ...


class A:
    def sync_fn(self, callback):
        # suppose that `sync_fn` takes advantage of the fact that because it is sync,
        # the event loop it's under can't run coroutines during its execution. if the
        # event loop _could_ run something else during it, state could be changed by
        # external code at a time when `sync_fn` does not expect that type of change.
        # such unexpected change can cause `sync_fn` to misbehave or cause it to leave
        # an object in an invalid state that will cause problems in the future.
        #
        # in real code, `A.sync_fn` can be any sync function that can call a function
        # that uses `greenback.await_`. so `A.sync_fn` could be in third-party code,
        # where it would be time-consuming to check for this bug. upstream might also,
        # _very reasonably_ not want to accept a patch that complicates their
        # implementation of `sync_fn` in a way that is only required because someone is
        # using a hack that violates the fundamental property that sync functions can't
        # `await`.
        #
        # (note that `callback` often would not be passed explicitly. more frequently,
        # it's a method that a third-party class asks you to subclass and implement.)

        ...
        print(f"sync_fn about to call {callback.__name__}.")
        callback()
        print(
            f"{callback.__name__} finished. no coroutines should have run during sync "
            f"function {callback.__name__}!"
        )
        ...


async def unrelated_coro():
    # `unrelated_coro` can interact with `A`, etc. when `sync_fn` is in the middle of
    # running and doesn't expect it to be possible...
    print("unrelated_coro ran something.")
    ...


async def main():
    await greenback.ensure_portal()
    async with trio.open_nursery() as nursery:
        nursery.start_soon(unrelated_coro)
        A().sync_fn(foo)


if __name__ == "__main__":
    trio.run(main)

if you run the above a couple times, you should see both scheduling orders occur:

sync_fn about to call foo.
foo finished. no coroutines should have run during sync function foo!
unrelated_coro ran something.

as well as the problematic

sync_fn about to call foo.
unrelated_coro ran something.
foo finished. no coroutines should have run during sync function foo!

(aside: it might be possible for greenback to prevent this problem—i'm not sure. could greenback.await_(bar()) allow execution of only bar() and its subtree of tasks? even if this is possible, it probably only prevents the problem if bar() is a top-level coroutine that you can (asyncio | trio).run. if bar depends on other coroutines to e.g. release locks, etc., then greenback.await_ could deadlock trying to acquire a lock that is not strictly "in bar()'s subtree. so this would have to be opt-in, and probably could not work on asyncio where there is no notion of a task tree.)

use a temporary Trio thread, not greenback

so, ditching greenback and going back to the initial problem, why not just let foo run bar in a temporary Trio thread? bar is an independent (i.e. trio.runnable) coroutine. it's fine if bar runs under a different event loop.

if we do this, there's no greenback violating fundamental assumptions, and no greenback to have as a somewhat-concerning dependency at all. and we still get to avoid the heavy time-cost of async virality in this situation. yes, we do have to start a thread, but for our use-cases this small delay isn't a problem. (a persistent thread pool can also be to speed this up if one wants.)

the most basic implementation of this is something like

def sync_to_thread_run(async_fn):
    with ThreadPoolExecutor(1) as executor:
        future = executor.submit(trio.run, async_fn)
        return future.result()


def foo():
    async def bar():
        ...

    ...
    bar_ret = sync_to_thread_run(bar)
    ...

this works fine on Trio 0.22.0. note that there is no cross-calling between threads running Trio happening in this example. the main thread (which may be running Trio) starts the temporary Trio event loop but doesn't actually interact with it; it just waits for it.

thread-safety

the main hiccup that we need to be careful of is making sure that bar is thread-safe. if bar has to call thread-unsafe API, our approach of running it in a temporary thread obviously doesn't work.

one approach to dealing with this is to split bar up. if bar's thread-unsafe calls are sync and between thread-safe async code, i.e. bar can be written in a form like

async def bar():
    b = await thread_safe1()
    c = thread_unsafe(b)
    d = await thread_safe2()
    return ...

then we can rewrite as

def foo():
    ...
    b = sync_to_thread_run(thread_safe1)
    c = thread_unsafe(b)
    d = sync_to_thread_run(thread_safe2)
    bar_ret = ...
    ...

nurseries, streams, and other objects tied to a particular event loop

a problem with this is that it's starting a new Trio event loop for each sync_to_thread_run call. this means that we can't use this technique if the thread_safeN coroutines require sharing async objects (e.g. nurseries, streams, etc.). for example, we can't use sync_to_thread_run to rewrite

async def bar():
    async with Connection(ip_addr_or_hostname, ...) as remote_server:
        b = await remote_server.some_remote_function(a)
        c = thread_unsafe(b)  # this line doesn't require the ACM
        d = await remote_server.some_remote_function(c)
        return ...

we could try to rewrite this using sync_to_thread_run, but it would look like

def foo():
    async def bar_part1():
        async with Connection(ip_addr_or_hostname, ...) as remote_server:
            return await remote_server.some_remote_function(a)

    async def bar_part2():
        async with Connection(ip_addr_or_hostname, ...) as remote_server:
            return await remote_server.some_remote_function(c)

    ...
    b = sync_to_thread_run(bar_part1)
    c = thread_unsafe(b)
    d = sync_to_thread_run(bar_part2)
    bar_ret = ...
    ...

here, we can't re-use a temporary Trio loop twice (since synchronous cross-calling is not supported), so we had to establish a connection to the server twice (slow). more importantly, since we couldn't re-use the same TCP stream for both calls, we lost our original guarantee that both requests are actually going to the same server! this is a problem.

if we could synchronously cross-call between threads running Trio, though, we could do what we need. a convenient way to do this is

@asynccontextmanager
async def create_blocking_portal_in(
    acm: AsyncContextManager[T], /
) -> AsyncGenerator[tuple[T, BlockingPortal], None]:
    """
    Async context manager that runs a blocking portal in an async context manager.
    """
    async with acm as ret_acm, anyio.from_thread.BlockingPortal() as blocking_portal:
        yield ret_acm, blocking_portal

def foo():
    ...
    with (
        anyio.from_thread.start_blocking_portal(backend="trio") as thread_portal,
        thread_portal.wrap_async_context_manager(
            create_blocking_portal_in(Connection(ip_addr_or_hostname, ...))
        ) as (remote_server, portal)
    ):
        # `portal` is a `BlockingPortal` that allows us to run code inside of the
        # `async with Connection(...)` block in our temporary Trio event loop.
        b = portal.call(remote_server.some_remote_function, a)
        c = thread_unsafe(b)
        d = portal.call(remote_server.some_remote_function, c)
        bar_ret = ...
        ...
    ...

this has the main thread start a thread that runs Trio. initially the temporary Trio thread just runs a task that sleeps forever waiting for further instructions. the main thread tells the temporary Trio loop to __aenter__ a Connection, and then to __aenter__ a BlockingPortal inside that async with Connection(...) block (related: #323). this BlockingPortal sleeps waiting for instructions. the main thread can then call coroutines in the async with Connection(...) block. during each of these instructions from the main thread to the temporary Trio loop, the main thread blocks waiting.

that example is a bit complicated. a simpler example to illustrate the usefulness of cross-calling is perhaps

def foo():
    async def bar():
        # we have to write this helper function `bar`, which makes things a bit hard to
        # read.
        b = await something1()
        # b is some sort of async object tied to a particular event loop, e.g. a stream.
        # so these next coroutines have to run under the same event loop.
        c = await b.something2()
        d = await c.something3()
        ...

    ...
    bar_ret = sync_to_thread_run(bar)
    ...

# vs.

def foo():
    ...
    with anyio.from_thread.start_blocking_portal(backend="trio") as portal:
        # this is a lot more natural.
        b = portal.call(something1)
        # b is some sort of async object tied to a particular event loop, e.g. a stream.
        # so these next coroutines have to run under the same event loop.
        c = portal.call(b.something2)
        # sync cross-calling support also lets us do thread-unsafe operations in the
        # parent thread between async calls.
        thread_unsafe(b)
        d = portal.call(c.something3)
        ...
    ...

under the hood, all of these instructions from the main thread to the temporary Trio event loop are using trio.from_thread.run_sync(Nursery.start_soon, ..., trio_token=token_of_temp_trio_thread).

this works in case 1 but it does not currently work in case 2 (AnyIO issue) because Trio currently prevents synchronous cross-calling across Trio threads. note that the cross-calling has to be synchronous for this use-case because, as discussed above, if foo wasn't synchronous we wouldn't have had problems (see greenback readme) that led us to want to cross-call in the first place.

as noted in agronholm/anyio#525, it is also possible for a workaround for this to be implemented in AnyIO. my (n = 1!) opinion is that implementing this in Trio by loosening the deadlock check to cases when it's sure to deadlock would be better, since it allows pure-Trio (not AnyIO) users to take advantage of this too.

it's also perhaps worth noting that other people before me have used a second event loop thread (via the same AnyIO API) to solve the same problem (reference), but i think they didn't notice that it works on AsyncIO but not on Trio currently.

Footnotes

  1. a sans I/O protocol implementation is not enough to do what we need to do here because the RPC is bidirectional; a client needs to have a listener running in the background. clients also need concurrency to do happy eyeballs.

@gschaffner
Copy link
Member Author

I think a different interface that is designed for cross-Trio-thread calls (with the entrypoint functions being async instead of synchronous) would be clearer; what do you think? @oremanj

async cross-calling unfortunately wouldn't be useful for this use-case; it needs to be sync (see above).

i do think that async cross-calling would be a cool interface to have, especially if a GIL-free Python implementation ever becomes mainstream and running multiple Trio threads becomes useful for performance. with mandatory GIL's in place still, i can't currently think of a use-case for it though.

@gschaffner
Copy link
Member Author

Isn't it fairly trivial to cross call from one Trio thread to the other? await trio.to_thread(functools.partial(trio.from_thread.run, fn, *args, trio_token=other_task.token)) @smurfix

sorry—by "thread running Trio"/"Trio thread" i meant "thread running a Trio event loop", not "thread started by trio.to_thread". i shouldn't have used "Trio thread" interchangeably with "thread running Trio". sorry about that.

creating something like trio._threads.cross_thread.run(fn, *args) should be fairly simple. @smurfix

yes, but if the hypothetical trio._threads.cross_thread.run(_sync)? was a sync function, its implementation would essentially be identical to the from_thread.run(_sync)? implementation, but with a relaxed deadlock check that only triggers when a deadlock is certain. if it was an async function, it wouldn't be applicable to the use-case i'm after (see above).

@gschaffner
Copy link
Member Author

after some more digging, this issue is a duplicate of #2191. apologies for not searching the tracker thoroughly enough before opening this. i will update the corresponding PR's newsfragment to reference the lower issue number.

i'll leave this duplicate open currently since there is ongoing discussion, but let me know if discussion should move to #2191 instead.

@oremanj
Copy link
Member

oremanj commented Feb 3, 2023

@gschnaffer thank you for the extremely detailed response! I'm convinced :-)

Would you also benefit from trio.run(..., allow_nested=True)? That would create an inner Trio event loop that blocks the outer one until it's complete, with no need for a thread. We don't allow that currently, but there's no principled reason for it, besides the result of nested trio.run() being confusing (some people would expect the outer event loop to continue running) and no one's wanted it yet. It sounds like it might solve your problem without the need for an extra thread.

I agree that greenback is somewhat dangerous in that it can violate "no task switches can occur in a synchronous function" assumptions. I don't think it is particularly hazardous otherwise, unless you consider greenlets in general to be hazardous; the async support in SQLAlchemy uses the same technique, and I use greenback in production at my day job.

@gschaffner
Copy link
Member Author

gschaffner commented Feb 9, 2023

@oremanj awesome! i wasn't sure if the detailed message would be helpful or a nuisance and i'm glad it's being helpful.

trio.run(..., allow_nested=True) would not be very useful for us at time of writing because we usually need asyncio support as well (via AnyIO), and AnyIO couldn't support allow_nested=True since asyncio does not support it1.

ignoring asyncio support, though, our main use-case for sync cross-calling is for something that trio.run(..., allow_nested=True) can't do. namely, running a nursery in the background and cross-calling into it to provide an API like the SyncConnection example:

with SyncConnection(ip_addr_or_hostname, ...) as remote_server:
    # a `SyncConnection` wraps a `Connection` and its nursery, multiplexer, etc. that run in
    # a background thread scoped to this `with` block.
    foo_ret = remote_server.foo()  # this calls a coro in the bg thread.
    something()
    bar_ret = remote_server.bar()  # this calls a coro in the bg thread.
    ...

i believe trio.run(..., allow_nested=True) can't work for this use-case because it would require users to refactor each with SyncConnection block into a function that they trio.run. this can work in some cases, such as the code block above2, but it does not work when one needs a class whose A.__enter__/__exit__ wrap SyncConnection.__enter__/__exit__. such wrapping might be necessary in order to, for example, guarantee that all the RPCs made by an A instance go to the same server. if a reconnect occurred between RPCs, it could reconnect to a different server with different state!

that said, allowing nested event loops would clearly be quite useful in only-Trio (not AnyIO) code:

def foo():
    # foo() works in a fully sync context per usual, but it also no longer breaks when
    # called under a running event loop!
    bar_ret = trio.run(bar, allow_nested=True)
    ...

and i'd happily use this feature in the situations where we use Trio over AnyIO.

regarding greenback violating that assumption, do you think a note should be added to the "what won't work" section of the greenback readme? most of the time, greenback's violation of that assumption is not problematic, but i have had it cause a bug in real code once (and it was a huge pain to debug given that it violates the should-be-safe-to-assume "1st law of async/await" and is flaky to reproduce (it depends on scheduling order)).

Footnotes

  1. AnyIO could add a dependency on nest_asyncio, which patches asyncio with this support, but this is probably not a great idea [ref].

    asyncio has historically taken a hard stance against adding nested event loop support [ref] but it does look possible that it might get support in the future [ref, ref].

  2. although it's syntactically more painful for users, especially in a REPL (and even if you do use %autoawait trio in IPython!).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants