Skip to content

Group: A set of tasks that sould run synchrounosly #7300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
evyatark2 opened this issue May 3, 2025 · 3 comments
Open

Group: A set of tasks that sould run synchrounosly #7300

evyatark2 opened this issue May 3, 2025 · 3 comments
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request.

Comments

@evyatark2
Copy link

evyatark2 commented May 3, 2025

The Problem

When using a multi threaded runtime, it is impossible to group a set of spawned tasks that should be run synchronously with respect to each other.

Consider a video game server. Many times a multiplayer video game is partitioned into so called "maps" that have some internal state and that needs to be broadcasted to other clients on the same map when state changes occur.

An optimized game server will adhere to the invariant "if two clients are on the same map, their handlers will run synchronously". This way shared map data that is commonly accessed and mutated doesn't need to be locked by a mutex and in Rust terms it can be an Rc<RefCell<T>> instead of an Arc<Mutex<T>>.

Note that by "their handlers will run synchronously" I don't mean that a map is bound to one single thread for its entire lifetime, It means that when a client makes a request it is guaranteed that no other client handler in the same map will run at the same time in parallel. I will get back to this point when Tokio's work stealing model comes up.

Currently, the best solution I could come up with is keeping a global HashMap<u32, Sender<T>> that maps between map IDs and channels to communicate with their respective LocalSets:

  • Each map is driven by tokio::select!ing from the map's inbound channel and a futures::FuturesUnordered that includes the TcpStream of all clients currently inside the map.
  • A new LocalSet and a channel is created in the main tokio context when a client wants to join a map that does not currently exist

The main drawback of this method is that it doesn't take advantage of Tokio's work stealing model, When a map is first created, all the client handlers for the map's clients will run on the same thread that called spawn_local. This can be especially bad if there are some popular maps (maps with many clients) that (by chance) got created on the same thread and now you have a thread that handles a large number of clients while other threads are getting starved because they only handle maps with a small number of clients.

Proposed solution

I propose introducing a new kind of task that shall be called a "SyncGroup".

  • A SyncGroup can be spawned in any context (including inside a tokio::spawn unlike LocalSet) with tokio::spawn_group().
  • Unlike tokio::spawn() which takes a Future, tokio::spawn_group() takes a function of the type FnOnce(SyncGroup) -> Future. This SyncGroup handle can be used to spawn tasks that run synchronously with respect to each other:
tokio::spawn_group(|g| async move {
    let rc = Rc::new(1);
    let rc_clone = rc.clone();
    g.spawn(async move {
        do_async(rc_clone).await;
    });
    g.spawn(async move {
        do_async2(rc).await;
    });
});
  • Like tokio::spawn(), when a SyncGroup is spawned it immediately begins executing without the need of an await on the JoinHandle to keep it driving.

Implementation details

On Linux, Epoll has a nice feature that an epoll file descriptor can be polled itself for readabilty. An epoll fd becomes readable when any of its children becomes ready. A SyncGroup can be implemented using a single epoll fd and the Waker that is provided for the future that is returned by the function that is provided for tokio::spawn_group() shall register the internal fd inside this epoll, A commented example:

async fn main() {
    let (tx, rx) = unbounded_channel();
    tokio::spawn_group(|g| async move {
        let channel = rx;
        loop {
            let request = channel.recv().await; // When this await is called, the SyncGroup g uses
                                                // epoll_ctl(EPOLL_CTL_ADD) on its internal epoll
            g.spawn(async move {
                handle_client().await; // This does the same thing
            });
        }
    })
}

The SyncGroup's epoll is installed to the main Tokio context to start listening for readability. When it is readable, a free thread is chosen and it begins looping on all pending jobs in the epoll.

I can't speak for other platforms as I don't have much experiences in them.

Safety

Note that even though a SyncGroup can be sent between threads, tasks spawned within the group should not require Send implementation as we can guarantee that all the tasks in the group run synchronously.

Nonetheless, The task that is spawned by the SyncGroup itself should be Send as to prevent non-Send types from entering the context from the outside:

async fn main() {
    let rc = Rc::new(1);
    tokio::spawn_group(|g| async move {
        println!("{}", rc); // Compile error: Rc is `!Send`
        let inner_rc = Rc::new(2);
        g.spawn(async move {
            println!("{}", inner_rc); // Okay
        });
    });
}

Remarks

I hope I was clear enough with my problem and the solution I provided.
If there are any questions about this feel free to ask.
I can go ahead and implement this for Linux if there are no objections and I would also like if someone can provide better names for the various objects and functions or even a different approach on how the API should be exposed.

@evyatark2 evyatark2 added A-tokio Area: The main tokio crate C-feature-request Category: A feature request. labels May 3, 2025
@Darksonn
Copy link
Contributor

Darksonn commented May 4, 2025

Honestly, it sounds like you should just use a Mutex around each map. Everything that needs to access a certain map takes the lock, and the mutex ensures that they take turns to access the map.

@evyatark2
Copy link
Author

Honestly, it sounds like you should just use a Mutex around each map. Everything that needs to access a certain map takes the lock, and the mutex ensures that they take turns to access the map.

That is what I am trying to avoid, map data is very "hot" and since it is accessed and mutated all the time the underlying mutex implementation has a high chance to call futex() to block the thread until the mutex is unlocked.

But I did learn about LocalKey which I missed before and it might solve this issue for my needs.

Using it wouldn't be as ergonomic as my proposed solution: instead of leveraging async fns and the abstraction capabilities that they provide, I need to use a tokio::select!() on a futures::FuturesUnordered but at least it should work and I can use Rc<RefCell<T>>s instead of muteces.

@Darksonn
Copy link
Contributor

Darksonn commented May 5, 2025

Then you should probably use a single task that has ownership of the map and performs all of the access to the map.

We're probably not going to add anything like this. I don't think it solves the problems you mention.

Note that even though a SyncGroup can be sent between threads, tasks spawned within the group should not require Send implementation as we can guarantee that all the tasks in the group run synchronously.

That is not what Send means. If a type isn't Send, then it can never be accessed from other threads than the one it was created on - ever. We can't move non-Send tasks across threads under any circumstances.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request.
Projects
None yet
Development

No branches or pull requests

2 participants