Skip to content

Commit 2291484

Browse files
authored
[build] Enable ci (alibaba#149)
1 parent 8ae33af commit 2291484

File tree

24 files changed

+280
-125
lines changed

24 files changed

+280
-125
lines changed

.github/workflows/ci.yaml

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
################################################################################
2+
# Copyright (c) 2024 Alibaba Group Holding Ltd.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
################################################################################
16+
name: Tests on JDK 8
17+
on:
18+
push:
19+
pull_request:
20+
paths-ignore:
21+
- 'docs/**'
22+
- '**/*.md'
23+
concurrency:
24+
group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }}
25+
cancel-in-progress: true
26+
27+
jobs:
28+
build:
29+
runs-on: self-hosted
30+
steps:
31+
- name: Checkout code
32+
uses: actions/checkout@v2
33+
- name: Test
34+
timeout-minutes: 60
35+
run: |
36+
mvn -B verify -Ptest-coverage
37+
env:
38+
MAVEN_OPTS: -Xmx4096m
39+
ARTIFACTS_OSS_ENDPOINT: ${{ secrets.ARTIFACTS_OSS_ENDPOINT }}
40+
ARTIFACTS_OSS_REGION: ${{ secrets.ARTIFACTS_OSS_REGION }}
41+
ARTIFACTS_OSS_BUCKET: ${{ secrets.ARTIFACTS_OSS_BUCKET }}
42+
ARTIFACTS_OSS_ACCESS_KEY: ${{ secrets.ARTIFACTS_OSS_ACCESS_KEY }}
43+
ARTIFACTS_OSS_SECRET_KEY: ${{ secrets.ARTIFACTS_OSS_SECRET_KEY }}
44+
ARTIFACTS_OSS_STS_ENDPOINT: ${{ secrets.ARTIFACTS_OSS_STS_ENDPOINT }}
45+
ARTIFACTS_OSS_ROLE_ARN: ${{ secrets.ARTIFACTS_OSS_ROLE_ARN }}

fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java

+1
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception {
245245

246246
// assert the cluster should have tablet server number to be 3
247247
assertHasTabletServerNumber(3);
248+
FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata();
248249

249250
// we can create the table now
250251
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();

fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/FlussLogScannerITCase.java

+19-8
Original file line numberDiff line numberDiff line change
@@ -275,9 +275,13 @@ void testKvHeavyWriteAndScan() throws Exception {
275275
@ParameterizedTest
276276
@ValueSource(booleans = {true, false})
277277
void testScanFromStartTimestamp(boolean isPartitioned) throws Exception {
278+
TablePath tablePath =
279+
TablePath.of(
280+
"test_db_1",
281+
"test_scan_from_timestamp" + (isPartitioned ? "_partitioned" : ""));
278282
long tableId =
279283
createTable(
280-
DATA1_TABLE_PATH,
284+
tablePath,
281285
isPartitioned
282286
? DATA1_PARTITIONED_TABLE_INFO.getTableDescriptor()
283287
: DATA1_TABLE_INFO.getTableDescriptor(),
@@ -289,7 +293,7 @@ void testScanFromStartTimestamp(boolean isPartitioned) throws Exception {
289293
FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(tableId);
290294
} else {
291295
Map<String, Long> partitionNameAndIds =
292-
FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(DATA1_TABLE_PATH);
296+
FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(tablePath);
293297
// just pick one partition
294298
Map.Entry<String, Long> partitionNameAndIdEntry =
295299
partitionNameAndIds.entrySet().iterator().next();
@@ -298,12 +302,12 @@ void testScanFromStartTimestamp(boolean isPartitioned) throws Exception {
298302
FLUSS_CLUSTER_EXTENSION.waitUtilTablePartitionReady(tableId, partitionId);
299303
}
300304

301-
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(DATA1_TABLE_PATH, partitionName);
305+
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName);
302306

303307
long firstStartTimestamp = System.currentTimeMillis();
304308
int batchRecordSize = 10;
305309
List<IndexedRow> expectedRows = new ArrayList<>();
306-
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
310+
try (Table table = conn.getTable(tablePath)) {
307311
// 1. first write one batch of data.
308312
AppendWriter appendWriter = table.getAppendWriter();
309313
for (int i = 0; i < batchRecordSize; i++) {
@@ -315,6 +319,9 @@ void testScanFromStartTimestamp(boolean isPartitioned) throws Exception {
315319
appendWriter.append(row).get();
316320
}
317321

322+
// sleep a while to avoid secondStartTimestamp is same with firstStartTimestamp
323+
Thread.sleep(10);
324+
318325
// record second batch start timestamp, we move this before first scan to make it
319326
// as early as possible to avoid potential time backwards
320327
// as early as possible to avoid potential time backwards
@@ -371,9 +378,13 @@ void testScanFromStartTimestamp(boolean isPartitioned) throws Exception {
371378
@ParameterizedTest
372379
@ValueSource(booleans = {true, false})
373380
void testScanFromLatestOffsets(boolean isPartitioned) throws Exception {
381+
TablePath tablePath =
382+
TablePath.of(
383+
"test_db_1",
384+
"test_scan_from_latest_offsets" + (isPartitioned ? "_partitioned" : ""));
374385
long tableId =
375386
createTable(
376-
DATA1_TABLE_PATH,
387+
tablePath,
377388
isPartitioned
378389
? DATA1_PARTITIONED_TABLE_INFO.getTableDescriptor()
379390
: DATA1_TABLE_INFO.getTableDescriptor(),
@@ -384,16 +395,16 @@ void testScanFromLatestOffsets(boolean isPartitioned) throws Exception {
384395
FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(tableId);
385396
} else {
386397
Map<String, Long> partitionNameAndIds =
387-
FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(DATA1_TABLE_PATH);
398+
FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(tablePath);
388399
// just pick one partition
389400
partitionName = partitionNameAndIds.keySet().iterator().next();
390401
partitionId = partitionNameAndIds.get(partitionName);
391402
FLUSS_CLUSTER_EXTENSION.waitUtilTablePartitionReady(tableId, partitionId);
392403
}
393-
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(DATA1_TABLE_PATH, partitionName);
404+
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName);
394405

395406
int batchRecordSize = 10;
396-
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
407+
try (Table table = conn.getTable(tablePath)) {
397408
// 1. first write one batch of data.
398409
AppendWriter appendWriter = table.getAppendWriter();
399410
for (int i = 0; i < batchRecordSize; i++) {

fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteLogDownloaderTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ void testPrefetchNum() throws Exception {
100100
retry(
101101
Duration.ofMinutes(1),
102102
() -> {
103-
for (int i = 0; i < 3; i++) {
103+
for (int i = 0; i < 4; i++) {
104104
assertThat(futures.get(i).isDone()).isTrue();
105105
}
106106
});

fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,11 @@ void testPartitionedLogTable() throws Exception {
119119
@Test
120120
void testUnsubscribePartitionBucket() throws Exception {
121121
// write rows
122-
Schema schema = createPartitionedTable(DATA1_TABLE_PATH, false);
122+
TablePath tablePath = TablePath.of("test_db_1", "unsubscribe_partition_bucket_table");
123+
Schema schema = createPartitionedTable(tablePath, false);
123124
Map<String, Long> partitionIdByNames =
124-
FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(DATA1_TABLE_PATH);
125-
Table table = conn.getTable(DATA1_TABLE_PATH);
125+
FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(tablePath);
126+
Table table = conn.getTable(tablePath);
126127

127128
Map<Long, List<InternalRow>> expectPartitionAppendRows =
128129
writeRows(table, schema, partitionIdByNames);

fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java

+10-5
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import com.alibaba.fluss.utils.Preconditions;
4848

4949
import org.apache.commons.lang3.StringUtils;
50+
import org.junit.jupiter.api.Disabled;
5051
import org.junit.jupiter.api.Test;
5152
import org.junit.jupiter.params.ParameterizedTest;
5253
import org.junit.jupiter.params.provider.ValueSource;
@@ -101,6 +102,7 @@ void testAppendOnly() throws Exception {
101102

102103
@ParameterizedTest
103104
@ValueSource(booleans = {true, false})
105+
@Disabled("TODO, fix me in #116")
104106
void testAppendWithSmallBuffer(boolean indexedFormat) throws Exception {
105107
TableDescriptor desc =
106108
indexedFormat
@@ -186,8 +188,9 @@ void testUpsertWithSmallBuffer() throws Exception {
186188

187189
@Test
188190
void testPutAndLookup() throws Exception {
189-
createTable(DATA1_TABLE_PATH_PK, DATA1_TABLE_INFO_PK.getTableDescriptor(), false);
190-
verifyPutAndLookup(DATA1_TABLE_PATH_PK, DATA1_SCHEMA_PK, new Object[] {1, "a"});
191+
TablePath tablePath = TablePath.of("test_db_1", "test_put_and_lookup_table");
192+
createTable(tablePath, DATA1_TABLE_INFO_PK.getTableDescriptor(), false);
193+
verifyPutAndLookup(tablePath, DATA1_SCHEMA_PK, new Object[] {1, "a"});
191194

192195
// test put/lookup data for primary table with pk index is not 0
193196
Schema schema =
@@ -319,7 +322,8 @@ void verifyPutAndLookup(TablePath tablePath, Schema tableSchema, Object[] fields
319322
try (Table table = conn.getTable(tablePath)) {
320323
UpsertWriter upsertWriter = table.getUpsertWriter();
321324
// put data.
322-
upsertWriter.upsert(row).get();
325+
upsertWriter.upsert(row);
326+
upsertWriter.flush();
323327
}
324328
// lookup this key.
325329
IndexedRow keyRow = keyRow(tableSchema, fields);
@@ -601,9 +605,10 @@ void testAppendAndProject() throws Exception {
601605
.column("d", DataTypes.BIGINT())
602606
.build();
603607
TableDescriptor tableDescriptor = TableDescriptor.builder().schema(schema).build();
604-
createTable(DATA1_TABLE_PATH, tableDescriptor, false);
608+
TablePath tablePath = TablePath.of("test_db_1", "test_append_and_project");
609+
createTable(tablePath, tableDescriptor, false);
605610

606-
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
611+
try (Table table = conn.getTable(tablePath)) {
607612
AppendWriter appendWriter = table.getAppendWriter();
608613
int expectedSize = 30;
609614
for (int i = 0; i < expectedSize; i++) {

fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import com.alibaba.fluss.config.ConfigOptions;
2020
import com.alibaba.fluss.config.Configuration;
2121
import com.alibaba.fluss.server.testutils.FlussClusterExtension;
22+
import com.alibaba.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
23+
import com.alibaba.fluss.utils.ExceptionUtils;
2224

2325
import org.apache.flink.table.api.DataTypes;
2426
import org.apache.flink.table.api.Schema;
@@ -120,8 +122,15 @@ static void afterAll() {
120122

121123
@BeforeEach
122124
void beforeEach() throws Exception {
123-
if (catalog != null) {
125+
try {
124126
catalog.createDatabase(DEFAULT_DB, null, true);
127+
} catch (CatalogException e) {
128+
// the auto partitioned manager may create the db zk node
129+
// in an another thread, so if exception is NodeExistsException, just ignore
130+
if (!ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class)
131+
.isPresent()) {
132+
throw e;
133+
}
125134
}
126135
}
127136

fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/utils/FlinkConnectorOptionsUtilTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.junit.jupiter.api.Test;
2121

2222
import java.time.ZoneId;
23+
import java.util.TimeZone;
2324

2425
import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP;
2526
import static com.alibaba.fluss.connector.flink.utils.FlinkConnectorOptionsUtils.parseTimestamp;
@@ -41,7 +42,7 @@ void testParseTimestamp() {
4142
parseTimestamp(
4243
"2023-12-09 23:09:12",
4344
SCAN_STARTUP_TIMESTAMP.key(),
44-
ZoneId.systemDefault()))
45+
TimeZone.getTimeZone("Asia/Shanghai").toZoneId()))
4546
.isEqualTo(1702134552000L);
4647

4748
assertThatThrownBy(

fluss-filesystems/fluss-fs-oss/src/test/java/com/alibaba/fluss/fs/oss/OSSTestCredentials.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,15 @@ public class OSSTestCredentials {
4040
// ------------------------------------------------------------------------
4141

4242
public static boolean credentialsAvailable() {
43-
return ENDPOINT != null && BUCKET != null && ACCESS_KEY != null && SECRET_KEY != null;
43+
return isNotEmpty(ENDPOINT)
44+
&& isNotEmpty(BUCKET)
45+
&& isNotEmpty(ACCESS_KEY)
46+
&& isNotEmpty(SECRET_KEY);
47+
}
48+
49+
/** Checks if a String is not null and not empty. */
50+
private static boolean isNotEmpty(@Nullable String str) {
51+
return str != null && !str.isEmpty();
4452
}
4553

4654
public static void assumeCredentialsAvailable() {

fluss-filesystems/fluss-fs-oss/src/test/java/com/alibaba/fluss/fs/oss/OSSWithTokenFileSystemBehaviorITCase.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -32,37 +32,37 @@ class OSSWithTokenFileSystemBehaviorITCase extends OSSWithTokenFileSystemBehavio
3232

3333
private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
3434

35-
private static final FsPath basePath =
36-
new FsPath(OSSTestCredentials.getTestBucketUri() + TEST_DATA_DIR);
37-
3835
@BeforeAll
3936
static void setup() throws Exception {
4037
// init a filesystem with ak/sk so that it can generate sts token
4138
initFileSystemWithSecretKey();
4239
// now, we can init with sts token
43-
initFileSystemWithToken();
40+
initFileSystemWithToken(getFsPath());
4441
}
4542

4643
@Override
4744
protected FileSystem getFileSystem() throws Exception {
48-
return basePath.getFileSystem();
45+
return getFsPath().getFileSystem();
4946
}
5047

5148
@Override
5249
protected FsPath getBasePath() {
53-
return basePath;
50+
return getFsPath();
51+
}
52+
53+
private static FsPath getFsPath() {
54+
return new FsPath(OSSTestCredentials.getTestBucketUri() + TEST_DATA_DIR);
5455
}
5556

5657
@AfterAll
5758
static void clearFsConfig() {
5859
FileSystem.initialize(new Configuration(), null);
5960
}
6061

61-
private static void initFileSystemWithToken() throws Exception {
62+
private static void initFileSystemWithToken(FsPath fsPath) throws Exception {
6263
Configuration configuration = new Configuration();
6364
// obtain a security token and call onNewTokensObtained
64-
ObtainedSecurityToken obtainedSecurityToken =
65-
basePath.getFileSystem().obtainSecurityToken();
65+
ObtainedSecurityToken obtainedSecurityToken = fsPath.getFileSystem().obtainSecurityToken();
6666
OSSSecurityTokenReceiver ossSecurityTokenReceiver = new OSSSecurityTokenReceiver();
6767
ossSecurityTokenReceiver.onNewTokensObtained(obtainedSecurityToken);
6868

fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/FlinkUnionReadLogTableITCase.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,13 @@ private long prepareLogTable(
103103
if (isPartitioned) {
104104
Map<Long, String> partitionNameById = waitUntilPartitions(tablePath);
105105
for (String partition : partitionNameById.values()) {
106-
for (int i = 0; i < 10; i++) {
107-
flinkRows.addAll(writeRows(tablePath, 3, partition));
106+
for (int i = 0; i < 3; i++) {
107+
flinkRows.addAll(writeRows(tablePath, 10, partition));
108108
}
109109
}
110110
} else {
111-
for (int i = 0; i < 10; i++) {
112-
flinkRows.addAll(writeRows(tablePath, 3, null));
111+
for (int i = 0; i < 3; i++) {
112+
flinkRows.addAll(writeRows(tablePath, 10, null));
113113
}
114114
}
115115
return t1Id;

fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/LakeTableEnumeratorTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,9 @@ void testLogTableEnumerator() throws Throwable {
126126
TableBucket tableBucket = new TableBucket(t1Id, i);
127127
// no any paimon data written for the bucket
128128
if (bucketLogEndOffset.get(i) <= 0) {
129-
expectedAssignment.put(
130-
i, Collections.singletonList(new LogSplit(tableBucket, null, -2, 0)));
129+
expectedAssignment
130+
.computeIfAbsent(i, (k) -> new ArrayList<>())
131+
.add(new LogSplit(tableBucket, null, -2, 0));
131132
}
132133
}
133134
Map<Integer, List<SourceSplitBase>> actualAssignment =

fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/PaimonSyncTestBase.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,11 @@ protected PaimonDataBaseSyncSinkBuilder getDatabaseSyncSinkBuilder(
8787

8888
DataStreamSource<MultiplexCdcRecord> input =
8989
execEnv.fromSource(
90-
flussDatabaseSyncSource,
91-
WatermarkStrategy.noWatermarks(),
92-
"flinkSycDatabaseSource");
93-
90+
flussDatabaseSyncSource,
91+
WatermarkStrategy.noWatermarks(),
92+
"flinkSycDatabaseSource")
93+
// limit resource usage
94+
.setParallelism(2);
9495
Map<String, String> paimonCatalogConf = FlinkPaimonTestBase.getPaimonCatalogConf();
9596

9697
return new PaimonDataBaseSyncSinkBuilder(paimonCatalogConf, configuration).withInput(input);
@@ -120,7 +121,7 @@ protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPart
120121

121122
TableDescriptor.Builder tableBuilder =
122123
TableDescriptor.builder()
123-
.distributedBy(bucketNum)
124+
.distributedBy(bucketNum, "a")
124125
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
125126

126127
if (isPartitioned) {

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/RequestChannel.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,15 @@ public RequestChannel(int queueCapacity) {
3636
this.requestQueue =
3737
new PriorityBlockingQueue<>(
3838
queueCapacity,
39-
(req1, req2) -> Integer.compare(req2.getPriority(), req1.getPriority()));
39+
(req1, req2) -> {
40+
// less value will be popped first
41+
int res = Integer.compare(req2.getPriority(), req1.getPriority());
42+
// if priority is same, we want to keep FIFO
43+
if (res == 0 && req1 != req2) {
44+
res = (req1.getRequestId() < req2.getRequestId() ? -1 : 1);
45+
}
46+
return res;
47+
});
4048
}
4149

4250
/**

0 commit comments

Comments
 (0)