Skip to content

Commit

Permalink
Fix bug that backup may create an empty file on remote storage. (#1869)
Browse files Browse the repository at this point in the history
Sometime the broker writer failed to close, but we do not handle this failure.
This may create an empty file on remote storage but be treated as normal.

Also enhance some usabilities:
1. getting latest 2000 transactions instead of getting the earliest.
2. Show backend which download and upload tasks are being executed.
  • Loading branch information
morningman authored and imay committed Sep 27, 2019
1 parent 1c229fb commit e67b398
Show file tree
Hide file tree
Showing 14 changed files with 84 additions and 65 deletions.
29 changes: 19 additions & 10 deletions be/src/exec/broker_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ Status BrokerWriter::write(const uint8_t* buf, size_t buf_len, size_t* written_l
return Status::OK();
}

void BrokerWriter::close() {
Status BrokerWriter::close() {
if (_is_closed) {
return;
return Status::OK();
}
TBrokerCloseWriterRequest request;

Expand All @@ -198,40 +198,49 @@ void BrokerWriter::close() {
TBrokerOperationStatus response;
try {
Status status;
BrokerServiceConnection client(client_cache(_env), broker_addr, 10000, &status);
// use 20 second because close may take longer in remote storage, sometimes.
// TODO(cmy): optimize this if necessary.
BrokerServiceConnection client(client_cache(_env), broker_addr, 20000, &status);
if (!status.ok()) {
LOG(WARNING) << "Create broker write client failed. broker=" << broker_addr
<< ", status=" << status.get_error_msg();
return;
return status;
}

try {
client->closeWriter(response, request);
} catch (apache::thrift::transport::TTransportException& e) {
LOG(WARNING) << "Close broker writer failed. broker=" << broker_addr
<< ", status=" << status.get_error_msg();
status = client.reopen();
if (!status.ok()) {
LOG(WARNING) << "Close broker writer failed. broker=" << broker_addr
LOG(WARNING) << "Reopen broker writer failed. broker=" << broker_addr
<< ", status=" << status.get_error_msg();
return;
return status;
}
client->closeWriter(response, request);
}
} catch (apache::thrift::TException& e) {
LOG(WARNING) << "Close broker writer failed, broker:" << broker_addr
std::stringstream ss;
ss << "Close broker writer failed, broker:" << broker_addr
<< " msg:" << e.what();
return;
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}

VLOG_ROW << "debug: send broker close writer response: "
<< apache::thrift::ThriftDebugString(response).c_str();

if (response.statusCode != TBrokerOperationStatusCode::OK) {
LOG(WARNING) << "Close broker writer failed, broker:" << broker_addr
std::stringstream ss;
ss << "Close broker writer failed, broker:" << broker_addr
<< " msg:" << response.message;
return;
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}

_is_closed = true;
return Status::OK();
}

} // end namespace doris
2 changes: 1 addition & 1 deletion be/src/exec/broker_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class BrokerWriter : public FileWriter {

virtual Status write(const uint8_t* buf, size_t buf_len, size_t* written_len) override;

virtual void close() override;
virtual Status close() override;

private:
ExecEnv* _env;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class FileWriter {
// NOTE: the number of bytes written may be less than count if.
virtual Status write(const uint8_t* buf, size_t buf_len, size_t* written_len) = 0;

virtual void close() = 0;
virtual Status close() = 0;
};

} // end namespace doris
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/local_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ Status LocalFileWriter::write(const uint8_t* buf, size_t buf_len, size_t* writte
return Status::OK();
}

void LocalFileWriter::close() {
Status LocalFileWriter::close() {
if (_fp != nullptr) {
fclose(_fp);
_fp = nullptr;
}
return Status::OK();
}

} // end namespace doris
2 changes: 1 addition & 1 deletion be/src/exec/local_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class LocalFileWriter : public FileWriter {

virtual Status write(const uint8_t* buf, size_t buf_len, size_t* written_len) override;

virtual void close() override;
virtual Status close() override;

private:
std::string _path;
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
}

if (_input_rowsets.size() <= 1) {
LOG(WARNING) << "There is no enough rowsets to cumulative compaction."
<< " The size of rowsets to compact=" << candidate_rowsets.size()
<< ", cumulative_point=" << _tablet->cumulative_layer_point();
LOG(INFO) << "There is no enough rowsets to cumulative compaction."
<< ", the size of rowsets to compact=" << candidate_rowsets.size()
<< ", cumulative_point=" << _tablet->cumulative_layer_point();
return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/snapshot_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ Status SnapshotLoader::upload(
read_offset += read_len;
left_len -= read_len;
}

// close manually, because we need to check its close status
RETURN_IF_ERROR(broker_writer->close());

LOG(INFO) << "finished to write file via broker. file: " <<
full_local_file << ", length: " << file_len;
}
Expand Down
28 changes: 13 additions & 15 deletions fe/src/main/java/org/apache/doris/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -69,7 +68,6 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;


Expand All @@ -95,8 +93,8 @@ public enum BackupJobState {
private long snapshotFinishedTime = -1;
private long snapshopUploadFinishedTime = -1;

// save all tablets which tasks are not finished.
private Set<Long> unfinishedTaskIds = Sets.newConcurrentHashSet();
// save task id map to the backend it be executed
private Map<Long, Long> unfinishedTaskIds = Maps.newConcurrentMap();
// tablet id -> snapshot info
private Map<Long, SnapshotInfo> snapshotInfos = Maps.newConcurrentMap();
// save all related table[partition] info
Expand Down Expand Up @@ -164,12 +162,12 @@ public synchronized boolean finishTabletSnapshotTask(SnapshotTask task, TFinishT

snapshotInfos.put(task.getTabletId(), info);
taskProgress.remove(task.getTabletId());
boolean res = unfinishedTaskIds.remove(task.getTabletId());
Long oldValue = unfinishedTaskIds.remove(task.getTabletId());
taskErrMsg.remove(task.getTabletId());
LOG.debug("get finished snapshot info: {}, unfinished tasks num: {}, remove result: {}. {}",
info, unfinishedTaskIds.size(), res, this);
info, unfinishedTaskIds.size(), (oldValue != null), this);

return res;
return oldValue != null;
}

public synchronized boolean finishSnapshotUploadTask(UploadTask task, TFinishTaskRequest request) {
Expand Down Expand Up @@ -221,11 +219,11 @@ public synchronized boolean finishSnapshotUploadTask(UploadTask task, TFinishTas
}

taskProgress.remove(task.getSignature());
boolean res = unfinishedTaskIds.remove(task.getSignature());
Long oldValue = unfinishedTaskIds.remove(task.getSignature());
taskErrMsg.remove(task.getTabletId());
LOG.debug("get finished upload snapshot task, unfinished tasks num: {}, remove result: {}. {}",
unfinishedTaskIds.size(), res, this);
return res;
unfinishedTaskIds.size(), (oldValue != null), this);
return oldValue != null;
}

@Override
Expand Down Expand Up @@ -406,7 +404,7 @@ private void prepareAndSendSnapshotTask() {
visibleVersion, visibleVersionHash,
schemaHash, timeoutMs, false /* not restore task */);
batchTask.addTask(task);
unfinishedTaskIds.add(tablet.getId());
unfinishedTaskIds.put(tablet.getId(), replica.getBackendId());
}
}

