Skip to content

Commit 954ef96

Browse files
cloud-fangatorsmile
authored andcommitted
[SPARK-25530][SQL] data source v2 API refactor (batch write)
## What changes were proposed in this pull request? Adjust the batch write API to match the read API refactor after apache#23086 The doc with high-level ideas: https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing Basically it renames `BatchWriteSupportProvider` to `SupportsBatchWrite`, and make it extend `Table`. Renames `WriteSupport` to `Write`. It also cleans up some code as batch API is completed. This PR also removes the test from apache#22688 . Now data source must return a table for read/write. A few notes about future changes: 1. We will create `SupportsStreamingWrite` later for streaming APIs 2. We will create `SupportsBatchReplaceWhere`, `SupportsBatchAppend`, etc. for the new end-user write APIs. I think streaming APIs would remain to use `OutputMode`, and new end-user write APIs will apply to batch only, at least in the near future. 3. We will remove `SaveMode` from data source API: https://issues.apache.org/jira/browse/SPARK-26356 ## How was this patch tested? existing tests Closes apache#23208 from cloud-fan/refactor-batch. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: gatorsmile <[email protected]>
1 parent 1b75f3b commit 954ef96

21 files changed

+330
-245
lines changed

sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java

-59
This file was deleted.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java

+1-9
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,7 @@
2020
import org.apache.spark.annotation.Evolving;
2121

2222
/**
23-
* The base interface for data source v2. Implementations must have a public, 0-arg constructor.
24-
*
25-
* Note that this is an empty interface. Data source implementations must mix in interfaces such as
26-
* {@link BatchReadSupportProvider} or {@link BatchWriteSupportProvider}, which can provide
27-
* batch or streaming read/write support instances. Otherwise it's just a dummy data source which
28-
* is un-readable/writable.
29-
*
30-
* If Spark fails to execute any methods in the implementations of this interface (by throwing an
31-
* exception), the read action will fail and no Spark job will be submitted.
23+
* TODO: remove it when we finish the API refactor for streaming side.
3224
*/
3325
@Evolving
3426
public interface DataSourceV2 {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
22+
23+
/**
24+
* An empty mix-in interface for {@link Table}, to indicate this table supports batch write.
25+
* <p>
26+
* If a {@link Table} implements this interface, the
27+
* {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
28+
* with {@link WriteBuilder#buildForBatch()} implemented.
29+
* </p>
30+
*/
31+
@Evolving
32+
public interface SupportsBatchWrite extends SupportsWrite {}

sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ interface SupportsRead extends Table {
2929

3030
/**
3131
* Returns a {@link ScanBuilder} which can be used to build a {@link Scan}. Spark will call this
32-
* method to configure each scan.
32+
* method to configure each data source scan.
33+
*
34+
* @param options The options for reading, which is an immutable case-insensitive
35+
* string-to-string map.
3336
*/
3437
ScanBuilder newScanBuilder(DataSourceOptions options);
3538
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
import org.apache.spark.sql.sources.v2.writer.BatchWrite;
21+
import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
22+
23+
/**
24+
* An internal base interface of mix-in interfaces for writable {@link Table}. This adds
25+
* {@link #newWriteBuilder(DataSourceOptions)} that is used to create a write
26+
* for batch or streaming.
27+
*/
28+
interface SupportsWrite extends Table {
29+
30+
/**
31+
* Returns a {@link WriteBuilder} which can be used to create {@link BatchWrite}. Spark will call
32+
* this method to configure each data source write.
33+
*/
34+
WriteBuilder newWriteBuilder(DataSourceOptions options);
35+
}

sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWrite.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
* Please refer to the documentation of commit/abort methods for detailed specifications.
3939
*/
4040
@Evolving
41-
public interface BatchWriteSupport {
41+
public interface BatchWrite {
4242

4343
/**
4444
* Creates a writer factory which will be serialized and sent to executors.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@
3636
*
3737
* If this data writer succeeds(all records are successfully written and {@link #commit()}
3838
* succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to
39-
* {@link BatchWriteSupport#commit(WriterCommitMessage[])} with commit messages from other data
39+
* {@link BatchWrite#commit(WriterCommitMessage[])} with commit messages from other data
4040
* writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an
4141
* exception will be sent to the driver side, and Spark may retry this writing task a few times.
4242
* In each retry, {@link DataWriterFactory#createWriter(int, long)} will receive a
43-
* different `taskId`. Spark will call {@link BatchWriteSupport#abort(WriterCommitMessage[])}
43+
* different `taskId`. Spark will call {@link BatchWrite#abort(WriterCommitMessage[])}
4444
* when the configured number of retries is exhausted.
4545
*
4646
* Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task
@@ -71,11 +71,11 @@ public interface DataWriter<T> {
7171
/**
7272
* Commits this writer after all records are written successfully, returns a commit message which
7373
* will be sent back to driver side and passed to
74-
* {@link BatchWriteSupport#commit(WriterCommitMessage[])}.
74+
* {@link BatchWrite#commit(WriterCommitMessage[])}.
7575
*
7676
* The written data should only be visible to data source readers after
77-
* {@link BatchWriteSupport#commit(WriterCommitMessage[])} succeeds, which means this method
78-
* should still "hide" the written data and ask the {@link BatchWriteSupport} at driver side to
77+
* {@link BatchWrite#commit(WriterCommitMessage[])} succeeds, which means this method
78+
* should still "hide" the written data and ask the {@link BatchWrite} at driver side to
7979
* do the final commit via {@link WriterCommitMessage}.
8080
*
8181
* If this method fails (by throwing an exception), {@link #abort()} will be called and this
@@ -93,7 +93,7 @@ public interface DataWriter<T> {
9393
* failed.
9494
*
9595
* If this method fails(by throwing an exception), the underlying data source may have garbage
96-
* that need to be cleaned by {@link BatchWriteSupport#abort(WriterCommitMessage[])} or manually,
96+
* that need to be cleaned by {@link BatchWrite#abort(WriterCommitMessage[])} or manually,
9797
* but these garbage should not be visible to data source readers.
9898
*
9999
* @throws IOException if failure happens during disk/network IO like writing files.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.spark.sql.catalyst.InternalRow;
2525

2626
/**
27-
* A factory of {@link DataWriter} returned by {@link BatchWriteSupport#createBatchWriterFactory()},
27+
* A factory of {@link DataWriter} returned by {@link BatchWrite#createBatchWriterFactory()},
2828
* which is responsible for creating and initializing the actual data writer at executor side.
2929
*
3030
* Note that, the writer factory will be serialized and sent to executors, then the data writer
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2.writer;
19+
20+
import org.apache.spark.sql.SaveMode;
21+
22+
// A temporary mixin trait for `WriteBuilder` to support `SaveMode`. Will be removed before
23+
// Spark 3.0 when all the new write operators are finished. See SPARK-26356 for more details.
24+
public interface SupportsSaveMode extends WriteBuilder {
25+
WriteBuilder mode(SaveMode mode);
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2.writer;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
import org.apache.spark.sql.sources.v2.SupportsBatchWrite;
22+
import org.apache.spark.sql.sources.v2.Table;
23+
import org.apache.spark.sql.types.StructType;
24+
25+
/**
26+
* An interface for building the {@link BatchWrite}. Implementations can mix in some interfaces to
27+
* support different ways to write data to data sources.
28+
*
29+
* Unless modified by a mixin interface, the {@link BatchWrite} configured by this builder is to
30+
* append data without affecting existing data.
31+
*/
32+
@Evolving
33+
public interface WriteBuilder {
34+
35+
/**
36+
* Passes the `queryId` from Spark to data source. `queryId` is a unique string of the query. It's
37+
* possible that there are many queries running at the same time, or a query is restarted and
38+
* resumed. {@link BatchWrite} can use this id to identify the query.
39+
*
40+
* @return a new builder with the `queryId`. By default it returns `this`, which means the given
41+
* `queryId` is ignored. Please override this method to take the `queryId`.
42+
*/
43+
default WriteBuilder withQueryId(String queryId) {
44+
return this;
45+
}
46+
47+
/**
48+
* Passes the schema of the input data from Spark to data source.
49+
*
50+
* @return a new builder with the `schema`. By default it returns `this`, which means the given
51+
* `schema` is ignored. Please override this method to take the `schema`.
52+
*/
53+
default WriteBuilder withInputDataSchema(StructType schema) {
54+
return this;
55+
}
56+
57+
/**
58+
* Returns a {@link BatchWrite} to write data to batch source. By default this method throws
59+
* exception, data sources must overwrite this method to provide an implementation, if the
60+
* {@link Table} that creates this scan implements {@link SupportsBatchWrite}.
61+
*
62+
* Note that, the returned {@link BatchWrite} can be null if the implementation supports SaveMode,
63+
* to indicate that no writing is needed. We can clean it up after removing
64+
* {@link SupportsSaveMode}.
65+
*/
66+
default BatchWrite buildForBatch() {
67+
throw new UnsupportedOperationException("Batch scans are not supported");
68+
}
69+
}

sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
/**
2626
* A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side
27-
* as the input parameter of {@link BatchWriteSupport#commit(WriterCommitMessage[])} or
27+
* as the input parameter of {@link BatchWrite#commit(WriterCommitMessage[])} or
2828
* {@link StreamingWriteSupport#commit(long, WriterCommitMessage[])}.
2929
*
3030
* This is an empty interface, data sources should define their own message class and use it when

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

+3-5
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
4040
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
4141
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
4242
import org.apache.spark.sql.sources.v2._
43-
import org.apache.spark.sql.types.{StringType, StructType}
43+
import org.apache.spark.sql.types.StructType
4444
import org.apache.spark.unsafe.types.UTF8String
4545

4646
/**
@@ -209,10 +209,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
209209
case _ => provider.getTable(dsOptions)
210210
}
211211
table match {
212-
case s: SupportsBatchRead =>
213-
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
214-
provider, s, finalOptions, userSpecifiedSchema = userSpecifiedSchema))
215-
212+
case _: SupportsBatchRead =>
213+
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(table, finalOptions))
216214
case _ => loadV1Source(paths: _*)
217215
}
218216
} else {

0 commit comments

Comments
 (0)