-
-
Notifications
You must be signed in to change notification settings - Fork 486
Support for physical and logical replication #752
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@jeff-davis here is the new PR! Any comments are very much appreciated :) |
@sfackler the tests are failing because the changes(that are included in this PR) in the Dockerfile used by the CI are not picked up and the pre-existing image from the registry is used instead. How should we proceed here? |
Thank you for the super quick review! I'm going to try and address your comments today :) |
@sfackler I have addressed all your comments, I think it looks much better now! Here is a summary of what I changed:
I've re-authored all the commits to be reviewable on their own and the PR is now ~1400 lines versus ~2250 before :) |
Needed until sfackler/rust-postgres#752 is merged and released Signed-off-by: Petros Angelatos <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A good request!
I hoped @sfackler permits ReplicationClient because it is known at the connection stage there can be only replication flow and simple queries. Still think it is wise to have a special struct for it.
Several nitpickings are below:
Needed until sfackler/rust-postgres#752 is merged and released Signed-off-by: Petros Angelatos <[email protected]>
Co-authored-by: Petros Angelatos <[email protected]>
Co-authored-by: Petros Angelatos <[email protected]>
Signed-off-by: Petros Angelatos <[email protected]>
This can be optionally used with a CopyBoth stream to decode the replication protocol Signed-off-by: Petros Angelatos <[email protected]>
Signed-off-by: Petros Angelatos <[email protected]>
@sfackler I've reworded the description above to aid with reviewing. I hope this is more in line with what you had in mind :) |
Signed-off-by: Petros Angelatos <[email protected]>
/// A type which deserializes the postgres replication protocol. This type can be used with | ||
/// both physical and logical replication to get access to the byte content of each replication | ||
/// message. | ||
/// | ||
/// The replication *must* be explicitly completed via the `finish` method. | ||
pub struct ReplicationStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused by the last line of the description. What finish
method?
In my PR, I had support for the client sending a CopyDone before the server is done sending. This is important, because in most replication modes, the server will send forever, so the only other way to stop it is to disconnect. For instance, see 5728fe61. It seems that this PR does not support the client terminating the replication stream, is that correct? Can it be made to do so? Also, my PR used PinnedDrop so that merely dropping a stream would safely close it, rather than needing to call an explicit finish. |
I think the smallest useful PR we can make here is:
I think the logical replication stuff should be in a separate PR. And a lot of the helper functions for building replication commands and parsing the responses can be in a separate PR as well (or perhaps a separate crate, as @sfackler indicated). |
@jeff-davis I think that makes sense. I'll work on splitting this PR in one that just adds Regarding your concerns about terminating the stream from the client, I think you're right and that's an omission from my part. I'll take a look into that too. My initial plan for the implementation was to use the same mechanics that are used during a |
closing in favour of #778. Once that is merged we can address the replication protocol itself in a separate PR |
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]> Co-authored-by: Dmitry Ivanov <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]> Co-authored-by: Dmitry Ivanov <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]> Co-authored-by: Dmitry Ivanov <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]> Co-authored-by: Dmitry Ivanov <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]> Co-authored-by: Dmitry Ivanov <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]> Co-authored-by: Dmitry Ivanov <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]> Co-authored-by: Dmitry Ivanov <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]> Co-authored-by: Dmitry Ivanov <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]> Co-authored-by: Dmitry Ivanov <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]> Co-authored-by: Dmitry Ivanov <[email protected]>
This patch was implemented by Petros Angelatos and Jeff Davis to support physical and logical replication in rust-postgres (see sfackler#752). The original PR never made it to the upstream, but we (Neon) still use it in our own fork of rust-postgres. The following commits were squashed together: * Image configuration updates. * Make simple_query::encode() pub(crate). * decoding logic for replication protocol * Connection string config for replication. * add copy_both_simple method * helper ReplicationStream type for replication protocol This can be optionally used with a CopyBoth stream to decode the replication protocol * decoding logic for logical replication protocol * helper LogicalReplicationStream type to decode logical replication * add postgres replication integration test * add simple query versions of copy operations * replication: use SystemTime for timestamps at API boundary Co-authored-by: Petros Angelatos <[email protected]> Co-authored-by: Jeff Davis <[email protected]> Co-authored-by: Dmitry Ivanov <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]> # Conflicts: # src/compute/src/logging/timely.rs diff --git c/Cargo.lock i/Cargo.lock index b331fe40a0..eb1a1d5b1e 100644 --- c/Cargo.lock +++ i/Cargo.lock @@ -2390,21 +2390,23 @@ dependencies = [ [[package]] name = "differential-dataflow" -version = "0.13.6" +version = "0.13.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc8d6ede5efde0ebabf8f48cdd8d5eadda4ad6c033cf703d58cacfd6be009635" +checksum = "922c18a0f94e29defaef228ecd65589880c16f3f3462a33258f869119f039443" dependencies = [ "columnar", + "columnation", "fnv", + "paste", "serde", "timely", ] [[package]] name = "differential-dogs3" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea29c7d272a8b0e95694aa6cbfef7a2a76bedb42a284602cdfec5972ab7d1641" +checksum = "680def2e24f4d035de6ce128d4820f5533d9afe872d85e87b5c8ed84946c8118" dependencies = [ "differential-dataflow", "serde", @@ -2827,17 +2829,6 @@ dependencies = [ "rustc_version", ] -[[package]] -name = "flatcontainer" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ff185ea156496de196dfd189038982f480515ea3338f1ff0a4fbff1e52ea0a6" -dependencies = [ - "cfg-if", - "paste", - "serde", -] - [[package]] name = "flate2" version = "1.0.32" @@ -5545,6 +5536,7 @@ dependencies = [ "datadriven", "dec", "derivative", + "differential-dataflow", "encoding", "enum-iterator", "fallible-iterator", @@ -6108,7 +6100,6 @@ dependencies = [ "ctor", "derivative", "either", - "flatcontainer", "futures", "hibitset", "http 1.2.0", @@ -6594,7 +6585,6 @@ dependencies = [ "dec", "differential-dataflow", "enum-kinds", - "flatcontainer", "hex", "insta", "itertools 0.12.1", @@ -10586,12 +10576,14 @@ dependencies = [ [[package]] name = "timely" -version = "0.18.1" -source = "git+https://github.com/TimelyDataflow/timely-dataflow#f1ad08bed16c5683ad52e57597f2846733f7bff2" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a714a3fed9aeacf63d9c5c574523c18972b788fa0414011b590af73acad30b09" dependencies = [ "bincode", "byteorder", "columnar", + "columnation", "crossbeam-channel", "getopts", "serde", @@ -10605,12 +10597,14 @@ dependencies = [ [[package]] name = "timely_bytes" version = "0.13.0" -source = "git+https://github.com/TimelyDataflow/timely-dataflow#f1ad08bed16c5683ad52e57597f2846733f7bff2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46e1275de95b4a2713f0850c458d3a550dc323fffda65ce3e075f62545e0484b" [[package]] name = "timely_communication" -version = "0.17.1" -source = "git+https://github.com/TimelyDataflow/timely-dataflow#f1ad08bed16c5683ad52e57597f2846733f7bff2" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3cdbfc7739e6a8ed95cd591ec0e862f294c681796c6121e6b3fa1ab946473e1" dependencies = [ "byteorder", "columnar", @@ -10624,18 +10618,15 @@ dependencies = [ [[package]] name = "timely_container" -version = "0.14.0" -source = "git+https://github.com/TimelyDataflow/timely-dataflow#f1ad08bed16c5683ad52e57597f2846733f7bff2" -dependencies = [ - "columnation", - "flatcontainer", - "serde", -] +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c951c468b95e2070be7f48a9d8350b6e8e5ecb23e0d13fd7f6155893bb1297d" [[package]] name = "timely_logging" -version = "0.13.2" -source = "git+https://github.com/TimelyDataflow/timely-dataflow#f1ad08bed16c5683ad52e57597f2846733f7bff2" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10d46d6e2fbf5831ff8345f92723e46f778ce157cf8b74448fdcaac9efb9b9a2" dependencies = [ "timely_container", ] diff --git c/Cargo.toml i/Cargo.toml index 7b32b4bd12..939050b69d 100644 --- c/Cargo.toml +++ i/Cargo.toml @@ -293,9 +293,6 @@ incremental = true # merged), after which point it becomes impossible to build that historical # version of Materialize. [patch.crates-io] -timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } - - # Waiting on https://github.com/sfackler/rust-postgres/pull/752. postgres = { git = "https://github.com/MaterializeInc/rust-postgres" } tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres" } diff --git c/src/adapter-types/Cargo.toml i/src/adapter-types/Cargo.toml index e9557931c0..9940ecd84e 100644 --- c/src/adapter-types/Cargo.toml +++ i/src/adapter-types/Cargo.toml @@ -15,7 +15,7 @@ mz-ore = { path = "../ore" } mz-repr = { path = "../repr" } mz-storage-types = { path = "../storage-types" } serde = "1.0.218" -timely = "0.18.1" +timely = "0.19.0" workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } [package.metadata.cargo-udeps.ignore] diff --git c/src/adapter/Cargo.toml i/src/adapter/Cargo.toml index 0be9de7ff9..6da297c267 100644 --- c/src/adapter/Cargo.toml +++ i/src/adapter/Cargo.toml @@ -18,7 +18,7 @@ bytesize = "1.3.0" chrono = { version = "0.4.39", default-features = false, features = ["std"] } dec = "0.4.8" derivative = "2.2.0" -differential-dataflow = "0.13.6" +differential-dataflow = "0.13.7" enum-kinds = "0.5.1" fail = { version = "0.5.1", features = ["failpoints"] } futures = "0.3.25" @@ -82,7 +82,7 @@ serde_plain = "1.0.1" sha2 = "0.10.6" smallvec = { version = "1.10.0", features = ["union"] } static_assertions = "1.1" -timely = "0.18.1" +timely = "0.19.0" tokio = { version = "1.38.0", features = ["rt", "time"] } tokio-postgres = { version = "0.7.8" } tracing = "0.1.37" diff --git c/src/catalog/Cargo.toml i/src/catalog/Cargo.toml index b92fc0e838..ebab2fa4af 100644 --- c/src/catalog/Cargo.toml +++ i/src/catalog/Cargo.toml @@ -18,7 +18,7 @@ bytesize = "1.3.0" chrono = { version = "0.4.39", default-features = false, features = ["std"] } clap = { version = "4.5.23", features = ["derive"] } derivative = "2.2.0" -differential-dataflow = "0.13.6" +differential-dataflow = "0.13.7" futures = "0.3.25" ipnet = "2.11.0" itertools = "0.12.1" @@ -60,7 +60,7 @@ serde_plain = "1.0.1" static_assertions = "1.1" sha2 = "0.10.6" thiserror = "2.0.11" -timely = "0.18.1" +timely = "0.19.0" tokio = { version = "1.38.0" } tracing = "0.1.37" uuid = "1.2.2" diff --git c/src/cluster/Cargo.toml i/src/cluster/Cargo.toml index 8451aff17a..cd3f4f0308 100644 --- c/src/cluster/Cargo.toml +++ i/src/cluster/Cargo.toml @@ -13,14 +13,14 @@ workspace = true anyhow = "1.0.95" async-trait = "0.1.83" crossbeam-channel = "0.5.14" -differential-dataflow = "0.13.6" +differential-dataflow = "0.13.7" futures = "0.3.25" -lgalloc = "0.4.0" +lgalloc = "0.5.0" mz-cluster-client = { path = "../cluster-client" } mz-ore = { path = "../ore", features = ["async", "process", "tracing"] } mz-service = { path = "../service" } regex = "1.10.6" -timely = "0.18.1" +timely = "0.19.0" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] } tracing = "0.1.37" workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } diff --git c/src/compute-client/Cargo.toml i/src/compute-client/Cargo.toml index 38ee801651..b3851756d1 100644 --- c/src/compute-client/Cargo.toml +++ i/src/compute-client/Cargo.toml @@ -15,7 +15,7 @@ async-trait = "0.1.83" bytesize = "1.3.0" crossbeam-channel = "0.5.14" derivative = "2.2.0" -differential-dataflow = "0.13.6" +differential-dataflow = "0.13.7" futures = "0.3.25" http = "1.2.0" mz-build-info = { path = "../build-info" } @@ -43,7 +43,7 @@ prost = { version = "0.13.4", features = ["no-recursion-limit"] } serde = { version = "1.0.218", features = ["derive"] } serde_json = "1.0.125" thiserror = "2.0.11" -timely = "0.18.1" +timely = "0.19.0" tokio = "1.38.0" tokio-stream = "0.1.17" tonic = "0.12.1" diff --git c/src/compute-types/Cargo.toml i/src/compute-types/Cargo.toml index da3ea9d6ef..b53444124f 100644 --- c/src/compute-types/Cargo.toml +++ i/src/compute-types/Cargo.toml @@ -12,7 +12,7 @@ workspace = true [dependencies] columnar = "0.3.0" columnation = "0.1.0" -differential-dataflow = "0.13.6" +differential-dataflow = "0.13.7" itertools = "0.12.1" mz-dyncfg = { path = "../dyncfg" } mz-expr = { path = "../expr" } @@ -24,7 +24,7 @@ proptest = { version = "1.6.0", default-features = false, features = ["std"] } proptest-derive = { version = "0.5.1", features = ["boxed_union"] } prost = { version = "0.13.4", features = ["no-recursion-limit"] } serde = { version = "1.0.218", features = ["derive"] } -timely = "0.18.1" +timely = "0.19.0" tracing = "0.1.37" workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } diff --git c/src/compute/Cargo.toml i/src/compute/Cargo.toml index af3bd06e88..1a0172e839 100644 --- c/src/compute/Cargo.toml +++ i/src/compute/Cargo.toml @@ -16,8 +16,8 @@ bytesize = "1.3.0" columnar = "0.3.0" crossbeam-channel = "0.5.14" dec = { version = "0.4.8", features = ["serde"] } -differential-dataflow = "0.13.6" -differential-dogs3 = "0.1.6" +differential-dataflow = "0.13.7" +differential-dogs3 = "0.1.7" futures = "0.3.25" itertools = "0.12.1" lgalloc = "0.5" @@ -28,7 +28,7 @@ mz-dyncfg = { path = "../dyncfg" } mz-dyncfgs = { path = "../dyncfgs" } mz-expr = { path = "../expr" } mz-metrics = { path = "../metrics" } -mz-ore = { path = "../ore", features = ["async", "flatcontainer", "process", "tracing"] } +mz-ore = { path = "../ore", features = ["async", "process", "tracing"] } mz-persist-client = { path = "../persist-client" } mz-persist-types = { path = "../persist-types" } mz-repr = { path = "../repr" } @@ -41,7 +41,7 @@ prometheus = { version = "0.13.3", default-features = false } scopeguard = "1.1.0" serde = { version = "1.0.218", features = ["derive"] } smallvec = { version = "1.10.0", features = ["serde", "union"] } -timely = "0.18.1" +timely = "0.19.0" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] } tracing = "0.1.37" uuid = { version = "1.7.0", features = ["serde", "v4"] } diff --git c/src/compute/src/compute_state.rs i/src/compute/src/compute_state.rs index e3b5ef3fb0..168e1ce2f6 100644 --- c/src/compute/src/compute_state.rs +++ i/src/compute/src/compute_state.rs @@ -16,9 +16,9 @@ use std::time::{Duration, Instant}; use bytesize::ByteSize; use differential_dataflow::lattice::Lattice; -use differential_dataflow::trace::cursor::IntoOwned; use differential_dataflow::trace::{Cursor, TraceReader}; use differential_dataflow::Hashable; +use differential_dataflow::IntoOwned; use mz_compute_client::logging::LoggingConfig; use mz_compute_client::protocol::command::{ ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget, diff --git c/src/compute/src/extensions/arrange.rs i/src/compute/src/extensions/arrange.rs index 555bc69a02..a169258311 100644 --- c/src/compute/src/extensions/arrange.rs +++ i/src/compute/src/extensions/arrange.rs @@ -9,13 +9,13 @@ use std::rc::Rc; +use differential_dataflow::containers::Columnation; use differential_dataflow::difference::Semigroup; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::arrangement::arrange_core; use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; use differential_dataflow::trace::{Batch, Batcher, Builder, Trace, TraceReader}; use differential_dataflow::{Collection, Data, ExchangeData, Hashable}; -use timely::container::columnation::Columnation; use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline}; use timely::dataflow::operators::Operator; use timely::dataflow::{Scope, ScopeParent, StreamCore}; @@ -457,79 +457,3 @@ where }) } } - -mod flatcontainer { - use differential_dataflow::difference::Semigroup; - use differential_dataflow::lattice::Lattice; - use differential_dataflow::operators::arrange::Arranged; - use differential_dataflow::trace::TraceReader; - use mz_ore::flatcontainer::MzRegionPreference; - use timely::container::flatcontainer::{IntoOwned, Push, Region, ReserveItems}; - use timely::dataflow::Scope; - use timely::progress::Timestamp; - use timely::PartialOrder; - - use crate::extensions::arrange::{log_arrangement_size_inner, ArrangementSize}; - use crate::typedefs::{FlatKeyValAgent, FlatKeyValSpine}; - - impl<G, K, V, T, R> ArrangementSize for Arranged<G, FlatKeyValAgent<K, V, T, R>> - where - Self: Clone, - G: Scope<Timestamp = T::Owned>, - G::Timestamp: Lattice + Ord + MzRegionPreference, - K: Region - + Clone - + Push<<K as Region>::Owned> - + for<'a> Push<<K as Region>::ReadItem<'a>> - + for<'a> ReserveItems<<K as Region>::ReadItem<'a>> - + 'static, - V: Region - + Clone - + Push<<V as Region>::Owned> - + for<'a> Push<<V as Region>::ReadItem<'a>> - + for<'a> ReserveItems<<V as Region>::ReadItem<'a>> - + 'static, - T: Region - + Clone - + Push<<T as Region>::Owned> - + for<'a> Push<<T as Region>::ReadItem<'a>> - + for<'a> ReserveItems<<T as Region>::ReadItem<'a>> - + 'static, - R: Region - + Clone - + Push<<R as Region>::Owned> - + for<'a> Push<&'a <R as Region>::Owned> - + for<'a> Push<<R as Region>::ReadItem<'a>> - + for<'a> ReserveItems<<R as Region>::ReadItem<'a>> - + 'static, - K::Owned: Clone + Ord, - V::Owned: Clone + Ord, - T::Owned: Lattice + for<'a> PartialOrder<<T as Region>::ReadItem<'a>> + Timestamp, - R::Owned: - Default + Ord + Semigroup + for<'a> Semigroup<<R as Region>::ReadItem<'a>> + 'static, - for<'a> <K as Region>::ReadItem<'a>: Copy + Ord, - for<'a> <V as Region>::ReadItem<'a>: Copy + Ord, - for<'a> <T as Region>::ReadItem<'a>: Copy + IntoOwned<'a> + Ord + PartialOrder<T::Owned>, - for<'a> <R as Region>::ReadItem<'a>: Copy + IntoOwned<'a, Owned = R::Owned> + Ord, - { - fn log_arrangement_size(self) -> Self { - log_arrangement_size_inner::<_, FlatKeyValSpine<K, V, T, R>, _>(self, |trace| { - let (mut size, mut capacity, mut allocations) = (0, 0, 0); - let mut callback = |siz, cap| { - size += siz; - capacity += cap; - allocations += usize::from(cap > 0); - }; - trace.map_batches(|batch| { - batch.storage.keys.heap_size(&mut callback); - batch.storage.keys_offs.heap_size(&mut callback); - batch.storage.vals.heap_size(&mut callback); - batch.storage.vals_offs.heap_size(&mut callback); - batch.storage.times.heap_size(&mut callback); - batch.storage.diffs.heap_size(&mut callback); - }); - (size, capacity, allocations) - }) - } - } -} diff --git c/src/compute/src/extensions/reduce.rs i/src/compute/src/extensions/reduce.rs index 7d21e7437d..460fe46519 100644 --- c/src/compute/src/extensions/reduce.rs +++ i/src/compute/src/extensions/reduce.rs @@ -16,9 +16,9 @@ use differential_dataflow::difference::{Abelian, Semigroup}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; -use differential_dataflow::trace::cursor::IntoOwned; use differential_dataflow::trace::{Batch, Builder, Trace, TraceReader}; use differential_dataflow::Data; +use differential_dataflow::IntoOwned; use timely::container::PushInto; use timely::dataflow::Scope; use timely::Container; diff --git c/src/compute/src/logging/timely.rs i/src/compute/src/logging/timely.rs index 671a54a52c..f8d852fc55 100644 --- c/src/compute/src/logging/timely.rs +++ i/src/compute/src/logging/timely.rs @@ -14,6 +14,7 @@ use std::rc::Rc; use std::time::Duration; use differential_dataflow::consolidation::ConsolidatingContainerBuilder; +use differential_dataflow::containers::{Columnation, CopyRegion}; use mz_compute_client::logging::LoggingConfig; use mz_ore::cast::CastFrom; use mz_repr::{Datum, Diff, Timestamp}; @@ -21,7 +22,7 @@ use mz_timely_util::containers::{ columnar_exchange, Col2ValBatcher, Column, ColumnBuilder, ProvidedBuilder, }; use mz_timely_util::replay::MzReplay; -use timely::container::columnation::{Columnation, CopyRegion}; +use timely::communication::Allocate; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::channels::pushers::buffer::Session; use timely::dataflow::channels::pushers::{Counter, Tee}; diff --git c/src/compute/src/render.rs i/src/compute/src/render.rs index 3fde6605ae..9a9316f808 100644 --- c/src/compute/src/render.rs +++ i/src/compute/src/render.rs @@ -111,11 +111,12 @@ use std::sync::Arc; use std::task::Poll; use columnar::Columnar; +use differential_dataflow::containers::Columnation; use differential_dataflow::dynamic::pointstamp::PointStamp; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::cursor::IntoOwned; use differential_dataflow::trace::TraceReader; +use differential_dataflow::IntoOwned; use differential_dataflow::{AsCollection, Collection, Data}; use futures::channel::oneshot; use futures::FutureExt; @@ -134,7 +135,6 @@ use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::errors::DataflowError; use mz_timely_util::operator::{CollectionExt, StreamExt}; use timely::communication::Allocate; -use timely::container::columnation::Columnation; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::to_stream::ToStream; use timely::dataflow::operators::{probe, BranchWhen, Operator, Probe}; diff --git c/src/compute/src/render/context.rs i/src/compute/src/render/context.rs index 20befab8de..c622d2a3bc 100644 --- c/src/compute/src/render/context.rs +++ i/src/compute/src/render/context.rs @@ -16,10 +16,11 @@ use std::sync::mpsc; use columnar::Columnar; use differential_dataflow::consolidation::ConsolidatingContainerBuilder; +use differential_dataflow::containers::Columnation; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::cursor::IntoOwned; use differential_dataflow::trace::{BatchReader, Cursor, TraceReader}; +use differential_dataflow::IntoOwned; use differential_dataflow::{AsCollection, Collection, Data}; use mz_compute_types::dataflows::DataflowDescription; use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION; @@ -32,7 +33,6 @@ use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::errors::DataflowError; use mz_timely_util::containers::{columnar_exchange, Col2ValBatcher, ColumnBuilder}; use mz_timely_util::operator::{CollectionExt, StreamExt}; -use timely::container::columnation::Columnation; use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::operators::generic::OutputHandleCore; diff --git c/src/compute/src/render/errors.rs i/src/compute/src/render/errors.rs index 887015ef67..f70cfd2506 100644 --- c/src/compute/src/render/errors.rs +++ i/src/compute/src/render/errors.rs @@ -11,9 +11,9 @@ use std::hash::Hash; +use differential_dataflow::containers::Columnation; use differential_dataflow::ExchangeData; use mz_repr::Row; -use timely::container::columnation::Columnation; use crate::render::context::ShutdownToken; diff --git c/src/compute/src/render/join/delta_join.rs i/src/compute/src/render/join/delta_join.rs index ceef6cd23a..cc482ba003 100644 --- c/src/compute/src/render/join/delta_join.rs +++ i/src/compute/src/render/join/delta_join.rs @@ -18,8 +18,8 @@ use std::collections::{BTreeMap, BTreeSet}; use columnar::Columnar; use differential_dataflow::consolidation::ConsolidatingContainerBuilder; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::cursor::IntoOwned; use differential_dataflow::trace::{BatchReader, Cursor, TraceReader}; +use differential_dataflow::IntoOwned; use differential_dataflow::{AsCollection, Collection}; use mz_compute_types::plan::join::delta_join::{DeltaJoinPlan, DeltaPathPlan, DeltaStagePlan}; use mz_compute_types::plan::join::JoinClosure; diff --git c/src/compute/src/render/join/linear_join.rs i/src/compute/src/render/join/linear_join.rs index 2f6ddfd9c9..1756ecdcff 100644 --- c/src/compute/src/render/join/linear_join.rs +++ i/src/compute/src/render/join/linear_join.rs @@ -15,6 +15,7 @@ use std::time::{Duration, Instant}; use columnar::Columnar; use differential_dataflow::consolidation::ConsolidatingContainerBuilder; +use differential_dataflow::containers::Columnation; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::arrangement::Arranged; use differential_dataflow::trace::TraceReader; @@ -28,7 +29,6 @@ use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow}; use mz_storage_types::errors::DataflowError; use mz_timely_util::containers::{columnar_exchange, Col2ValBatcher, ColumnBuilder}; use mz_timely_util::operator::{CollectionExt, StreamExt}; -use timely::container::columnation::Columnation; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::operators::OkErr; use timely::dataflow::scopes::Child; diff --git c/src/compute/src/render/join/mz_join_core.rs i/src/compute/src/render/join/mz_join_core.rs index 3848256fd5..935f95257c 100644 --- c/src/compute/src/render/join/mz_join_core.rs +++ i/src/compute/src/render/join/mz_join_core.rs @@ -44,9 +44,9 @@ use differential_dataflow::consolidation::{consolidate, consolidate_updates}; use differential_dataflow::difference::Multiply; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::arrangement::Arranged; -use differential_dataflow::trace::cursor::IntoOwned; use differential_dataflow::trace::{BatchReader, Cursor, TraceReader}; use differential_dataflow::Data; +use differential_dataflow::IntoOwned; use mz_repr::Diff; use timely::container::{CapacityContainerBuilder, PushInto, SizableContainer}; use timely::dataflow::channels::pact::Pipeline; diff --git c/src/compute/src/render/reduce.rs i/src/compute/src/render/reduce.rs index 335b40c185..a0eb7a4275 100644 --- c/src/compute/src/render/reduce.rs +++ i/src/compute/src/render/reduce.rs @@ -17,12 +17,13 @@ use std::sync::LazyLock; use dec::OrderedDecimal; use differential_dataflow::collection::AsCollection; use differential_dataflow::consolidation::ConsolidatingContainerBuilder; +use differential_dataflow::containers::{Columnation, CopyRegion}; use differential_dataflow::difference::{IsZero, Multiply, Semigroup}; use differential_dataflow::hashable::Hashable; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; -use differential_dataflow::trace::cursor::IntoOwned; use differential_dataflow::trace::{Batch, Builder, Trace, TraceReader}; +use differential_dataflow::IntoOwned; use differential_dataflow::{Collection, Diff as _}; use mz_compute_types::plan::reduce::{ reduction_type, AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, KeyValPlan, @@ -37,7 +38,6 @@ use mz_repr::{Datum, DatumList, DatumVec, Diff, Row, RowArena, SharedRow}; use mz_storage_types::errors::DataflowError; use mz_timely_util::operator::CollectionExt; use serde::{Deserialize, Serialize}; -use timely::container::columnation::{Columnation, CopyRegion}; use timely::container::{CapacityContainerBuilder, PushInto}; use timely::dataflow::Scope; use timely::progress::timestamp::Refines; @@ -2308,12 +2308,12 @@ mod monoids { // add a new enum variant here), because other code (e.g., `HierarchicalOneByOneAggr`) // assumes this. + use differential_dataflow::containers::{Columnation, Region}; use differential_dataflow::difference::{IsZero, Multiply, Semigroup}; use mz_expr::AggregateFunc; use mz_ore::soft_panic_or_log; use mz_repr::{Datum, Diff, Row}; use serde::{Deserialize, Serialize}; - use timely::container::columnation::{Columnation, Region}; /// A monoid containing a single-datum row. #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)] diff --git c/src/compute/src/render/threshold.rs i/src/compute/src/render/threshold.rs index cd22d5529b..1a31367806 100644 --- c/src/compute/src/render/threshold.rs +++ i/src/compute/src/render/threshold.rs @@ -11,15 +11,15 @@ //! //! Consult [ThresholdPlan] documentation for details. +use differential_dataflow::containers::Columnation; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; -use differential_dataflow::trace::cursor::IntoOwned; use differential_dataflow::trace::{Batch, Builder, Trace, TraceReader}; use differential_dataflow::Data; +use differential_dataflow::IntoOwned; use mz_compute_types::plan::threshold::{BasicThresholdPlan, ThresholdPlan}; use mz_expr::MirScalarExpr; use mz_repr::Diff; -use timely::container::columnation::Columnation; use timely::container::PushInto; use timely::dataflow::Scope; use timely::progress::timestamp::Refines; diff --git c/src/compute/src/render/top_k.rs i/src/compute/src/render/top_k.rs index c4e2904c1d..d345b6cd48 100644 --- c/src/compute/src/render/top_k.rs +++ i/src/compute/src/render/top_k.rs @@ -12,11 +12,12 @@ //! Consult [TopKPlan] documentation for details. use columnar::Columnar; +use differential_dataflow::containers::Columnation; use differential_dataflow::hashable::Hashable; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; -use differential_dataflow::trace::cursor::IntoOwned; use differential_dataflow::trace::{Batch, Builder, Trace, TraceReader}; +use differential_dataflow::IntoOwned; use differential_dataflow::{AsCollection, Collection}; use mz_compute_types::plan::top_k::{ BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan, @@ -31,7 +32,6 @@ use mz_timely_util::operator::CollectionExt; use std::cell::RefCell; use std::collections::BTreeMap; use std::rc::Rc; -use timely::container::columnation::Columnation; use timely::container::{CapacityContainerBuilder, PushInto}; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Operator; @@ -841,11 +841,11 @@ pub mod monoids { use std::hash::{Hash, Hasher}; use std::rc::Rc; + use differential_dataflow::containers::{Columnation, Region}; use differential_dataflow::difference::{IsZero, Multiply, Semigroup}; use mz_expr::ColumnOrder; use mz_repr::{DatumVec, Diff, Row}; use serde::{Deserialize, Serialize}; - use timely::container::columnation::{Columnation, Region}; /// A monoid containing a row and an ordering. #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash, Default)] diff --git c/src/compute/src/row_spine.rs i/src/compute/src/row_spine.rs index d66479d78f..40816866e0 100644 --- c/src/compute/src/row_spine.rs +++ i/src/compute/src/row_spine.rs @@ -20,6 +20,7 @@ use differential_dataflow::trace::implementations::OffsetList; mod spines { use std::rc::Rc; + use differential_dataflow::containers::{Columnation, TimelyStack}; use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder}; use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdValBuilder}; use differential_dataflow::trace::implementations::spine_fueled::Spine; @@ -27,7 +28,6 @@ mod spines { use differential_dataflow::trace::implementations::Update; use differential_dataflow::trace::rc_blanket_impls::RcBuilder; use mz_repr::Row; - use timely::container::columnation::{Columnation, TimelyStack}; use crate::row_spine::{DatumContainer, OffsetOptimized}; use crate::typedefs::{KeyBatcher, KeyValBatcher}; @@ -99,7 +99,7 @@ mod spines { /// A `Row`-specialized container using dictionary compression. mod container { - use differential_dataflow::trace::cursor::IntoOwned; + use differential_dataflow::IntoOwned; use std::cmp::Ordering; use differential_dataflow::trace::implementations::BatchContainer; diff --git c/src/compute/src/sink/correction_v2.rs i/src/compute/src/sink/correction_v2.rs index 5f0e77b962..6b0ec40af2 100644 --- c/src/compute/src/sink/correction_v2.rs +++ i/src/compute/src/sink/correction_v2.rs @@ -129,10 +129,10 @@ use std::collections::{BinaryHeap, VecDeque}; use std::fmt; use std::rc::Rc; +use differential_dataflow::containers::{Columnation, TimelyStack}; use differential_dataflow::trace::implementations::BatchContainer; use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta}; use mz_repr::{Diff, Timestamp}; -use timely::container::columnation::{Columnation, TimelyStack}; use timely::container::SizableContainer; use timely::progress::Antichain; use timely::{Container, PartialOrder}; diff --git c/src/compute/src/typedefs.rs i/src/compute/src/typedefs.rs index 6325edd515..c824ea85a1 100644 --- c/src/compute/src/typedefs.rs +++ i/src/compute/src/typedefs.rs @@ -15,21 +15,14 @@ use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::trace::implementations::chunker::ColumnationChunker; use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher}; -use differential_dataflow::trace::implementations::ord_neu::{ - FlatValBatcher, FlatValBuilder, FlatValSpine, OrdValBatch, -}; use differential_dataflow::trace::wrappers::enter::TraceEnter; use differential_dataflow::trace::wrappers::frontier::TraceFrontier; -use mz_ore::flatcontainer::MzRegionPreference; use mz_repr::Diff; use mz_storage_types::errors::DataflowError; -use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; use timely::dataflow::ScopeParent; use crate::row_spine::RowValBuilder; -use crate::typedefs::spines::{ - ColKeyBatcher, ColKeyBuilder, ColValBatcher, ColValBuilder, MzFlatLayout, -}; +use crate::typedefs::spines::{ColKeyBatcher, ColKeyBuilder, ColValBatcher, ColValBuilder}; pub use crate::row_spine::{RowRowSpine, RowSpine, RowValBatcher, RowValSpine}; pub use crate::typedefs::spines::{ColKeySpine, ColValSpine}; @@ -37,17 +30,13 @@ pub use crate::typedefs::spines::{ColKeySpine, ColValSpine}; pub(crate) mod spines { use std::rc::Rc; - use differential_dataflow::difference::Semigroup; - use differential_dataflow::lattice::Lattice; + use differential_dataflow::containers::{Columnation, TimelyStack}; use differential_dataflow::trace::implementations::ord_neu::{ OrdKeyBatch, OrdKeyBuilder, OrdValBatch, OrdValBuilder, }; use differential_dataflow::trace::implementations::spine_fueled::Spine; use differential_dataflow::trace::implementations::{Layout, Update}; use differential_dataflow::trace::rc_blanket_impls::RcBuilder; - use timely::container::columnation::{Columnation, TimelyStack}; - use timely::container::flatcontainer::{FlatStack, Push, Region}; - use timely::progress::Timestamp; use crate::row_spine::OffsetOptimized; use crate::typedefs::{KeyBatcher, KeyValBatcher}; @@ -83,67 +72,6 @@ pub(crate) mod spines { type DiffContainer = TimelyStack<U::Diff>; type OffsetContainer = OffsetOptimized; } - - /// A layout based on flat container stacks - pub struct MzFlatLayout<K, V, T, R> { - phantom: std::marker::PhantomData<(K, V, T, R)>, - } - - impl<K, V, T, R> Update for MzFlatLayout<K, V, T, R> - where - K: Region, - V: Region, - T: Region, - R: Region, - K::Owned: Ord + Clone + 'static, - V::Owned: Ord + Clone + 'static, - T::Owned: Ord + Clone + Lattice + Timestamp + 'static, - R::Owned: Ord + Semigroup + 'static, - { - type Key = K::Owned; - type Val = V::Owned; - type Time = T::Owned; - type Diff = R::Owned; - } - - /// Layout implementation for [`MzFlatLayout`]. Mostly equivalent to differential's - /// flat layout but with a different opinion for the offset container. Here, we use - /// [`OffsetOptimized`] instead of an offset list. If differential should gain access - /// to the optimized variant, we might be able to remove this implementation. - impl<K, V, T, R> Layout for MzFlatLayout<K, V, T, R> - where - K: Region - + Push<<K as Region>::Owned> - + for<'a> Push<<K as Region>::ReadItem<'a>> - + 'static, - V: Region - + Push<<V as Region>::Owned> - + for<'a> Push<<V as Region>::ReadItem<'a>> - + 'static, - T: Region - + Push<<T as Region>::Owned> - + for<'a> Push<<T as Region>::ReadItem<'a>> - + 'static, - R: Region - + Push<<R as Region>::Owned> - + for<'a> Push<<R as Region>::ReadItem<'a>> - + 'static, - K::Owned: Ord + Clone + 'static, - V::Owned: Ord + Clone + 'static, - T::Owned: Ord + Clone + Lattice + Timestamp + 'static, - R::Owned: Ord + Semigroup + 'static, - for<'a> K::ReadItem<'a>: Copy + Ord, - for<'a> V::ReadItem<'a>: Copy + Ord, - for<'a> T::ReadItem<'a>: Copy + Ord, - for<'a> R::ReadItem<'a>: Copy + Ord, - { - type Target = Self; - type KeyContainer = FlatStack<K>; - type ValContainer = FlatStack<V>; - type TimeContainer = FlatStack<T>; - type DiffContainer = FlatStack<R>; - type OffsetContainer = OffsetOptimized; - } } // Spines are data structures that collect and maintain updates. @@ -193,33 +121,3 @@ pub type RowErrBuilder<T, R> = RowValBuilder<DataflowError, T, R>; pub type KeyBatcher<K, T, D> = KeyValBatcher<K, (), T, D>; pub type KeyValBatcher<K, V, T, D> = MergeBatcher<Vec<((K, V), T, D)>, ColumnationChunker<((K, V), T, D)>, ColMerger<(K, V), T, D>>; - -pub type FlatKeyValBatch<K, V, T, R> = OrdValBatch<MzFlatLayout<K, V, T, R>>; -pub type FlatKeyValSpine<K, V, T, R> = FlatValSpine<MzFlatLayout<K, V, T, R>>; -pub type FlatKeyValSpineDefault<K, V, T, R> = FlatKeyValSpine< - <K as MzRegionPreference>::Region, - <V as MzRegionPreference>::Region, - <T as MzRegionPreference>::Region, - <R as MzRegionPreference>::Region, ->; -pub type FlatKeyValBatcher<K, V, T, R, C> = - FlatValBatcher<TupleABCRegion<TupleABRegion<K, V>, T, R>, C>; -pub type FlatKeyValBatcherDefault<K, V, T, R, C> = FlatKeyValBatcher< - <K as MzRegionPreference>::Region, - <V as MzRegionPreference>::Region, - <T as MzRegionPreference>::Region, - <R as MzRegionPreference>::Region, - C, ->; -pub type FlatKeyValBuilder<K, V, T, R> = - FlatValBuilder<MzFlatLayout<K, V, T, R>, TupleABCRegion<TupleABRegion<K, V>, T, R>>; -pub type FlatKeyValBuilderDefault<K, V, T, R> = FlatKeyValBuilder< - <K as MzRegionPreference>::Region, - <V as MzRegionPreference>::Region, - <T as MzRegionPreference>::Region, - <R as MzRegionPreference>::Region, ->; - -pub type FlatKeyValAgent<K, V, T, R> = TraceAgent<FlatKeyValSpine<K, V, T, R>>; -pub type FlatKeyValEnter<K, V, T, R, TEnter> = - TraceEnter<TraceFrontier<FlatKeyValAgent<K, V, T, R>>, TEnter>; diff --git c/src/controller/Cargo.toml i/src/controller/Cargo.toml index a4425e0901..6c76c7c55c 100644 --- c/src/controller/Cargo.toml +++ i/src/controller/Cargo.toml @@ -32,7 +32,7 @@ mz-txn-wal = { path = "../txn-wal" } regex = "1.10.6" serde = { version = "1.0.218", features = ["derive"] } serde_json = "1.0.125" -timely = "0.18.1" +timely = "0.19.0" tokio = "1.38.0" tracing = "0.1.37" uuid = { version = "1.7.0" } diff --git c/src/durable-cache/Cargo.toml i/src/durable-cache/Cargo.toml index 0ce084ad7a..753858e65c 100644 --- c/src/durable-cache/Cargo.toml +++ i/src/durable-cache/Cargo.toml @@ -12,7 +12,7 @@ workspace = true [dependencies] async-trait = "0.1.83" bytes = { version = "1.3.0" } -differential-dataflow = "0.13.6" +differential-dataflow = "0.13.7" futures = "0.3.25" itertools = { version = "0.12.1" } mz-ore = { path = "../ore", features = ["process"] } @@ -23,7 +23,7 @@ mz-timely-util = { path = "../timely-util" } prometheus = { version = "0.13.3", default-features = false } prost = { version = "0.13.4", features = ["no-recursion-limit"] } serde = { version = "1.0.218", features = ["derive", "rc"] } -timely = "0.18.1" +timely = "0.19.0" tokio = { version = "1.38.0", default-features = false, features = ["rt", "rt-multi-thread"] } tracing = "0.1.37" uuid = { version = "1.7.0", features = ["v4"] } diff --git c/src/environmentd/Cargo.toml i/src/environmentd/Cargo.toml index 9d4e946a62..fe73371663 100644 --- c/src/environmentd/Cargo.toml +++ i/src/environmentd/Cargo.toml @@ -145,7 +145,7 @@ reqwest = { version = "0.11.13", features = ["blocking"] } serde_json = "1.0.125" serde_urlencoded = "0.7.1" similar-asserts = "1.4" -timely = "0.18.1" +timely = "0.19.0" tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] } [build-dependencies] diff --git c/src/expr/Cargo.toml i/src/expr/Cargo.toml index 91ec768698..b58fbadc85 100644 --- c/src/expr/Cargo.toml +++ i/src/expr/Cargo.toml @@ -26,6 +26,7 @@ chrono = { version = "0.4.39", default-features = false, features = ["std"] } chrono-tz = { version = "0.8.1", features = ["serde", "case-insensitive"] } crc32fast = "1.4.2" csv = "1.3.1" +differential-dataflow = "0.13.7" dec = "0.4.8" derivative = "2.2.0" encoding = "0.2.0" @@ -59,7 +60,7 @@ serde_json = "1.0.125" sha1 = "0.10.5" sha2 = "0.10.6" subtle = "2.4.1" -timely = "0.18.1" +timely = "0.19.0" tracing = "0.1.37" uncased = "0.9.7" uuid = { version = "1.7.0", features = ["v5"] } diff --git c/src/expr/src/relation.rs i/src/expr/src/relation.rs index 9ee748b83b..28e396773b 100644 --- c/src/expr/src/relation.rs +++ i/src/expr/src/relation.rs @@ -18,6 +18,7 @@ use std::num::NonZeroU64; use std::time::Instant; use bytesize::ByteSize; +use differential_dataflow::containers::{Columnation, CopyRegion}; use itertools::Itertools; use mz_lowertest::MzReflect; use mz_ore::cast::CastFrom; @@ -42,7 +43,6 @@ use proptest::prelude::{any, Arbitrary, BoxedStrategy}; use proptest::strategy::{Strategy, Union}; use proptest_derive::Arbitrary; use serde::{Deserialize, Serialize}; -use timely::container::columnation::{Columnation, CopyRegion}; use crate::explain::{HumanizedExpr, HumanizerMode}; use crate::relation::func::{AggregateFunc, LagLeadType, TableFunc}; diff --git c/src/interchange/Cargo.toml i/src/interchange/Cargo.toml index b0194f126e..0d357cb370 100644 --- c/src/interchange/Cargo.toml +++ i/src/interchange/Cargo.toml @@ -20,7 +20,7 @@ byteorder = "1.4.3" bytes = "1.3.0" chrono = { version = "0.4.39", default-features = false, features = ["std"] } clap = { version = "4.5.23", features = ["derive"] } -differential-dataflow = "0.13.6" +differential-dataflow = "0.13.7" itertools = "0.12.1" maplit = "1.0.2" mz-avro = { path = "../avro", features = ["snappy"] } @@ -33,7 +33,7 @@ prost = { version = "0.13.4", features = ["no-recursion-limit"] } prost-reflect = "0.14.6" seahash = "4" serde_json = "1.0.125" -timely = "0.18.1" +timely = "0.19.0" tokio = { version = "1.38.0", features = ["macros", "net", "rt", "rt-multi-thread", "time"] } tracing = "0.1.37" uuid = { version = "1.7.0", features = ["serde"] } diff --git c/src/interchange/src/envelopes.rs i/src/interchange/src/envelopes.rs index 87d4053e62..dc685b3347 100644 --- c/src/interchange/src/envelopes.rs +++ i/src/interchange/src/envelopes.rs @@ -13,8 +13,8 @@ use std::sync::LazyLock; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; -use differential_dataflow::trace::cursor::IntoOwned; use differential_dataflow::trace::{Batch, BatchReader, Cursor, TraceReader}; +use differential_dataflow::IntoOwned; use differential_dataflow::{AsCollection, Collection}; use itertools::{EitherOrBoth, Itertools}; use maplit::btreemap; diff --git c/src/ore/BUILD.bazel i/src/ore/BUILD.bazel index dc637499cc..62d4551a7b 100644 --- c/src/ore/BUILD.bazel +++ i/src/ore/BUILD.bazel @@ -39,7 +39,6 @@ rust_library( "ctor", "default", "derivative", - "flatcontainer", "futures", "hibitset", "http", @@ -120,7 +119,6 @@ rust_test( "ctor", "default", "derivative", - "flatcontainer", "futures", "hibitset", "http", diff --git c/src/ore/Cargo.toml i/src/ore/Cargo.toml index 8e67f6c332..224adec8be 100644 --- c/src/ore/Cargo.toml +++ i/src/ore/Cargo.toml @@ -29,7 +29,6 @@ compact_bytes = { version = "0.1.3", optional = true } ctor = { version = "0.1.26", optional = true } derivative = { version = "2.2.0", optional = true } either = "1.8.0" -flatcontainer = { version = "0.5.0", optional = true } futures = { version = "0.3.25", optional = true } hibitset = { version = "0.6.4", optional = true } itertools = "0.12.1" diff --git c/src/ore/src/flatcontainer.rs i/src/ore/src/flatcontainer.rs deleted file mode 100644 index d74b84ac08..0000000000 --- c/src/ore/src/flatcontainer.rs +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright Materialize, Inc. and contributors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License in the LICENSE file at the -// root of this repository, or online at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Flat container utilities - -use flatcontainer::{Push, Region, ReserveItems}; - -/// Associate a type with a flat container region. -pub trait MzRegionPreference: 'static { - /// The owned type of the container. - type Owned; - /// A region that can hold `Self`. - type Region: for<'a> Region<Owned = Self::Owned> - + Push<Self::Owned> - + for<'a> Push<<Self::Region as Region>::ReadItem<'a>> - + for<'a> ReserveItems<<Self::Region as Region>::ReadItem<'a>>; -} - -/// Opinion indicating that the contents of a collection should be stored in an -/// [`OwnedRegion`](flatcontainer::OwnedRegion). This is most useful to force types to a region -/// that doesn't copy individual elements to a nested region, like the -/// [`SliceRegion`](flatcontainer::SliceRegion) does. -#[derive(Debug)] -pub struct OwnedRegionOpinion<T>(std::marker::PhantomData<T>); - -mod tuple { - use flatcontainer::impls::tuple::*; - use paste::paste; - - use crate::flatcontainer::MzRegionPreference; - - macro_rules! tuple_flatcontainer { - ($($name:ident)+) => ( - paste! { - impl<$($name: MzRegionPreference),*> MzRegionPreference for ($($name,)*) { - type Owned = ($($name::Owned,)*); - type Region = [<Tuple $($name)* Region >]<$($name::Region,)*>; - } - } - ) - } - - tuple_flatcontainer!(A); - tuple_flatcontainer!(A B); - tuple_flatcontainer!(A B C); - tuple_flatcontainer!(A B C D); - tuple_flatcontainer!(A B C D E); -} - -mod copy { - use flatcontainer::MirrorRegion; - - use crate::flatcontainer::MzRegionPreference; - - macro_rules! implement_for { - ($index_type:ty) => { - impl MzRegionPreference for $index_type { - type Owned = Self; - type Region = MirrorRegion<Self>; - } - }; - } - - implement_for!(()); - implement_for!(bool); - implement_for!(char); - - implement_for!(u8); - implement_for!(u16); - implement_for!(u32); - implement_for!(u64); - implement_for!(u128); - implement_for!(usize); - - implement_for!(i8); - implement_for!(i16); - implement_for!(i32); - implement_for!(i64); - implement_for!(i128); - implement_for!(isize); - - implement_for!(f32); - implement_for!(f64); - - implement_for!(std::num::Wrapping<i8>); - implement_for!(std::num::Wrapping<i16>); - implement_for!(std::num::Wrapping<i32>); - implement_for!(std::num::Wrapping<i64>); - implement_for!(std::num::Wrapping<i128>); - implement_for!(std::num::Wrapping<isize>); - - implement_for!(std::time::Duration); -} - -mod vec { - use flatcontainer::OwnedRegion; - - use crate::flatcontainer::{MzRegionPreference, OwnedRegionOpinion}; - - impl<T: Clone + 'static> MzRegionPreference for OwnedRegionOpinion<Vec<T>> { - type Owned = Vec<T>; - type Region = OwnedRegion<T>; - } -} - -impl<T: MzRegionPreference> MzRegionPreference for Option<T> { - type Owned = <flatcontainer::OptionRegion<T::Region> as Region>::Owned; - type Region = flatcontainer::OptionRegion<T::Region>; -} diff --git c/src/ore/src/lib.rs i/src/ore/src/lib.rs index b9e5cbf98b..0b8a1decb2 100644 --- c/src/ore/src/lib.rs +++ i/src/ore/src/lib.rs @@ -39,8 +39,6 @@ pub mod cli; pub mod collections; pub mod env; pub mod error; -#[cfg(feature = "flatcontainer")] -pub mod flatcontainer; pub mod fmt; #[cfg_attr(nightly_doc_features, doc(cfg(feature = "async")))] #[cfg(feature = "async")] diff --git c/src/persist-cli/Cargo.toml i/src/persist-cli/Cargo.toml index 504c30f1b2..68f71b41af 100644 --- c/src/persist-cli/Cargo.toml +++ i/src/persist-cli/Cargo.toml @@ -23,7 +23,7 @@ async-trait = "0.1.83" axum = "0.7.5" bytes = { version = "1.3.0", features = ["serde"] } clap = { version = "4.5.23", features = ["derive", "env"] } -differential-dataflow = "0.13.6" +differential-dataflow = "0.13.7" futures = "0.3.25" humantime = "2.1.0" mz-http-util = { path = "../http-util" } @@ -40,7 +40,7 @@ num_enum = "0.7.3" prometheus = { version = "0.13.3", default-features = false } serde = { version = "1.0.218", features = ["derive", "rc"] } serde_json = "1.0.125" -timely = "0.18.1" +timely = "0.19.0" tokio = { version = "1.38.0", default-features = false, features = ["macros", "sync", "rt", "rt-multi-thread", "time"] } tracing = "0.1.37" url = "2.3.1" diff --git c/src/persist-client/Cargo.toml i/src/persist-client/Cargo.toml index 233a370125..ac9229ae19 100644 --- c/src/persist-client/Cargo.toml +++ i/src/persist-client/Cargo.toml @@ -35,7 +35,7 @@ async-stream = "0.3.3" async-trait = "0.1.83" bytes = { version = "1.3.0", features = ["serde"] } clap = { version = "4.5.23", features = ["derive"] } -differential-dataflow = "0.13.6" +differential-dataflow = "0.13.7" futures = "0.3.25" futures-util = "0.3" h2 = "0.3.13" @@ -59,7 +59,7 @@ sentry-tracing = "0.29.1" semver = { version = "1.0.16", features = ["serde"] } serde = { version = "1.0.218", features = ["derive", "rc"] } serde_json = "1.0.125" -timely = "0.18.1" +timely = "0.19.0" thiserror = "2.0.11" tokio = { version = "1.38.0", default-features = false, features = ["macros", "sync", "rt", "rt-multi-thread", "time"] } tokio-metrics = "0.4.0" diff --git c/src/persist-types/Cargo.toml i/src/persist-types/Cargo.toml index 8837067526..19f7c1bf54 100644 --- c/src/persist-types/Cargo.toml +++ i/src/persist-types/Cargo.toml @@ -26,7 +26,7 @@ proptest-derive = { version = "0.5.1", features = ["boxed_union"] } prost = { version = "0.13.4", features = ["no-recursion-limit"] } serde = { version = "1.0.218", features = ["derive"] } serde_json = { version = "1.0.125" } -timely = "0.18.1" +timely = "0.19.0" tracing = "0.1.37" uuid = { version = "1.7.0", features = ["v4"] } workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } diff --git c/src/persist/Cargo.toml i/src/persist/Cargo.toml index b26bb13677..fe8eb42b01 100644 --- c/src/persist/Cargo.toml +++ i/src/persist/Cargo.toml @@ -37,7 +37,7 @@ azure_core = "0.21.0" base64 = "0.13.1" bytes = "1.3.0" deadpool-postgres = "0.10.3" -differential-dataflow = "0.13.6" +differential-dataflow = "0.13.7" fail = { version = "0.5.1", features = ["failpoints"] } futures-util = "0.3.31" md-5 = "0.10.5" @@ -59,7 +59,7 @@ prost = { version = "0.13.4", features = ["no-recursion-limit"] } rand = { version = "0.8.5", features = ["small_rng"] } reqwest = { version = "0.12", features = ["blocking", "json", "default-tls", "charset", "http2"], default-features = false } serde = { version = "1.0.218", features = ["derive"] } -timely = "0.18.1" +timely = "0.19.0" tokio = { version = "1.38.0", default-features = false, features = ["fs", "macros", "sync", "rt", "rt-multi-thread"] } tokio-postgres = { version = "0.7.8" } tracing = "0.1.37" diff --git c/src/repr/Cargo.toml i/src/repr/Cargo.toml index 1450fec64c..2c9433e2d8 100644 --- c/src/repr/Cargo.toml +++ i/src/repr/Cargo.toml @@ -36,15 +36,13 @@ columnation = "0.1.0" chrono = { version = "0.4.39", default-features = false, features = ["serde", "std"] } compact_bytes = "0.1.3" dec = "0.4.8" -differential-dataflow = "0.13.6" +differential-dataflow = "0.13.7" enum-kinds = "0.5.1" -flatcontainer = "0.5.0" hex = "0.4.3" itertools = "0.12.1" mz-lowertest = { path = "../lowertest", default-features = false } mz-ore = { path = "../ore", features = [ "bytes", - "flatcontainer", "id_gen", "smallvec", "region", @@ -68,7 +66,7 @@ serde_json = { version = "1.0.125", features = ["arbitrary_precision", "preserve smallvec = { version = "1.10.0", features = ["serde", "union"] } static_assertions = "1.1" strsim = "0.11.1" -timely = "0.18.1" +timely = "0.19.0" tokio-postgres = { version = "0.7.8" } tracing-core = "0.1.30" url = { version = "2.3.1", features = ["serde"] } diff --git c/src/repr/src/timestamp.rs i/src/repr/src/timestamp.rs index e00efa747f..cf0c5c6a5b 100644 --- c/src/repr/src/timestamp.rs +++ i/src/repr/src/timestamp.rs @@ -470,17 +470,11 @@ impl columnation::Columnation for Timestamp { type InnerRegion = columnation::CopyRegion<Timestamp>; } -mod flatcontainer { - use flatcontainer::{IntoOwned, MirrorRegion}; - use mz_ore::flatcontainer::MzRegionPreference; +mod differential { + use differential_dataflow::IntoOwned; use crate::Timestamp; - impl MzRegionPreference for Timestamp { - type Owned = Self; - type Region = MirrorRegion<Timestamp>; - } - impl<'a> IntoOwned<'a> for Timestamp { type Owned = Self; diff --git c/src/service/Cargo.toml i/src/service/Cargo.toml index 315fff7985..87b702c26a 100644 --- c/src/service/Cargo.toml +++ i/src/service/Cargo.toml @@ -35,7 +35,7 @@ prost = { version = "0.13.4", features = ["no-recursion-limit"] } semver = "1.0.16" serde = { version = "1.0.218", features = ["derive"] } sysinfo = "0.29.11" -timely = "0.18.1" +timely = "0.19.0" tokio = "1.38.0" tokio-stream = "0.1.17" tonic = "0.12.1" diff --git c/src/sql/src/session/vars.rs i/src/sql/src/session/vars.rs index 021350779e..fd2011778e 100644 --- c/src/sql/src/session/vars.rs +++ i/src/sql/src/session/vars.rs @@ -2238,6 +2238,15 @@ impl SystemVars { name == MAX_RESULT_SIZE.name() || self.is_dyncfg_var(name) || is_tracing_var(name) } + /// Returns whether the named variable is an initial compute configuration parameter + /// (things that go in `TimelyConfig` and cannot be changed at runtime). + pub fn is_initial_compute_config_var(&self, name: &str) -> bool { + name == ARRANGEMENT_EXERT_PROPORTIONALITY.name() + || name == ENABLE_TIMELY_ZERO_COPY.name() + || name == ENABLE_TIMELY_ZERO_COPY_LGALLOC.name() + || name == TIMELY_ZERO_COPY_LIMIT.name() + } + /// Returns whether the named variable is a metrics configuration parameter pub fn is_metrics_config_var(&self, name: &str) -> bool { self.is_dyncfg_var(name) diff --git c/src/storage-client/Cargo.toml i/src/storage-client/Cargo.toml index 267d72aaa4..de83f1b3c3 100644 --- c/src/storage-client/Cargo.toml +++ i/src/storage-client/Cargo.toml @@ -13,7 +13,7 @@ workspace = true anyhow = "1.0.95" async-trait = "0.1.83" chrono = { version = "0.4.39", default-features = false, features = ["std"] } -differential-dataflow = "0.13.6" +differential-dataflow = "0.13.7" futures = "0.3.25" http = "1.2.0" itertools = { version = "0.12.1" } @@ -46,7 +46,7 @@ serde = { version = "1.0.218", features = ["derive"] } serde_json = { version = "1.0.125" } smallvec = { version = "1.10.0", features = ["serde", "union"] } static_assertions = "1.1" -timely = "0.18.1" +timely = "0.19.0" tokio = { version = "1.38.0", features = [ "fs", "rt", diff --git c/src/storage-controller/Cargo.toml i/src/storage-controller/Cargo.toml index dabcfe6dca..d23979fbda 100644 --- c/src/storage-controller/Cargo.toml +++ i/src/storage-controller/Cargo.toml @@ -15,7 +15,7 @@ async-trait = "0.1.83" bytes = "1.3.0" chrono = { version = "0.4.39", default-features = false, features = ["std"] } derivative = "2.2.0" -differential-dataflow = "0.13.6" +differential-dataflow = "0.13.7" futures = "0.3.25" itertools = { version = "0.12.1" } mz-build-info = { path = "../build-info" } @@ -38,7 +38,7 @@ proptest = { version = "1.6.0", default-features = false, features = ["std"] } prost = { version = "0.13.4", features = ["no-recursion-limit"] } serde = { version = "1.0.218", features = ["derive"] } serde_json = { version = "1.0.125" } -timely = "0.18.1" +timely = "0.19.0" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "test-util", "time"] } tokio-postgres = { version = "0.7.8", features = ["serde"] } tokio-stream = "0.1.17" diff --git c/src/storage-operators/Cargo.toml i/src/storage-operators/Cargo.toml index cccfe02b74..e6fc8907d0 100644 --- c/src/storage-operators/Cargo.toml +++ i/src/storage-operators/Cargo.toml @@ -20,7 +20,7 @@ bytes = "1.3.0" bytesize = "1.3.0" csv-async = { version = "1.3.0", features = ["tokio"] } derivative = "2.2.0" -differential-dataflow = "0.13.6" +differential-dataflow = "0.13.7" futures = "0.3.25" glob = "0.3.2" http = "1.2.0" @@ -47,7 +47,7 @@ reqwest = { version = "0.11.13", features = ["stream"] } sentry = { version = "0.29.1" } serde = { version = "1.0.218", features = ["derive"] } smallvec = { version = "1.10.0", features = ["union"] } -timely = "0.18.1" +timely = "0.19.0" thiserror = "2.0.11" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "test-util", "time"] } tokio-stream = "0.1.17" diff --git c/src/storage-operators/src/s3_oneshot_sink.rs i/src/storage-operators/src/s3_oneshot_sink.rs index 67f60776d8..84d6f34578 100644 --- c/src/storage-operators/src/s3_oneshot_sink.rs +++ i/src/storage-operators/src/s3_oneshot_sink.rs @@ -16,6 +16,7 @@ use std::rc::Rc; use anyhow::anyhow; use aws_types::sdk_config::SdkConfig; +use differential_dataflow::containers::TimelyStack; use differential_dataflow::Hashable; use futures::StreamExt; use mz_ore::cast::CastFrom; @@ -31,7 +32,6 @@ use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo}; use mz_timely_util::builder_async::{ Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, }; -use timely::container::columnation::TimelyStack; use timely::dataflow::channels::pact::{Exchange, Pipeline}; use timely::dataflow::operators::Broadcast; use timely::dataflow::{Scope, Stream}; diff --git c/src/storage-types/Cargo.toml i/src/storage-types/Cargo.toml index f0591fa771..07e85ef878 100644 --- c/src/storage-types/Cargo.toml +++ i/src/storage-types/Cargo.toml @@ -25,7 +25,7 @@ bytes = "1.3.0" columnation = "0.1.0" dec = "0.4.8" derivative = "2.2.0" -differential-dataflow = "0.13.6" +differential-dataflow = "0.13.7" hex = "0.4.3" http = "1.2.0" itertools = { version = "0.12.1" } @@ -62,7 +62,7 @@ regex = "1.10.6" serde = { version = "1.0.218", features = ["derive"] } serde_json = { version = "1.0.125", features = ["preserve_order"] } thiserror = "2.0.11" -timely = "0.18.1" +timely = "0.19.0" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "test-util", "time"] } tokio-postgres = { version = "0.7.8", features = ["serde"] } tracing = "0.1.37" diff --git c/src/storage-types/src/errors.rs i/src/storage-types/src/errors.rs index ea50e3b6e2..fa5eeabe5f 100644 --- c/src/storage-types/src/errors.rs +++ i/src/storage-types/src/errors.rs @@ -394,8 +394,8 @@ impl Error for DataflowError {} mod boxed_str { - use timely::container::columnation::Region; - use timely::container::columnation::StableRegion; + use differential_dataflow::containers::Region; + use differential_dataflow::containers::StableRegion; /// Region allocation for `String` data. /// @@ -447,11 +447,11 @@ mod boxed_str { mod columnation { use std::iter::once; + use differential_dataflow::containers::{Columnation, Region, StableRegion}; use mz_expr::EvalError; use mz_repr::adt::range::InvalidRangeError; use mz_repr::strconv::ParseError; use mz_repr::Row; - use timely::container::columnation::{Columnation, Region, StableRegion}; use crate::errors::boxed_str::BoxStrStack; use crate::errors::{ @@ -907,8 +907,8 @@ mod columnation { #[cfg(test)] mod tests { + use differential_dataflow::containers::TimelyStack; use proptest::prelude::*; - use timely::container::columnation::TimelyStack; use super::*; diff --git c/src/storage/Cargo.toml i/src/storage/Cargo.toml index 6719c12e72..8023503296 100644 --- c/src/storage/Cargo.toml +++ i/src/storage/Cargo.toml @@ -26,7 +26,7 @@ columnation = "0.1.0" crossbeam-channel = "0.5.14" csv-core = { version = "0.1.10" } dec = "0.4.8" -differential-dataflow = "0.13.6" +differential-dataflow = "0.13.7" fail = { version = "0.5.1", features = ["failpoints"] } futures = "0.3.25" indexmap = { version = "2.0.0", default-features = false, features = ["std"] } @@ -77,7 +77,7 @@ serde = { version = "1.0.218", features = ["derive"] } serde_json = { version = "1.0.125" } serde_bytes = { version = "0.11.15" } sha2 = "0.10.6" -timely = "0.18.1" +timely = "0.19.0" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "test-util"] } tokio-postgres = { version = "0.7.8", features = ["serde"] } tokio-stream = "0.1.17" diff --git c/src/storage/src/source/mysql.rs i/src/storage/src/source/mysql.rs index 45ad1ed639..4f04cf7846 100644 --- c/src/storage/src/source/mysql.rs +++ i/src/storage/src/source/mysql.rs @@ -56,6 +56,7 @@ use std::fmt; use std::io; use std::rc::Rc; +use differential_dataflow::containers::TimelyStack; use differential_dataflow::AsCollection; use itertools::Itertools; use mz_mysql_util::quote_identifier; @@ -66,7 +67,6 @@ use mz_storage_types::errors::{DataflowError, SourceError}; use mz_storage_types::sources::SourceExport; use mz_timely_util::containers::stack::AccountedStackBuilder; use serde::{Deserialize, Serialize}; -use timely::container::columnation::TimelyStack; use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pushers::Tee; use timely::dataflow::operators::core::Partition; diff --git c/src/storage/src/source/types.rs i/src/storage/src/source/types.rs index 43e1081a7a..f305db7865 100644 --- c/src/storage/src/source/types.rs +++ i/src/storage/src/source/types.rs @@ -20,6 +20,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; +use differential_dataflow::containers::TimelyStack; use differential_dataflow::Collection; use mz_repr::{Diff, GlobalId, Row}; use mz_storage_types::errors::{DataflowError, DecodeError}; @@ -27,7 +28,6 @@ use mz_storage_types::sources::SourceTimestamp; use mz_timely_util::builder_async::PressOnDropButton; use pin_project::pin_project; use serde::{Deserialize, Serialize}; -use timely::container::columnation::TimelyStack; use timely::dataflow::{Scope, ScopeParent, Stream}; use timely::progress::Antichain; use tokio::sync::Semaphore; diff --git c/src/timely-util/Cargo.toml i/src/timely-util/Cargo.toml index 597bfac89c..a4d501f103 100644 --- c/src/timely-util/Cargo.toml +++ i/src/timely-util/Cargo.toml @@ -15,7 +15,7 @@ bincode = "1.3.3" bytemuck = "1.21.0" columnar = "0.3.0" columnation = "0.1.0" -differential-dataflow = "0.13.6" +differential-dataflow = "0.13.7" either = "1" futures-util = "0.3.31" lgalloc = "0.5" @@ -23,7 +23,7 @@ mz-ore = { path = "../ore", features = ["async", "process", "tracing", "test"] } num-traits = "0.2" proptest = { version = "1.6.0", default-features = false, features = ["std"] } serde = { version = "1.0.218", features = ["derive"] } -timely = "0.18.1" +timely = "0.19.0" tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread", "time"] } tracing = "0.1.37" uuid = { version = "1.7.0", features = ["serde", "v4"] } diff --git c/src/timely-util/src/builder_async.rs i/src/timely-util/src/builder_async.rs index d6fd79bf4e..9d2d139858 100644 --- c/src/timely-util/src/builder_async.rs +++ i/src/timely-util/src/builder_async.rs @@ -24,10 +24,10 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::task::{ready, Context, Poll, Waker}; +use differential_dataflow::containers::{Columnation, TimelyStack}; use futures_util::task::ArcWake; use futures_util::Stream; use timely::communication::{Pull, Push}; -use timely::container::columnation::{Columnation, TimelyStack}; use timely::container::{CapacityContainerBuilder, ContainerBuilder, PushInto}; use timely::dataflow::channels::pact::ParallelizationContract; use timely::dataflow::channels::pushers::Tee; diff --git c/src/timely-util/src/containers.rs i/src/timely-util/src/containers.rs index 5fb43223be..d52390d1fa 100644 --- c/src/timely-util/src/containers.rs +++ i/src/timely-util/src/containers.rs @@ -18,9 +18,9 @@ use std::hash::Hash; use columnar::Columnar; +use differential_dataflow::containers::TimelyStack; use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher}; use differential_dataflow::Hashable; -use timely::container::columnation::TimelyStack; pub mod stack; diff --git c/src/timely-util/src/containers/stack.rs i/src/timely-util/src/containers/stack.rs index 0112853afc..c7c17e714d 100644 --- c/src/timely-util/src/containers/stack.rs +++ i/src/timely-util/src/containers/stack.rs @@ -18,7 +18,7 @@ use std::cell::Cell; -use timely::container::columnation::{Columnation, TimelyStack}; +use differential_dataflow::containers::{Columnation, TimelyStack}; use timely::container::{ContainerBuilder, PushInto}; /// A Stacked container builder that keep track of container memory usage. diff --git c/src/timely-util/src/operator.rs i/src/timely-util/src/operator.rs index 8763e95937..3f1c9d6ab6 100644 --- c/src/timely-util/src/operator.rs +++ i/src/timely-util/src/operator.rs @@ -21,12 +21,12 @@ use std::marker::PhantomData; use std::rc::Weak; use differential_dataflow::consolidation::ConsolidatingContainerBuilder; +use differential_dataflow::containers::{Columnation, TimelyStack}; use differential_dataflow::difference::{Multiply, Semigroup}; use differential_dataflow::lattice::Lattice; use differential_dataflow::logging::DifferentialEventBuilder; use differential_dataflow::trace::{Batcher, Builder, Description}; use differential_dataflow::{AsCollection, Collection, Hashable}; -use timely::container::columnation::{Columnation, TimelyStack}; use timely::container::{ContainerBuilder, PushInto}; use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline}; use timely::dataflow::channels::pushers::Tee; diff --git c/src/timely-util/src/order.rs i/src/timely-util/src/order.rs index b37179fcd3..01655568ff 100644 --- c/src/timely-util/src/order.rs +++ i/src/timely-util/src/order.rs @@ -19,8 +19,8 @@ use std::cmp::Ordering; use std::fmt::{self, Debug}; use std::hash::Hash; +use differential_dataflow::containers::CopyRegion; use serde::{Deserialize, Serialize}; -use timely::container::columnation::CopyRegion; use timely::order::Product; use timely::progress::timestamp::{PathSummary, Refines, Timestamp}; use timely::progress::Antichain; diff --git c/src/transform/Cargo.toml i/src/transform/Cargo.toml index 20fc3be596..0f572a6ce3 100644 --- c/src/transform/Cargo.toml +++ i/src/transform/Cargo.toml @@ -10,7 +10,7 @@ publish = false workspace = true [dependencies] -differential-dataflow = "0.13.6" +differential-dataflow = "0.13.7" enum-kinds = "0.5.1" itertools = "0.12.1" mz-compute-types = { path = "../compute-types" } diff --git c/src/txn-wal/Cargo.toml i/src/txn-wal/Cargo.toml index 0425548217..7b8b0b8966 100644 --- c/src/txn-wal/Cargo.toml +++ i/src/txn-wal/Cargo.toml @@ -12,7 +12,7 @@ workspace = true [dependencies] async-trait = "0.1.83" bytes = { version = "1.3.0" } -di…
This PR adds the replication mode options to the
Config
and acopy_both_simple
method to theClient
to support queries that enter the CopyBoth mode. At the time of writing the only query that uses the CopyBoth mode is replication but more could be added in the future.copy_both_simple
returns aCopyBothDuplex
type that implements bothStream
andSink
. Users can split the two halves if they need to usingfutures::StreamExt::split
.Similarly to binary copies, this PR also provides two helper types that can wrap the
CopyBothDuplex
type and offer a higher level view of the data to the user.ReplicationStream
: Decodes the physical replication protocol according to the specification. It provides raw byte access to the contents of XLogData messages.LogicalReplicationStream
: Internally wrapsReplicationStream
and further decodes the contents of the XLogData messages according to the logical replication specificationFinally, this PR includes an integration test that sets up a replication connection, starts a CopyBoth stream for logical replication and tests that the correct data is sent and received.