Skip to content

[SPARK-52283][SQL] Declarative Pipelines DataflowGraph creation and resolution #51003

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: master
Choose a base branch
from

Conversation

aakash-db
Copy link
Contributor

@aakash-db aakash-db commented May 23, 2025

What changes were proposed in this pull request?

This PR introduces the DataflowGraph, a container for Declarative Pipelines datasets and flows, as described in the Declarative Pipelines SPIP. It also adds functionality for

  • Constructing a graph by registering a set of graph elements in succession (GraphRegistrationContext)
  • "Resolving" a graph, which means resolving each of the flows within a graph. Resolving a flow means:
    • Validating that its plan can be successfully analyzed
    • Determining the schema of the data it will produce
    • Determining what upstream datasets within the graph it depends on

It also introduces various secondary changes:

  • Changes to SparkBuild to support declarative pipelines.
  • Updates to the pom.xml for the module.
  • New error conditions

Why are the changes needed?

In order to implement Declarative Pipelines.

Does this PR introduce any user-facing change?

No changes to existing behavior.

How was this patch tested?

New test suites:

  • ConnectValidPipelineSuite – test cases where the graph can be successfully resolved
  • ConnectInvalidPipelineSuite – test cases where the graph fails to be resolved

Was this patch authored or co-authored using generative AI tooling?

No

@sryza sryza changed the title [SPARK-52283][CONNECT] SDP DataflowGraph creation and resolution [SPARK-52283][CONNECT] Declarative Pipelines DataflowGraph creation and resolution May 23, 2025
@sryza sryza self-requested a review May 23, 2025 21:01
@sryza sryza self-assigned this May 23, 2025
Copy link

@jonmio jonmio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flushing some comments

@@ -2025,6 +2031,18 @@
],
"sqlState" : "42613"
},
"INCOMPATIBLE_BATCH_VIEW_READ": {
"message": [
"View <datasetIdentifier> is not a streaming view and must be referenced using read. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this conf and do we really need it?

* @param upstreamNodes Upstream nodes for the node
* @return
*/
def processNode(node: GraphElement, upstreamNodes: Seq[GraphElement]): Seq[GraphElement] = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Document return. I'm especially curious why this is a Seq and when processNode would return more than one element

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, it's mostly just for flexibility - in case one node maps to several in the future.

@apache apache deleted a comment from aakash-db May 27, 2025
@aakash-db aakash-db requested a review from sryza May 27, 2025 22:40
@aakash-db aakash-db changed the title [SPARK-52283][CONNECT] Declarative Pipelines DataflowGraph creation and resolution [SPARK-52283] Declarative Pipelines DataflowGraph creation and resolution May 27, 2025
@aakash-db aakash-db changed the title [SPARK-52283] Declarative Pipelines DataflowGraph creation and resolution [SPARK-52283][SQL] Declarative Pipelines DataflowGraph creation and resolution May 27, 2025
@sryza sryza requested a review from cloud-fan May 27, 2025 22:53
val materializedFlowIdentifiers: Set[TableIdentifier] = materializedFlows.map(_.identifier).toSet

/** Returns a [[Table]] given its identifier */
lazy val table: Map[TableIdentifier, Table] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableIdentifier only supports 3-level namespace. Shall we use Seq[String] to better support DS v2, which can have an arbitrary level of namespace?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seq[String] is a bit hard to use here. We can switch to the DS v2 API after we create an encapsulation class to match TableIdentifier.

f.inputs.toSeq
.map(availableResolvedInputs(_))
.filter {
// Input is a flow implies that the upstream table is a View.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it hard to understand this comment. We are resolving a flow, and the input of a flow can be other flows? Why it means the upstream table is view?

Copy link
Contributor Author

@aakash-db aakash-db May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because for views, we don't have a ViewInput object - we instead map the view's downstream directly to the view's upstream. Thus if a flow's upstream is a flow, the node it is writing to is a view. If a flow's upstream was a table, then there would be a TableInput object here.

@sryza
Copy link
Contributor

sryza commented May 30, 2025

@aakash-db

I'm still seeing some test failures:

@cloud-fan
Copy link
Contributor

yea the docker test is generally flaky and we can ignore

/**
* Returns a [[TableInput]], if one is available, that can be read from by downstream flows.
*/
def tableInput(identifier: TableIdentifier): Option[TableInput] = table.get(identifier)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If tables are both input and output, shall we just have a single name to table map, instead of a output map and a tableInput method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually there is already a table map here, why do we need the output map which is identical to the table map but just give different error message for duplication?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah it looks like we do not need a separate tableInput and we should take this out. I suspect this was a relic of an older version of a code where there was a subclass of DataflowGraph that overrode tableInput.

output is a little bit forward-looking. We will eventually allow DataflowGraphs to have sinks (described in the SPIP but not yet implemented here), and output will include both sinks and tables. However we can leave it out for now and add it back when we add sinks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed tableInput.

@aakash-db
Copy link
Contributor Author

Hm, I don't really understand the test failures:

Error:  Failed to execute goal org.apache.maven.plugins:maven-enforcer-plugin:3.5.0:enforce (enforce-no-duplicate-dependencies) on project spark-pipelines_2.13: 
Error:  Rule 0: org.apache.maven.enforcer.rules.BanDuplicatePomDependencyVersions failed with message:
Error:  Found 1 duplicate dependency declaration in this project:
Error:   - dependencies.dependency[org.apache.spark:spark-core_${scala.binary.version}:jar] (2 times)
Error:  
Error:  -> [Help 1]
Error:  
Error:  To see the full stack trace of the errors, re-run Maven with the -e switch.
Error:  Re-run Maven using the -X switch to enable full debug logging.
Error:  
Error:  For more information about the errors and possible solutions, please read the following articles:
Error:  [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
Error:  
Error:  After correcting the problems, you can resume the build with the command
Error:    mvn <args> -rf :spark-pipelines_2.13

We do not include spark_core twice. We have one dep for production and one for tests (i.e test-jar). Is that wrong?

[error] /__w/spark/spark/sql/pipelines/target/java/org/apache/spark/sql/pipelines/graph/Input.java:7:1:  error: reference not found
[error]    * Returns a {@link DataFrame} that is a result of loading data from this {@link Input}.
[error]                       ^
[error] /__w/spark/spark/sql/pipelines/target/java/org/apache/spark/sql/pipelines/graph/Input.java:9:1:  error: reference not found
[error]    * @return Streaming or batch {@link DataFrame} of this Input's data.
[error]                                        ^Generating 

We definitely import Dataframe into this file.

@aakash-db aakash-db requested a review from cloud-fan May 30, 2025 16:49
/** Returns the [[Flow]]s that write to a given destination. */
lazy val flowsTo: Map[TableIdentifier, Seq[Flow]] = flows.groupBy(_.destinationIdentifier)

lazy val resolvedFlows: Seq[ResolvedFlow] = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this a be a function or lazy val?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lazy val makes sense to me – it gets used in a few places, and we'd be redundantly recomputing it if it were a function

Comment on lines +174 to +175
// if the new sql confs are different from the original sql confs the flow was resolved
// with, resolve again.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can this happen? Can we document that?

name: String,
catalog: Option[String] = catalogInPipelineSpec,
schema: Option[String] = schemaInPipelineSpec,
isView: Boolean = false
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really obvious what this is for

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's useful in tests to assert on the event log.

@sryza
Copy link
Contributor

sryza commented May 30, 2025

We definitely import Dataframe into this file.

Hmm you are right. Not sure what's going on here. This is in auto-generated Java code that doesn't appear to have DataFrame imported. Can we just take out the square brackets for now to get past this?

Java code – Click to expand
package org.apache.spark.sql.pipelines.graph;
/**
 * Holds the {@link DataFrame} returned by a {@link FlowFunction} along with the inputs used to
 * construct it.
 * param:  usedBatchInputs the identifiers of the complete inputs read by the flow
 * param:  usedStreamingInputs the identifiers of the incremental inputs read by the flow
 * param:  usedExternalInputs the identifiers of the external inputs read by the flow
 * param:  dataFrame the {@link DataFrame} expression executed by the flow if the flow can be resolved
 */
public  class FlowFunctionResult implements scala.Product, java.io.Serializable {
  static public abstract  R apply (T1 v1, T2 v2, T3 v3, T4 v4, T5 v5, T6 v6, T7 v7)  ;
  static public  java.lang.String toString ()  { throw new RuntimeException(); }
  public  scala.collection.immutable.Set<org.apache.spark.sql.catalyst.TableIdentifier> requestedInputs ()  { throw new RuntimeException(); }
  public  scala.collection.immutable.Set<org.apache.spark.sql.pipelines.graph.ResolvedInput> usedBatchInputs ()  { throw new RuntimeException(); }
  public  scala.collection.immutable.Set<org.apache.spark.sql.pipelines.graph.ResolvedInput> usedStreamingInputs ()  { throw new RuntimeException(); }
  public  scala.collection.immutable.Set<java.lang.String> usedExternalInputs ()  { throw new RuntimeException(); }
  public  scala.util.Try<org.apache.spark.sql.classic.Dataset<org.apache.spark.sql.Row>> dataFrame ()  { throw new RuntimeException(); }
  public  scala.collection.immutable.Map<java.lang.String, java.lang.String> sqlConf ()  { throw new RuntimeException(); }
  public  scala.collection.immutable.Seq<org.apache.spark.sql.pipelines.AnalysisWarning> analysisWarnings ()  { throw new RuntimeException(); }
  // not preceding
  public   FlowFunctionResult (scala.collection.immutable.Set<org.apache.spark.sql.catalyst.TableIdentifier> requestedInputs, scala.collection.immutable.Set<org.apache.spark.sql.pipelines.graph.ResolvedInput> usedBatchInputs, scala.collection.immutable.Set<org.apache.spark.sql.pipelines.graph.ResolvedInput> usedStreamingInputs, scala.collection.immutable.Set<java.lang.String> usedExternalInputs, scala.util.Try<org.apache.spark.sql.classic.Dataset<org.apache.spark.sql.Row>> dataFrame, scala.collection.immutable.Map<java.lang.String, java.lang.String> sqlConf, scala.collection.immutable.Seq<org.apache.spark.sql.pipelines.AnalysisWarning> analysisWarnings)  { throw new RuntimeException(); }
  /**
   * Returns the names of all of the {@link Input}s used when resolving this {@link Flow}. If the
   * flow failed to resolve, we return the all the datasets that were requested when evaluating the
   * flow.
   * @return (undocumented)
   */
  public  scala.collection.immutable.Set<org.apache.spark.sql.catalyst.TableIdentifier> inputs ()  { throw new RuntimeException(); }
  /** Names of {@link Input}s read completely by this {@link Flow}. */
  public  scala.collection.immutable.Set<org.apache.spark.sql.pipelines.graph.ResolvedInput> batchInputs ()  { throw new RuntimeException(); }
  /** Names of {@link Input}s read incrementally by this {@link Flow}. */
  public  scala.collection.immutable.Set<org.apache.spark.sql.pipelines.graph.ResolvedInput> streamingInputs ()  { throw new RuntimeException(); }
  /** Returns errors that occurred when attempting to analyze this {@link Flow}. */
  public  scala.collection.immutable.Seq<java.lang.Throwable> failure ()  { throw new RuntimeException(); }
  /** Whether this {@link Flow} is successfully analyzed. */
  public final  boolean resolved ()  { throw new RuntimeException(); }
}

@sryza
Copy link
Contributor

sryza commented May 30, 2025

We do not include spark_core twice. We have one dep for production and one for tests (i.e test-jar). Is that wrong?

Are we consistent with what other modules that depend on spark_core do? (e.g. SQL or streaming)

@aakash-db
Copy link
Contributor Author

Are we consistent with what other modules that depend on spark_core do? (e.g. SQL or streaming)

Yes, AFAICT. sql/hive/pom.xml or sql/core/pom.xml or streaming/pom.xml or many others do the same thing.

@aakash-db aakash-db requested review from jonmio May 30, 2025 22:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants