-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-52283][SQL] Declarative Pipelines DataflowGraph
creation and resolution
#51003
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
DataflowGraph
creation and resolutionDataflowGraph
creation and resolution
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphIdentifierManager.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelinesErrors.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphIdentifierManager.scala
Outdated
Show resolved
Hide resolved
...pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraph.scala
Outdated
Show resolved
Hide resolved
...ipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala
Outdated
Show resolved
Hide resolved
...elines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flushing some comments
@@ -2025,6 +2031,18 @@ | |||
], | |||
"sqlState" : "42613" | |||
}, | |||
"INCOMPATIBLE_BATCH_VIEW_READ": { | |||
"message": [ | |||
"View <datasetIdentifier> is not a streaming view and must be referenced using read. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of this conf and do we really need it?
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/AnalysisWarning.scala
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/Language.scala
Show resolved
Hide resolved
...ipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala
Outdated
Show resolved
Hide resolved
...ipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala
Outdated
Show resolved
Hide resolved
* @param upstreamNodes Upstream nodes for the node | ||
* @return | ||
*/ | ||
def processNode(node: GraphElement, upstreamNodes: Seq[GraphElement]): Seq[GraphElement] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Document return. I'm especially curious why this is a Seq and when processNode would return more than one element
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now, it's mostly just for flexibility - in case one node maps to several in the future.
...ipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala
Outdated
Show resolved
Hide resolved
...ipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala
Outdated
Show resolved
Hide resolved
DataflowGraph
creation and resolutionDataflowGraph
creation and resolution
DataflowGraph
creation and resolutionDataflowGraph
creation and resolution
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/AnalysisWarning.scala
Show resolved
Hide resolved
val materializedFlowIdentifiers: Set[TableIdentifier] = materializedFlows.map(_.identifier).toSet | ||
|
||
/** Returns a [[Table]] given its identifier */ | ||
lazy val table: Map[TableIdentifier, Table] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TableIdentifier
only supports 3-level namespace. Shall we use Seq[String]
to better support DS v2, which can have an arbitrary level of namespace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seq[String]
is a bit hard to use here. We can switch to the DS v2 API after we create an encapsulation class to match TableIdentifier
.
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
Outdated
Show resolved
Hide resolved
...ipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala
Show resolved
Hide resolved
...ipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala
Outdated
Show resolved
Hide resolved
f.inputs.toSeq | ||
.map(availableResolvedInputs(_)) | ||
.filter { | ||
// Input is a flow implies that the upstream table is a View. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it hard to understand this comment. We are resolving a flow, and the input of a flow can be other flows? Why it means the upstream table is view?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because for views, we don't have a ViewInput
object - we instead map the view's downstream directly to the view's upstream. Thus if a flow's upstream is a flow, the node it is writing to is a view. If a flow's upstream was a table, then there would be a TableInput object here.
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraph.scala
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraph.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraph.scala
Show resolved
Hide resolved
...pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraphTransformer.scala
Show resolved
Hide resolved
I'm still seeing some test failures:
|
yea the docker test is generally flaky and we can ignore |
...ipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala
Show resolved
Hide resolved
/** | ||
* Returns a [[TableInput]], if one is available, that can be read from by downstream flows. | ||
*/ | ||
def tableInput(identifier: TableIdentifier): Option[TableInput] = table.get(identifier) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If tables are both input and output, shall we just have a single name to table map, instead of a output
map and a tableInput
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually there is already a table
map here, why do we need the output
map which is identical to the table
map but just give different error message for duplication?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm yeah it looks like we do not need a separate tableInput
and we should take this out. I suspect this was a relic of an older version of a code where there was a subclass of DataflowGraph
that overrode tableInput
.
output
is a little bit forward-looking. We will eventually allow DataflowGraph
s to have sinks (described in the SPIP but not yet implemented here), and output
will include both sinks and tables. However we can leave it out for now and add it back when we add sinks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed tableInput
.
Hm, I don't really understand the test failures:
We do not include spark_core twice. We have one dep for production and one for tests (i.e test-jar). Is that wrong?
We definitely import Dataframe into this file. |
/** Returns the [[Flow]]s that write to a given destination. */ | ||
lazy val flowsTo: Map[TableIdentifier, Seq[Flow]] = flows.groupBy(_.destinationIdentifier) | ||
|
||
lazy val resolvedFlows: Seq[ResolvedFlow] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this a be a function or lazy val?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lazy val makes sense to me – it gets used in a few places, and we'd be redundantly recomputing it if it were a function
// if the new sql confs are different from the original sql confs the flow was resolved | ||
// with, resolve again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can this happen? Can we document that?
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraph.scala
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraph.scala
Outdated
Show resolved
Hide resolved
...pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraphTransformer.scala
Outdated
Show resolved
Hide resolved
name: String, | ||
catalog: Option[String] = catalogInPipelineSpec, | ||
schema: Option[String] = schemaInPipelineSpec, | ||
isView: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not really obvious what this is for
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's useful in tests to assert on the event log.
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineTest.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineTest.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineTest.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineTest.scala
Outdated
Show resolved
Hide resolved
Hmm you are right. Not sure what's going on here. This is in auto-generated Java code that doesn't appear to have Java code – Click to expandpackage org.apache.spark.sql.pipelines.graph;
/**
* Holds the {@link DataFrame} returned by a {@link FlowFunction} along with the inputs used to
* construct it.
* param: usedBatchInputs the identifiers of the complete inputs read by the flow
* param: usedStreamingInputs the identifiers of the incremental inputs read by the flow
* param: usedExternalInputs the identifiers of the external inputs read by the flow
* param: dataFrame the {@link DataFrame} expression executed by the flow if the flow can be resolved
*/
public class FlowFunctionResult implements scala.Product, java.io.Serializable {
static public abstract R apply (T1 v1, T2 v2, T3 v3, T4 v4, T5 v5, T6 v6, T7 v7) ;
static public java.lang.String toString () { throw new RuntimeException(); }
public scala.collection.immutable.Set<org.apache.spark.sql.catalyst.TableIdentifier> requestedInputs () { throw new RuntimeException(); }
public scala.collection.immutable.Set<org.apache.spark.sql.pipelines.graph.ResolvedInput> usedBatchInputs () { throw new RuntimeException(); }
public scala.collection.immutable.Set<org.apache.spark.sql.pipelines.graph.ResolvedInput> usedStreamingInputs () { throw new RuntimeException(); }
public scala.collection.immutable.Set<java.lang.String> usedExternalInputs () { throw new RuntimeException(); }
public scala.util.Try<org.apache.spark.sql.classic.Dataset<org.apache.spark.sql.Row>> dataFrame () { throw new RuntimeException(); }
public scala.collection.immutable.Map<java.lang.String, java.lang.String> sqlConf () { throw new RuntimeException(); }
public scala.collection.immutable.Seq<org.apache.spark.sql.pipelines.AnalysisWarning> analysisWarnings () { throw new RuntimeException(); }
// not preceding
public FlowFunctionResult (scala.collection.immutable.Set<org.apache.spark.sql.catalyst.TableIdentifier> requestedInputs, scala.collection.immutable.Set<org.apache.spark.sql.pipelines.graph.ResolvedInput> usedBatchInputs, scala.collection.immutable.Set<org.apache.spark.sql.pipelines.graph.ResolvedInput> usedStreamingInputs, scala.collection.immutable.Set<java.lang.String> usedExternalInputs, scala.util.Try<org.apache.spark.sql.classic.Dataset<org.apache.spark.sql.Row>> dataFrame, scala.collection.immutable.Map<java.lang.String, java.lang.String> sqlConf, scala.collection.immutable.Seq<org.apache.spark.sql.pipelines.AnalysisWarning> analysisWarnings) { throw new RuntimeException(); }
/**
* Returns the names of all of the {@link Input}s used when resolving this {@link Flow}. If the
* flow failed to resolve, we return the all the datasets that were requested when evaluating the
* flow.
* @return (undocumented)
*/
public scala.collection.immutable.Set<org.apache.spark.sql.catalyst.TableIdentifier> inputs () { throw new RuntimeException(); }
/** Names of {@link Input}s read completely by this {@link Flow}. */
public scala.collection.immutable.Set<org.apache.spark.sql.pipelines.graph.ResolvedInput> batchInputs () { throw new RuntimeException(); }
/** Names of {@link Input}s read incrementally by this {@link Flow}. */
public scala.collection.immutable.Set<org.apache.spark.sql.pipelines.graph.ResolvedInput> streamingInputs () { throw new RuntimeException(); }
/** Returns errors that occurred when attempting to analyze this {@link Flow}. */
public scala.collection.immutable.Seq<java.lang.Throwable> failure () { throw new RuntimeException(); }
/** Whether this {@link Flow} is successfully analyzed. */
public final boolean resolved () { throw new RuntimeException(); }
} |
Are we consistent with what other modules that depend on spark_core do? (e.g. SQL or streaming) |
Yes, AFAICT. |
What changes were proposed in this pull request?
This PR introduces the
DataflowGraph
, a container for Declarative Pipelines datasets and flows, as described in the Declarative Pipelines SPIP. It also adds functionality forGraphRegistrationContext
)It also introduces various secondary changes:
SparkBuild
to support declarative pipelines.pom.xml
for the module.Why are the changes needed?
In order to implement Declarative Pipelines.
Does this PR introduce any user-facing change?
No changes to existing behavior.
How was this patch tested?
New test suites:
ConnectValidPipelineSuite
– test cases where the graph can be successfully resolvedConnectInvalidPipelineSuite
– test cases where the graph fails to be resolvedWas this patch authored or co-authored using generative AI tooling?
No