Skip to content

Commit 3b0b52a

Browse files
committed
Flink: Support log store
1 parent 8e79259 commit 3b0b52a

33 files changed

+1726
-59
lines changed

core/src/main/java/org/apache/iceberg/io/WriteResult.java

+32-3
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,30 @@
2121
import java.io.Serializable;
2222
import java.util.Collections;
2323
import java.util.List;
24+
import java.util.Map;
2425
import org.apache.iceberg.DataFile;
2526
import org.apache.iceberg.DeleteFile;
2627
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
2728
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
29+
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
2830
import org.apache.iceberg.util.CharSequenceSet;
2931

3032
public class WriteResult implements Serializable {
3133
private DataFile[] dataFiles;
3234
private DeleteFile[] deleteFiles;
3335
private CharSequence[] referencedDataFiles;
3436

37+
private Map<Integer, Long> logStorePartitionOffsets;
38+
3539
private WriteResult(
36-
List<DataFile> dataFiles, List<DeleteFile> deleteFiles, CharSequenceSet referencedDataFiles) {
40+
List<DataFile> dataFiles,
41+
List<DeleteFile> deleteFiles,
42+
CharSequenceSet referencedDataFiles,
43+
Map<Integer, Long> logStorePartitionOffsets) {
3744
this.dataFiles = dataFiles.toArray(new DataFile[0]);
3845
this.deleteFiles = deleteFiles.toArray(new DeleteFile[0]);
3946
this.referencedDataFiles = referencedDataFiles.toArray(new CharSequence[0]);
47+
this.logStorePartitionOffsets = logStorePartitionOffsets;
4048
}
4149

4250
public DataFile[] dataFiles() {
@@ -51,6 +59,14 @@ public CharSequence[] referencedDataFiles() {
5159
return referencedDataFiles;
5260
}
5361

62+
public Map<Integer, Long> logStorePartitionOffsets() {
63+
return logStorePartitionOffsets;
64+
}
65+
66+
public void setLogStorePartitionOffsets(Map<Integer, Long> logStorePartitionOffsets) {
67+
this.logStorePartitionOffsets = logStorePartitionOffsets;
68+
}
69+
5470
public static Builder builder() {
5571
return new Builder();
5672
}
@@ -59,18 +75,20 @@ public static class Builder {
5975
private final List<DataFile> dataFiles;
6076
private final List<DeleteFile> deleteFiles;
6177
private final CharSequenceSet referencedDataFiles;
78+
private Map<Integer, Long> logStorePartitionOffsets;
6279

6380
private Builder() {
6481
this.dataFiles = Lists.newArrayList();
6582
this.deleteFiles = Lists.newArrayList();
6683
this.referencedDataFiles = CharSequenceSet.empty();
84+
this.logStorePartitionOffsets = Maps.newHashMap();
6785
}
6886

6987
public Builder add(WriteResult result) {
7088
addDataFiles(result.dataFiles);
7189
addDeleteFiles(result.deleteFiles);
7290
addReferencedDataFiles(result.referencedDataFiles);
73-
91+
addOffsets(result.logStorePartitionOffsets);
7492
return this;
7593
}
7694

@@ -109,8 +127,19 @@ public Builder addReferencedDataFiles(Iterable<CharSequence> files) {
109127
return this;
110128
}
111129

130+
public Builder addOffsets(Map<Integer, Long> newLogStorePartitionOffsets) {
131+
for (Map.Entry<Integer, Long> entry : newLogStorePartitionOffsets.entrySet()) {
132+
Long oldOffset = this.logStorePartitionOffsets.get(entry.getKey());
133+
Long newOffset = entry.getValue();
134+
if (oldOffset == null || oldOffset < newOffset) {
135+
this.logStorePartitionOffsets.put(entry.getKey(), newOffset);
136+
}
137+
}
138+
return this;
139+
}
140+
112141
public WriteResult build() {
113-
return new WriteResult(dataFiles, deleteFiles, referencedDataFiles);
142+
return new WriteResult(dataFiles, deleteFiles, referencedDataFiles, logStorePartitionOffsets);
114143
}
115144
}
116145
}

flink-example/build.gradle

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
String flinkVersion = '1.16.0'
21+
String flinkMajorVersion = '1.16'
22+
String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")
23+
24+
project(":iceberg-flink-example") {
25+
apply plugin: 'com.github.johnrengelman.shadow'
26+
27+
tasks.jar.dependsOn tasks.shadowJar
28+
29+
dependencies {
30+
implementation project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}")
31+
implementation "org.apache.flink:flink-table-api-java:${flinkVersion}"
32+
implementation "org.apache.flink:flink-table-api-java-bridge:${flinkVersion}"
33+
implementation 'org.apache.flink:flink-runtime:1.16.0'
34+
implementation 'org.apache.flink:flink-table-runtime:1.16.0'
35+
// implementation 'org.apache.flink:flink-table-planner-loader:1.16.0'
36+
implementation "org.apache.flink:flink-sql-connector-hive-2.3.9_2.12:1.16.0"
37+
implementation 'org.apache.flink:flink-json:1.16.0'
38+
implementation "org.apache.flink:flink-connector-kafka:${flinkVersion}"
39+
implementation "org.apache.flink:flink-connector-base:${flinkVersion}"
40+
implementation "org.apache.flink:flink-connector-files:${flinkVersion}"
41+
implementation "org.apache.flink:flink-clients:1.16.0"
42+
implementation "org.apache.hadoop:hadoop-client"
43+
implementation 'org.apache.flink:flink-runtime-web:1.16.0'
44+
implementation 'org.apache.flink:flink-sql-gateway-api:1.16.0'
45+
implementation 'org.apache.flink:flink-table-planner_2.12:1.16.0'
46+
implementation 'org.apache.flink:flink-csv:1.16.0'
47+
}
48+
49+
shadowJar {
50+
configurations = [project.configurations.runtimeClasspath]
51+
52+
zip64 true
53+
54+
// include the LICENSE and NOTICE files for the shaded Jar
55+
from(projectDir) {
56+
include 'LICENSE'
57+
include 'NOTICE'
58+
}
59+
60+
// Relocate dependencies to avoid conflicts
61+
relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro'
62+
relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet'
63+
relocate 'com.google', 'org.apache.iceberg.shaded.com.google'
64+
relocate 'com.fasterxml', 'org.apache.iceberg.shaded.com.fasterxml'
65+
relocate 'com.github.benmanes', 'org.apache.iceberg.shaded.com.github.benmanes'
66+
relocate 'org.checkerframework', 'org.apache.iceberg.shaded.org.checkerframework'
67+
relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded'
68+
relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc'
69+
relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift'
70+
relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra'
71+
relocate 'org.apache.httpcomponents.client5', 'org.apache.iceberg.shaded.org.apache.httpcomponents.client5'
72+
73+
classifier null
74+
}
75+
76+
jar {
77+
enabled = false
78+
}
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.flink;
20+
21+
import org.apache.flink.configuration.ConfigConstants;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
24+
import org.apache.flink.table.api.Table;
25+
import org.apache.flink.table.api.TableEnvironment;
26+
import org.apache.flink.table.api.TableResult;
27+
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
28+
import org.apache.flink.types.Row;
29+
import org.apache.flink.util.CloseableIterator;
30+
31+
public class LogStoreExample {
32+
33+
private LogStoreExample() {}
34+
35+
public static void main(String[] args) throws Exception {
36+
37+
Configuration configuration = new Configuration();
38+
// configuration.setString("table.exec.iceberg.use-flip27-source", "true");
39+
configuration.setString("execution.checkpointing.interval", "60s");
40+
configuration.setString("state.checkpoint-storage", "jobmanager");
41+
configuration.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
42+
43+
StreamExecutionEnvironment env =
44+
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
45+
TableEnvironment tEnv = StreamTableEnvironment.create(env);
46+
47+
tEnv.executeSql(
48+
"CREATE CATALOG hive_catalog WITH (\n"
49+
+ " 'type'='iceberg',\n"
50+
+ " 'uri'='thrift://flink03:9083',\n"
51+
+ " 'warehouse'='hdfs://ns1/dtInsight/hive/warehouse'\n"
52+
+ ")");
53+
54+
tEnv.executeSql("USE CATALOG hive_catalog");
55+
56+
tEnv.executeSql("USE sec_index");
57+
58+
tEnv.executeSql("CREATE TABLE default_catalog.default_database.f (\n" +
59+
" id BIGINT,\n" +
60+
" name STRING\n" +
61+
") WITH (\n" +
62+
" 'connector' = 'filesystem',\n" +
63+
" 'path' = 'file:///Users/ada/tmp/log-store',\n" +
64+
" 'format' = 'csv'\n" +
65+
")");
66+
67+
// tEnv.executeSql("CREATE TABLE log_store_v2 (\n" +
68+
// " id BIGINT COMMENT 'unique id',\n" +
69+
// " name STRING\n" +
70+
// ") WITH (\n" +
71+
// " 'format-version' = '2',\n" +
72+
// " 'log-store' = 'kafka',\n" +
73+
// " 'kafka.bootstrap.servers' = '172.16.100.109:9092',\n" +
74+
// " 'kafka.topic' = 'log-store.v2'\n" +
75+
// ")");
76+
77+
// tEnv.executeSql("ALTER TABLE log_store_v1 SET ('log-store.kafka.bootstrap.servers'='172.16.100.109:9092')");
78+
// tEnv.executeSql("ALTER TABLE log_store_v1 SET ('log-store.kafka.topic'='log-store.v2')");
79+
80+
// tEnv.executeSql("INSERT INTO log_store_v2 VALUES (3, 'bar')");
81+
// tEnv.executeSql(
82+
// "SELECT * FROM log_store_v2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/")
83+
// .print();
84+
85+
tEnv.executeSql(
86+
"INSERT INTO default_catalog.default_database.f SELECT * FROM log_store_v2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'log-store'='none') */")
87+
;
88+
89+
// tEnv.executeSql(
90+
// "INSERT INTO default_catalog.default_database.f VALUES(1, 'foo')");
91+
// ;
92+
93+
// tEnv.executeSql(
94+
// "SELECT * FROM log_store_v2 /*+ OPTIONS('log-store'='') */")
95+
// .print();
96+
}
97+
}

flink/v1.16/build.gradle

+10
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
3333
implementation project(':iceberg-parquet')
3434
implementation project(':iceberg-hive-metastore')
3535

36+
implementation "org.apache.flink:flink-connector-kafka:${flinkVersion}"
37+
3638
// for dropwizard histogram metrics implementation
3739
compileOnly "org.apache.flink:flink-metrics-dropwizard:${flinkVersion}"
3840
compileOnly "org.apache.flink:flink-streaming-java:${flinkVersion}"
@@ -80,6 +82,14 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
8082
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
8183
testImplementation project(path: ':iceberg-data', configuration: 'testArtifacts')
8284

85+
testImplementation("org.apache.kafka:kafka_${scalaVersion}:2.8.1") {
86+
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
87+
exclude group: 'com.fasterxml.jackson.core'
88+
exclude group: 'com.fasterxml.jackson.dataformat'
89+
exclude group: 'com.fasterxml.jackson.module'
90+
exclude group: 'com.fasterxml.jackson.datatype'
91+
}
92+
8393
// By default, hive-exec is a fat/uber jar and it exports a guava library
8494
// that's really old. We use the core classifier to be able to override our guava
8595
// version. Luckily, hive-exec seems to work okay so far with this version of guava

flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java

+12
Original file line numberDiff line numberDiff line change
@@ -104,4 +104,16 @@ private FlinkConfigOptions() {}
104104
SplitAssignerType.SIMPLE
105105
+ ": simple assigner that doesn't provide any guarantee on order or locality."))
106106
.build());
107+
108+
public static final ConfigOption<String> LOG_KEY_FORMAT =
109+
ConfigOptions.key("log.key.format")
110+
.stringType()
111+
.defaultValue("json")
112+
.withDescription("Specify the key message format of log system with primary key.");
113+
114+
public static final ConfigOption<String> LOG_FORMAT =
115+
ConfigOptions.key("log.format")
116+
.stringType()
117+
.defaultValue("json")
118+
.withDescription("Specify the message format of log system.");
107119
}

0 commit comments

Comments
 (0)