Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public class MManager {
private File logFile;
private MLogWriter logWriter;

private MTreeService mtree;
private MTreeService mtree = MTreeService.getInstance();
// device -> DeviceMNode
private LoadingCache<PartialPath, IMNode> mNodeCache;
private TagManager tagManager = TagManager.getInstance();
Expand Down Expand Up @@ -292,14 +292,25 @@ public synchronized void init() {
try {
isRecovering = true;

templateManager.init();
tagManager.init();
mtree = new MTreeService();
mtree.init();

int lineNumber = initFromLog(logFile);

logWriter = new MLogWriter(config.getSchemaDir(), MetadataConstant.METADATA_LOG);
logWriter.setLogNum(lineNumber);

// todo fix me by refactoring tag recover
for (PartialPath path : mtree.getMeasurementPaths(new PartialPath("root.**"))) {
IMeasurementMNode measurementMNode = mtree.getMeasurementMNode(path);
if (measurementMNode.getOffset() != -1) {
tagManager.recoverIndex(measurementMNode.getOffset(), measurementMNode);
} else {
mtree.unPinMNode(measurementMNode);
}
}

isRecovering = false;
} catch (MetadataException | IOException e) {
logger.error(
Expand Down Expand Up @@ -552,6 +563,20 @@ public void operation(PhysicalPlan plan) throws IOException, MetadataException {
logger.error("Unrecognizable command {}", plan.getOperatorType());
}
}

public void flushMetadata() {
if (!config.isEnablePersistentSchema()) {
return;
}
try {
templateManager.sync();
tagManager.sync();
mtree.sync();
logWriter.clear();
} catch (MetadataException | IOException e) {
logger.error("Exception occurred while flushing MManager");
}
}
// endregion

// region Interfaces for CQ
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ private MetadataConstant() {
MTREE_PREFIX + IoTDBConstant.FILE_NAME_SEPARATOR + MTREE_VERSION + ".snapshot.bin.tmp";
public static final String SCHEMA_FILE_DIR = "pst";
public static final String SCHEMA_FILE_SUFFIX = "pst";
public static final String TEMPLATE_FILE = "templates.bin";

public static final PartialPath ALL_MATCH_PATTERN = new PartialPath(new String[] {"root", "**"});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,22 @@ public class MTreeService implements Serializable {
private IMNode root;
private IMTreeStore store;

// region MTree Singleton
private static class MTreeServiceHolder {

private MTreeServiceHolder() {
// allowed to do nothing
}

private static final MTreeService INSTANCE = new MTreeService();
}

/** we should not use this function in other place, but only in IoTDB class */
public static MTreeService getInstance() {
return MTreeService.MTreeServiceHolder.INSTANCE;
}
// endregion

// region MTree initialization, clear and serialization
public MTreeService() {}

Expand All @@ -146,6 +162,10 @@ public void init() throws MetadataException, IOException {
this.root = store.getRoot();
}

public void sync() throws MetadataException, IOException {
store.sync();
}

public void clear() {
store.clear();
root = store.getRoot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iotdb.db.metadata.mtree.store.disk.cache.MemManager;
import org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.ISchemaFileManager;
import org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.SFManager;
import org.apache.iotdb.db.service.IoTDB;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -342,19 +343,24 @@ public void unPin(IMNode node) {
@Override
public void createSnapshot() throws IOException {}

@Override
public void sync() throws MetadataException, IOException {
flushVolatileNodes();
}

/** clear all the data of MTreeStore in memory and disk. */
@Override
public void clear() {
if (flushTask != null) {
flushTask.shutdown();
while (!flushTask.isTerminated()) ;
flushTask = null;
}
root = null;
cacheStrategy.clear();
memManager.clear();
if (file != null) {
try {
file.clear();
file.close();
} catch (MetadataException | IOException e) {
logger.error(String.format("Error occurred during SchemaFile clear, %s", e.getMessage()));
Expand Down Expand Up @@ -399,7 +405,11 @@ private synchronized void registerFlushTask() {
return;
}
hasFlushTask = true;
flushTask.submit(this::flushVolatileNodes);
flushTask.submit(this::triggerMManagerFlush);
}

private void triggerMManagerFlush() {
IoTDB.metaManager.flushMetadata();
}

/** Sync all volatile nodes to schemaFile and execute memory release after flush. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public interface IMTreeStore {

void createSnapshot() throws IOException;

void sync() throws MetadataException, IOException;

void clear();

String toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.MetadataConstant;
import org.apache.iotdb.db.metadata.logfile.MLogReader;
import org.apache.iotdb.db.metadata.logfile.MLogWriter;
Expand Down Expand Up @@ -193,6 +194,9 @@ public void createSnapshot() throws IOException {
}
}

@Override
public void sync() throws MetadataException, IOException {}

public void serializeTo(String snapshotPath) throws IOException {
try (MLogWriter mLogWriter = new MLogWriter(snapshotPath)) {
root.serializeTo(mLogWriter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ protected SFManager() {
@Override
public IMNode init() throws MetadataException, IOException {
loadSchemaFiles();
return getUpperMTree();
IMNode result = MockSFManager.cloneMNode(root);
SchemaFile.setNodeAddress(result, 0);
return result;
}

@Override
Expand Down Expand Up @@ -388,6 +390,7 @@ private void appendStorageGroupNode(String[] nodes, long dataTTL, boolean isEnti
for (int i = 1; i < nodes.length - 1; i++) {
if (!cur.hasChild(nodes[i])) {
cur.addChild(new InternalMNode(cur, nodes[i]));
SchemaFile.setNodeAddress(cur.getChild(nodes[i]), 0L);
}
cur = cur.getChild(nodes[i]);
if (cur.isStorageGroup()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ private void serializeMap(Map<String, String> map, ByteBuffer byteBuffer)
}
}

public void sync() throws IOException {
fileChannel.force(true);
}

@Override
public void close() throws IOException {
fileChannel.force(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,10 @@ public Pair<Map<String, String>, Map<String, String>> readTagFile(long tagFileOf
return tagLogFile.read(config.getTagAttributeTotalSize(), tagFileOffset);
}

public void sync() throws IOException {
tagLogFile.sync();
}

public void clear() throws IOException {
this.tagIndex.clear();
if (tagLogFile != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.metadata.template;

import org.apache.iotdb.db.metadata.logfile.MLogReader;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;

import java.io.IOException;

public class TemplateFileReader implements AutoCloseable {

private MLogReader logReader;

public TemplateFileReader(String schemaDir, String fileName) throws IOException {
logReader = new MLogReader(schemaDir, fileName);
}

public boolean hasNext() {
return logReader.hasNext();
}

public PhysicalPlan next() {
return logReader.next();
}

@Override
public void close() {
logReader.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.metadata.template;

import org.apache.iotdb.db.metadata.logfile.MLogWriter;
import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.DropTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;

import java.io.IOException;

public class TemplateFileWriter {

private final MLogWriter logWriter;

public TemplateFileWriter(String schemaDir, String fileName) throws IOException {
logWriter = new MLogWriter(schemaDir, fileName);
}

public void createSchemaTemplate(CreateTemplatePlan plan) throws IOException {
logWriter.createSchemaTemplate(plan);
}

public void appendSchemaTemplate(AppendTemplatePlan plan) throws IOException {
logWriter.appendSchemaTemplate(plan);
}

public void pruneSchemaTemplate(PruneTemplatePlan plan) throws IOException {
logWriter.pruneSchemaTemplate(plan);
}

public void dropSchemaTemplate(DropTemplatePlan plan) throws IOException {
logWriter.dropSchemaTemplate(plan);
}

public void force() throws IOException {
logWriter.force();
}

public void close() throws IOException {
logWriter.close();
}

public void clear() throws IOException {
logWriter.clear();
}
}
Loading