Skip to content

Cubed for larger-than-memory workloads on a single machine #492

Open
@TomNicholas

Description

@TomNicholas

One thing that's really obvious at Scipy this year is that people absolutely love DuckDB. The reason is that although DuckDB doesn't offer multi-machine parallelism so can never scale horizontally, it can nevertheless operate on larger-than-memory workloads, and is extremely reliable at just chugging through them until they complete. This confidence is all people really need for 90% of real-world workloads. It seems like a lot of users are happy to lose the horizontal scaling in favour of simplicity and reliability of no longer trying to run a distributed system.

This has got me thinking about whether we could write an executor for Cubed that is single-machine but can handle larger-than-memory workloads. It would parallelize to fully utilize that machine but only that machine.


I've also been reading this blog post on Memory Management in DuckDB, and basically all they seem to do is
a) have a preset memory usage limit (defaulting to 80% of machine RAM),
b) spill to disk when necessary,
c) try to minimize spilling to disk by efficient caching,
d) fully utilize the parallelism available to them on that one machine to get vertical scaling.

I feel like in Cubed's model we have all the information we need to take a similar approach. We have parallel single-machine executors already (synchronous and asynchronous), but (please correct me if I've misunderstood @tomwhite) neither of these executors would work on larger-than-memory workloads, because they currently would naively try to launch more cubed tasks than could fit in memory (because Cubed's allowed_mem refers to per-task).

(I'm assuming this because I notice neither single-machine executor actually uses the cubed.Spec object for anything, so whilst they would raise if you tried to run a workload where a single task would use > allowed_mem (because cubed wouldn't even let you execute in that case), they can't possibly have a model of exactly how many tasks they can safely run in parallel simultaneously before running out of RAM.)

So what if we wrote an executor which:

  • Looks to see how much RAM your machine has,
  • For each stage in the plan:
    • Calculates how many tasks it can safely run in parallel using the multiprocessing code we have (calculated as safe_num_tasks = total_machine_RAM / projected_mem_per_task),
    • Runs that many tasks at once,
      • If that's all of the tasks needed to complete that stage then don't bother writing the results to storage, just keep them in memory ready for the next stage,
      • If that's not all of the tasks needed then run them in batches, spilling results to disk via Zarr as normal. This is what would allow running on larger-than-memory workloads (I think it's basically the same idea as dask's P2P rechunking).
      • (Maybe have some way of only spilling the minimum number of tasks necessary to disk and keeping the rest in memory)
    • Now move on to the next stage of the plan.

Maybe I'm completely misunderstanding something about how sharing memory across processes works, but wouldn't this strategy basically have the same characteristics that the DuckDB approach has?

cc @rabernat @jakirkham

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions