Skip to content

Support for physical and logical replication #752

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from

Conversation

petrosagg
Copy link
Contributor

@petrosagg petrosagg commented Mar 30, 2021

This PR adds the replication mode options to the Config and a copy_both_simple method to the Client to support queries that enter the CopyBoth mode. At the time of writing the only query that uses the CopyBoth mode is replication but more could be added in the future.

copy_both_simple returns a CopyBothDuplex type that implements both Stream and Sink. Users can split the two halves if they need to using futures::StreamExt::split.

Similarly to binary copies, this PR also provides two helper types that can wrap the CopyBothDuplex type and offer a higher level view of the data to the user.

  • ReplicationStream: Decodes the physical replication protocol according to the specification. It provides raw byte access to the contents of XLogData messages.
  • LogicalReplicationStream: Internally wraps ReplicationStream and further decodes the contents of the XLogData messages according to the logical replication specification

Finally, this PR includes an integration test that sets up a replication connection, starts a CopyBoth stream for logical replication and tests that the correct data is sent and received.

@petrosagg
Copy link
Contributor Author

@jeff-davis here is the new PR! Any comments are very much appreciated :)

@petrosagg
Copy link
Contributor Author

@sfackler the tests are failing because the changes(that are included in this PR) in the Dockerfile used by the CI are not picked up and the pre-existing image from the registry is used instead. How should we proceed here?

@petrosagg
Copy link
Contributor Author

Thank you for the super quick review! I'm going to try and address your comments today :)

@petrosagg
Copy link
Contributor Author

@sfackler I have addressed all your comments, I think it looks much better now! Here is a summary of what I changed:

  • ReplicationClient is gone, the normal client can be used to open a replication connection by passing the connection string option
  • I added a copy_both_simple method on the client that returns a type that is both a Stream and a Sink
  • DecodingPlugin and Parse traits are gone. Instead, two utility types are provided: ReplicationStream and LogicalReplicationStream that users can use to wrap the raw CopyBothDuplex stream returned by the client. This is the same pattern used for the binary copy stuff
  • pin_project_lite is left intact

I've re-authored all the commits to be reviewable on their own and the PR is now ~1400 lines versus ~2250 before :)

@petrosagg petrosagg requested a review from sfackler April 8, 2021 09:57
petrosagg added a commit to petrosagg/materialize that referenced this pull request Apr 12, 2021
Needed until sfackler/rust-postgres#752 is merged and released

Signed-off-by: Petros Angelatos <[email protected]>
Copy link

@vitaly-burovoy vitaly-burovoy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good request!
I hoped @sfackler permits ReplicationClient because it is known at the connection stage there can be only replication flow and simple queries. Still think it is wise to have a special struct for it.

Several nitpickings are below:

petrosagg added a commit to petrosagg/materialize that referenced this pull request Apr 14, 2021
Needed until sfackler/rust-postgres#752 is merged and released

Signed-off-by: Petros Angelatos <[email protected]>
jeff-davis and others added 7 commits April 14, 2021 15:27
Signed-off-by: Petros Angelatos <[email protected]>
This can be optionally used with a CopyBoth stream to decode the
replication protocol

Signed-off-by: Petros Angelatos <[email protected]>
@petrosagg
Copy link
Contributor Author

@sfackler I've reworded the description above to aid with reviewing. I hope this is more in line with what you had in mind :)

Comment on lines +19 to +24
/// A type which deserializes the postgres replication protocol. This type can be used with
/// both physical and logical replication to get access to the byte content of each replication
/// message.
///
/// The replication *must* be explicitly completed via the `finish` method.
pub struct ReplicationStream {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused by the last line of the description. What finish method?

@jeff-davis
Copy link
Contributor

jeff-davis commented May 19, 2021

In my PR, I had support for the client sending a CopyDone before the server is done sending. This is important, because in most replication modes, the server will send forever, so the only other way to stop it is to disconnect.

For instance, see 5728fe61.

It seems that this PR does not support the client terminating the replication stream, is that correct? Can it be made to do so?

Also, my PR used PinnedDrop so that merely dropping a stream would safely close it, rather than needing to call an explicit finish.

@jeff-davis
Copy link
Contributor

I think the smallest useful PR we can make here is:

  1. The image updates for pg_hba.conf so replication can be tested.
  2. Enable specifying the replication parameter when connecting.
  3. Handle CopyBoth. I believe this includes the ability for the client to safely send a CopyDone message while the server is still streaming data. For that, I think we need something like the unpipelined_send that I introduced in 2d66934.
  4. I'd like it if the interface could be made safe from protocol desyncs by mutably borrowing the client connection when returning a CopyBoth stream, like I did in Replication support (#116) #696, and safely shutting down the copyboth stream when the object is dropped (which I believe requires PinnedDrop).

I think the logical replication stuff should be in a separate PR. And a lot of the helper functions for building replication commands and parsing the responses can be in a separate PR as well (or perhaps a separate crate, as @sfackler indicated).

@petrosagg
Copy link
Contributor Author

@jeff-davis I think that makes sense. I'll work on splitting this PR in one that just adds copy_both support and another that has the replication decoding.

Regarding your concerns about terminating the stream from the client, I think you're right and that's an omission from my part. I'll take a look into that too. My initial plan for the implementation was to use the same mechanics that are used during a copy_in operation which doesn't use PinnedDrop either.

@petrosagg
Copy link
Contributor Author

closing in favour of #778. Once that is merged we can address the replication protocol itself in a separate PR

@petrosagg petrosagg closed this May 25, 2021
funbringer added a commit to neondatabase/rust-postgres that referenced this pull request Apr 15, 2022
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
funbringer added a commit to neondatabase/rust-postgres that referenced this pull request Apr 15, 2022
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
funbringer added a commit to neondatabase/rust-postgres that referenced this pull request Apr 15, 2022
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
funbringer added a commit to neondatabase/rust-postgres that referenced this pull request Apr 15, 2022
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
funbringer added a commit to neondatabase/rust-postgres that referenced this pull request Apr 15, 2022
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
funbringer added a commit to neondatabase/rust-postgres that referenced this pull request Apr 15, 2022
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
funbringer added a commit to neondatabase/rust-postgres that referenced this pull request Apr 15, 2022
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
funbringer added a commit to neondatabase/rust-postgres that referenced this pull request Apr 15, 2022
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
funbringer added a commit to neondatabase/rust-postgres that referenced this pull request Apr 16, 2022
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
funbringer added a commit to neondatabase/rust-postgres that referenced this pull request Apr 21, 2022
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
funbringer added a commit to neondatabase/rust-postgres that referenced this pull request Dec 16, 2022
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
Co-authored-by: Dmitry Ivanov <[email protected]>
pimeys pushed a commit to grafbase/rust-postgres that referenced this pull request Oct 12, 2023
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
Co-authored-by: Dmitry Ivanov <[email protected]>
pimeys pushed a commit to grafbase/rust-postgres that referenced this pull request Nov 27, 2023
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
Co-authored-by: Dmitry Ivanov <[email protected]>
conradludgate pushed a commit to neondatabase/rust-postgres that referenced this pull request Nov 28, 2024
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
Co-authored-by: Dmitry Ivanov <[email protected]>
conradludgate pushed a commit to neondatabase/rust-postgres that referenced this pull request Dec 4, 2024
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
Co-authored-by: Dmitry Ivanov <[email protected]>
conradludgate pushed a commit to neondatabase/rust-postgres that referenced this pull request Dec 4, 2024
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
Co-authored-by: Dmitry Ivanov <[email protected]>
conradludgate pushed a commit to neondatabase/rust-postgres that referenced this pull request Jan 29, 2025
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
Co-authored-by: Dmitry Ivanov <[email protected]>
conradludgate pushed a commit to neondatabase/rust-postgres that referenced this pull request Jan 29, 2025
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
Co-authored-by: Dmitry Ivanov <[email protected]>
conradludgate pushed a commit to neondatabase/rust-postgres that referenced this pull request Jan 29, 2025
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
Co-authored-by: Dmitry Ivanov <[email protected]>
arpad-m pushed a commit to neondatabase/rust-postgres that referenced this pull request Jan 29, 2025
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
Co-authored-by: Dmitry Ivanov <[email protected]>
arpad-m pushed a commit to neondatabase/rust-postgres that referenced this pull request Jan 30, 2025
This patch was implemented by Petros Angelatos and Jeff Davis
to support physical and logical replication in rust-postgres
(see sfackler#752).

The original PR never made it to the upstream, but we
(Neon) still use it in our own fork of rust-postgres.

The following commits were squashed together:

* Image configuration updates.

* Make simple_query::encode() pub(crate).

* decoding logic for replication protocol

* Connection string config for replication.

* add copy_both_simple method

* helper ReplicationStream type for replication protocol

This can be optionally used with a CopyBoth stream to decode the
replication protocol

* decoding logic for logical replication protocol

* helper LogicalReplicationStream type to decode logical replication

* add postgres replication integration test

* add simple query versions of copy operations

* replication: use SystemTime for timestamps at API boundary

Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Jeff Davis <[email protected]>
Co-authored-by: Dmitry Ivanov <[email protected]>
antiguru added a commit to antiguru/materialize that referenced this pull request Feb 28, 2025
Signed-off-by: Moritz Hoffmann <[email protected]>

# Conflicts:
#	src/compute/src/logging/timely.rs

diff --git c/Cargo.lock i/Cargo.lock
index b331fe40a0..eb1a1d5b1e 100644
--- c/Cargo.lock
+++ i/Cargo.lock
@@ -2390,21 +2390,23 @@ dependencies = [

 [[package]]
 name = "differential-dataflow"
-version = "0.13.6"
+version = "0.13.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fc8d6ede5efde0ebabf8f48cdd8d5eadda4ad6c033cf703d58cacfd6be009635"
+checksum = "922c18a0f94e29defaef228ecd65589880c16f3f3462a33258f869119f039443"
 dependencies = [
  "columnar",
+ "columnation",
  "fnv",
+ "paste",
  "serde",
  "timely",
 ]

 [[package]]
 name = "differential-dogs3"
-version = "0.1.6"
+version = "0.1.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ea29c7d272a8b0e95694aa6cbfef7a2a76bedb42a284602cdfec5972ab7d1641"
+checksum = "680def2e24f4d035de6ce128d4820f5533d9afe872d85e87b5c8ed84946c8118"
 dependencies = [
  "differential-dataflow",
  "serde",
@@ -2827,17 +2829,6 @@ dependencies = [
  "rustc_version",
 ]

-[[package]]
-name = "flatcontainer"
-version = "0.5.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0ff185ea156496de196dfd189038982f480515ea3338f1ff0a4fbff1e52ea0a6"
-dependencies = [
- "cfg-if",
- "paste",
- "serde",
-]
-
 [[package]]
 name = "flate2"
 version = "1.0.32"
@@ -5545,6 +5536,7 @@ dependencies = [
  "datadriven",
  "dec",
  "derivative",
+ "differential-dataflow",
  "encoding",
  "enum-iterator",
  "fallible-iterator",
@@ -6108,7 +6100,6 @@ dependencies = [
  "ctor",
  "derivative",
  "either",
- "flatcontainer",
  "futures",
  "hibitset",
  "http 1.2.0",
@@ -6594,7 +6585,6 @@ dependencies = [
  "dec",
  "differential-dataflow",
  "enum-kinds",
- "flatcontainer",
  "hex",
  "insta",
  "itertools 0.12.1",
@@ -10586,12 +10576,14 @@ dependencies = [

 [[package]]
 name = "timely"
-version = "0.18.1"
-source = "git+https://github.com/TimelyDataflow/timely-dataflow#f1ad08bed16c5683ad52e57597f2846733f7bff2"
+version = "0.19.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a714a3fed9aeacf63d9c5c574523c18972b788fa0414011b590af73acad30b09"
 dependencies = [
  "bincode",
  "byteorder",
  "columnar",
+ "columnation",
  "crossbeam-channel",
  "getopts",
  "serde",
@@ -10605,12 +10597,14 @@ dependencies = [
 [[package]]
 name = "timely_bytes"
 version = "0.13.0"
-source = "git+https://github.com/TimelyDataflow/timely-dataflow#f1ad08bed16c5683ad52e57597f2846733f7bff2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "46e1275de95b4a2713f0850c458d3a550dc323fffda65ce3e075f62545e0484b"

 [[package]]
 name = "timely_communication"
-version = "0.17.1"
-source = "git+https://github.com/TimelyDataflow/timely-dataflow#f1ad08bed16c5683ad52e57597f2846733f7bff2"
+version = "0.18.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f3cdbfc7739e6a8ed95cd591ec0e862f294c681796c6121e6b3fa1ab946473e1"
 dependencies = [
  "byteorder",
  "columnar",
@@ -10624,18 +10618,15 @@ dependencies = [

 [[package]]
 name = "timely_container"
-version = "0.14.0"
-source = "git+https://github.com/TimelyDataflow/timely-dataflow#f1ad08bed16c5683ad52e57597f2846733f7bff2"
-dependencies = [
- "columnation",
- "flatcontainer",
- "serde",
-]
+version = "0.15.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9c951c468b95e2070be7f48a9d8350b6e8e5ecb23e0d13fd7f6155893bb1297d"

 [[package]]
 name = "timely_logging"
-version = "0.13.2"
-source = "git+https://github.com/TimelyDataflow/timely-dataflow#f1ad08bed16c5683ad52e57597f2846733f7bff2"
+version = "0.13.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "10d46d6e2fbf5831ff8345f92723e46f778ce157cf8b74448fdcaac9efb9b9a2"
 dependencies = [
  "timely_container",
 ]
diff --git c/Cargo.toml i/Cargo.toml
index 7b32b4bd12..939050b69d 100644
--- c/Cargo.toml
+++ i/Cargo.toml
@@ -293,9 +293,6 @@ incremental = true
 # merged), after which point it becomes impossible to build that historical
 # version of Materialize.
 [patch.crates-io]
-timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
-
-
 # Waiting on https://github.com/sfackler/rust-postgres/pull/752.
 postgres = { git = "https://github.com/MaterializeInc/rust-postgres" }
 tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres" }
diff --git c/src/adapter-types/Cargo.toml i/src/adapter-types/Cargo.toml
index e9557931c0..9940ecd84e 100644
--- c/src/adapter-types/Cargo.toml
+++ i/src/adapter-types/Cargo.toml
@@ -15,7 +15,7 @@ mz-ore = { path = "../ore" }
 mz-repr = { path = "../repr" }
 mz-storage-types = { path = "../storage-types" }
 serde = "1.0.218"
-timely = "0.18.1"
+timely = "0.19.0"
 workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }

 [package.metadata.cargo-udeps.ignore]
diff --git c/src/adapter/Cargo.toml i/src/adapter/Cargo.toml
index 0be9de7ff9..6da297c267 100644
--- c/src/adapter/Cargo.toml
+++ i/src/adapter/Cargo.toml
@@ -18,7 +18,7 @@ bytesize = "1.3.0"
 chrono = { version = "0.4.39", default-features = false, features = ["std"] }
 dec = "0.4.8"
 derivative = "2.2.0"
-differential-dataflow = "0.13.6"
+differential-dataflow = "0.13.7"
 enum-kinds = "0.5.1"
 fail = { version = "0.5.1", features = ["failpoints"] }
 futures = "0.3.25"
@@ -82,7 +82,7 @@ serde_plain = "1.0.1"
 sha2 = "0.10.6"
 smallvec = { version = "1.10.0", features = ["union"] }
 static_assertions = "1.1"
-timely = "0.18.1"
+timely = "0.19.0"
 tokio = { version = "1.38.0", features = ["rt", "time"] }
 tokio-postgres = { version = "0.7.8" }
 tracing = "0.1.37"
diff --git c/src/catalog/Cargo.toml i/src/catalog/Cargo.toml
index b92fc0e838..ebab2fa4af 100644
--- c/src/catalog/Cargo.toml
+++ i/src/catalog/Cargo.toml
@@ -18,7 +18,7 @@ bytesize = "1.3.0"
 chrono = { version = "0.4.39", default-features = false, features = ["std"] }
 clap = { version = "4.5.23", features = ["derive"] }
 derivative = "2.2.0"
-differential-dataflow = "0.13.6"
+differential-dataflow = "0.13.7"
 futures = "0.3.25"
 ipnet = "2.11.0"
 itertools = "0.12.1"
@@ -60,7 +60,7 @@ serde_plain = "1.0.1"
 static_assertions = "1.1"
 sha2 = "0.10.6"
 thiserror = "2.0.11"
-timely = "0.18.1"
+timely = "0.19.0"
 tokio = { version = "1.38.0" }
 tracing = "0.1.37"
 uuid = "1.2.2"
diff --git c/src/cluster/Cargo.toml i/src/cluster/Cargo.toml
index 8451aff17a..cd3f4f0308 100644
--- c/src/cluster/Cargo.toml
+++ i/src/cluster/Cargo.toml
@@ -13,14 +13,14 @@ workspace = true
 anyhow = "1.0.95"
 async-trait = "0.1.83"
 crossbeam-channel = "0.5.14"
-differential-dataflow = "0.13.6"
+differential-dataflow = "0.13.7"
 futures = "0.3.25"
-lgalloc = "0.4.0"
+lgalloc = "0.5.0"
 mz-cluster-client = { path = "../cluster-client" }
 mz-ore = { path = "../ore", features = ["async", "process", "tracing"] }
 mz-service = { path = "../service" }
 regex = "1.10.6"
-timely = "0.18.1"
+timely = "0.19.0"
 tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] }
 tracing = "0.1.37"
 workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }
diff --git c/src/compute-client/Cargo.toml i/src/compute-client/Cargo.toml
index 38ee801651..b3851756d1 100644
--- c/src/compute-client/Cargo.toml
+++ i/src/compute-client/Cargo.toml
@@ -15,7 +15,7 @@ async-trait = "0.1.83"
 bytesize = "1.3.0"
 crossbeam-channel = "0.5.14"
 derivative = "2.2.0"
-differential-dataflow = "0.13.6"
+differential-dataflow = "0.13.7"
 futures = "0.3.25"
 http = "1.2.0"
 mz-build-info = { path = "../build-info" }
@@ -43,7 +43,7 @@ prost = { version = "0.13.4", features = ["no-recursion-limit"] }
 serde = { version = "1.0.218", features = ["derive"] }
 serde_json = "1.0.125"
 thiserror = "2.0.11"
-timely = "0.18.1"
+timely = "0.19.0"
 tokio = "1.38.0"
 tokio-stream = "0.1.17"
 tonic = "0.12.1"
diff --git c/src/compute-types/Cargo.toml i/src/compute-types/Cargo.toml
index da3ea9d6ef..b53444124f 100644
--- c/src/compute-types/Cargo.toml
+++ i/src/compute-types/Cargo.toml
@@ -12,7 +12,7 @@ workspace = true
 [dependencies]
 columnar = "0.3.0"
 columnation = "0.1.0"
-differential-dataflow = "0.13.6"
+differential-dataflow = "0.13.7"
 itertools = "0.12.1"
 mz-dyncfg = { path = "../dyncfg" }
 mz-expr = { path = "../expr" }
@@ -24,7 +24,7 @@ proptest = { version = "1.6.0", default-features = false, features = ["std"] }
 proptest-derive = { version = "0.5.1", features = ["boxed_union"] }
 prost = { version = "0.13.4", features = ["no-recursion-limit"] }
 serde = { version = "1.0.218", features = ["derive"] }
-timely = "0.18.1"
+timely = "0.19.0"
 tracing = "0.1.37"
 workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }

diff --git c/src/compute/Cargo.toml i/src/compute/Cargo.toml
index af3bd06e88..1a0172e839 100644
--- c/src/compute/Cargo.toml
+++ i/src/compute/Cargo.toml
@@ -16,8 +16,8 @@ bytesize = "1.3.0"
 columnar = "0.3.0"
 crossbeam-channel = "0.5.14"
 dec = { version = "0.4.8", features = ["serde"] }
-differential-dataflow = "0.13.6"
-differential-dogs3 = "0.1.6"
+differential-dataflow = "0.13.7"
+differential-dogs3 = "0.1.7"
 futures = "0.3.25"
 itertools = "0.12.1"
 lgalloc = "0.5"
@@ -28,7 +28,7 @@ mz-dyncfg = { path = "../dyncfg" }
 mz-dyncfgs = { path = "../dyncfgs" }
 mz-expr = { path = "../expr" }
 mz-metrics = { path = "../metrics" }
-mz-ore = { path = "../ore", features = ["async", "flatcontainer", "process", "tracing"] }
+mz-ore = { path = "../ore", features = ["async", "process", "tracing"] }
 mz-persist-client = { path = "../persist-client" }
 mz-persist-types = { path = "../persist-types" }
 mz-repr = { path = "../repr" }
@@ -41,7 +41,7 @@ prometheus = { version = "0.13.3", default-features = false }
 scopeguard = "1.1.0"
 serde = { version = "1.0.218", features = ["derive"] }
 smallvec = { version = "1.10.0", features = ["serde", "union"] }
-timely = "0.18.1"
+timely = "0.19.0"
 tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] }
 tracing = "0.1.37"
 uuid = { version = "1.7.0", features = ["serde", "v4"] }
diff --git c/src/compute/src/compute_state.rs i/src/compute/src/compute_state.rs
index e3b5ef3fb0..168e1ce2f6 100644
--- c/src/compute/src/compute_state.rs
+++ i/src/compute/src/compute_state.rs
@@ -16,9 +16,9 @@ use std::time::{Duration, Instant};

 use bytesize::ByteSize;
 use differential_dataflow::lattice::Lattice;
-use differential_dataflow::trace::cursor::IntoOwned;
 use differential_dataflow::trace::{Cursor, TraceReader};
 use differential_dataflow::Hashable;
+use differential_dataflow::IntoOwned;
 use mz_compute_client::logging::LoggingConfig;
 use mz_compute_client::protocol::command::{
     ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
diff --git c/src/compute/src/extensions/arrange.rs i/src/compute/src/extensions/arrange.rs
index 555bc69a02..a169258311 100644
--- c/src/compute/src/extensions/arrange.rs
+++ i/src/compute/src/extensions/arrange.rs
@@ -9,13 +9,13 @@

 use std::rc::Rc;

+use differential_dataflow::containers::Columnation;
 use differential_dataflow::difference::Semigroup;
 use differential_dataflow::lattice::Lattice;
 use differential_dataflow::operators::arrange::arrangement::arrange_core;
 use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
 use differential_dataflow::trace::{Batch, Batcher, Builder, Trace, TraceReader};
 use differential_dataflow::{Collection, Data, ExchangeData, Hashable};
-use timely::container::columnation::Columnation;
 use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
 use timely::dataflow::operators::Operator;
 use timely::dataflow::{Scope, ScopeParent, StreamCore};
@@ -457,79 +457,3 @@ where
         })
     }
 }
-
-mod flatcontainer {
-    use differential_dataflow::difference::Semigroup;
-    use differential_dataflow::lattice::Lattice;
-    use differential_dataflow::operators::arrange::Arranged;
-    use differential_dataflow::trace::TraceReader;
-    use mz_ore::flatcontainer::MzRegionPreference;
-    use timely::container::flatcontainer::{IntoOwned, Push, Region, ReserveItems};
-    use timely::dataflow::Scope;
-    use timely::progress::Timestamp;
-    use timely::PartialOrder;
-
-    use crate::extensions::arrange::{log_arrangement_size_inner, ArrangementSize};
-    use crate::typedefs::{FlatKeyValAgent, FlatKeyValSpine};
-
-    impl<G, K, V, T, R> ArrangementSize for Arranged<G, FlatKeyValAgent<K, V, T, R>>
-    where
-        Self: Clone,
-        G: Scope<Timestamp = T::Owned>,
-        G::Timestamp: Lattice + Ord + MzRegionPreference,
-        K: Region
-            + Clone
-            + Push<<K as Region>::Owned>
-            + for<'a> Push<<K as Region>::ReadItem<'a>>
-            + for<'a> ReserveItems<<K as Region>::ReadItem<'a>>
-            + 'static,
-        V: Region
-            + Clone
-            + Push<<V as Region>::Owned>
-            + for<'a> Push<<V as Region>::ReadItem<'a>>
-            + for<'a> ReserveItems<<V as Region>::ReadItem<'a>>
-            + 'static,
-        T: Region
-            + Clone
-            + Push<<T as Region>::Owned>
-            + for<'a> Push<<T as Region>::ReadItem<'a>>
-            + for<'a> ReserveItems<<T as Region>::ReadItem<'a>>
-            + 'static,
-        R: Region
-            + Clone
-            + Push<<R as Region>::Owned>
-            + for<'a> Push<&'a <R as Region>::Owned>
-            + for<'a> Push<<R as Region>::ReadItem<'a>>
-            + for<'a> ReserveItems<<R as Region>::ReadItem<'a>>
-            + 'static,
-        K::Owned: Clone + Ord,
-        V::Owned: Clone + Ord,
-        T::Owned: Lattice + for<'a> PartialOrder<<T as Region>::ReadItem<'a>> + Timestamp,
-        R::Owned:
-            Default + Ord + Semigroup + for<'a> Semigroup<<R as Region>::ReadItem<'a>> + 'static,
-        for<'a> <K as Region>::ReadItem<'a>: Copy + Ord,
-        for<'a> <V as Region>::ReadItem<'a>: Copy + Ord,
-        for<'a> <T as Region>::ReadItem<'a>: Copy + IntoOwned<'a> + Ord + PartialOrder<T::Owned>,
-        for<'a> <R as Region>::ReadItem<'a>: Copy + IntoOwned<'a, Owned = R::Owned> + Ord,
-    {
-        fn log_arrangement_size(self) -> Self {
-            log_arrangement_size_inner::<_, FlatKeyValSpine<K, V, T, R>, _>(self, |trace| {
-                let (mut size, mut capacity, mut allocations) = (0, 0, 0);
-                let mut callback = |siz, cap| {
-                    size += siz;
-                    capacity += cap;
-                    allocations += usize::from(cap > 0);
-                };
-                trace.map_batches(|batch| {
-                    batch.storage.keys.heap_size(&mut callback);
-                    batch.storage.keys_offs.heap_size(&mut callback);
-                    batch.storage.vals.heap_size(&mut callback);
-                    batch.storage.vals_offs.heap_size(&mut callback);
-                    batch.storage.times.heap_size(&mut callback);
-                    batch.storage.diffs.heap_size(&mut callback);
-                });
-                (size, capacity, allocations)
-            })
-        }
-    }
-}
diff --git c/src/compute/src/extensions/reduce.rs i/src/compute/src/extensions/reduce.rs
index 7d21e7437d..460fe46519 100644
--- c/src/compute/src/extensions/reduce.rs
+++ i/src/compute/src/extensions/reduce.rs
@@ -16,9 +16,9 @@
 use differential_dataflow::difference::{Abelian, Semigroup};
 use differential_dataflow::lattice::Lattice;
 use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
-use differential_dataflow::trace::cursor::IntoOwned;
 use differential_dataflow::trace::{Batch, Builder, Trace, TraceReader};
 use differential_dataflow::Data;
+use differential_dataflow::IntoOwned;
 use timely::container::PushInto;
 use timely::dataflow::Scope;
 use timely::Container;
diff --git c/src/compute/src/logging/timely.rs i/src/compute/src/logging/timely.rs
index 671a54a52c..f8d852fc55 100644
--- c/src/compute/src/logging/timely.rs
+++ i/src/compute/src/logging/timely.rs
@@ -14,6 +14,7 @@ use std::rc::Rc;
 use std::time::Duration;

 use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
+use differential_dataflow::containers::{Columnation, CopyRegion};
 use mz_compute_client::logging::LoggingConfig;
 use mz_ore::cast::CastFrom;
 use mz_repr::{Datum, Diff, Timestamp};
@@ -21,7 +22,7 @@ use mz_timely_util::containers::{
     columnar_exchange, Col2ValBatcher, Column, ColumnBuilder, ProvidedBuilder,
 };
 use mz_timely_util::replay::MzReplay;
-use timely::container::columnation::{Columnation, CopyRegion};
+use timely::communication::Allocate;
 use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
 use timely::dataflow::channels::pushers::buffer::Session;
 use timely::dataflow::channels::pushers::{Counter, Tee};
diff --git c/src/compute/src/render.rs i/src/compute/src/render.rs
index 3fde6605ae..9a9316f808 100644
--- c/src/compute/src/render.rs
+++ i/src/compute/src/render.rs
@@ -111,11 +111,12 @@ use std::sync::Arc;
 use std::task::Poll;

 use columnar::Columnar;
+use differential_dataflow::containers::Columnation;
 use differential_dataflow::dynamic::pointstamp::PointStamp;
 use differential_dataflow::lattice::Lattice;
 use differential_dataflow::operators::arrange::Arranged;
-use differential_dataflow::trace::cursor::IntoOwned;
 use differential_dataflow::trace::TraceReader;
+use differential_dataflow::IntoOwned;
 use differential_dataflow::{AsCollection, Collection, Data};
 use futures::channel::oneshot;
 use futures::FutureExt;
@@ -134,7 +135,6 @@ use mz_storage_types::controller::CollectionMetadata;
 use mz_storage_types::errors::DataflowError;
 use mz_timely_util::operator::{CollectionExt, StreamExt};
 use timely::communication::Allocate;
-use timely::container::columnation::Columnation;
 use timely::dataflow::channels::pact::Pipeline;
 use timely::dataflow::operators::to_stream::ToStream;
 use timely::dataflow::operators::{probe, BranchWhen, Operator, Probe};
diff --git c/src/compute/src/render/context.rs i/src/compute/src/render/context.rs
index 20befab8de..c622d2a3bc 100644
--- c/src/compute/src/render/context.rs
+++ i/src/compute/src/render/context.rs
@@ -16,10 +16,11 @@ use std::sync::mpsc;

 use columnar::Columnar;
 use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
+use differential_dataflow::containers::Columnation;
 use differential_dataflow::lattice::Lattice;
 use differential_dataflow::operators::arrange::Arranged;
-use differential_dataflow::trace::cursor::IntoOwned;
 use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
+use differential_dataflow::IntoOwned;
 use differential_dataflow::{AsCollection, Collection, Data};
 use mz_compute_types::dataflows::DataflowDescription;
 use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION;
@@ -32,7 +33,6 @@ use mz_storage_types::controller::CollectionMetadata;
 use mz_storage_types::errors::DataflowError;
 use mz_timely_util::containers::{columnar_exchange, Col2ValBatcher, ColumnBuilder};
 use mz_timely_util::operator::{CollectionExt, StreamExt};
-use timely::container::columnation::Columnation;
 use timely::container::CapacityContainerBuilder;
 use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
 use timely::dataflow::operators::generic::OutputHandleCore;
diff --git c/src/compute/src/render/errors.rs i/src/compute/src/render/errors.rs
index 887015ef67..f70cfd2506 100644
--- c/src/compute/src/render/errors.rs
+++ i/src/compute/src/render/errors.rs
@@ -11,9 +11,9 @@

 use std::hash::Hash;

+use differential_dataflow::containers::Columnation;
 use differential_dataflow::ExchangeData;
 use mz_repr::Row;
-use timely::container::columnation::Columnation;

 use crate::render::context::ShutdownToken;

diff --git c/src/compute/src/render/join/delta_join.rs i/src/compute/src/render/join/delta_join.rs
index ceef6cd23a..cc482ba003 100644
--- c/src/compute/src/render/join/delta_join.rs
+++ i/src/compute/src/render/join/delta_join.rs
@@ -18,8 +18,8 @@ use std::collections::{BTreeMap, BTreeSet};
 use columnar::Columnar;
 use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
 use differential_dataflow::operators::arrange::Arranged;
-use differential_dataflow::trace::cursor::IntoOwned;
 use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
+use differential_dataflow::IntoOwned;
 use differential_dataflow::{AsCollection, Collection};
 use mz_compute_types::plan::join::delta_join::{DeltaJoinPlan, DeltaPathPlan, DeltaStagePlan};
 use mz_compute_types::plan::join::JoinClosure;
diff --git c/src/compute/src/render/join/linear_join.rs i/src/compute/src/render/join/linear_join.rs
index 2f6ddfd9c9..1756ecdcff 100644
--- c/src/compute/src/render/join/linear_join.rs
+++ i/src/compute/src/render/join/linear_join.rs
@@ -15,6 +15,7 @@ use std::time::{Duration, Instant};

 use columnar::Columnar;
 use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
+use differential_dataflow::containers::Columnation;
 use differential_dataflow::lattice::Lattice;
 use differential_dataflow::operators::arrange::arrangement::Arranged;
 use differential_dataflow::trace::TraceReader;
@@ -28,7 +29,6 @@ use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
 use mz_storage_types::errors::DataflowError;
 use mz_timely_util::containers::{columnar_exchange, Col2ValBatcher, ColumnBuilder};
 use mz_timely_util::operator::{CollectionExt, StreamExt};
-use timely::container::columnation::Columnation;
 use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
 use timely::dataflow::operators::OkErr;
 use timely::dataflow::scopes::Child;
diff --git c/src/compute/src/render/join/mz_join_core.rs i/src/compute/src/render/join/mz_join_core.rs
index 3848256fd5..935f95257c 100644
--- c/src/compute/src/render/join/mz_join_core.rs
+++ i/src/compute/src/render/join/mz_join_core.rs
@@ -44,9 +44,9 @@ use differential_dataflow::consolidation::{consolidate, consolidate_updates};
 use differential_dataflow::difference::Multiply;
 use differential_dataflow::lattice::Lattice;
 use differential_dataflow::operators::arrange::arrangement::Arranged;
-use differential_dataflow::trace::cursor::IntoOwned;
 use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
 use differential_dataflow::Data;
+use differential_dataflow::IntoOwned;
 use mz_repr::Diff;
 use timely::container::{CapacityContainerBuilder, PushInto, SizableContainer};
 use timely::dataflow::channels::pact::Pipeline;
diff --git c/src/compute/src/render/reduce.rs i/src/compute/src/render/reduce.rs
index 335b40c185..a0eb7a4275 100644
--- c/src/compute/src/render/reduce.rs
+++ i/src/compute/src/render/reduce.rs
@@ -17,12 +17,13 @@ use std::sync::LazyLock;
 use dec::OrderedDecimal;
 use differential_dataflow::collection::AsCollection;
 use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
+use differential_dataflow::containers::{Columnation, CopyRegion};
 use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
 use differential_dataflow::hashable::Hashable;
 use differential_dataflow::lattice::Lattice;
 use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
-use differential_dataflow::trace::cursor::IntoOwned;
 use differential_dataflow::trace::{Batch, Builder, Trace, TraceReader};
+use differential_dataflow::IntoOwned;
 use differential_dataflow::{Collection, Diff as _};
 use mz_compute_types::plan::reduce::{
     reduction_type, AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan,
@@ -37,7 +38,6 @@ use mz_repr::{Datum, DatumList, DatumVec, Diff, Row, RowArena, SharedRow};
 use mz_storage_types::errors::DataflowError;
 use mz_timely_util::operator::CollectionExt;
 use serde::{Deserialize, Serialize};
-use timely::container::columnation::{Columnation, CopyRegion};
 use timely::container::{CapacityContainerBuilder, PushInto};
 use timely::dataflow::Scope;
 use timely::progress::timestamp::Refines;
@@ -2308,12 +2308,12 @@ mod monoids {
     // add a new enum variant here), because other code (e.g., `HierarchicalOneByOneAggr`)
     // assumes this.

+    use differential_dataflow::containers::{Columnation, Region};
     use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
     use mz_expr::AggregateFunc;
     use mz_ore::soft_panic_or_log;
     use mz_repr::{Datum, Diff, Row};
     use serde::{Deserialize, Serialize};
-    use timely::container::columnation::{Columnation, Region};

     /// A monoid containing a single-datum row.
     #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
diff --git c/src/compute/src/render/threshold.rs i/src/compute/src/render/threshold.rs
index cd22d5529b..1a31367806 100644
--- c/src/compute/src/render/threshold.rs
+++ i/src/compute/src/render/threshold.rs
@@ -11,15 +11,15 @@
 //!
 //! Consult [ThresholdPlan] documentation for details.

+use differential_dataflow::containers::Columnation;
 use differential_dataflow::lattice::Lattice;
 use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
-use differential_dataflow::trace::cursor::IntoOwned;
 use differential_dataflow::trace::{Batch, Builder, Trace, TraceReader};
 use differential_dataflow::Data;
+use differential_dataflow::IntoOwned;
 use mz_compute_types::plan::threshold::{BasicThresholdPlan, ThresholdPlan};
 use mz_expr::MirScalarExpr;
 use mz_repr::Diff;
-use timely::container::columnation::Columnation;
 use timely::container::PushInto;
 use timely::dataflow::Scope;
 use timely::progress::timestamp::Refines;
diff --git c/src/compute/src/render/top_k.rs i/src/compute/src/render/top_k.rs
index c4e2904c1d..d345b6cd48 100644
--- c/src/compute/src/render/top_k.rs
+++ i/src/compute/src/render/top_k.rs
@@ -12,11 +12,12 @@
 //! Consult [TopKPlan] documentation for details.

 use columnar::Columnar;
+use differential_dataflow::containers::Columnation;
 use differential_dataflow::hashable::Hashable;
 use differential_dataflow::lattice::Lattice;
 use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
-use differential_dataflow::trace::cursor::IntoOwned;
 use differential_dataflow::trace::{Batch, Builder, Trace, TraceReader};
+use differential_dataflow::IntoOwned;
 use differential_dataflow::{AsCollection, Collection};
 use mz_compute_types::plan::top_k::{
     BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan,
@@ -31,7 +32,6 @@ use mz_timely_util::operator::CollectionExt;
 use std::cell::RefCell;
 use std::collections::BTreeMap;
 use std::rc::Rc;
-use timely::container::columnation::Columnation;
 use timely::container::{CapacityContainerBuilder, PushInto};
 use timely::dataflow::channels::pact::Pipeline;
 use timely::dataflow::operators::Operator;
@@ -841,11 +841,11 @@ pub mod monoids {
     use std::hash::{Hash, Hasher};
     use std::rc::Rc;

+    use differential_dataflow::containers::{Columnation, Region};
     use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
     use mz_expr::ColumnOrder;
     use mz_repr::{DatumVec, Diff, Row};
     use serde::{Deserialize, Serialize};
-    use timely::container::columnation::{Columnation, Region};

     /// A monoid containing a row and an ordering.
     #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash, Default)]
diff --git c/src/compute/src/row_spine.rs i/src/compute/src/row_spine.rs
index d66479d78f..40816866e0 100644
--- c/src/compute/src/row_spine.rs
+++ i/src/compute/src/row_spine.rs
@@ -20,6 +20,7 @@ use differential_dataflow::trace::implementations::OffsetList;
 mod spines {
     use std::rc::Rc;

+    use differential_dataflow::containers::{Columnation, TimelyStack};
     use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder};
     use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdValBuilder};
     use differential_dataflow::trace::implementations::spine_fueled::Spine;
@@ -27,7 +28,6 @@ mod spines {
     use differential_dataflow::trace::implementations::Update;
     use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
     use mz_repr::Row;
-    use timely::container::columnation::{Columnation, TimelyStack};

     use crate::row_spine::{DatumContainer, OffsetOptimized};
     use crate::typedefs::{KeyBatcher, KeyValBatcher};
@@ -99,7 +99,7 @@ mod spines {

 /// A `Row`-specialized container using dictionary compression.
 mod container {
-    use differential_dataflow::trace::cursor::IntoOwned;
+    use differential_dataflow::IntoOwned;
     use std::cmp::Ordering;

     use differential_dataflow::trace::implementations::BatchContainer;
diff --git c/src/compute/src/sink/correction_v2.rs i/src/compute/src/sink/correction_v2.rs
index 5f0e77b962..6b0ec40af2 100644
--- c/src/compute/src/sink/correction_v2.rs
+++ i/src/compute/src/sink/correction_v2.rs
@@ -129,10 +129,10 @@ use std::collections::{BinaryHeap, VecDeque};
 use std::fmt;
 use std::rc::Rc;

+use differential_dataflow::containers::{Columnation, TimelyStack};
 use differential_dataflow::trace::implementations::BatchContainer;
 use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
 use mz_repr::{Diff, Timestamp};
-use timely::container::columnation::{Columnation, TimelyStack};
 use timely::container::SizableContainer;
 use timely::progress::Antichain;
 use timely::{Container, PartialOrder};
diff --git c/src/compute/src/typedefs.rs i/src/compute/src/typedefs.rs
index 6325edd515..c824ea85a1 100644
--- c/src/compute/src/typedefs.rs
+++ i/src/compute/src/typedefs.rs
@@ -15,21 +15,14 @@ use differential_dataflow::operators::arrange::Arranged;
 use differential_dataflow::operators::arrange::TraceAgent;
 use differential_dataflow::trace::implementations::chunker::ColumnationChunker;
 use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher};
-use differential_dataflow::trace::implementations::ord_neu::{
-    FlatValBatcher, FlatValBuilder, FlatValSpine, OrdValBatch,
-};
 use differential_dataflow::trace::wrappers::enter::TraceEnter;
 use differential_dataflow::trace::wrappers::frontier::TraceFrontier;
-use mz_ore::flatcontainer::MzRegionPreference;
 use mz_repr::Diff;
 use mz_storage_types::errors::DataflowError;
-use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
 use timely::dataflow::ScopeParent;

 use crate::row_spine::RowValBuilder;
-use crate::typedefs::spines::{
-    ColKeyBatcher, ColKeyBuilder, ColValBatcher, ColValBuilder, MzFlatLayout,
-};
+use crate::typedefs::spines::{ColKeyBatcher, ColKeyBuilder, ColValBatcher, ColValBuilder};

 pub use crate::row_spine::{RowRowSpine, RowSpine, RowValBatcher, RowValSpine};
 pub use crate::typedefs::spines::{ColKeySpine, ColValSpine};
@@ -37,17 +30,13 @@ pub use crate::typedefs::spines::{ColKeySpine, ColValSpine};
 pub(crate) mod spines {
     use std::rc::Rc;

-    use differential_dataflow::difference::Semigroup;
-    use differential_dataflow::lattice::Lattice;
+    use differential_dataflow::containers::{Columnation, TimelyStack};
     use differential_dataflow::trace::implementations::ord_neu::{
         OrdKeyBatch, OrdKeyBuilder, OrdValBatch, OrdValBuilder,
     };
     use differential_dataflow::trace::implementations::spine_fueled::Spine;
     use differential_dataflow::trace::implementations::{Layout, Update};
     use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
-    use timely::container::columnation::{Columnation, TimelyStack};
-    use timely::container::flatcontainer::{FlatStack, Push, Region};
-    use timely::progress::Timestamp;

     use crate::row_spine::OffsetOptimized;
     use crate::typedefs::{KeyBatcher, KeyValBatcher};
@@ -83,67 +72,6 @@ pub(crate) mod spines {
         type DiffContainer = TimelyStack<U::Diff>;
         type OffsetContainer = OffsetOptimized;
     }
-
-    /// A layout based on flat container stacks
-    pub struct MzFlatLayout<K, V, T, R> {
-        phantom: std::marker::PhantomData<(K, V, T, R)>,
-    }
-
-    impl<K, V, T, R> Update for MzFlatLayout<K, V, T, R>
-    where
-        K: Region,
-        V: Region,
-        T: Region,
-        R: Region,
-        K::Owned: Ord + Clone + 'static,
-        V::Owned: Ord + Clone + 'static,
-        T::Owned: Ord + Clone + Lattice + Timestamp + 'static,
-        R::Owned: Ord + Semigroup + 'static,
-    {
-        type Key = K::Owned;
-        type Val = V::Owned;
-        type Time = T::Owned;
-        type Diff = R::Owned;
-    }
-
-    /// Layout implementation for [`MzFlatLayout`]. Mostly equivalent to differential's
-    /// flat layout but with a different opinion for the offset container. Here, we use
-    /// [`OffsetOptimized`] instead of an offset list. If differential should gain access
-    /// to the optimized variant, we might be able to remove this implementation.
-    impl<K, V, T, R> Layout for MzFlatLayout<K, V, T, R>
-    where
-        K: Region
-            + Push<<K as Region>::Owned>
-            + for<'a> Push<<K as Region>::ReadItem<'a>>
-            + 'static,
-        V: Region
-            + Push<<V as Region>::Owned>
-            + for<'a> Push<<V as Region>::ReadItem<'a>>
-            + 'static,
-        T: Region
-            + Push<<T as Region>::Owned>
-            + for<'a> Push<<T as Region>::ReadItem<'a>>
-            + 'static,
-        R: Region
-            + Push<<R as Region>::Owned>
-            + for<'a> Push<<R as Region>::ReadItem<'a>>
-            + 'static,
-        K::Owned: Ord + Clone + 'static,
-        V::Owned: Ord + Clone + 'static,
-        T::Owned: Ord + Clone + Lattice + Timestamp + 'static,
-        R::Owned: Ord + Semigroup + 'static,
-        for<'a> K::ReadItem<'a>: Copy + Ord,
-        for<'a> V::ReadItem<'a>: Copy + Ord,
-        for<'a> T::ReadItem<'a>: Copy + Ord,
-        for<'a> R::ReadItem<'a>: Copy + Ord,
-    {
-        type Target = Self;
-        type KeyContainer = FlatStack<K>;
-        type ValContainer = FlatStack<V>;
-        type TimeContainer = FlatStack<T>;
-        type DiffContainer = FlatStack<R>;
-        type OffsetContainer = OffsetOptimized;
-    }
 }

 // Spines are data structures that collect and maintain updates.
@@ -193,33 +121,3 @@ pub type RowErrBuilder<T, R> = RowValBuilder<DataflowError, T, R>;
 pub type KeyBatcher<K, T, D> = KeyValBatcher<K, (), T, D>;
 pub type KeyValBatcher<K, V, T, D> =
     MergeBatcher<Vec<((K, V), T, D)>, ColumnationChunker<((K, V), T, D)>, ColMerger<(K, V), T, D>>;
-
-pub type FlatKeyValBatch<K, V, T, R> = OrdValBatch<MzFlatLayout<K, V, T, R>>;
-pub type FlatKeyValSpine<K, V, T, R> = FlatValSpine<MzFlatLayout<K, V, T, R>>;
-pub type FlatKeyValSpineDefault<K, V, T, R> = FlatKeyValSpine<
-    <K as MzRegionPreference>::Region,
-    <V as MzRegionPreference>::Region,
-    <T as MzRegionPreference>::Region,
-    <R as MzRegionPreference>::Region,
->;
-pub type FlatKeyValBatcher<K, V, T, R, C> =
-    FlatValBatcher<TupleABCRegion<TupleABRegion<K, V>, T, R>, C>;
-pub type FlatKeyValBatcherDefault<K, V, T, R, C> = FlatKeyValBatcher<
-    <K as MzRegionPreference>::Region,
-    <V as MzRegionPreference>::Region,
-    <T as MzRegionPreference>::Region,
-    <R as MzRegionPreference>::Region,
-    C,
->;
-pub type FlatKeyValBuilder<K, V, T, R> =
-    FlatValBuilder<MzFlatLayout<K, V, T, R>, TupleABCRegion<TupleABRegion<K, V>, T, R>>;
-pub type FlatKeyValBuilderDefault<K, V, T, R> = FlatKeyValBuilder<
-    <K as MzRegionPreference>::Region,
-    <V as MzRegionPreference>::Region,
-    <T as MzRegionPreference>::Region,
-    <R as MzRegionPreference>::Region,
->;
-
-pub type FlatKeyValAgent<K, V, T, R> = TraceAgent<FlatKeyValSpine<K, V, T, R>>;
-pub type FlatKeyValEnter<K, V, T, R, TEnter> =
-    TraceEnter<TraceFrontier<FlatKeyValAgent<K, V, T, R>>, TEnter>;
diff --git c/src/controller/Cargo.toml i/src/controller/Cargo.toml
index a4425e0901..6c76c7c55c 100644
--- c/src/controller/Cargo.toml
+++ i/src/controller/Cargo.toml
@@ -32,7 +32,7 @@ mz-txn-wal = { path = "../txn-wal" }
 regex = "1.10.6"
 serde = { version = "1.0.218", features = ["derive"] }
 serde_json = "1.0.125"
-timely = "0.18.1"
+timely = "0.19.0"
 tokio = "1.38.0"
 tracing = "0.1.37"
 uuid = { version = "1.7.0" }
diff --git c/src/durable-cache/Cargo.toml i/src/durable-cache/Cargo.toml
index 0ce084ad7a..753858e65c 100644
--- c/src/durable-cache/Cargo.toml
+++ i/src/durable-cache/Cargo.toml
@@ -12,7 +12,7 @@ workspace = true
 [dependencies]
 async-trait = "0.1.83"
 bytes = { version = "1.3.0" }
-differential-dataflow = "0.13.6"
+differential-dataflow = "0.13.7"
 futures = "0.3.25"
 itertools = { version = "0.12.1" }
 mz-ore = { path = "../ore", features = ["process"] }
@@ -23,7 +23,7 @@ mz-timely-util = { path = "../timely-util" }
 prometheus = { version = "0.13.3", default-features = false }
 prost = { version = "0.13.4", features = ["no-recursion-limit"] }
 serde = { version = "1.0.218", features = ["derive", "rc"] }
-timely = "0.18.1"
+timely = "0.19.0"
 tokio = { version = "1.38.0", default-features = false, features = ["rt", "rt-multi-thread"] }
 tracing = "0.1.37"
 uuid = { version = "1.7.0", features = ["v4"] }
diff --git c/src/environmentd/Cargo.toml i/src/environmentd/Cargo.toml
index 9d4e946a62..fe73371663 100644
--- c/src/environmentd/Cargo.toml
+++ i/src/environmentd/Cargo.toml
@@ -145,7 +145,7 @@ reqwest = { version = "0.11.13", features = ["blocking"] }
 serde_json = "1.0.125"
 serde_urlencoded = "0.7.1"
 similar-asserts = "1.4"
-timely = "0.18.1"
+timely = "0.19.0"
 tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] }

 [build-dependencies]
diff --git c/src/expr/Cargo.toml i/src/expr/Cargo.toml
index 91ec768698..b58fbadc85 100644
--- c/src/expr/Cargo.toml
+++ i/src/expr/Cargo.toml
@@ -26,6 +26,7 @@ chrono = { version = "0.4.39", default-features = false, features = ["std"] }
 chrono-tz = { version = "0.8.1", features = ["serde", "case-insensitive"] }
 crc32fast = "1.4.2"
 csv = "1.3.1"
+differential-dataflow = "0.13.7"
 dec = "0.4.8"
 derivative = "2.2.0"
 encoding = "0.2.0"
@@ -59,7 +60,7 @@ serde_json = "1.0.125"
 sha1 = "0.10.5"
 sha2 = "0.10.6"
 subtle = "2.4.1"
-timely = "0.18.1"
+timely = "0.19.0"
 tracing = "0.1.37"
 uncased = "0.9.7"
 uuid = { version = "1.7.0", features = ["v5"] }
diff --git c/src/expr/src/relation.rs i/src/expr/src/relation.rs
index 9ee748b83b..28e396773b 100644
--- c/src/expr/src/relation.rs
+++ i/src/expr/src/relation.rs
@@ -18,6 +18,7 @@ use std::num::NonZeroU64;
 use std::time::Instant;

 use bytesize::ByteSize;
+use differential_dataflow::containers::{Columnation, CopyRegion};
 use itertools::Itertools;
 use mz_lowertest::MzReflect;
 use mz_ore::cast::CastFrom;
@@ -42,7 +43,6 @@ use proptest::prelude::{any, Arbitrary, BoxedStrategy};
 use proptest::strategy::{Strategy, Union};
 use proptest_derive::Arbitrary;
 use serde::{Deserialize, Serialize};
-use timely::container::columnation::{Columnation, CopyRegion};

 use crate::explain::{HumanizedExpr, HumanizerMode};
 use crate::relation::func::{AggregateFunc, LagLeadType, TableFunc};
diff --git c/src/interchange/Cargo.toml i/src/interchange/Cargo.toml
index b0194f126e..0d357cb370 100644
--- c/src/interchange/Cargo.toml
+++ i/src/interchange/Cargo.toml
@@ -20,7 +20,7 @@ byteorder = "1.4.3"
 bytes = "1.3.0"
 chrono = { version = "0.4.39", default-features = false, features = ["std"] }
 clap = { version = "4.5.23", features = ["derive"] }
-differential-dataflow = "0.13.6"
+differential-dataflow = "0.13.7"
 itertools = "0.12.1"
 maplit = "1.0.2"
 mz-avro = { path = "../avro", features = ["snappy"] }
@@ -33,7 +33,7 @@ prost = { version = "0.13.4", features = ["no-recursion-limit"] }
 prost-reflect = "0.14.6"
 seahash = "4"
 serde_json = "1.0.125"
-timely = "0.18.1"
+timely = "0.19.0"
 tokio = { version = "1.38.0", features = ["macros", "net", "rt", "rt-multi-thread", "time"] }
 tracing = "0.1.37"
 uuid = { version = "1.7.0", features = ["serde"] }
diff --git c/src/interchange/src/envelopes.rs i/src/interchange/src/envelopes.rs
index 87d4053e62..dc685b3347 100644
--- c/src/interchange/src/envelopes.rs
+++ i/src/interchange/src/envelopes.rs
@@ -13,8 +13,8 @@ use std::sync::LazyLock;

 use differential_dataflow::lattice::Lattice;
 use differential_dataflow::operators::arrange::Arranged;
-use differential_dataflow::trace::cursor::IntoOwned;
 use differential_dataflow::trace::{Batch, BatchReader, Cursor, TraceReader};
+use differential_dataflow::IntoOwned;
 use differential_dataflow::{AsCollection, Collection};
 use itertools::{EitherOrBoth, Itertools};
 use maplit::btreemap;
diff --git c/src/ore/BUILD.bazel i/src/ore/BUILD.bazel
index dc637499cc..62d4551a7b 100644
--- c/src/ore/BUILD.bazel
+++ i/src/ore/BUILD.bazel
@@ -39,7 +39,6 @@ rust_library(
         "ctor",
         "default",
         "derivative",
-        "flatcontainer",
         "futures",
         "hibitset",
         "http",
@@ -120,7 +119,6 @@ rust_test(
         "ctor",
         "default",
         "derivative",
-        "flatcontainer",
         "futures",
         "hibitset",
         "http",
diff --git c/src/ore/Cargo.toml i/src/ore/Cargo.toml
index 8e67f6c332..224adec8be 100644
--- c/src/ore/Cargo.toml
+++ i/src/ore/Cargo.toml
@@ -29,7 +29,6 @@ compact_bytes = { version = "0.1.3", optional = true }
 ctor = { version = "0.1.26", optional = true }
 derivative = { version = "2.2.0", optional = true }
 either = "1.8.0"
-flatcontainer = { version = "0.5.0", optional = true }
 futures = { version = "0.3.25", optional = true }
 hibitset = { version = "0.6.4", optional = true }
 itertools = "0.12.1"
diff --git c/src/ore/src/flatcontainer.rs i/src/ore/src/flatcontainer.rs
deleted file mode 100644
index d74b84ac08..0000000000
--- c/src/ore/src/flatcontainer.rs
+++ /dev/null
@@ -1,121 +0,0 @@
-// Copyright Materialize, Inc. and contributors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License in the LICENSE file at the
-// root of this repository, or online at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-//! Flat container utilities
-
-use flatcontainer::{Push, Region, ReserveItems};
-
-/// Associate a type with a flat container region.
-pub trait MzRegionPreference: 'static {
-    /// The owned type of the container.
-    type Owned;
-    /// A region that can hold `Self`.
-    type Region: for<'a> Region<Owned = Self::Owned>
-        + Push<Self::Owned>
-        + for<'a> Push<<Self::Region as Region>::ReadItem<'a>>
-        + for<'a> ReserveItems<<Self::Region as Region>::ReadItem<'a>>;
-}
-
-/// Opinion indicating that the contents of a collection should be stored in an
-/// [`OwnedRegion`](flatcontainer::OwnedRegion). This is most useful to force types to a region
-/// that doesn't copy individual elements to a nested region, like the
-/// [`SliceRegion`](flatcontainer::SliceRegion) does.
-#[derive(Debug)]
-pub struct OwnedRegionOpinion<T>(std::marker::PhantomData<T>);
-
-mod tuple {
-    use flatcontainer::impls::tuple::*;
-    use paste::paste;
-
-    use crate::flatcontainer::MzRegionPreference;
-
-    macro_rules! tuple_flatcontainer {
-        ($($name:ident)+) => (
-            paste! {
-                impl<$($name: MzRegionPreference),*> MzRegionPreference for ($($name,)*) {
-                    type Owned = ($($name::Owned,)*);
-                    type Region = [<Tuple $($name)* Region >]<$($name::Region,)*>;
-                }
-            }
-        )
-    }
-
-    tuple_flatcontainer!(A);
-    tuple_flatcontainer!(A B);
-    tuple_flatcontainer!(A B C);
-    tuple_flatcontainer!(A B C D);
-    tuple_flatcontainer!(A B C D E);
-}
-
-mod copy {
-    use flatcontainer::MirrorRegion;
-
-    use crate::flatcontainer::MzRegionPreference;
-
-    macro_rules! implement_for {
-        ($index_type:ty) => {
-            impl MzRegionPreference for $index_type {
-                type Owned = Self;
-                type Region = MirrorRegion<Self>;
-            }
-        };
-    }
-
-    implement_for!(());
-    implement_for!(bool);
-    implement_for!(char);
-
-    implement_for!(u8);
-    implement_for!(u16);
-    implement_for!(u32);
-    implement_for!(u64);
-    implement_for!(u128);
-    implement_for!(usize);
-
-    implement_for!(i8);
-    implement_for!(i16);
-    implement_for!(i32);
-    implement_for!(i64);
-    implement_for!(i128);
-    implement_for!(isize);
-
-    implement_for!(f32);
-    implement_for!(f64);
-
-    implement_for!(std::num::Wrapping<i8>);
-    implement_for!(std::num::Wrapping<i16>);
-    implement_for!(std::num::Wrapping<i32>);
-    implement_for!(std::num::Wrapping<i64>);
-    implement_for!(std::num::Wrapping<i128>);
-    implement_for!(std::num::Wrapping<isize>);
-
-    implement_for!(std::time::Duration);
-}
-
-mod vec {
-    use flatcontainer::OwnedRegion;
-
-    use crate::flatcontainer::{MzRegionPreference, OwnedRegionOpinion};
-
-    impl<T: Clone + 'static> MzRegionPreference for OwnedRegionOpinion<Vec<T>> {
-        type Owned = Vec<T>;
-        type Region = OwnedRegion<T>;
-    }
-}
-
-impl<T: MzRegionPreference> MzRegionPreference for Option<T> {
-    type Owned = <flatcontainer::OptionRegion<T::Region> as Region>::Owned;
-    type Region = flatcontainer::OptionRegion<T::Region>;
-}
diff --git c/src/ore/src/lib.rs i/src/ore/src/lib.rs
index b9e5cbf98b..0b8a1decb2 100644
--- c/src/ore/src/lib.rs
+++ i/src/ore/src/lib.rs
@@ -39,8 +39,6 @@ pub mod cli;
 pub mod collections;
 pub mod env;
 pub mod error;
-#[cfg(feature = "flatcontainer")]
-pub mod flatcontainer;
 pub mod fmt;
 #[cfg_attr(nightly_doc_features, doc(cfg(feature = "async")))]
 #[cfg(feature = "async")]
diff --git c/src/persist-cli/Cargo.toml i/src/persist-cli/Cargo.toml
index 504c30f1b2..68f71b41af 100644
--- c/src/persist-cli/Cargo.toml
+++ i/src/persist-cli/Cargo.toml
@@ -23,7 +23,7 @@ async-trait = "0.1.83"
 axum = "0.7.5"
 bytes = { version = "1.3.0", features = ["serde"] }
 clap = { version = "4.5.23", features = ["derive", "env"] }
-differential-dataflow = "0.13.6"
+differential-dataflow = "0.13.7"
 futures = "0.3.25"
 humantime = "2.1.0"
 mz-http-util = { path = "../http-util" }
@@ -40,7 +40,7 @@ num_enum = "0.7.3"
 prometheus = { version = "0.13.3", default-features = false }
 serde = { version = "1.0.218", features = ["derive", "rc"] }
 serde_json = "1.0.125"
-timely = "0.18.1"
+timely = "0.19.0"
 tokio = { version = "1.38.0", default-features = false, features = ["macros", "sync", "rt", "rt-multi-thread", "time"] }
 tracing = "0.1.37"
 url = "2.3.1"
diff --git c/src/persist-client/Cargo.toml i/src/persist-client/Cargo.toml
index 233a370125..ac9229ae19 100644
--- c/src/persist-client/Cargo.toml
+++ i/src/persist-client/Cargo.toml
@@ -35,7 +35,7 @@ async-stream = "0.3.3"
 async-trait = "0.1.83"
 bytes = { version = "1.3.0", features = ["serde"] }
 clap = { version = "4.5.23", features = ["derive"] }
-differential-dataflow = "0.13.6"
+differential-dataflow = "0.13.7"
 futures = "0.3.25"
 futures-util = "0.3"
 h2 = "0.3.13"
@@ -59,7 +59,7 @@ sentry-tracing = "0.29.1"
 semver = { version = "1.0.16", features = ["serde"] }
 serde = { version = "1.0.218", features = ["derive", "rc"] }
 serde_json = "1.0.125"
-timely = "0.18.1"
+timely = "0.19.0"
 thiserror = "2.0.11"
 tokio = { version = "1.38.0", default-features = false, features = ["macros", "sync", "rt", "rt-multi-thread", "time"] }
 tokio-metrics = "0.4.0"
diff --git c/src/persist-types/Cargo.toml i/src/persist-types/Cargo.toml
index 8837067526..19f7c1bf54 100644
--- c/src/persist-types/Cargo.toml
+++ i/src/persist-types/Cargo.toml
@@ -26,7 +26,7 @@ proptest-derive = { version = "0.5.1", features = ["boxed_union"] }
 prost = { version = "0.13.4", features = ["no-recursion-limit"] }
 serde = { version = "1.0.218", features = ["derive"] }
 serde_json = { version = "1.0.125" }
-timely = "0.18.1"
+timely = "0.19.0"
 tracing = "0.1.37"
 uuid = { version = "1.7.0", features = ["v4"] }
 workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }
diff --git c/src/persist/Cargo.toml i/src/persist/Cargo.toml
index b26bb13677..fe8eb42b01 100644
--- c/src/persist/Cargo.toml
+++ i/src/persist/Cargo.toml
@@ -37,7 +37,7 @@ azure_core = "0.21.0"
 base64 = "0.13.1"
 bytes = "1.3.0"
 deadpool-postgres = "0.10.3"
-differential-dataflow = "0.13.6"
+differential-dataflow = "0.13.7"
 fail = { version = "0.5.1", features = ["failpoints"] }
 futures-util = "0.3.31"
 md-5 = "0.10.5"
@@ -59,7 +59,7 @@ prost = { version = "0.13.4", features = ["no-recursion-limit"] }
 rand = { version = "0.8.5", features = ["small_rng"] }
 reqwest = { version = "0.12", features = ["blocking", "json", "default-tls", "charset", "http2"], default-features = false }
 serde = { version = "1.0.218", features = ["derive"] }
-timely = "0.18.1"
+timely = "0.19.0"
 tokio = { version = "1.38.0", default-features = false, features = ["fs", "macros", "sync", "rt", "rt-multi-thread"] }
 tokio-postgres = { version = "0.7.8" }
 tracing = "0.1.37"
diff --git c/src/repr/Cargo.toml i/src/repr/Cargo.toml
index 1450fec64c..2c9433e2d8 100644
--- c/src/repr/Cargo.toml
+++ i/src/repr/Cargo.toml
@@ -36,15 +36,13 @@ columnation = "0.1.0"
 chrono = { version = "0.4.39", default-features = false, features = ["serde", "std"] }
 compact_bytes = "0.1.3"
 dec = "0.4.8"
-differential-dataflow = "0.13.6"
+differential-dataflow = "0.13.7"
 enum-kinds = "0.5.1"
-flatcontainer = "0.5.0"
 hex = "0.4.3"
 itertools = "0.12.1"
 mz-lowertest = { path = "../lowertest", default-features = false }
 mz-ore = { path = "../ore", features = [
     "bytes",
-    "flatcontainer",
     "id_gen",
     "smallvec",
     "region",
@@ -68,7 +66,7 @@ serde_json = { version = "1.0.125", features = ["arbitrary_precision", "preserve
 smallvec = { version = "1.10.0", features = ["serde", "union"] }
 static_assertions = "1.1"
 strsim = "0.11.1"
-timely = "0.18.1"
+timely = "0.19.0"
 tokio-postgres = { version = "0.7.8" }
 tracing-core = "0.1.30"
 url = { version = "2.3.1", features = ["serde"] }
diff --git c/src/repr/src/timestamp.rs i/src/repr/src/timestamp.rs
index e00efa747f..cf0c5c6a5b 100644
--- c/src/repr/src/timestamp.rs
+++ i/src/repr/src/timestamp.rs
@@ -470,17 +470,11 @@ impl columnation::Columnation for Timestamp {
     type InnerRegion = columnation::CopyRegion<Timestamp>;
 }

-mod flatcontainer {
-    use flatcontainer::{IntoOwned, MirrorRegion};
-    use mz_ore::flatcontainer::MzRegionPreference;
+mod differential {
+    use differential_dataflow::IntoOwned;

     use crate::Timestamp;

-    impl MzRegionPreference for Timestamp {
-        type Owned = Self;
-        type Region = MirrorRegion<Timestamp>;
-    }
-
     impl<'a> IntoOwned<'a> for Timestamp {
         type Owned = Self;

diff --git c/src/service/Cargo.toml i/src/service/Cargo.toml
index 315fff7985..87b702c26a 100644
--- c/src/service/Cargo.toml
+++ i/src/service/Cargo.toml
@@ -35,7 +35,7 @@ prost = { version = "0.13.4", features = ["no-recursion-limit"] }
 semver = "1.0.16"
 serde = { version = "1.0.218", features = ["derive"] }
 sysinfo = "0.29.11"
-timely = "0.18.1"
+timely = "0.19.0"
 tokio = "1.38.0"
 tokio-stream = "0.1.17"
 tonic = "0.12.1"
diff --git c/src/sql/src/session/vars.rs i/src/sql/src/session/vars.rs
index 021350779e..fd2011778e 100644
--- c/src/sql/src/session/vars.rs
+++ i/src/sql/src/session/vars.rs
@@ -2238,6 +2238,15 @@ impl SystemVars {
         name == MAX_RESULT_SIZE.name() || self.is_dyncfg_var(name) || is_tracing_var(name)
     }

+    /// Returns whether the named variable is an initial compute configuration parameter
+    /// (things that go in `TimelyConfig` and cannot be changed at runtime).
+    pub fn is_initial_compute_config_var(&self, name: &str) -> bool {
+        name == ARRANGEMENT_EXERT_PROPORTIONALITY.name()
+            || name == ENABLE_TIMELY_ZERO_COPY.name()
+            || name == ENABLE_TIMELY_ZERO_COPY_LGALLOC.name()
+            || name == TIMELY_ZERO_COPY_LIMIT.name()
+    }
+
     /// Returns whether the named variable is a metrics configuration parameter
     pub fn is_metrics_config_var(&self, name: &str) -> bool {
         self.is_dyncfg_var(name)
diff --git c/src/storage-client/Cargo.toml i/src/storage-client/Cargo.toml
index 267d72aaa4..de83f1b3c3 100644
--- c/src/storage-client/Cargo.toml
+++ i/src/storage-client/Cargo.toml
@@ -13,7 +13,7 @@ workspace = true
 anyhow = "1.0.95"
 async-trait = "0.1.83"
 chrono = { version = "0.4.39", default-features = false, features = ["std"] }
-differential-dataflow = "0.13.6"
+differential-dataflow = "0.13.7"
 futures = "0.3.25"
 http = "1.2.0"
 itertools = { version = "0.12.1" }
@@ -46,7 +46,7 @@ serde = { version = "1.0.218", features = ["derive"] }
 serde_json = { version = "1.0.125" }
 smallvec = { version = "1.10.0", features = ["serde", "union"] }
 static_assertions = "1.1"
-timely = "0.18.1"
+timely = "0.19.0"
 tokio = { version = "1.38.0", features = [
     "fs",
     "rt",
diff --git c/src/storage-controller/Cargo.toml i/src/storage-controller/Cargo.toml
index dabcfe6dca..d23979fbda 100644
--- c/src/storage-controller/Cargo.toml
+++ i/src/storage-controller/Cargo.toml
@@ -15,7 +15,7 @@ async-trait = "0.1.83"
 bytes = "1.3.0"
 chrono = { version = "0.4.39", default-features = false, features = ["std"] }
 derivative = "2.2.0"
-differential-dataflow = "0.13.6"
+differential-dataflow = "0.13.7"
 futures = "0.3.25"
 itertools = { version = "0.12.1" }
 mz-build-info = { path = "../build-info" }
@@ -38,7 +38,7 @@ proptest = { version = "1.6.0", default-features = false, features = ["std"] }
 prost = { version = "0.13.4", features = ["no-recursion-limit"] }
 serde = { version = "1.0.218", features = ["derive"] }
 serde_json = { version = "1.0.125" }
-timely = "0.18.1"
+timely = "0.19.0"
 tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "test-util", "time"] }
 tokio-postgres = { version = "0.7.8", features = ["serde"] }
 tokio-stream = "0.1.17"
diff --git c/src/storage-operators/Cargo.toml i/src/storage-operators/Cargo.toml
index cccfe02b74..e6fc8907d0 100644
--- c/src/storage-operators/Cargo.toml
+++ i/src/storage-operators/Cargo.toml
@@ -20,7 +20,7 @@ bytes = "1.3.0"
 bytesize = "1.3.0"
 csv-async = { version = "1.3.0", features = ["tokio"] }
 derivative = "2.2.0"
-differential-dataflow = "0.13.6"
+differential-dataflow = "0.13.7"
 futures = "0.3.25"
 glob = "0.3.2"
 http = "1.2.0"
@@ -47,7 +47,7 @@ reqwest = { version = "0.11.13", features = ["stream"] }
 sentry = { version = "0.29.1" }
 serde = { version = "1.0.218", features = ["derive"] }
 smallvec = { version = "1.10.0", features = ["union"] }
-timely = "0.18.1"
+timely = "0.19.0"
 thiserror = "2.0.11"
 tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "test-util", "time"] }
 tokio-stream = "0.1.17"
diff --git c/src/storage-operators/src/s3_oneshot_sink.rs i/src/storage-operators/src/s3_oneshot_sink.rs
index 67f60776d8..84d6f34578 100644
--- c/src/storage-operators/src/s3_oneshot_sink.rs
+++ i/src/storage-operators/src/s3_oneshot_sink.rs
@@ -16,6 +16,7 @@ use std::rc::Rc;

 use anyhow::anyhow;
 use aws_types::sdk_config::SdkConfig;
+use differential_dataflow::containers::TimelyStack;
 use differential_dataflow::Hashable;
 use futures::StreamExt;
 use mz_ore::cast::CastFrom;
@@ -31,7 +32,6 @@ use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
 use mz_timely_util::builder_async::{
     Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
 };
-use timely::container::columnation::TimelyStack;
 use timely::dataflow::channels::pact::{Exchange, Pipeline};
 use timely::dataflow::operators::Broadcast;
 use timely::dataflow::{Scope, Stream};
diff --git c/src/storage-types/Cargo.toml i/src/storage-types/Cargo.toml
index f0591fa771..07e85ef878 100644
--- c/src/storage-types/Cargo.toml
+++ i/src/storage-types/Cargo.toml
@@ -25,7 +25,7 @@ bytes = "1.3.0"
 columnation = "0.1.0"
 dec = "0.4.8"
 derivative = "2.2.0"
-differential-dataflow = "0.13.6"
+differential-dataflow = "0.13.7"
 hex = "0.4.3"
 http = "1.2.0"
 itertools = { version = "0.12.1" }
@@ -62,7 +62,7 @@ regex = "1.10.6"
 serde = { version = "1.0.218", features = ["derive"] }
 serde_json = { version = "1.0.125", features = ["preserve_order"] }
 thiserror = "2.0.11"
-timely = "0.18.1"
+timely = "0.19.0"
 tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "test-util", "time"] }
 tokio-postgres = { version = "0.7.8", features = ["serde"] }
 tracing = "0.1.37"
diff --git c/src/storage-types/src/errors.rs i/src/storage-types/src/errors.rs
index ea50e3b6e2..fa5eeabe5f 100644
--- c/src/storage-types/src/errors.rs
+++ i/src/storage-types/src/errors.rs
@@ -394,8 +394,8 @@ impl Error for DataflowError {}

 mod boxed_str {

-    use timely::container::columnation::Region;
-    use timely::container::columnation::StableRegion;
+    use differential_dataflow::containers::Region;
+    use differential_dataflow::containers::StableRegion;

     /// Region allocation for `String` data.
     ///
@@ -447,11 +447,11 @@ mod boxed_str {
 mod columnation {
     use std::iter::once;

+    use differential_dataflow::containers::{Columnation, Region, StableRegion};
     use mz_expr::EvalError;
     use mz_repr::adt::range::InvalidRangeError;
     use mz_repr::strconv::ParseError;
     use mz_repr::Row;
-    use timely::container::columnation::{Columnation, Region, StableRegion};

     use crate::errors::boxed_str::BoxStrStack;
     use crate::errors::{
@@ -907,8 +907,8 @@ mod columnation {

     #[cfg(test)]
     mod tests {
+        use differential_dataflow::containers::TimelyStack;
         use proptest::prelude::*;
-        use timely::container::columnation::TimelyStack;

         use super::*;

diff --git c/src/storage/Cargo.toml i/src/storage/Cargo.toml
index 6719c12e72..8023503296 100644
--- c/src/storage/Cargo.toml
+++ i/src/storage/Cargo.toml
@@ -26,7 +26,7 @@ columnation = "0.1.0"
 crossbeam-channel = "0.5.14"
 csv-core = { version = "0.1.10" }
 dec = "0.4.8"
-differential-dataflow = "0.13.6"
+differential-dataflow = "0.13.7"
 fail = { version = "0.5.1", features = ["failpoints"] }
 futures = "0.3.25"
 indexmap = { version = "2.0.0", default-features = false, features = ["std"] }
@@ -77,7 +77,7 @@ serde = { version = "1.0.218", features = ["derive"] }
 serde_json = { version = "1.0.125" }
 serde_bytes = { version = "0.11.15" }
 sha2 = "0.10.6"
-timely = "0.18.1"
+timely = "0.19.0"
 tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "test-util"] }
 tokio-postgres = { version = "0.7.8", features = ["serde"] }
 tokio-stream = "0.1.17"
diff --git c/src/storage/src/source/mysql.rs i/src/storage/src/source/mysql.rs
index 45ad1ed639..4f04cf7846 100644
--- c/src/storage/src/source/mysql.rs
+++ i/src/storage/src/source/mysql.rs
@@ -56,6 +56,7 @@ use std::fmt;
 use std::io;
 use std::rc::Rc;

+use differential_dataflow::containers::TimelyStack;
 use differential_dataflow::AsCollection;
 use itertools::Itertools;
 use mz_mysql_util::quote_identifier;
@@ -66,7 +67,6 @@ use mz_storage_types::errors::{DataflowError, SourceError};
 use mz_storage_types::sources::SourceExport;
 use mz_timely_util::containers::stack::AccountedStackBuilder;
 use serde::{Deserialize, Serialize};
-use timely::container::columnation::TimelyStack;
 use timely::container::CapacityContainerBuilder;
 use timely::dataflow::channels::pushers::Tee;
 use timely::dataflow::operators::core::Partition;
diff --git c/src/storage/src/source/types.rs i/src/storage/src/source/types.rs
index 43e1081a7a..f305db7865 100644
--- c/src/storage/src/source/types.rs
+++ i/src/storage/src/source/types.rs
@@ -20,6 +20,7 @@ use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{ready, Context, Poll};

+use differential_dataflow::containers::TimelyStack;
 use differential_dataflow::Collection;
 use mz_repr::{Diff, GlobalId, Row};
 use mz_storage_types::errors::{DataflowError, DecodeError};
@@ -27,7 +28,6 @@ use mz_storage_types::sources::SourceTimestamp;
 use mz_timely_util::builder_async::PressOnDropButton;
 use pin_project::pin_project;
 use serde::{Deserialize, Serialize};
-use timely::container::columnation::TimelyStack;
 use timely::dataflow::{Scope, ScopeParent, Stream};
 use timely::progress::Antichain;
 use tokio::sync::Semaphore;
diff --git c/src/timely-util/Cargo.toml i/src/timely-util/Cargo.toml
index 597bfac89c..a4d501f103 100644
--- c/src/timely-util/Cargo.toml
+++ i/src/timely-util/Cargo.toml
@@ -15,7 +15,7 @@ bincode = "1.3.3"
 bytemuck = "1.21.0"
 columnar = "0.3.0"
 columnation = "0.1.0"
-differential-dataflow = "0.13.6"
+differential-dataflow = "0.13.7"
 either = "1"
 futures-util = "0.3.31"
 lgalloc = "0.5"
@@ -23,7 +23,7 @@ mz-ore = { path = "../ore", features = ["async", "process", "tracing", "test"] }
 num-traits = "0.2"
 proptest = { version = "1.6.0", default-features = false, features = ["std"] }
 serde = { version = "1.0.218", features = ["derive"] }
-timely = "0.18.1"
+timely = "0.19.0"
 tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread", "time"] }
 tracing = "0.1.37"
 uuid = { version = "1.7.0", features = ["serde", "v4"] }
diff --git c/src/timely-util/src/builder_async.rs i/src/timely-util/src/builder_async.rs
index d6fd79bf4e..9d2d139858 100644
--- c/src/timely-util/src/builder_async.rs
+++ i/src/timely-util/src/builder_async.rs
@@ -24,10 +24,10 @@ use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use std::task::{ready, Context, Poll, Waker};

+use differential_dataflow::containers::{Columnation, TimelyStack};
 use futures_util::task::ArcWake;
 use futures_util::Stream;
 use timely::communication::{Pull, Push};
-use timely::container::columnation::{Columnation, TimelyStack};
 use timely::container::{CapacityContainerBuilder, ContainerBuilder, PushInto};
 use timely::dataflow::channels::pact::ParallelizationContract;
 use timely::dataflow::channels::pushers::Tee;
diff --git c/src/timely-util/src/containers.rs i/src/timely-util/src/containers.rs
index 5fb43223be..d52390d1fa 100644
--- c/src/timely-util/src/containers.rs
+++ i/src/timely-util/src/containers.rs
@@ -18,9 +18,9 @@
 use std::hash::Hash;

 use columnar::Columnar;
+use differential_dataflow::containers::TimelyStack;
 use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher};
 use differential_dataflow::Hashable;
-use timely::container::columnation::TimelyStack;

 pub mod stack;

diff --git c/src/timely-util/src/containers/stack.rs i/src/timely-util/src/containers/stack.rs
index 0112853afc..c7c17e714d 100644
--- c/src/timely-util/src/containers/stack.rs
+++ i/src/timely-util/src/containers/stack.rs
@@ -18,7 +18,7 @@

 use std::cell::Cell;

-use timely::container::columnation::{Columnation, TimelyStack};
+use differential_dataflow::containers::{Columnation, TimelyStack};
 use timely::container::{ContainerBuilder, PushInto};

 /// A Stacked container builder that keep track of container memory usage.
diff --git c/src/timely-util/src/operator.rs i/src/timely-util/src/operator.rs
index 8763e95937..3f1c9d6ab6 100644
--- c/src/timely-util/src/operator.rs
+++ i/src/timely-util/src/operator.rs
@@ -21,12 +21,12 @@ use std::marker::PhantomData;
 use std::rc::Weak;

 use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
+use differential_dataflow::containers::{Columnation, TimelyStack};
 use differential_dataflow::difference::{Multiply, Semigroup};
 use differential_dataflow::lattice::Lattice;
 use differential_dataflow::logging::DifferentialEventBuilder;
 use differential_dataflow::trace::{Batcher, Builder, Description};
 use differential_dataflow::{AsCollection, Collection, Hashable};
-use timely::container::columnation::{Columnation, TimelyStack};
 use timely::container::{ContainerBuilder, PushInto};
 use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
 use timely::dataflow::channels::pushers::Tee;
diff --git c/src/timely-util/src/order.rs i/src/timely-util/src/order.rs
index b37179fcd3..01655568ff 100644
--- c/src/timely-util/src/order.rs
+++ i/src/timely-util/src/order.rs
@@ -19,8 +19,8 @@ use std::cmp::Ordering;
 use std::fmt::{self, Debug};
 use std::hash::Hash;

+use differential_dataflow::containers::CopyRegion;
 use serde::{Deserialize, Serialize};
-use timely::container::columnation::CopyRegion;
 use timely::order::Product;
 use timely::progress::timestamp::{PathSummary, Refines, Timestamp};
 use timely::progress::Antichain;
diff --git c/src/transform/Cargo.toml i/src/transform/Cargo.toml
index 20fc3be596..0f572a6ce3 100644
--- c/src/transform/Cargo.toml
+++ i/src/transform/Cargo.toml
@@ -10,7 +10,7 @@ publish = false
 workspace = true

 [dependencies]
-differential-dataflow = "0.13.6"
+differential-dataflow = "0.13.7"
 enum-kinds = "0.5.1"
 itertools = "0.12.1"
 mz-compute-types = { path = "../compute-types" }
diff --git c/src/txn-wal/Cargo.toml i/src/txn-wal/Cargo.toml
index 0425548217..7b8b0b8966 100644
--- c/src/txn-wal/Cargo.toml
+++ i/src/txn-wal/Cargo.toml
@@ -12,7 +12,7 @@ workspace = true
 [dependencies]
 async-trait = "0.1.83"
 bytes = { version = "1.3.0" }
-di…
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants