Skip to content

Commit

Permalink
[cherry-pick](fe) select BE local broker to scan Hive table when 'bro…
Browse files Browse the repository at this point in the history
…ker.name' in hms catalog is specified (apache#27122) (apache#27252)

Since apache#24830 introduce `broker.name` in hms catalog, data scan will run on specified brokers.
And [doris operator](https://github.com/selectdb/doris-operator) support BE and broker deployed in same pod, BE access local broker is the fastest approach to access data.
In previous logic, every inputSplit will select one BE to execute,  then randomly select one broker for actual data access, BE and related broker are always located on  separate K8S pod.
This pr optimizes the broker select strategy to prioritize BE-local broker when `broker.name` is specified in hms catalog.
  • Loading branch information
WinkerDu authored Nov 20, 2023
1 parent dbd9597 commit 76c0cf0
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public abstract class FileQueryScanNode extends FileScanNode {
@Getter
protected TableSample tableSample;

protected String brokerName;

/**
* External file scan node for Query hms table
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
Expand Down Expand Up @@ -292,7 +294,6 @@ public void createScanRangeLocations() throws UserException {
for (Split split : inputSplits) {
FileSplit fileSplit = (FileSplit) split;
TFileType locationType = getLocationType(fileSplit.getPath().toString());
setLocationPropertiesIfNecessary(locationType, locationProperties);

TScanRangeLocations curLocations = newLocations();
// If fileSplit has partition values, use the values collected from hive partitions.
Expand Down Expand Up @@ -352,6 +353,7 @@ public void createScanRangeLocations() throws UserException {
} else {
selectedBackend = backendPolicy.getNextBe();
}
setLocationPropertiesIfNecessary(selectedBackend, locationType, locationProperties);
location.setBackendId(selectedBackend.getId());
location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort()));
curLocations.addToLocations(location);
Expand All @@ -368,7 +370,7 @@ public void createScanRangeLocations() throws UserException {
scanRangeLocations.size(), (System.currentTimeMillis() - start));
}

private void setLocationPropertiesIfNecessary(TFileType locationType,
private void setLocationPropertiesIfNecessary(Backend selectedBackend, TFileType locationType,
Map<String, String> locationProperties) throws UserException {
if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) {
if (!params.isSetHdfsParams()) {
Expand All @@ -381,7 +383,15 @@ private void setLocationPropertiesIfNecessary(TFileType locationType,
params.setProperties(locationProperties);

if (!params.isSetBrokerAddresses()) {
FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
FsBroker broker;
if (brokerName != null) {
broker = Env.getCurrentEnv().getBrokerMgr().getBroker(brokerName, selectedBackend.getHost());
LOG.debug(String.format(
"Set location for broker [%s], selected BE host: [%s] selected broker host: [%s]",
brokerName, selectedBackend.getHost(), broker.host));
} else {
broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
}
if (broker == null) {
throw new UserException("No alive broker.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,14 @@ public class HiveScanNode extends FileQueryScanNode {
public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
super(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, needCheckColumnPriv);
hmsTable = (HMSExternalTable) desc.getTable();
brokerName = hmsTable.getCatalog().bindBrokerName();
}

public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
StatisticalType statisticalType, boolean needCheckColumnPriv) {
super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
hmsTable = (HMSExternalTable) desc.getTable();
brokerName = hmsTable.getCatalog().bindBrokerName();
}

@Override
Expand Down

0 comments on commit 76c0cf0

Please sign in to comment.