Skip to content

Commit

Permalink
Merge branch 'branch-2.0' into fix-isexlusive
Browse files Browse the repository at this point in the history
  • Loading branch information
amorynan authored Oct 10, 2024
2 parents c62e31f + 495987f commit ee50993
Show file tree
Hide file tree
Showing 15 changed files with 275 additions and 22 deletions.
9 changes: 6 additions & 3 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,19 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
// check if version number exceed limit
if (tablet->exceed_version_limit(config::max_tablet_version_num)) {
return Status::Status::Error<TOO_MANY_VERSION>(
"failed to push data. version count: {}, exceed limit: {}, tablet: {}",
tablet->version_count(), config::max_tablet_version_num, tablet->full_name());
"failed to push data. version count: {}, exceed limit: {}, tablet: {}. Please "
"reduce the frequency of loading data or adjust the max_tablet_version_num in "
"be.conf to a larger value.",
tablet->version_count(), config::max_tablet_version_num, tablet->tablet_id());
}

int version_count = tablet->version_count() + tablet->stale_version_count();
if (tablet->avg_rs_meta_serialize_size() * version_count >
config::tablet_meta_serialize_size_limit) {
return Status::Error<TOO_MANY_VERSION>(
"failed to init rowset builder. meta serialize size : {}, exceed limit: {}, "
"tablet: {}",
"tablet: {}. Please reduce the frequency of loading data or adjust the "
"max_tablet_version_num in be.conf to a larger value.",
tablet->avg_rs_meta_serialize_size() * version_count,
config::tablet_meta_serialize_size_limit, tablet->tablet_id());
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,8 @@ Status BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::Segm
if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
return Status::Error<TOO_MANY_SEGMENTS>(
"too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, "
"_segcompacted_point:{}, _num_segcompacted:{}, rowset_num_rows:{}",
"_segcompacted_point:{}, _num_segcompacted:{}, rowset_num_rows:{}. Please check if "
"the bucket number is too small or if the data is skewed.",
_context.tablet_id, _context.rowset_id.to_string(),
config::max_segment_num_per_rowset, _num_segment, _segcompacted_point,
_num_segcompacted, get_rowset_num_rows());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ public void addTruncateTable(TruncateTableInfo info, long commitSeq) {
TruncateTableRecord record = new TruncateTableRecord(info);
String data = record.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, record);
}

// get binlog by dbId, return first binlog.version > version
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ public void recoverBinlog(TBinlog binlog, boolean dbBinlogEnable) {
}
}
}
} else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE) {
TruncateTableRecord record = TruncateTableRecord.fromJson(binlog.data);
if (record != null) {
for (long partitionId : record.getOldPartitionIds()) {
droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq()));
}
}
}

if (tableIds == null) {
Expand Down Expand Up @@ -214,6 +221,11 @@ public void addBinlog(TBinlog binlog, Object raw) {
}
}
}
} else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE && raw instanceof TruncateTableRecord) {
TruncateTableRecord truncateTableRecord = (TruncateTableRecord) raw;
for (long partitionId : truncateTableRecord.getOldPartitionIds()) {
droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq()));
}
}

