Skip to content

[SPARK-52283][SQL] Declarative Pipelines DataflowGraph creation and resolution #51003

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,12 @@
],
"sqlState" : "42713"
},
"DUPLICATE_FLOW_SQL_CONF": {
"message": [
"Found duplicate sql conf for dataset '<datasetName>': '<key>' is defined by both '<flowName1>' and '<flowName2>'"
],
"sqlState": "42710"
},
"DUPLICATED_MAP_KEY" : {
"message" : [
"Duplicate map key <key> was found, please check the input data.",
Expand Down Expand Up @@ -2025,6 +2031,18 @@
],
"sqlState" : "42613"
},
"INCOMPATIBLE_BATCH_VIEW_READ": {
"message": [
"View <datasetIdentifier> is not a streaming view and must be referenced using read. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this conf and do we really need it?

],
"sqlState": "42000"
},
"INCOMPATIBLE_STREAMING_VIEW_READ": {
"message": [
"View <datasetIdentifier> is a streaming view and must be referenced using readStream. This check can be disabled by setting Spark conf pipelines.incompatibleViewCheck.enabled = false."
],
"sqlState": "42000"
},
"INCOMPATIBLE_VIEW_SCHEMA_CHANGE" : {
"message" : [
"The SQL query of view <viewName> has an incompatible schema change and column <colName> cannot be resolved. Expected <expectedNum> columns named <colName> but got <actualCols>.",
Expand Down Expand Up @@ -6566,6 +6584,20 @@
],
"sqlState" : "P0001"
},
"USER_SPECIFIED_AND_INFERRED_SCHEMA_NOT_COMPATIBLE": {
"message": [
"Table '<tableName>' has a user-specified schema that is incompatible with the schema",
" inferred from its query.",
"<streamingTableHint>",
"",
"Declared schema:",
"<specifiedSchema>",
"",
"Inferred schema:",
"<inferredDataSchema>"
],
"sqlState": "42000"
},
"VARIABLE_ALREADY_EXISTS" : {
"message" : [
"Cannot create the variable <variableName> because it already exists.",
Expand Down
90 changes: 90 additions & 0 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,8 @@ object SparkBuild extends PomBuild {

enable(HiveThriftServer.settings)(hiveThriftServer)

enable(SparkDeclarativePipelines.settings)(pipelines)

enable(SparkConnectCommon.settings)(connectCommon)
enable(SparkConnect.settings)(connect)
enable(SparkConnectClient.settings)(connectClient)
Expand Down Expand Up @@ -884,6 +886,94 @@ object SparkConnectClient {
)
}

object SparkDeclarativePipelines {
import BuildCommons.protoVersion

lazy val settings = Seq(
// For some reason the resolution from the imported Maven build does not work for some
// of these dependendencies that we need to shade later on.
libraryDependencies ++= {
val guavaVersion =
SbtPomKeys.effectivePom.value.getProperties.get(
"connect.guava.version").asInstanceOf[String]
val guavaFailureaccessVersion =
SbtPomKeys.effectivePom.value.getProperties.get(
"guava.failureaccess.version").asInstanceOf[String]
Seq(
"com.google.guava" % "guava" % guavaVersion,
"com.google.guava" % "failureaccess" % guavaFailureaccessVersion,
"com.google.protobuf" % "protobuf-java" % protoVersion % "protobuf"
)
},

dependencyOverrides ++= {
val guavaVersion =
SbtPomKeys.effectivePom.value.getProperties.get(
"connect.guava.version").asInstanceOf[String]
val guavaFailureaccessVersion =
SbtPomKeys.effectivePom.value.getProperties.get(
"guava.failureaccess.version").asInstanceOf[String]
Seq(
"com.google.guava" % "guava" % guavaVersion,
"com.google.guava" % "failureaccess" % guavaFailureaccessVersion,
"com.google.protobuf" % "protobuf-java" % protoVersion
)
},

(assembly / test) := { },

(assembly / logLevel) := Level.Info,

// Exclude `scala-library` from assembly.
(assembly / assemblyPackageScala / assembleArtifact) := false,

// SPARK-46733: Include `spark-connect-*.jar`, `unused-*.jar`,`guava-*.jar`,
// `failureaccess-*.jar`, `annotations-*.jar`, `grpc-*.jar`, `protobuf-*.jar`,
// `gson-*.jar`, `error_prone_annotations-*.jar`, `j2objc-annotations-*.jar`,
// `animal-sniffer-annotations-*.jar`, `perfmark-api-*.jar`,
// `proto-google-common-protos-*.jar` in assembly.
// This needs to be consistent with the content of `maven-shade-plugin`.
(assembly / assemblyExcludedJars) := {
val cp = (assembly / fullClasspath).value
val validPrefixes = Set("spark-connect", "unused-", "guava-", "failureaccess-",
"annotations-", "grpc-", "protobuf-", "gson", "error_prone_annotations",
"j2objc-annotations", "animal-sniffer-annotations", "perfmark-api",
"proto-google-common-protos")
cp filterNot { v =>
validPrefixes.exists(v.data.getName.startsWith)
}
},

(assembly / assemblyShadeRules) := Seq(
ShadeRule.rename("io.grpc.**" -> "org.sparkproject.connect.grpc.@0").inAll,
ShadeRule.rename("com.google.common.**" -> "org.sparkproject.connect.guava.@1").inAll,
ShadeRule.rename("com.google.thirdparty.**" -> "org.sparkproject.connect.guava.@1").inAll,
ShadeRule.rename("com.google.protobuf.**" -> "org.sparkproject.connect.protobuf.@1").inAll,
ShadeRule.rename("android.annotation.**" -> "org.sparkproject.connect.android_annotation.@1").inAll,
ShadeRule.rename("io.perfmark.**" -> "org.sparkproject.connect.io_perfmark.@1").inAll,
ShadeRule.rename("org.codehaus.mojo.animal_sniffer.**" -> "org.sparkproject.connect.animal_sniffer.@1").inAll,
ShadeRule.rename("com.google.j2objc.annotations.**" -> "org.sparkproject.connect.j2objc_annotations.@1").inAll,
ShadeRule.rename("com.google.errorprone.annotations.**" -> "org.sparkproject.connect.errorprone_annotations.@1").inAll,
ShadeRule.rename("org.checkerframework.**" -> "org.sparkproject.connect.checkerframework.@1").inAll,
ShadeRule.rename("com.google.gson.**" -> "org.sparkproject.connect.gson.@1").inAll,
ShadeRule.rename("com.google.api.**" -> "org.sparkproject.connect.google_protos.api.@1").inAll,
ShadeRule.rename("com.google.cloud.**" -> "org.sparkproject.connect.google_protos.cloud.@1").inAll,
ShadeRule.rename("com.google.geo.**" -> "org.sparkproject.connect.google_protos.geo.@1").inAll,
ShadeRule.rename("com.google.logging.**" -> "org.sparkproject.connect.google_protos.logging.@1").inAll,
ShadeRule.rename("com.google.longrunning.**" -> "org.sparkproject.connect.google_protos.longrunning.@1").inAll,
ShadeRule.rename("com.google.rpc.**" -> "org.sparkproject.connect.google_protos.rpc.@1").inAll,
ShadeRule.rename("com.google.type.**" -> "org.sparkproject.connect.google_protos.type.@1").inAll
),

(assembly / assemblyMergeStrategy) := {
case m if m.toLowerCase(Locale.ROOT).endsWith("manifest.mf") => MergeStrategy.discard
// Drop all proto files that are not needed as artifacts of the build.
case m if m.toLowerCase(Locale.ROOT).endsWith(".proto") => MergeStrategy.discard
case _ => MergeStrategy.first
}
)
}

object SparkProtobuf {
import BuildCommons.protoVersion

Expand Down
196 changes: 195 additions & 1 deletion sql/pipelines/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
Expand All @@ -39,9 +40,202 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!--PROTOBUF DEPS-->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${connect.guava.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>failureaccess</artifactId>
<version>${guava.failureaccess.version}</version>
<scope>compile</scope>
</dependency>

<!--GRPC BUILTINS-->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${io.grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${io.grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${io.grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-services</artifactId>
<version>${io.grpc.version}</version>
</dependency>

<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parser-combinators_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-connect-shims_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-connect-shims_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-api_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-connect-shims_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy-agent</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>

<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
them will yield errors.
-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.0</version>
<extensions>true</extensions>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
<protoSourceRoot>src/main/protobuf</protoSourceRoot>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${io.grpc.version}:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
<goal>test-compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<!-- see http://davidb.github.com/scala-maven-plugin -->
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.9.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- os-maven-plugin helps resolve ${os.detected.classifier} automatically -->
</plugins>
</build>
</project>
Loading