Skip to content

Commit 6e82b7b

Browse files
0AyanamiReiYour Name
authored andcommitted
[fix](broker-load) Fix the COLUMNS FROM PATH feature (#57309)
### What problem does this PR solve? Issue Number: close #xxx Related PR: ##53399
1 parent a17454c commit 6e82b7b

File tree

3 files changed

+223
-8
lines changed

3 files changed

+223
-8
lines changed

fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,9 @@ public void createScanRangeLocationsUnsplittable(FileLoadScanNode.ParamCreateCon
268268
context.params.setCompressType(compressType);
269269
List<String> columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path,
270270
context.fileGroup.getColumnNamesFromPath());
271-
TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath);
271+
List<String> columnsFromPathKeys = context.fileGroup.getColumnNamesFromPath();
272+
TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath,
273+
columnsFromPathKeys);
272274
locations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
273275
}
274276
scanRangeLocations.add(locations);
@@ -312,12 +314,13 @@ public void createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateConte
312314
context.params.setCompressType(compressType);
313315
List<String> columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path,
314316
context.fileGroup.getColumnNamesFromPath());
317+
List<String> columnsFromPathKeys = context.fileGroup.getColumnNamesFromPath();
315318
// Assign scan range locations only for broker load.
316319
// stream load has only one file, and no need to set multi scan ranges.
317320
if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD) {
318321
long rangeBytes = bytesPerInstance - curInstanceBytes;
319322
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes,
320-
columnsFromPath);
323+
columnsFromPath, columnsFromPathKeys);
321324
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
322325
curFileOffset += rangeBytes;
323326

@@ -326,7 +329,8 @@ public void createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateConte
326329
curLocations = newLocations(context.params, brokerDesc, backendPolicy);
327330
curInstanceBytes = 0;
328331
} else {
329-
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath);
332+
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath,
333+
columnsFromPathKeys);
330334
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
331335
curFileOffset = 0;
332336
curInstanceBytes += leftBytes;
@@ -397,14 +401,15 @@ private TFileFormatType formatType(String fileFormat, String path) throws UserEx
397401
}
398402

399403
private TFileRangeDesc createFileRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus, long rangeBytes,
400-
List<String> columnsFromPath) {
404+
List<String> columnsFromPath, List<String> columnsFromPathKeys) {
401405
TFileRangeDesc rangeDesc = new TFileRangeDesc();
402406
if (jobType == JobType.BULK_LOAD) {
403407
rangeDesc.setPath(fileStatus.path);
404408
rangeDesc.setStartOffset(curFileOffset);
405409
rangeDesc.setSize(rangeBytes);
406410
rangeDesc.setFileSize(fileStatus.size);
407411
rangeDesc.setColumnsFromPath(columnsFromPath);
412+
rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
408413
if (getFileType() == TFileType.FILE_HDFS) {
409414
URI fileUri = new Path(fileStatus.path).toUri();
410415
rangeDesc.setFsName(fileUri.getScheme() + "://" + fileUri.getAuthority());

fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsFileGroupInfo.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,9 @@ public void createScanRangeLocationsUnsplittable(NereidsParamCreateContext conte
281281
context.params.setCompressType(compressType);
282282
List<String> columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path,
283283
context.fileGroup.getColumnNamesFromPath());
284-
TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath);
284+
List<String> columnsFromPathKeys = context.fileGroup.getColumnNamesFromPath();
285+
TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath,
286+
columnsFromPathKeys);
285287
locations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
286288
}
287289
scanRangeLocations.add(locations);
@@ -331,12 +333,13 @@ public void createScanRangeLocationsSplittable(NereidsParamCreateContext context
331333
context.params.setCompressType(compressType);
332334
List<String> columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path,
333335
context.fileGroup.getColumnNamesFromPath());
336+
List<String> columnsFromPathKeys = context.fileGroup.getColumnNamesFromPath();
334337
// Assign scan range locations only for broker load.
335338
// stream load has only one file, and no need to set multi scan ranges.
336339
if (tmpBytes > bytesPerInstance && jobType != FileGroupInfo.JobType.STREAM_LOAD) {
337340
long rangeBytes = bytesPerInstance - curInstanceBytes;
338341
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes,
339-
columnsFromPath);
342+
columnsFromPath, columnsFromPathKeys);
340343
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
341344
curFileOffset += rangeBytes;
342345

@@ -345,7 +348,8 @@ public void createScanRangeLocationsSplittable(NereidsParamCreateContext context
345348
curLocations = newLocations(context.params, brokerDesc, backendPolicy);
346349
curInstanceBytes = 0;
347350
} else {
348-
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath);
351+
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath,
352+
columnsFromPathKeys);
349353
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
350354
curFileOffset = 0;
351355
curInstanceBytes += leftBytes;
@@ -416,14 +420,15 @@ private TFileFormatType formatType(String fileFormat, String path) throws UserEx
416420
}
417421

418422
private TFileRangeDesc createFileRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus, long rangeBytes,
419-
List<String> columnsFromPath) {
423+
List<String> columnsFromPath, List<String> columnsFromPathKeys) {
420424
TFileRangeDesc rangeDesc = new TFileRangeDesc();
421425
if (jobType == FileGroupInfo.JobType.BULK_LOAD) {
422426
rangeDesc.setPath(fileStatus.path);
423427
rangeDesc.setStartOffset(curFileOffset);
424428
rangeDesc.setSize(rangeBytes);
425429
rangeDesc.setFileSize(fileStatus.size);
426430
rangeDesc.setColumnsFromPath(columnsFromPath);
431+
rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
427432
if (getFileType() == TFileType.FILE_HDFS) {
428433
URI fileUri = new Path(fileStatus.path).toUri();
429434
rangeDesc.setFsName(fileUri.getScheme() + "://" + fileUri.getAuthority());
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("test_load_columns_from_path", "load_p0") {
19+
def s3BucketName = getS3BucketName()
20+
def s3Endpoint = getS3Endpoint()
21+
def s3Region = getS3Region()
22+
def ak = getS3AK()
23+
def sk = getS3SK()
24+
def tableName = "test_columns_from_path"
25+
def label = UUID.randomUUID().toString().replace("-", "0")
26+
def path = "s3://${s3BucketName}/load/product=p1/code=107020/dt=20250202/data.csv"
27+
28+
sql """ DROP TABLE IF EXISTS ${tableName} """
29+
30+
sql """
31+
CREATE TABLE ${tableName} (
32+
k1 INT,
33+
k2 INT,
34+
pd VARCHAR(20) NULL,
35+
code INT NULL,
36+
dt DATE
37+
)
38+
DUPLICATE KEY(`k1`)
39+
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
40+
PROPERTIES (
41+
"replication_allocation" = "tag.location.default: 1"
42+
);
43+
"""
44+
// test all three columns with set three
45+
try {
46+
sql """
47+
LOAD LABEL ${label}
48+
(
49+
DATA INFILE("${path}")
50+
INTO TABLE ${tableName}
51+
COLUMNS TERMINATED BY ","
52+
FORMAT AS "CSV"
53+
(k1, k2)
54+
COLUMNS FROM PATH AS (product, code, dt)
55+
SET
56+
(
57+
pd = product,
58+
code = code,
59+
dt = dt
60+
)
61+
)
62+
WITH S3
63+
(
64+
"s3.access_key" = "${ak}",
65+
"s3.secret_key" = "${sk}",
66+
"s3.endpoint" = "${s3Endpoint}",
67+
"s3.region" = "${s3Region}"
68+
)
69+
"""
70+
71+
// Wait for load job to finish
72+
def maxRetry = 60
73+
def result = ""
74+
for (int i = 0; i < maxRetry; i++) {
75+
result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
76+
if (result[0].State == "FINISHED" || result[0].State == "CANCELLED") {
77+
break
78+
}
79+
sleep(1000)
80+
}
81+
82+
// Check load job state
83+
assertEquals("FINISHED", result[0].State)
84+
85+
// Verify the loaded data
86+
def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
87+
assertTrue(rowCount[0][0] > 0, "No data was loaded")
88+
89+
// Verify columns from path are extracted correctly
90+
def pathData = sql "SELECT pd, code, dt FROM ${tableName} LIMIT 1"
91+
assertEquals("p1", pathData[0][0])
92+
assertEquals(107020, pathData[0][1])
93+
assertEquals("2025-02-02", pathData[0][2].toString())
94+
95+
} finally {
96+
sql """ TRUNCATE TABLE ${tableName} """
97+
}
98+
99+
// test all three columns with set non-same name column
100+
label = UUID.randomUUID().toString().replace("-", "1")
101+
try {
102+
sql """
103+
LOAD LABEL ${label}
104+
(
105+
DATA INFILE("${path}")
106+
INTO TABLE ${tableName}
107+
COLUMNS TERMINATED BY ","
108+
FORMAT AS "CSV"
109+
(k1, k2)
110+
COLUMNS FROM PATH AS (product, code, dt)
111+
SET (
112+
pd = product
113+
)
114+
)
115+
WITH S3
116+
(
117+
"s3.access_key" = "${ak}",
118+
"s3.secret_key" = "${sk}",
119+
"s3.endpoint" = "${s3Endpoint}",
120+
"s3.region" = "${s3Region}"
121+
)
122+
"""
123+
124+
// Wait for load job to finish
125+
def maxRetry = 60
126+
def result = ""
127+
for (int i = 0; i < maxRetry; i++) {
128+
result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
129+
if (result[0].State == "FINISHED" || result[0].State == "CANCELLED") {
130+
break
131+
}
132+
sleep(1000)
133+
}
134+
135+
// Check load job state
136+
assertEquals("FINISHED", result[0].State)
137+
138+
// Verify the loaded data
139+
def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
140+
assertTrue(rowCount[0][0] > 0, "No data was loaded")
141+
142+
// Verify columns from path are extracted correctly
143+
def pathData = sql "SELECT pd, code, dt FROM ${tableName} LIMIT 1"
144+
assertEquals("p1", pathData[0][0])
145+
assertEquals(107020, pathData[0][1])
146+
assertEquals("2025-02-02", pathData[0][2].toString())
147+
148+
} finally {
149+
sql """ TRUNCATE TABLE ${tableName} """
150+
}
151+
152+
// test extracting only one column from path (only product)
153+
label = UUID.randomUUID().toString().replace("-", "2")
154+
try {
155+
sql """
156+
LOAD LABEL ${label}
157+
(
158+
DATA INFILE("${path}")
159+
INTO TABLE ${tableName}
160+
COLUMNS TERMINATED BY ","
161+
FORMAT AS "CSV"
162+
(k1, k2)
163+
COLUMNS FROM PATH AS (product)
164+
SET
165+
(
166+
pd = product
167+
)
168+
)
169+
WITH S3
170+
(
171+
"s3.access_key" = "${ak}",
172+
"s3.secret_key" = "${sk}",
173+
"s3.endpoint" = "${s3Endpoint}",
174+
"s3.region" = "${s3Region}"
175+
)
176+
"""
177+
178+
// Wait for load job to finish
179+
def maxRetry = 60
180+
def result = ""
181+
for (int i = 0; i < maxRetry; i++) {
182+
result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
183+
if (result[0].State == "FINISHED" || result[0]. State == "CANCELLED") {
184+
break
185+
}
186+
sleep(1000)
187+
}
188+
189+
// Check load job state
190+
assertEquals("FINISHED", result[0].State)
191+
192+
// Verify the loaded data
193+
def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
194+
assertTrue(rowCount[0][0] > 0, "No data was loaded")
195+
196+
// Verify only pd column is extracted from path, code and dt are loaded from CSV file
197+
def pathData = sql "SELECT pd FROM ${tableName} LIMIT 1"
198+
assertEquals("p1", pathData[0][0])
199+
// code and dt should be loaded from CSV file data, not from path
200+
// The actual values depend on the CSV file content
201+
202+
} finally {
203+
sql """ DROP TABLE ${tableName} """
204+
}
205+
}

0 commit comments

Comments
 (0)