switch (binlog.getType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@

import com.google.gson.annotations.SerializedName;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

public class TruncateTableRecord {
@SerializedName(value = "dbId")
private long dbId;
Expand All @@ -35,6 +40,8 @@ public class TruncateTableRecord {
private boolean isEntireTable = false;
@SerializedName(value = "rawSql")
private String rawSql = "";
@SerializedName(value = "op")
private Map<Long, String> oldPartitions = new HashMap<>();

public TruncateTableRecord(TruncateTableInfo info) {
this.dbId = info.getDbId();
Expand All @@ -43,9 +50,18 @@ public TruncateTableRecord(TruncateTableInfo info) {
this.table = info.getTable();
this.isEntireTable = info.isEntireTable();
this.rawSql = info.getRawSql();
this.oldPartitions = info.getOldPartitions();
}

public Collection<Long> getOldPartitionIds() {
return oldPartitions == null ? new ArrayList<>() : oldPartitions.keySet();
}

public String toJson() {
return GsonUtils.GSON.toJson(this);
}

public static TruncateTableRecord fromJson(String json) {
return GsonUtils.GSON.fromJson(json, TruncateTableRecord.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ private static void checkReplicaAllocation(ReplicaAllocation replicaAlloc, int h
} catch (DdlException e) {
throw new DdlException("Failed to find enough backend for ssd storage medium. When setting "
+ DynamicPartitionProperty.HOT_PARTITION_NUM + " > 0, the hot partitions will store "
+ "in ssd. Please check the replication num,replication tag and storage medium.");
+ "in ssd. Please check the replication num,replication tag and storage medium."
+ Env.getCurrentSystemInfo().getDetailsForCreateReplica(replicaAlloc));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3194,13 +3194,12 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti
}

// replace
truncateTableInternal(olapTable, newPartitions, truncateEntireTable);
List<Partition> oldPartitions = truncateTableInternal(olapTable, newPartitions, truncateEntireTable);

// write edit log
TruncateTableInfo info =
new TruncateTableInfo(db.getId(), db.getFullName(), olapTable.getId(), olapTable.getName(),
newPartitions,
truncateEntireTable, truncateTableStmt.toSqlWithoutTable());
newPartitions, truncateEntireTable, truncateTableStmt.toSqlWithoutTable(), oldPartitions);
Env.getCurrentEnv().getEditLog().logTruncateTable(info);
} catch (DdlException e) {
failedCleanCallback.run();
Expand All @@ -3220,11 +3219,14 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti
LOG.info("finished to truncate table {}, partitions: {}", tblRef.getName().toSql(), tblRef.getPartitionNames());
}

private void truncateTableInternal(OlapTable olapTable, List<Partition> newPartitions, boolean isEntireTable) {
private List<Partition> truncateTableInternal(
OlapTable olapTable, List<Partition> newPartitions, boolean isEntireTable) {
// use new partitions to replace the old ones.
List<Partition> oldPartitions = Lists.newArrayList();
Set<Long> oldTabletIds = Sets.newHashSet();
for (Partition newPartition : newPartitions) {
Partition oldPartition = olapTable.replacePartition(newPartition);
oldPartitions.add(oldPartition);
// save old tablets to be removed
for (MaterializedIndex index : oldPartition.getMaterializedIndices(IndexExtState.ALL)) {
index.getTablets().forEach(t -> {
Expand All @@ -3242,6 +3244,7 @@ private void truncateTableInternal(OlapTable olapTable, List<Partition> newParti
for (Long tabletId : oldTabletIds) {
Env.getCurrentInvertedIndex().deleteTablet(tabletId);
}
return oldPartitions;
}

public void replayTruncateTable(TruncateTableInfo info) throws MetaNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public enum MysqlCommand {
COM_STMT_SEND_LONG_DATA("COM_STMT_SEND_LONG_DATA", 24),
COM_STMT_CLOSE("COM_STMT_CLOSE", 25),
COM_STMT_RESET("COM_STMT_RESET", 26),
COM_SET_OPTION("COM_RESET_CONNECTION", 27),
COM_STMT_FETCH("COM_RESET_CONNECTION", 28),
COM_SET_OPTION("COM_SET_OPTION", 27),
COM_STMT_FETCH("COM_STMT_FETCH", 28),
COM_DAEMON("COM_DAEMON", 29),
COM_RESET_CONNECTION("COM_RESET_CONNECTION", 31);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TruncateTableInfo implements Writable {
@SerializedName(value = "dbId")
Expand All @@ -45,20 +47,25 @@ public class TruncateTableInfo implements Writable {
private boolean isEntireTable = false;
@SerializedName(value = "rawSql")
private String rawSql = "";
@SerializedName(value = "op")
private Map<Long, String> oldPartitions = new HashMap<>();

public TruncateTableInfo() {

}

public TruncateTableInfo(long dbId, String db, long tblId, String table, List<Partition> partitions,
boolean isEntireTable, String rawSql) {
boolean isEntireTable, String rawSql, List<Partition> oldPartitions) {
this.dbId = dbId;
this.db = db;
this.tblId = tblId;
this.table = table;
this.partitions = partitions;
this.isEntireTable = isEntireTable;
this.rawSql = rawSql;
for (Partition partition : oldPartitions) {
this.oldPartitions.put(partition.getId(), partition.getName());
}
}

public long getDbId() {
Expand All @@ -81,6 +88,10 @@ public List<Partition> getPartitions() {
return partitions;
}

public Map<Long, String> getOldPartitions() {
return oldPartitions == null ? new HashMap<>() : oldPartitions;
}

public boolean isEntireTable() {
return isEntireTable;
}
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,9 @@ private void dispatch() throws IOException {
case COM_STMT_CLOSE:
handleStmtClose();
break;
case COM_SET_OPTION:
handleSetOption();
break;
default:
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR, "Unsupported command(" + command + ")");
LOG.warn("Unsupported command(" + command + ")");
Expand Down Expand Up @@ -861,6 +864,15 @@ private void handleChangeUser() throws IOException {
ctx.getState().setOk();
}

private void handleSetOption() {
// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_set_option.html
int optionOperation = MysqlProto.readInt2(packetBuf);
LOG.debug("option_operation {}", optionOperation);
// Do nothing for now.
// https://dev.mysql.com/doc/c-api/8.0/en/mysql-set-server-option.html
ctx.getState().setOk();
}

// Process a MySQL request
public void processOnce() throws IOException {
// set status of query to OK.
Expand Down
65 changes: 65 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,71 @@ public void setLoadDisabled(boolean isLoadDisabled) {
this.backendStatus.isLoadDisabled = isLoadDisabled;
}

public String getDetailsForCreateReplica() {
int hddBad = 0;
int hddExceedLimit = 0;
int hddOk = 0;
int ssdBad = 0;
int ssdExceedLimit = 0;
int ssdOk = 0;
for (DiskInfo disk : disksRef.values()) {
TStorageMedium storageMedium = disk.getStorageMedium();
if (storageMedium == TStorageMedium.HDD) {
if (!disk.isAlive()) {
hddBad++;
} else if (disk.exceedLimit(true)) {
hddExceedLimit++;
} else {
hddOk++;
}
} else if (storageMedium == TStorageMedium.SSD) {
if (!disk.isAlive()) {
ssdBad++;
} else if (disk.exceedLimit(true)) {
ssdExceedLimit++;
} else {
ssdOk++;
}
}
}

StringBuilder sb = new StringBuilder("[");
sb.append("backendId=").append(id);
sb.append(", host=").append(host);
if (!isAlive()) {
sb.append(", isAlive=false, exclude it");
} else if (isDecommissioned()) {
sb.append(", isDecommissioned=true, exclude it");
} else if (isComputeNode()) {
sb.append(", isComputeNode=true, exclude it");
} else {
sb.append(", hdd disks count={");
if (hddOk > 0) {
sb.append("ok=").append(hddOk).append(",");
}
if (hddBad > 0) {
sb.append("bad=").append(hddBad).append(",");
}
if (hddExceedLimit > 0) {
sb.append("capExceedLimit=").append(hddExceedLimit).append(",");
}
sb.append("}, ssd disk count={");
if (ssdOk > 0) {
sb.append("ok=").append(ssdOk).append(",");
}
if (ssdBad > 0) {
sb.append("bad=").append(ssdBad).append(",");
}
if (ssdExceedLimit > 0) {
sb.append("capExceedLimit=").append(ssdExceedLimit).append(",");
}
sb.append("}");
}
sb.append("]");

return sb.toString();
}

// for test only
public void updateOnce(int bePort, int httpPort, int beRpcPort) {
if (this.bePort != bePort) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,14 +526,28 @@ public Pair<Map<Tag, List<Long>>, TStorageMedium> selectBackendIdsForReplicaCrea
if (!failedEntries.isEmpty()) {
String failedMsg = Joiner.on("\n").join(failedEntries);
throw new DdlException("Failed to find enough backend, please check the replication num,"
+ "replication tag and storage medium.\n" + "Create failed replications:\n" + failedMsg);
+ "replication tag and storage medium and avail capacity of backends "
+ "or maybe all be on same host." + getDetailsForCreateReplica(replicaAlloc) + "\n"
+ "Create failed replications:\n" + failedMsg);
}
}

Preconditions.checkState(totalReplicaNum == replicaAlloc.getTotalReplicaNum());
return Pair.of(chosenBackendIds, storageMedium);
}

public String getDetailsForCreateReplica(ReplicaAllocation replicaAlloc) {
StringBuilder sb = new StringBuilder(" Backends details: ");
for (Tag tag : replicaAlloc.getAllocMap().keySet()) {
sb.append("backends with tag ").append(tag).append(" is ");
sb.append(idToBackendRef.values().stream().filter(be -> be.getLocationTag() == tag)
.map(Backend::getDetailsForCreateReplica)
.collect(Collectors.toList()));
sb.append(", ");
}
return sb.toString();
}

/**
* Select a set of backends by the given policy.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,15 +305,20 @@ public void testAbnormal() throws DdlException, ConfigException {

ConfigBase.setMutableConfig("disable_storage_medium_check", "false");
ExceptionChecker
.expectThrowsWithMsg(DdlException.class, "Failed to find enough backend, please check the replication num,replication tag and storage medium.\n"
.expectThrowsWithMsg(DdlException.class,
"Failed to find enough backend, please check the replication num,replication tag and storage medium and avail capacity of backends "
+ "or maybe all be on same host."
+ Env.getCurrentSystemInfo().getDetailsForCreateReplica(new ReplicaAllocation((short) 1)) + "\n"
+ "Create failed replications:\n"
+ "replication tag: {\"location\" : \"default\"}, replication num: 1, storage medium: SSD",
() -> createTable("create table test.tb7(key1 int, key2 varchar(10)) distributed by hash(key1) \n"
+ "buckets 1 properties('replication_num' = '1', 'storage_medium' = 'ssd');"));

ExceptionChecker
.expectThrowsWithMsg(DdlException.class,
"Failed to find enough backend, please check the replication num,replication tag and storage medium.\n"
"Failed to find enough backend, please check the replication num,replication tag and storage medium and avail capacity of backends "
+ "or maybe all be on same host."
+ Env.getCurrentSystemInfo().getDetailsForCreateReplica(new ReplicaAllocation((short) 1)) + "\n"
+ "Create failed replications:\n"
+ "replication tag: {\"location\" : \"default\"}, replication num: 1, storage medium: SSD",
() -> createTable("create table test.tb7_1(key1 int, key2 varchar(10))\n"
Expand Down
Loading

0 comments on commit ee50993

Please sign in to comment.