Expand Down Expand Up @@ -505,7 +503,7 @@ private void uploadSnapshot() {
UploadTask task = new UploadTask(null, beId, signature, jobId, dbId, srcToDest,
brokers.get(0), repo.getStorage().getProperties());
batchTask.addTask(task);
unfinishedTaskIds.add(signature);
unfinishedTaskIds.put(signature, beId);
}
}

Expand Down Expand Up @@ -680,13 +678,13 @@ private void cancelInternal() {
switch (state) {
case SNAPSHOTING:
// remove all snapshot tasks in AgentTaskQueue
for (Long taskId : unfinishedTaskIds) {
for (Long taskId : unfinishedTaskIds.keySet()) {
AgentTaskQueue.removeTaskOfType(TTaskType.MAKE_SNAPSHOT, taskId);
}
break;
case UPLOADING:
// remove all upload tasks in AgentTaskQueue
for (Long taskId : unfinishedTaskIds) {
for (Long taskId : unfinishedTaskIds.keySet()) {
AgentTaskQueue.removeTaskOfType(TTaskType.UPLOAD, taskId);
}
break;
Expand Down Expand Up @@ -729,7 +727,7 @@ public List<String> getInfo() {
info.add(TimeUtils.longToTimeString(snapshotFinishedTime));
info.add(TimeUtils.longToTimeString(snapshopUploadFinishedTime));
info.add(TimeUtils.longToTimeString(finishedTime));
info.add(Joiner.on(", ").join(unfinishedTaskIds));
info.add(Joiner.on(", ").join(unfinishedTaskIds.entrySet()));
info.add(Joiner.on(", ").join(taskProgress.entrySet().stream().map(
e -> "[" + e.getKey() + ": " + e.getValue().first + "/" + e.getValue().second + "]").collect(
Collectors.toList())));
Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,7 @@ public List<String> getInfo() {
info.add(TimeUtils.longToTimeString(snapshotFinishedTime));
info.add(TimeUtils.longToTimeString(downloadFinishedTime));
info.add(TimeUtils.longToTimeString(finishedTime));
info.add(Joiner.on(", ").join(unfinishedSignatureToId.keySet()));
info.add(Joiner.on(", ").join(unfinishedSignatureToId.entrySet()));
info.add(Joiner.on(", ").join(taskProgress.entrySet().stream().map(
e -> "[" + e.getKey() + ": " + e.getValue().first + "/" + e.getValue().second + "]").collect(
Collectors.toList())));
Expand Down
16 changes: 2 additions & 14 deletions fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@

import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.transaction.GlobalTransactionMgr;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class TransProcDir implements ProcDirInterface {
Expand Down Expand Up @@ -60,17 +57,8 @@ public ProcResult fetchResult() throws AnalysisException {
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);
GlobalTransactionMgr transactionMgr = Catalog.getCurrentGlobalTransactionMgr();
List<List<Comparable>> infos = transactionMgr.getDbTransInfo(dbId, state.equals("running"), MAX_SHOW_ENTRIES);
// order by transactionId, desc
ListComparator<List<Comparable>> comparator = new ListComparator<List<Comparable>>(true, 0);
Collections.sort(infos, comparator);
for (List<Comparable> info : infos) {
List<String> row = new ArrayList<String>(info.size());
for (Comparable comparable : info) {
row.add(comparable.toString());
}
result.addRow(row);
}
List<List<String>> infos = transactionMgr.getDbTransInfo(dbId, state.equals("running"), MAX_SHOW_ENTRIES);
result.setRows(infos);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ public class TransTablesProcDir implements ProcDirInterface {
.add("CommittedPartitionIds")
.build();

private long tid;
private long txnId;

public TransTablesProcDir(long tid) {
this.tid = tid;
public TransTablesProcDir(long txnId) {
this.txnId = txnId;
}

@Override
Expand All @@ -50,7 +50,7 @@ public boolean register(String name, ProcNodeInterface node) {
public ProcResult fetchResult() throws AnalysisException {
// get info
GlobalTransactionMgr transactionMgr = Catalog.getCurrentGlobalTransactionMgr();
List<List<Comparable>> tableInfos = transactionMgr.getTableTransInfo(tid);
List<List<Comparable>> tableInfos = transactionMgr.getTableTransInfo(txnId);
// sort by table id
ListComparator<List<Comparable>> comparator = new ListComparator<List<Comparable>>(0);
Collections.sort(tableInfos, comparator);
Expand Down Expand Up @@ -83,6 +83,6 @@ public ProcNodeInterface lookup(String tableIdStr) throws AnalysisException {
throw new AnalysisException("Invalid table id format: " + tableIdStr);
}

return new TransPartitionProcNode(tid, tableId);
return new TransPartitionProcNode(txnId, tableId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1190,33 +1190,34 @@ public List<List<String>> getDbTransStateInfo(long dbId) {
return infos;
}

public List<List<Comparable>> getDbTransInfo(long dbId, boolean running, int limit) throws AnalysisException {
List<List<Comparable>> infos = new ArrayList<List<Comparable>>();
public List<List<String>> getDbTransInfo(long dbId, boolean running, int limit) throws AnalysisException {
List<List<String>> infos = new ArrayList<List<String>>();
readLock();
try {
Database db = Catalog.getInstance().getDb(dbId);
if (db == null) {
throw new AnalysisException("Database[" + dbId + "] does not exist");
}

// get transaction order by txn id desc limit 'limit'
idToTransactionState.values().stream()
.filter(t -> (t.getDbId() == dbId && (running ? !t.getTransactionStatus().isFinalStatus()
: t.getTransactionStatus().isFinalStatus())))
: t.getTransactionStatus().isFinalStatus()))).sorted(TransactionState.TXN_ID_COMPARATOR)
.limit(limit)
.forEach(t -> {
List<Comparable> info = new ArrayList<Comparable>();
info.add(t.getTransactionId());
List<String> info = new ArrayList<String>();
info.add(String.valueOf(t.getTransactionId()));
info.add(t.getLabel());
info.add(t.getCoordinator());
info.add(t.getTransactionStatus());
info.add(t.getSourceType());
info.add(t.getTransactionStatus().name());
info.add(t.getSourceType().name());
info.add(TimeUtils.longToTimeString(t.getPrepareTime()));
info.add(TimeUtils.longToTimeString(t.getCommitTime()));
info.add(TimeUtils.longToTimeString(t.getFinishTime()));
info.add(t.getReason());
info.add(t.getErrorReplicas().size());
info.add(t.getCallbackId());
info.add(t.getTimeoutMs());
info.add(String.valueOf(t.getErrorReplicas().size()));
info.add(String.valueOf(t.getCallbackId()));
info.add(String.valueOf(t.getTimeoutMs()));
infos.add(info);
});
} finally {
Expand All @@ -1225,13 +1226,13 @@ public List<List<Comparable>> getDbTransInfo(long dbId, boolean running, int lim
return infos;
}

public List<List<Comparable>> getTableTransInfo(long tid) throws AnalysisException {
public List<List<Comparable>> getTableTransInfo(long txnId) throws AnalysisException {
List<List<Comparable>> tableInfos = new ArrayList<List<Comparable>>();
readLock();
try {
TransactionState transactionState = idToTransactionState.get(tid);
TransactionState transactionState = idToTransactionState.get(txnId);
if (null == transactionState) {
throw new AnalysisException("Transaction[" + tid + "] does not exist.");
throw new AnalysisException("Transaction[" + txnId + "] does not exist.");
}

for (Map.Entry<Long, TableCommitInfo> entry : transactionState.getIdToTableCommitInfos().entrySet()) {
Expand Down
Loading

0 comments on commit e67b398

Please sign in to comment.