diff --git a/example/influxdb-protocol-example/src/main/java/org/apache/iotdb/influxdb/InfluxDBExample.java b/example/influxdb-protocol-example/src/main/java/org/apache/iotdb/influxdb/InfluxDBExample.java index 64a5d178226a..cdb884dfc1b2 100644 --- a/example/influxdb-protocol-example/src/main/java/org/apache/iotdb/influxdb/InfluxDBExample.java +++ b/example/influxdb-protocol-example/src/main/java/org/apache/iotdb/influxdb/InfluxDBExample.java @@ -102,7 +102,7 @@ private static void queryData() { query = new Query( - "select count(temperature),first(temperature),last(temperature),max(temperature),mean(temperature),median(temperature),min(temperature),mode(temperature),spread(temperature),stddev(temperature),sum(temperature) from student where ((workshop=\"A1\" and production=\"B1\" and cell =\"C1\" ) or temperature< 15 )", + "select count(temperature),first(temperature),last(temperature),max(temperature),mean(temperature),median(temperature),min(temperature),mode(temperature),spread(temperature),stddev(temperature),sum(temperature) from factory where ((workshop=\"A1\" and production=\"B1\" and cell =\"C1\" ) or temperature< 15 )", database); result = influxDB.query(query); System.out.println("query2 result:" + result.getResults().get(0).getSeries().get(0).toString()); diff --git a/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/integration/IoTDBInfluxDBIT.java b/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/integration/IoTDBInfluxDBIT.java index d8c97dc2b4ab..047cada0f145 100644 --- a/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/integration/IoTDBInfluxDBIT.java +++ b/influxdb-protocol/src/test/java/org/apache/iotdb/influxdb/integration/IoTDBInfluxDBIT.java @@ -33,11 +33,15 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.utility.DockerImageName; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class IoTDBInfluxDBIT { @@ -164,8 +168,9 @@ public void testCommonQueryColumn() { QueryResult.Series series = result.getResults().get(0).getSeries().get(0); String[] retArray = new String[] {"time", "name", "sex", "province", "country", "score", "tel"}; + Set columnNames = new HashSet<>(Arrays.asList(retArray)); for (int i = 0; i < series.getColumns().size(); i++) { - assertEquals(retArray[i], series.getColumns().get(i)); + assertTrue(columnNames.contains(series.getColumns().get(i))); } } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java index 358d37f7b8bc..7b3d243242db 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java @@ -35,6 +35,7 @@ import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode; import org.apache.iotdb.db.metadata.mtree.ConfigMTree; import org.apache.iotdb.db.metadata.rescon.SchemaResourceManager; +import org.apache.iotdb.db.metadata.schemaregion.tagschemaregion.MockTagSchemaRegion; import org.apache.iotdb.db.metadata.visitor.SchemaExecutionVisitor; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.external.api.ISeriesNumerLimiter; @@ -323,6 +324,11 @@ private ISchemaRegion createSchemaRegionWithoutExistenceCheck( new RSchemaRegionLoader() .loadRSchemaRegion(storageGroup, schemaRegionId, storageGroupMNode); break; + case Tag: + schemaRegion = + new MockTagSchemaRegion( + storageGroup, schemaRegionId, storageGroupMNode, seriesNumerLimiter); + break; default: throw new UnsupportedOperationException( String.format( diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngineMode.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngineMode.java index 9147b9374c01..6b3270e87612 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngineMode.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngineMode.java @@ -22,5 +22,6 @@ public enum SchemaEngineMode { Memory, Schema_File, - Rocksdb_based + Rocksdb_based, + Tag } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/tagschemaregion/MockTagSchemaRegion.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/tagschemaregion/MockTagSchemaRegion.java new file mode 100644 index 000000000000..e4122375469c --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/tagschemaregion/MockTagSchemaRegion.java @@ -0,0 +1,927 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.metadata.schemaregion.tagschemaregion; + +import org.apache.iotdb.common.rpc.thrift.TSchemaNode; +import org.apache.iotdb.commons.consensus.SchemaRegionId; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.path.PathPatternTree; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException; +import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException; +import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException; +import org.apache.iotdb.db.exception.metadata.PathNotExistException; +import org.apache.iotdb.db.metadata.LocalSchemaProcessor; +import org.apache.iotdb.db.metadata.idtable.IDTable; +import org.apache.iotdb.db.metadata.idtable.IDTableManager; +import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry; +import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory; +import org.apache.iotdb.db.metadata.idtable.entry.DiskSchemaEntry; +import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID; +import org.apache.iotdb.db.metadata.idtable.entry.InsertMeasurementMNode; +import org.apache.iotdb.db.metadata.idtable.entry.SHA256DeviceID; +import org.apache.iotdb.db.metadata.idtable.entry.SchemaEntry; +import org.apache.iotdb.db.metadata.mnode.EntityMNode; +import org.apache.iotdb.db.metadata.mnode.IMNode; +import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; +import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode; +import org.apache.iotdb.db.metadata.path.MeasurementPath; +import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion; +import org.apache.iotdb.db.metadata.schemaregion.SchemaRegionUtils; +import org.apache.iotdb.db.metadata.template.Template; +import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo; +import org.apache.iotdb.db.mpp.common.schematree.MeasurementSchemaInfo; +import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.qp.physical.sys.ActivateTemplateInClusterPlan; +import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan; +import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan; +import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan; +import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan; +import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.dataset.ShowDevicesResult; +import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult; +import org.apache.iotdb.external.api.ISeriesNumerLimiter; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import org.jetbrains.annotations.NotNull; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; + +import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; + +public class MockTagSchemaRegion implements ISchemaRegion { + + protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private final String TAIL = ".**"; + private final IStorageGroupMNode storageGroupMNode; + private String storageGroupFullPath; + private SchemaRegionId schemaRegionId; + + private Map>> tagInvertedIndex; + + private List deviceIDS; + + private IDTable idTable; + + private final ISeriesNumerLimiter seriesNumerLimiter; + + public MockTagSchemaRegion( + PartialPath storageGroup, + SchemaRegionId schemaRegionId, + IStorageGroupMNode storageGroupMNode, + ISeriesNumerLimiter seriesNumerLimiter) + throws MetadataException { + + storageGroupFullPath = storageGroup.getFullPath(); + this.schemaRegionId = schemaRegionId; + this.storageGroupMNode = storageGroupMNode; + this.deviceIDS = new ArrayList<>(); + this.seriesNumerLimiter = seriesNumerLimiter; + tagInvertedIndex = new ConcurrentHashMap<>(); + idTable = IDTableManager.getInstance().getIDTable(storageGroup); + init(); + } + + @NotNull + private Map pathToTags(String path) { + if (path.length() <= storageGroupFullPath.length()) return new TreeMap<>(); + String devicePath = path.substring(storageGroupFullPath.length() + 1); + String[] tags = devicePath.split("\\."); + Map tagsMap = new TreeMap<>(); + for (int i = 0; i < tags.length; i += 2) { + tagsMap.put(tags[i], tags[i + 1]); + } + return tagsMap; + } + + public String tagsToPath(Map tags) { + StringBuilder stringBuilder = new StringBuilder(storageGroupFullPath); + for (String tagKey : tags.keySet()) { + stringBuilder.append(".").append(tagKey).append(".").append(tags.get(tagKey)); + } + return stringBuilder.toString(); + } + + @Override + public void init() throws MetadataException { + if (!config.isEnableIDTableLogFile() + && config.getDeviceIDTransformationMethod().equals("SHA256")) { + throw new MetadataException( + "enableIDTableLogFile OR deviceIDTransformationMethod==\"Plain\""); + } + } + + @Override + public void clear() { + return; + } + + @Override + public void forceMlog() { + return; + } + + @Override + public SchemaRegionId getSchemaRegionId() { + return schemaRegionId; + } + + @Override + public String getStorageGroupFullPath() { + return storageGroupFullPath; + } + + @Override + public void deleteSchemaRegion() throws MetadataException { + return; + } + + @Override + public boolean createSnapshot(File snapshotDir) { + return false; + } + + @Override + public void loadSnapshot(File latestSnapshotRootDir) { + return; + } + + private void createTagInvertedIndex(PartialPath devicePath) { + IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(devicePath); + Map tagsMap = pathToTags(devicePath.getFullPath()); + + deviceIDS.add(deviceID); + + for (String tagkey : tagsMap.keySet()) { + String tagValue = tagsMap.get(tagkey); + Map> tagkeyMap = + tagInvertedIndex.computeIfAbsent(tagkey, key -> new HashMap<>()); + List ids = tagkeyMap.computeIfAbsent(tagValue, key -> new ArrayList<>()); + ids.add(deviceIDS.size() - 1); + } + } + + private void createTimeseries( + PartialPath path, + TSDataType dataType, + TSEncoding encoding, + CompressionType compressor, + Map props) + throws MetadataException { + createTimeseries( + new CreateTimeSeriesPlan(path, dataType, encoding, compressor, props, null, null, null), 0); + } + + private void createAlignedTimeSeries( + PartialPath prefixPath, + List measurements, + List dataTypes, + List encodings, + List compressors) + throws MetadataException { + createAlignedTimeSeries( + new CreateAlignedTimeSeriesPlan( + prefixPath, measurements, dataTypes, encodings, compressors, null, null, null)); + } + + @Override // [iotdb|newIotdb/创建非对齐时间序列] [newIotdb/insert 2自动创建时间序列] + public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException { + PartialPath devicePath = plan.getPath().getDevicePath(); + Map tags = pathToTags(devicePath.getFullPath()); + PartialPath path = new PartialPath(tagsToPath(tags) + "." + plan.getPath().getMeasurement()); + plan.setPath(path); + devicePath = plan.getPath().getDevicePath(); + DeviceEntry deviceEntry = idTable.getDeviceEntry(devicePath.getFullPath()); + if (deviceEntry != null) { + if (deviceEntry.isAligned()) { + throw new AlignedTimeseriesException( + "Timeseries under this entity is not aligned, please use createTimeseries or change entity.", + devicePath.getFullPath() + "." + plan.getPath().getMeasurement()); + } else if (deviceEntry.getMeasurementMap().containsKey(plan.getPath().getMeasurement())) { + throw new PathAlreadyExistException( + devicePath.getFullPath() + "." + plan.getPath().getMeasurement()); + } + } + idTable.createTimeseries(plan); + if (deviceEntry == null) { + createTagInvertedIndex(devicePath); + } + } + + @Override // [iotdb|newIotdb/对齐时间序列] [newIotdb/insert 2自动创建时间序列] + public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException { + PartialPath devicePath = plan.getPrefixPath(); + Map tags = pathToTags(devicePath.getFullPath()); + PartialPath path = new PartialPath(tagsToPath(tags)); + plan.setPrefixPath(path); + devicePath = plan.getPrefixPath(); + DeviceEntry deviceEntry = idTable.getDeviceEntry(devicePath.getFullPath()); + if (deviceEntry != null) { + if (!deviceEntry.isAligned()) { + throw new AlignedTimeseriesException( + "Timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.", + devicePath.getFullPath()); + } else { + List measurements = plan.getMeasurements(); + List dataTypes = plan.getDataTypes(); + List encodings = plan.getEncodings(); + List compressors = plan.getCompressors(); + + List tmpMeasurements = new LinkedList<>(); + List tmpDataTypes = new LinkedList<>(); + List tmpEncodings = new LinkedList<>(); + List tmpCompressors = new LinkedList<>(); + for (int i = 0; i < measurements.size(); i++) { + String measurement = measurements.get(i); + if (!deviceEntry.getMeasurementMap().containsKey(measurement)) { + tmpMeasurements.add(measurements.get(i)); + tmpDataTypes.add(dataTypes.get(i)); + tmpEncodings.add(encodings.get(i)); + tmpCompressors.add(compressors.get(i)); + } + } + if (tmpMeasurements.size() == 0) + throw new PathAlreadyExistException(devicePath.getFullPath()); + plan.setMeasurements(tmpMeasurements); + plan.setDataTypes(tmpDataTypes); + plan.setEncodings(tmpEncodings); + plan.setCompressors(tmpCompressors); + } + } + idTable.createAlignedTimeseries(plan); + if (deviceEntry == null) { + createTagInvertedIndex(devicePath); + } + } + + @Override + public Pair> deleteTimeseries(PartialPath pathPattern, boolean isPrefixMatch) + throws MetadataException { + throw new UnsupportedOperationException(""); + } + + @Override + public int constructSchemaBlackList(PathPatternTree patternTree) throws MetadataException { + return 0; + } + + @Override + public void rollbackSchemaBlackList(PathPatternTree patternTree) throws MetadataException {} + + @Override + public List fetchSchemaBlackList(PathPatternTree patternTree) + throws MetadataException { + return null; + } + + @Override + public void deleteTimeseriesInBlackList(PathPatternTree patternTree) throws MetadataException {} + + @Override + public void autoCreateDeviceMNode(AutoCreateDeviceMNodePlan plan) throws MetadataException { + throw new UnsupportedOperationException(""); + } + + @Override + public boolean isPathExist(PartialPath path) throws MetadataException { + throw new UnsupportedOperationException(""); + } + + @Override + public int getAllTimeseriesCount(PartialPath pathPattern, boolean isPrefixMatch) + throws MetadataException { + int res = 0; + List deviceIDs = getDeviceIdFromInvertedIndex(pathPattern); + for (IDeviceID deviceID : deviceIDs) { + res += idTable.getDeviceEntry(deviceID.toStringID()).getMeasurementMap().keySet().size(); + } + return res; + } + + @Override + public int getAllTimeseriesCount( + PartialPath pathPattern, Map templateMap, boolean isPrefixMatch) + throws MetadataException { + return 0; + } + + @Override + public int getAllTimeseriesCount( + PartialPath pathPattern, boolean isPrefixMatch, String key, String value, boolean isContains) + throws MetadataException { + return 0; + } + + @Override + public Map getMeasurementCountGroupByLevel( + PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException { + throw new UnsupportedOperationException(""); + } + + @Override + public Map getMeasurementCountGroupByLevel( + PartialPath pathPattern, + int level, + boolean isPrefixMatch, + String key, + String value, + boolean isContains) + throws MetadataException { + return null; + } + + @Override + public int getDevicesNum(PartialPath pathPattern, boolean isPrefixMatch) + throws MetadataException { + if (pathPattern.getFullPath().length() <= storageGroupFullPath.length()) { + return deviceIDS.size(); + } else { + return getDeviceIDsByInvertedIndex(pathPattern).size(); + } + } + + @Override + public int getNodesCountInGivenLevel(PartialPath pathPattern, int level, boolean isPrefixMatch) + throws MetadataException { + throw new UnsupportedOperationException(""); + } + + @Override + public List getNodesListInGivenLevel( + PartialPath pathPattern, + int nodeLevel, + boolean isPrefixMatch, + LocalSchemaProcessor.StorageGroupFilter filter) + throws MetadataException { + throw new UnsupportedOperationException(""); + } + + @Override + public Set getChildNodePathInNextLevel(PartialPath pathPattern) + throws MetadataException { + throw new UnsupportedOperationException(""); + } + + @Override + public Set getChildNodeNameInNextLevel(PartialPath pathPattern) throws MetadataException { + throw new UnsupportedOperationException(""); + } + + @Override + public Set getBelongedDevices(PartialPath timeseries) throws MetadataException { + throw new UnsupportedOperationException(""); + } + + @Override // [newIotdb/show timeseries] [newIotdb/count device] [newIotdb/count timeseries] + public Set getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch) + throws MetadataException { + List deviceIDs = getDeviceIdFromInvertedIndex(pathPattern); + Set res = new HashSet<>(); + String devicePath = pathPattern.getFullPath(); + if (!devicePath.endsWith(TAIL) && !devicePath.equals(storageGroupFullPath)) { + DeviceEntry deviceEntry = idTable.getDeviceEntry(devicePath); + if (deviceEntry != null) { + res.add(pathPattern); + } + return res; + } + for (IDeviceID deviceID : deviceIDs) { + if (deviceID instanceof SHA256DeviceID) { + DeviceEntry deviceEntry = idTable.getDeviceEntry(deviceID.toStringID()); + Map map = deviceEntry.getMeasurementMap(); + for (String m : map.keySet()) { + SchemaEntry schemaEntry = map.get(m); + List schemaEntries = new ArrayList<>(); + schemaEntries.add(schemaEntry); + List diskSchemaEntries = idTable.getDiskSchemaEntries(schemaEntries); + DiskSchemaEntry diskSchemaEntry = diskSchemaEntries.get(0); + res.add( + new PartialPath( + diskSchemaEntry.seriesKey.substring( + 0, + diskSchemaEntry.seriesKey.length() + - diskSchemaEntry.measurementName.length() + - 1))); + break; + } + } else { + res.add(new PartialPath(deviceID.toStringID())); + } + } + return res; + } + + @Override + public Pair, Integer> getMatchedDevices(ShowDevicesPlan plan) + throws MetadataException { + throw new UnsupportedOperationException(""); + } + + @Override // [newIotDB / insert1,3] [newIotDB/select] [newIotdb/select count()] [newIotdb/select + // .. groupby level] + public List getMeasurementPaths(PartialPath pathPattern, boolean isPrefixMatch) + throws MetadataException { + PartialPath devicePath = pathPattern.getDevicePath(); + // 批量查询.路径以".**"结尾,如: + // root.sg.tag1.a.** + // root.sg.tagx.c.tag2.v.** + // 点查询.路径不以".**",直接走IDTable,精确查询 + if (devicePath.getFullPath().endsWith(TAIL)) { + return getMeasurementPathsWithBatchQuery(devicePath, isPrefixMatch); + } else { + return getMeasurementPathsWithPointQuery(devicePath, isPrefixMatch); + } + } + + private List getMeasurementPathsWithPointQuery( + PartialPath devicePath, boolean isPrefixMatch) throws MetadataException { + List res = new LinkedList<>(); + String path = devicePath.getFullPath(); + Map tags = pathToTags(path); + path = tagsToPath(tags); + DeviceEntry deviceEntry = idTable.getDeviceEntry(path); + if (deviceEntry == null) return res; + Map schemaMap = deviceEntry.getMeasurementMap(); + for (String measurement : schemaMap.keySet()) { + SchemaEntry schemaEntry = schemaMap.get(measurement); + MeasurementPath measurementPath = + new MeasurementPath( + path, + measurement, + new MeasurementSchema( + measurement, + schemaEntry.getTSDataType(), + schemaEntry.getTSEncoding(), + schemaEntry.getCompressionType())); + measurementPath.setUnderAlignedEntity(deviceEntry.isAligned()); + res.add(measurementPath); + } + + return res; + } + + private List getMeasurementPathsWithBatchQuery( + PartialPath devicePath, boolean isPrefixMatch) throws MetadataException { + List res = new LinkedList<>(); + List deviceIDs = getDeviceIdFromInvertedIndex(devicePath); + for (IDeviceID deviceID : deviceIDs) { + DeviceEntry deviceEntry = idTable.getDeviceEntry(deviceID.toStringID()); + Map schemaMap = deviceEntry.getMeasurementMap(); + if (deviceID instanceof SHA256DeviceID) { + for (String measurement : schemaMap.keySet()) { + SchemaEntry schemaEntry = schemaMap.get(measurement); + List schemaEntries = new ArrayList<>(); + schemaEntries.add(schemaEntry); + List diskSchemaEntries = idTable.getDiskSchemaEntries(schemaEntries); + DiskSchemaEntry diskSchemaEntry = diskSchemaEntries.get(0); + MeasurementPath measurementPath = + new MeasurementPath( + new PartialPath(diskSchemaEntry.seriesKey), + new MeasurementSchema( + measurement, + schemaEntry.getTSDataType(), + schemaEntry.getTSEncoding(), + schemaEntry.getCompressionType())); + measurementPath.setUnderAlignedEntity(deviceEntry.isAligned()); + res.add(measurementPath); + } + } else { + for (String measurement : schemaMap.keySet()) { + SchemaEntry schemaEntry = schemaMap.get(measurement); + MeasurementPath measurementPath = + new MeasurementPath( + deviceID.toStringID(), + measurement, + new MeasurementSchema( + measurement, + schemaEntry.getTSDataType(), + schemaEntry.getTSEncoding(), + schemaEntry.getCompressionType())); + measurementPath.setUnderAlignedEntity(deviceEntry.isAligned()); + res.add(measurementPath); + } + } + } + return res; + } + + // [iotdb/select] [iotdb/select last] [iotdb/select count()] [iotdb/select ...groupby level] + @Override + public Pair, Integer> getMeasurementPathsWithAlias( + PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch) + throws MetadataException { + List res = getMeasurementPaths(pathPattern, isPrefixMatch); + Pair, Integer> result = new Pair<>(res, 0); + return result; + } + + @Override + public List fetchSchema( + PartialPath pathPattern, Map templateMap) throws MetadataException { + return null; + } + + // show 时间序列 + @Override // [iotdb/show timeseries] + public Pair, Integer> showTimeseries( + ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException { + List res = new ArrayList<>(); + Pair, Integer> result = new Pair<>(res, 0); + String path = plan.getPath().getFullPath(); + if (!path.endsWith(TAIL)) { + Map tags = pathToTags(path); + path = tagsToPath(tags); + DeviceEntry deviceEntry = idTable.getDeviceEntry(path); + if (deviceEntry != null) { + Map measurementMap = deviceEntry.getMeasurementMap(); + for (String m : measurementMap.keySet()) { + SchemaEntry schemaEntry = measurementMap.get(m); + res.add( + new ShowTimeSeriesResult( + path + "." + m, + "null", + storageGroupFullPath, + schemaEntry.getTSDataType(), + schemaEntry.getTSEncoding(), + schemaEntry.getCompressionType(), + schemaEntry.getLastTime(), + new HashMap<>(), + new HashMap<>())); + } + } + return result; + } + List deviceIDs = getDeviceIdFromInvertedIndex(plan.getPath()); + for (IDeviceID deviceID : deviceIDs) { + getTimeSeriesResultOfDeviceFromIDTable(res, deviceID); + } + return result; + } + + private List getDeviceIdFromInvertedIndex(PartialPath devicePath) + throws MetadataException { + String path = devicePath.getFullPath(); + if (path.endsWith(TAIL)) { + path = path.substring(0, path.length() - TAIL.length()); + devicePath = new PartialPath(path); + } + if (devicePath.getFullPath().length() <= storageGroupFullPath.length()) { + return deviceIDS; + } else { + List res = new LinkedList<>(); + List ids = getDeviceIDsByInvertedIndex(devicePath); + if (ids.size() > 0) { + for (int id : ids) { + res.add(deviceIDS.get(id)); + } + } + return res; + } + } + + private void getTimeSeriesResultOfDeviceFromIDTable( + List res, IDeviceID deviceID) { + Map measurementMap = + idTable.getDeviceEntry(deviceID.toStringID()).getMeasurementMap(); + if (deviceID instanceof SHA256DeviceID) { + for (String m : measurementMap.keySet()) { + SchemaEntry schemaEntry = measurementMap.get(m); + List schemaEntries = new ArrayList<>(); + schemaEntries.add(schemaEntry); + List diskSchemaEntries = idTable.getDiskSchemaEntries(schemaEntries); + DiskSchemaEntry diskSchemaEntry = diskSchemaEntries.get(0); + res.add( + new ShowTimeSeriesResult( + diskSchemaEntry.seriesKey, + "null", + storageGroupFullPath, + schemaEntry.getTSDataType(), + schemaEntry.getTSEncoding(), + schemaEntry.getCompressionType(), + schemaEntry.getLastTime(), + new HashMap<>(), + new HashMap<>())); + } + } else { + for (String m : measurementMap.keySet()) { + SchemaEntry schemaEntry = measurementMap.get(m); + res.add( + new ShowTimeSeriesResult( + deviceID.toStringID() + "." + m, + "null", + storageGroupFullPath, + schemaEntry.getTSDataType(), + schemaEntry.getTSEncoding(), + schemaEntry.getCompressionType(), + schemaEntry.getLastTime(), + new HashMap<>(), + new HashMap<>())); + } + } + } + + private List getDeviceIDsByInvertedIndex(PartialPath path) { + Map tags = pathToTags(path.getFullPath()); + List idsCollection = new ArrayList<>(tags.keySet().size()); + for (String tagKey : tags.keySet()) { + if (!tagInvertedIndex.containsKey(tagKey) + || !tagInvertedIndex.get(tagKey).containsKey(tags.get(tagKey))) { + return new ArrayList<>(); + } + List ids = tagInvertedIndex.get(tagKey).get(tags.get(tagKey)); + idsCollection.add(new ArrayList(ids)); + } + if (idsCollection.size() == 0) return new ArrayList<>(); + List ids = idsCollection.get(0); + for (int i = 1; i < idsCollection.size(); i++) { + List list = idsCollection.get(i); + ids.retainAll(list); + } + return ids; + } + + @Override + public List getAllMeasurementByDevicePath(PartialPath devicePath) + throws PathNotExistException { + throw new UnsupportedOperationException(""); + } + + @Override + public IMNode getDeviceNode(PartialPath path) throws MetadataException { + DeviceEntry deviceEntry = idTable.getDeviceEntry(path.getFullPath()); + if (deviceEntry == null) throw new PathNotExistException(path.getFullPath()); + return new EntityMNode(storageGroupMNode, path.getFullPath()); + } + + @Override + public IMeasurementMNode getMeasurementMNode(PartialPath fullPath) throws MetadataException { + throw new UnsupportedOperationException(""); + } + + @Override + public void changeAlias(PartialPath path, String alias) throws MetadataException, IOException { + throw new UnsupportedOperationException(""); + } + + @Override + public void upsertTagsAndAttributes( + String alias, + Map tagsMap, + Map attributesMap, + PartialPath fullPath) + throws MetadataException, IOException { + throw new UnsupportedOperationException(""); + } + + @Override + public void addAttributes(Map attributesMap, PartialPath fullPath) + throws MetadataException, IOException { + throw new UnsupportedOperationException(""); + } + + @Override + public void addTags(Map tagsMap, PartialPath fullPath) + throws MetadataException, IOException { + throw new UnsupportedOperationException(""); + } + + @Override + public void dropTagsOrAttributes(Set keySet, PartialPath fullPath) + throws MetadataException, IOException { + throw new UnsupportedOperationException(""); + } + + @Override + public void setTagsOrAttributesValue(Map alterMap, PartialPath fullPath) + throws MetadataException, IOException { + throw new UnsupportedOperationException(""); + } + + @Override + public void renameTagOrAttributeKey(String oldKey, String newKey, PartialPath fullPath) + throws MetadataException, IOException { + throw new UnsupportedOperationException(""); + } + + // insert data + @Override // [iotdb/insert ] + public IMNode getSeriesSchemasAndReadLockDevice(InsertPlan plan) + throws MetadataException, IOException { + PartialPath devicePath = plan.getDevicePath(); + Map tags = pathToTags(devicePath.getFullPath()); + devicePath = new PartialPath(tagsToPath(tags)); + plan.setDevicePath(devicePath); + String[] measurementList = plan.getMeasurements(); + IMeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes(); + checkAlignedAndAutoCreateSeries(plan); + IMNode deviceMNode = getDeviceNode(devicePath); + IMeasurementMNode measurementMNode; + DeviceEntry deviceEntry = idTable.getDeviceEntry(devicePath.getFullPath()); + Map schemaMap = deviceEntry.getMeasurementMap(); + for (int i = 0; i < measurementList.length; i++) { + SchemaEntry schemaEntry = schemaMap.get(measurementList[i]); + // measurementMNode = + // new MeasurementMNode( + // deviceMNode, + // measurementList[i], + // new MeasurementSchema( + // measurementList[i], + // schemaEntry.getTSDataType(), + // schemaEntry.getTSEncoding(), + // schemaEntry.getCompressionType()), + // null); + measurementMNode = new InsertMeasurementMNode(measurementList[i], schemaEntry, null); + // check type is match + try { + SchemaRegionUtils.checkDataTypeMatch(plan, i, schemaEntry.getTSDataType()); + } catch (DataTypeMismatchException mismatchException) { + if (!config.isEnablePartialInsert()) { + throw mismatchException; + } else { + // mark failed measurement + plan.markFailedMeasurementInsertion(i, mismatchException); + continue; + } + } + measurementMNodes[i] = measurementMNode; + } + plan.setDeviceID(deviceEntry.getDeviceID()); + plan.setDevicePath(new PartialPath(deviceEntry.getDeviceID().toStringID(), false)); + return deviceMNode; + } + + private SchemaEntry getSchemaEntry(String devicePath, String measurementName) { + DeviceEntry deviceEntry = idTable.getDeviceEntry(devicePath); + if (deviceEntry == null) return null; + return deviceEntry.getSchemaEntry(measurementName); + } + + @Override + public DeviceSchemaInfo getDeviceSchemaInfoWithAutoCreate( + PartialPath devicePath, + String[] measurements, + Function getDataType, + boolean aligned) + throws MetadataException { + List measurementSchemaInfoList = new ArrayList<>(measurements.length); + for (int i = 0; i < measurements.length; i++) { + SchemaEntry schemaEntry = getSchemaEntry(devicePath.getFullPath(), measurements[i]); + if (schemaEntry == null) { + if (config.isAutoCreateSchemaEnabled()) { + if (aligned) { + internalAlignedCreateTimeseries( + devicePath, + Collections.singletonList(measurements[i]), + Collections.singletonList(getDataType.apply(i))); + + } else { + internalCreateTimeseries(devicePath.concatNode(measurements[i]), getDataType.apply(i)); + } + } + schemaEntry = getSchemaEntry(devicePath.getFullPath(), measurements[i]); + } + measurementSchemaInfoList.add( + new MeasurementSchemaInfo( + measurements[i], + new MeasurementSchema( + measurements[i], + schemaEntry.getTSDataType(), + schemaEntry.getTSEncoding(), + schemaEntry.getCompressionType()), + null)); + } + return new DeviceSchemaInfo(devicePath, aligned, measurementSchemaInfoList); + } + + private void checkAlignedAndAutoCreateSeries(InsertPlan plan) throws MetadataException { + String[] measurementList = plan.getMeasurements(); + try { + if (plan.isAligned()) { + internalAlignedCreateTimeseries( + plan.getDevicePath(), + Arrays.asList(measurementList), + Arrays.asList(plan.getDataTypes())); + } else { + internalCreateTimeseries( + plan.getDevicePath().concatNode(measurementList[0]), plan.getDataTypes()[0]); + } + } catch (MetadataException e) { + if (!(e instanceof PathAlreadyExistException)) { + throw e; + } + } + } + + private void internalCreateTimeseries(PartialPath path, TSDataType dataType) + throws MetadataException { + createTimeseries( + path, + dataType, + getDefaultEncoding(dataType), + TSFileDescriptor.getInstance().getConfig().getCompressor(), + Collections.emptyMap()); + } + + private void internalAlignedCreateTimeseries( + PartialPath prefixPath, List measurements, List dataTypes) + throws MetadataException { + List encodings = new ArrayList<>(); + List compressors = new ArrayList<>(); + for (TSDataType dataType : dataTypes) { + encodings.add(getDefaultEncoding(dataType)); + compressors.add(TSFileDescriptor.getInstance().getConfig().getCompressor()); + } + createAlignedTimeSeries(prefixPath, measurements, dataTypes, encodings, compressors); + } + + @Override + public Set getPathsSetTemplate(String templateName) throws MetadataException { + throw new UnsupportedOperationException(""); + } + + @Override + public Set getPathsUsingTemplate(String templateName) throws MetadataException { + throw new UnsupportedOperationException(""); + } + + @Override + public boolean isTemplateAppendable(Template template, List measurements) + throws MetadataException { + throw new UnsupportedOperationException(""); + } + + @Override + public void setSchemaTemplate(SetTemplatePlan plan) throws MetadataException { + throw new UnsupportedOperationException(""); + } + + @Override + public void unsetSchemaTemplate(UnsetTemplatePlan plan) throws MetadataException { + throw new UnsupportedOperationException(""); + } + + @Override + public void setUsingSchemaTemplate(ActivateTemplatePlan plan) throws MetadataException { + throw new UnsupportedOperationException(""); + } + + @Override + public void activateSchemaTemplate(ActivateTemplateInClusterPlan plan, Template template) + throws MetadataException {} + + @Override + public List getPathsUsingTemplate(int templateId) throws MetadataException { + return null; + } + + @Override + public IMNode getMNodeForTrigger(PartialPath fullPath) throws MetadataException { + throw new UnsupportedOperationException(""); + } + + @Override + public void releaseMNodeAfterDropTrigger(IMNode node) throws MetadataException { + throw new UnsupportedOperationException(""); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java index ae5e7b97b571..9974b982c11a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.metadata.path.MeasurementPath; +import org.apache.iotdb.db.metadata.schemaregion.SchemaEngineMode; import org.apache.iotdb.db.mpp.common.schematree.node.SchemaEntityNode; import org.apache.iotdb.db.mpp.common.schematree.node.SchemaInternalNode; import org.apache.iotdb.db.mpp.common.schematree.node.SchemaMeasurementNode; @@ -78,13 +79,49 @@ public Pair, Integer> searchMeasurementPaths( @Override public Pair, Integer> searchMeasurementPaths(PartialPath pathPattern) { - SchemaTreeMeasurementVisitor visitor = - new SchemaTreeMeasurementVisitor( - root, - pathPattern, - IoTDBDescriptor.getInstance().getConfig().getMaxQueryDeduplicatedPathNum() + 1, - 0, - false); + SchemaTreeMeasurementVisitor visitor; + switch (SchemaEngineMode.valueOf( + IoTDBDescriptor.getInstance().getConfig().getSchemaEngineMode())) { + case Memory: + case Schema_File: + visitor = + new SchemaTreeMeasurementVisitor( + root, + pathPattern, + IoTDBDescriptor.getInstance().getConfig().getMaxQueryDeduplicatedPathNum() + 1, + 0, + false); + break; + case Tag: + if (pathPattern.getFullPath().contains(".**")) { + String measurement = pathPattern.getMeasurement(); + visitor = + new SchemaTreeMeasurementVisitor( + root, + ALL_MATCH_PATTERN.concatNode(measurement), + IoTDBDescriptor.getInstance().getConfig().getMaxQueryDeduplicatedPathNum() + 1, + 0, + false); + } else { + visitor = + new SchemaTreeMeasurementVisitor( + root, + pathPattern, + IoTDBDescriptor.getInstance().getConfig().getMaxQueryDeduplicatedPathNum() + 1, + 0, + false); + } + break; + default: + visitor = + new SchemaTreeMeasurementVisitor( + root, + ALL_MATCH_PATTERN, + IoTDBDescriptor.getInstance().getConfig().getMaxQueryDeduplicatedPathNum() + 1, + 0, + false); + break; + } return new Pair<>(visitor.getAllResult(), visitor.getNextOffset()); } @@ -107,7 +144,20 @@ public List getMatchedDevices(PartialPath pathPattern, boolean @Override public List getMatchedDevices(PartialPath pathPattern) { - SchemaTreeDeviceVisitor visitor = new SchemaTreeDeviceVisitor(root, pathPattern, false); + SchemaTreeDeviceVisitor visitor; + switch (SchemaEngineMode.valueOf( + IoTDBDescriptor.getInstance().getConfig().getSchemaEngineMode())) { + case Memory: + case Schema_File: + visitor = new SchemaTreeDeviceVisitor(root, pathPattern, false); + break; + case Tag: + visitor = new SchemaTreeDeviceVisitor(root, ALL_MATCH_PATTERN, false); + break; + default: + visitor = new SchemaTreeDeviceVisitor(root, ALL_MATCH_PATTERN, false); + break; + } return visitor.getAllResult(); } diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/dto/IoTDBPoint.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/dto/IoTDBPoint.java index ac7c1ce9842e..9f5aea798383 100644 --- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/dto/IoTDBPoint.java +++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/dto/IoTDBPoint.java @@ -21,7 +21,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.query.QueryProcessException; -import org.apache.iotdb.db.protocol.influxdb.meta.AbstractInfluxDBMetaManager; +import org.apache.iotdb.db.protocol.influxdb.meta.IInfluxDBMetaManager; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.utils.DataTypeUtils; import org.apache.iotdb.db.utils.ParameterUtils; @@ -59,7 +59,7 @@ public IoTDBPoint( } public IoTDBPoint( - String database, Point point, AbstractInfluxDBMetaManager metaManager, long sessionID) { + String database, Point point, IInfluxDBMetaManager influxDBMetaManager, long sessionID) { String measurement = null; Map tags = new HashMap<>(); Map fields = new HashMap<>(); @@ -105,7 +105,8 @@ public IoTDBPoint( } ParameterUtils.checkNonEmptyString(database, "database"); ParameterUtils.checkNonEmptyString(measurement, "measurement name"); - String path = metaManager.generatePath(database, measurement, tags, sessionID); + String path = + influxDBMetaManager.generatePath(database, measurement, tags, fields.keySet(), sessionID); List measurements = new ArrayList<>(); List types = new ArrayList<>(); List values = new ArrayList<>(); diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/AbstractQueryHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/AbstractQueryHandler.java index 78899f7e34bb..10bf4dced928 100644 --- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/AbstractQueryHandler.java +++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/AbstractQueryHandler.java @@ -29,7 +29,7 @@ import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunctionValue; import org.apache.iotdb.db.protocol.influxdb.function.aggregator.InfluxAggregator; import org.apache.iotdb.db.protocol.influxdb.function.selector.InfluxSelector; -import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager; +import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManagerFactory; import org.apache.iotdb.db.protocol.influxdb.operator.InfluxQueryOperator; import org.apache.iotdb.db.protocol.influxdb.operator.InfluxSelectComponent; import org.apache.iotdb.db.protocol.influxdb.util.FilterUtils; @@ -40,7 +40,6 @@ import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator; import org.apache.iotdb.db.qp.logical.crud.FilterOperator; -import org.apache.iotdb.db.service.basic.ServiceProvider; import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxQueryResultRsp; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -57,30 +56,24 @@ public abstract class AbstractQueryHandler { - abstract Map getFieldOrders( - String database, String measurement, ServiceProvider serviceProvider, long sessionId); - abstract InfluxFunctionValue updateByIoTDBFunc( - InfluxFunction function, ServiceProvider serviceProvider, String path, long sessionid); + String database, String measurement, InfluxFunction function, long sessionid); abstract QueryResult queryByConditions( String querySql, String database, String measurement, - ServiceProvider serviceProvider, + Map tagOrders, Map fieldOrders, long sessionId) throws AuthException; public final InfluxQueryResultRsp queryInfluxDB( - String database, - InfluxQueryOperator queryOperator, - long sessionId, - ServiceProvider serviceProvider) { + String database, InfluxQueryOperator queryOperator, long sessionId) { String measurement = queryOperator.getFromComponent().getPrefixPaths().get(0).getFullPath(); // The list of fields under the current measurement and the order of the specified rules Map fieldOrders = - getFieldOrders(database, measurement, serviceProvider, sessionId); + InfluxDBMetaManagerFactory.getInstance().getFieldOrders(database, measurement, sessionId); QueryResult queryResult; InfluxQueryResultRsp tsQueryResultRsp = new InfluxQueryResultRsp(); try { @@ -96,7 +89,6 @@ public final InfluxQueryResultRsp queryInfluxDB( : null, database, measurement, - serviceProvider, fieldOrders, sessionId); // step2 : select filter @@ -106,11 +98,7 @@ public final InfluxQueryResultRsp queryInfluxDB( else { queryResult = queryFuncWithoutFilter( - queryOperator.getSelectComponent(), - database, - measurement, - serviceProvider, - sessionId); + queryOperator.getSelectComponent(), database, measurement, sessionId); } return tsQueryResultRsp .setResultJsonString(JacksonUtils.bean2Json(queryResult)) @@ -274,18 +262,14 @@ else if (selectComponent.isHasCommonQuery()) { * @param selectComponent select data to query * @return select query result */ - public final QueryResult queryFuncWithoutFilter( - InfluxSelectComponent selectComponent, - String database, - String measurement, - ServiceProvider serviceProvider, - long sessionid) { + public QueryResult queryFuncWithoutFilter( + InfluxSelectComponent selectComponent, String database, String measurement, long sessionid) { // columns List columns = new ArrayList<>(); columns.add(InfluxSQLConstant.RESERVED_TIME); List functions = new ArrayList<>(); - String path = "root." + database + "." + measurement; + for (ResultColumn resultColumn : selectComponent.getResultColumns()) { Expression expression = resultColumn.getExpression(); if (expression instanceof FunctionExpression) { @@ -300,7 +284,7 @@ public final QueryResult queryFuncWithoutFilter( List> values = new ArrayList<>(); for (InfluxFunction function : functions) { InfluxFunctionValue functionValue = - updateByIoTDBFunc(function, serviceProvider, path, sessionid); + updateByIoTDBFunc(database, measurement, function, sessionid); // InfluxFunctionValue functionValue = function.calculateByIoTDBFunc(); if (value.size() == 0) { value.add(functionValue.getTimestamp()); @@ -330,40 +314,33 @@ public QueryResult queryExpr( FilterOperator operator, String database, String measurement, - ServiceProvider serviceProvider, Map fieldOrders, Long sessionId) throws AuthException { if (operator == null) { List expressions = new ArrayList<>(); - return queryByConditions( - expressions, database, measurement, serviceProvider, fieldOrders, sessionId); + return queryByConditions(expressions, database, measurement, fieldOrders, sessionId); } else if (operator instanceof BasicFunctionOperator) { List iExpressions = new ArrayList<>(); iExpressions.add(getIExpressionForBasicFunctionOperator((BasicFunctionOperator) operator)); - return queryByConditions( - iExpressions, database, measurement, serviceProvider, fieldOrders, sessionId); + return queryByConditions(iExpressions, database, measurement, fieldOrders, sessionId); } else { FilterOperator leftOperator = operator.getChildren().get(0); FilterOperator rightOperator = operator.getChildren().get(1); if (operator.getFilterType() == FilterConstant.FilterType.KW_OR) { return QueryResultUtils.orQueryResultProcess( - queryExpr(leftOperator, database, measurement, serviceProvider, fieldOrders, sessionId), - queryExpr( - rightOperator, database, measurement, serviceProvider, fieldOrders, sessionId)); + queryExpr(leftOperator, database, measurement, fieldOrders, sessionId), + queryExpr(rightOperator, database, measurement, fieldOrders, sessionId)); } else if (operator.getFilterType() == FilterConstant.FilterType.KW_AND) { if (canMergeOperator(leftOperator) && canMergeOperator(rightOperator)) { List iExpressions1 = getIExpressionByFilterOperatorOperator(leftOperator); List iExpressions2 = getIExpressionByFilterOperatorOperator(rightOperator); iExpressions1.addAll(iExpressions2); - return queryByConditions( - iExpressions1, database, measurement, serviceProvider, fieldOrders, sessionId); + return queryByConditions(iExpressions1, database, measurement, fieldOrders, sessionId); } else { return QueryResultUtils.andQueryResultProcess( - queryExpr( - leftOperator, database, measurement, serviceProvider, fieldOrders, sessionId), - queryExpr( - rightOperator, database, measurement, serviceProvider, fieldOrders, sessionId)); + queryExpr(leftOperator, database, measurement, fieldOrders, sessionId), + queryExpr(rightOperator, database, measurement, fieldOrders, sessionId)); } } } @@ -376,11 +353,10 @@ public QueryResult queryExpr( * @param expressions list of conditions, including tag and field condition * @return returns the results of the influxdb query */ - private QueryResult queryByConditions( + public QueryResult queryByConditions( List expressions, String database, String measurement, - ServiceProvider serviceProvider, Map fieldOrders, Long sessionId) throws AuthException { @@ -390,7 +366,8 @@ private QueryResult queryByConditions( List fieldExpressions = new ArrayList<>(); // maximum number of tags in the current query criteria int currentQueryMaxTagNum = 0; - Map tagOrders = InfluxDBMetaManager.getTagOrders(database, measurement); + Map tagOrders = + InfluxDBMetaManagerFactory.getInstance().getTagOrders(database, measurement, sessionId); for (IExpression expression : expressions) { SingleSeriesExpression singleSeriesExpression = ((SingleSeriesExpression) expression); // the current condition is in tag @@ -445,8 +422,7 @@ private QueryResult queryByConditions( realQuerySql += " where " + realIotDBCondition; } realQuerySql += " align by device"; - return queryByConditions( - realQuerySql, database, measurement, serviceProvider, fieldOrders, sessionId); + return queryByConditions(realQuerySql, database, measurement, null, fieldOrders, sessionId); } /** diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/NewQueryHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/NewQueryHandler.java index ee8d0db5c06b..3d536df10a32 100644 --- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/NewQueryHandler.java +++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/NewQueryHandler.java @@ -18,77 +18,31 @@ */ package org.apache.iotdb.db.protocol.influxdb.handler; -import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.db.protocol.influxdb.constant.InfluxConstant; import org.apache.iotdb.db.protocol.influxdb.constant.InfluxSQLConstant; import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunction; import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunctionValue; -import org.apache.iotdb.db.protocol.influxdb.meta.NewInfluxDBMetaManager; import org.apache.iotdb.db.protocol.influxdb.util.QueryResultUtils; import org.apache.iotdb.db.protocol.influxdb.util.StringUtils; -import org.apache.iotdb.db.service.basic.ServiceProvider; import org.apache.iotdb.db.service.thrift.impl.NewInfluxDBServiceImpl; -import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq; import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; -import org.influxdb.InfluxDBException; import org.influxdb.dto.QueryResult; -import java.util.HashMap; import java.util.List; import java.util.Map; public class NewQueryHandler extends AbstractQueryHandler { - public static TSExecuteStatementResp executeStatement(String sql, long sessionId) { - TSExecuteStatementReq tsExecuteStatementReq = new TSExecuteStatementReq(); - tsExecuteStatementReq.setStatement(sql); - tsExecuteStatementReq.setSessionId(sessionId); - tsExecuteStatementReq.setStatementId( - NewInfluxDBServiceImpl.getClientRPCService().requestStatementId(sessionId)); - tsExecuteStatementReq.setFetchSize(InfluxConstant.DEFAULT_FETCH_SIZE); - TSExecuteStatementResp executeStatementResp = - NewInfluxDBServiceImpl.getClientRPCService().executeStatement(tsExecuteStatementReq); - TSStatus tsStatus = executeStatementResp.getStatus(); - if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - throw new InfluxDBException(tsStatus.getMessage()); - } - return executeStatementResp; - } - - @Override - public Map getFieldOrders( - String database, String measurement, ServiceProvider serviceProvider, long sessionID) { - Map fieldOrders = new HashMap<>(); - String showTimeseriesSql = "show timeseries root." + database + '.' + measurement + ".**"; - TSExecuteStatementResp executeStatementResp = executeStatement(showTimeseriesSql, sessionID); - List paths = QueryResultUtils.getFullPaths(executeStatementResp); - Map tagOrders = NewInfluxDBMetaManager.getTagOrders(database, measurement); - int tagOrderNums = tagOrders.size(); - int fieldNums = 0; - for (String path : paths) { - String filed = StringUtils.getFieldByPath(path); - if (!fieldOrders.containsKey(filed)) { - // The corresponding order of fields is 1 + tagNum (the first is timestamp, then all tags, - // and finally all fields) - fieldOrders.put(filed, tagOrderNums + fieldNums + 1); - fieldNums++; - } - } - return fieldOrders; - } - - @Override - public InfluxFunctionValue updateByIoTDBFunc( - InfluxFunction function, ServiceProvider serviceProvider, String path, long sessionid) { + public final InfluxFunctionValue updateByIoTDBFunc( + String path, InfluxFunction function, long sessionid) { switch (function.getFunctionName()) { case InfluxSQLConstant.COUNT: { String functionSql = StringUtils.generateFunctionSql( function.getFunctionName(), function.getParmaName(), path); - TSExecuteStatementResp tsExecuteStatementResp = executeStatement(functionSql, sessionid); + TSExecuteStatementResp tsExecuteStatementResp = + NewInfluxDBServiceImpl.executeStatement(functionSql, sessionid); List list = QueryResultUtils.getInfluxFunctionValues(tsExecuteStatementResp); for (InfluxFunctionValue influxFunctionValue : list) { @@ -101,7 +55,7 @@ public InfluxFunctionValue updateByIoTDBFunc( String functionSqlCount = StringUtils.generateFunctionSql("count", function.getParmaName(), path); TSExecuteStatementResp tsExecuteStatementResp = - executeStatement(functionSqlCount, sessionid); + NewInfluxDBServiceImpl.executeStatement(functionSqlCount, sessionid); List list = QueryResultUtils.getInfluxFunctionValues(tsExecuteStatementResp); for (InfluxFunctionValue influxFunctionValue : list) { @@ -109,7 +63,8 @@ public InfluxFunctionValue updateByIoTDBFunc( } String functionSqlSum = StringUtils.generateFunctionSql("sum", function.getParmaName(), path); - tsExecuteStatementResp = executeStatement(functionSqlSum, sessionid); + tsExecuteStatementResp = + NewInfluxDBServiceImpl.executeStatement(functionSqlSum, sessionid); list = QueryResultUtils.getInfluxFunctionValues(tsExecuteStatementResp); for (InfluxFunctionValue influxFunctionValue : list) { function.updateValueIoTDBFunc(null, influxFunctionValue); @@ -120,7 +75,8 @@ public InfluxFunctionValue updateByIoTDBFunc( { String functionSql = StringUtils.generateFunctionSql("sum", function.getParmaName(), path); - TSExecuteStatementResp tsExecuteStatementResp = executeStatement(functionSql, sessionid); + TSExecuteStatementResp tsExecuteStatementResp = + NewInfluxDBServiceImpl.executeStatement(functionSql, sessionid); List list = QueryResultUtils.getInfluxFunctionValues(tsExecuteStatementResp); for (InfluxFunctionValue influxFunctionValue : list) { @@ -142,7 +98,8 @@ public InfluxFunctionValue updateByIoTDBFunc( StringUtils.generateFunctionSql("last_value", function.getParmaName(), path); functionName = "last_value"; } - TSExecuteStatementResp tsExecuteStatementResp = executeStatement(functionSql, sessionid); + TSExecuteStatementResp tsExecuteStatementResp = + NewInfluxDBServiceImpl.executeStatement(functionSql, sessionid); Map map = QueryResultUtils.getColumnNameAndValue(tsExecuteStatementResp); for (String colume : map.keySet()) { Object o = map.get(colume); @@ -152,7 +109,8 @@ public InfluxFunctionValue updateByIoTDBFunc( String.format( "select %s from %s where %s=%s", function.getParmaName(), devicePath, fullPath, o); - TSExecuteStatementResp resp = executeStatement(specificSql, sessionid); + TSExecuteStatementResp resp = + NewInfluxDBServiceImpl.executeStatement(specificSql, sessionid); List list = QueryResultUtils.getInfluxFunctionValues(resp); for (InfluxFunctionValue influxFunctionValue : list) { function.updateValueIoTDBFunc(influxFunctionValue); @@ -171,7 +129,8 @@ public InfluxFunctionValue updateByIoTDBFunc( functionSql = StringUtils.generateFunctionSql("min_value", function.getParmaName(), path); } - TSExecuteStatementResp tsExecuteStatementResp = executeStatement(functionSql, sessionid); + TSExecuteStatementResp tsExecuteStatementResp = + NewInfluxDBServiceImpl.executeStatement(functionSql, sessionid); List list = QueryResultUtils.getInfluxFunctionValues(tsExecuteStatementResp); for (InfluxFunctionValue influxFunctionValue : list) { @@ -185,15 +144,23 @@ public InfluxFunctionValue updateByIoTDBFunc( return function.calculateByIoTDBFunc(); } + @Override + public InfluxFunctionValue updateByIoTDBFunc( + String database, String measurement, InfluxFunction function, long sessionid) { + String path = "root." + database + "." + measurement; + return updateByIoTDBFunc(path, function, sessionid); + } + @Override public QueryResult queryByConditions( String querySql, String database, String measurement, - ServiceProvider serviceProvider, + Map tagOrders, Map fieldOrders, long sessionId) { - TSExecuteStatementResp executeStatementResp = executeStatement(querySql, sessionId); + TSExecuteStatementResp executeStatementResp = + NewInfluxDBServiceImpl.executeStatement(querySql, sessionId); return QueryResultUtils.iotdbResultConvertInfluxResult( executeStatementResp, database, measurement, fieldOrders); } diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java index b58b65f6bf5c..3a895bcb9a2b 100644 --- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java +++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java @@ -27,14 +27,13 @@ import org.apache.iotdb.db.protocol.influxdb.constant.InfluxSQLConstant; import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunction; import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunctionValue; -import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager; import org.apache.iotdb.db.protocol.influxdb.util.FieldUtils; import org.apache.iotdb.db.protocol.influxdb.util.QueryResultUtils; import org.apache.iotdb.db.protocol.influxdb.util.StringUtils; -import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.SessionManager; +import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.service.basic.ServiceProvider; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; import org.apache.iotdb.tsfile.read.common.Field; @@ -49,62 +48,17 @@ import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; public class QueryHandler extends AbstractQueryHandler { - @Override - public Map getFieldOrders( - String database, String measurement, ServiceProvider serviceProvider, long sessionID) { - Map fieldOrders = new HashMap<>(); - long queryId = ServiceProvider.SESSION_MANAGER.requestQueryId(true); - try { - String showTimeseriesSql = "show timeseries root." + database + '.' + measurement + ".**"; - PhysicalPlan physicalPlan = - serviceProvider.getPlanner().parseSQLToPhysicalPlan(showTimeseriesSql); - QueryContext queryContext = - serviceProvider.genQueryContext( - queryId, - true, - System.currentTimeMillis(), - showTimeseriesSql, - InfluxConstant.DEFAULT_CONNECTION_TIMEOUT_MS); - QueryDataSet queryDataSet = - serviceProvider.createQueryDataSet( - queryContext, physicalPlan, InfluxConstant.DEFAULT_FETCH_SIZE); - int fieldNums = 0; - Map tagOrders = InfluxDBMetaManager.getTagOrders(database, measurement); - int tagOrderNums = tagOrders.size(); - while (queryDataSet.hasNext()) { - List fields = queryDataSet.next().getFields(); - String filed = StringUtils.getFieldByPath(fields.get(0).getStringValue()); - if (!fieldOrders.containsKey(filed)) { - // The corresponding order of fields is 1 + tagNum (the first is timestamp, then all tags, - // and finally all fields) - fieldOrders.put(filed, tagOrderNums + fieldNums + 1); - fieldNums++; - } - } - } catch (QueryProcessException - | TException - | StorageEngineException - | SQLException - | IOException - | InterruptedException - | QueryFilterOptimizationException - | MetadataException e) { - throw new InfluxDBException(e.getMessage()); - } finally { - ServiceProvider.SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId); - } - return fieldOrders; - } + ServiceProvider serviceProvider = IoTDB.serviceProvider; @Override public InfluxFunctionValue updateByIoTDBFunc( - InfluxFunction function, ServiceProvider serviceProvider, String path, long sessionid) { + String database, String measurement, InfluxFunction function, long sessionid) { + String path = "root." + database + "." + measurement; switch (function.getFunctionName()) { case InfluxSQLConstant.COUNT: { @@ -481,7 +435,7 @@ public QueryResult queryByConditions( String querySql, String database, String measurement, - ServiceProvider serviceProvider, + Map tagOrders, Map fieldOrders, long sessionId) throws AuthException { diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandlerFactory.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandlerFactory.java new file mode 100644 index 000000000000..bfdf413a9c30 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandlerFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.protocol.influxdb.handler; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.metadata.schemaregion.SchemaEngineMode; +import org.apache.iotdb.db.service.thrift.impl.ClientRPCServiceImpl; + +public class QueryHandlerFactory { + public static AbstractQueryHandler getInstance() { + if (IoTDBDescriptor.getInstance() + .getConfig() + .getRpcImplClassName() + .equals(ClientRPCServiceImpl.class.getName())) { + switch (SchemaEngineMode.valueOf( + IoTDBDescriptor.getInstance().getConfig().getSchemaEngineMode())) { + case Tag: + return new TagQueryHandler(); + default: + return new NewQueryHandler(); + } + } else { + return new QueryHandler(); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/TagQueryHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/TagQueryHandler.java new file mode 100644 index 000000000000..5344bba68c90 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/TagQueryHandler.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.protocol.influxdb.handler; + +import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunction; +import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunctionValue; +import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManagerFactory; +import org.apache.iotdb.db.protocol.influxdb.util.FilterUtils; +import org.apache.iotdb.db.protocol.influxdb.util.QueryResultUtils; +import org.apache.iotdb.db.protocol.influxdb.util.StringUtils; +import org.apache.iotdb.db.service.thrift.impl.NewInfluxDBServiceImpl; +import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; +import org.apache.iotdb.tsfile.read.expression.IExpression; +import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; + +import org.influxdb.dto.QueryResult; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** use in tag schema region */ +public class TagQueryHandler extends NewQueryHandler { + + @Override + public InfluxFunctionValue updateByIoTDBFunc( + String database, String measurement, InfluxFunction function, long sessionid) { + String path = "root." + database + ".measurement." + measurement; + return updateByIoTDBFunc(path, function, sessionid); + } + + @Override + public QueryResult queryByConditions( + String querySql, + String database, + String measurement, + Map tagOrders, + Map fieldOrders, + long sessionId) { + TSExecuteStatementResp executeStatementResp = + NewInfluxDBServiceImpl.executeStatement(querySql, sessionId); + return QueryResultUtils.iotdbResultConvertInfluxResult( + executeStatementResp, database, measurement, tagOrders, fieldOrders); + } + + @Override + public QueryResult queryByConditions( + List expressions, + String database, + String measurement, + Map fieldOrders, + Long sessionId) { + List fieldExpressions = new ArrayList<>(); + List tagExpressions = new ArrayList<>(); + Map tagOrders = + InfluxDBMetaManagerFactory.getInstance().getTagOrders(database, measurement, sessionId); + for (IExpression expression : expressions) { + SingleSeriesExpression singleSeriesExpression = ((SingleSeriesExpression) expression); + // the current condition is in tag + if (tagOrders.containsKey(singleSeriesExpression.getSeriesPath().getFullPath())) { + tagExpressions.add(singleSeriesExpression); + } else { + fieldExpressions.add(singleSeriesExpression); + } + } + // construct the actual query path + StringBuilder curQueryPath = + new StringBuilder("root." + database + ".measurement." + measurement); + for (SingleSeriesExpression singleSeriesExpression : tagExpressions) { + String tagKey = singleSeriesExpression.getSeriesPath().getFullPath(); + String tagValue = + StringUtils.removeQuotation( + FilterUtils.getFilterStringValue(singleSeriesExpression.getFilter())); + curQueryPath.append(".").append(tagKey).append(".").append(tagValue); + } + curQueryPath.append(".**"); + + // construct actual query condition + StringBuilder realIotDBCondition = new StringBuilder(); + for (int i = 0; i < fieldExpressions.size(); i++) { + SingleSeriesExpression singleSeriesExpression = fieldExpressions.get(i); + if (i != 0) { + realIotDBCondition.append(" and "); + } + realIotDBCondition + .append(singleSeriesExpression.getSeriesPath().getFullPath()) + .append(" ") + .append((FilterUtils.getFilerSymbol(singleSeriesExpression.getFilter()))) + .append(" ") + .append(FilterUtils.getFilterStringValue(singleSeriesExpression.getFilter())); + } + // actual query SQL statement + String realQuerySql; + + realQuerySql = "select * from " + curQueryPath; + if (!(realIotDBCondition.length() == 0)) { + realQuerySql += " where " + realIotDBCondition; + } + realQuerySql += " align by device"; + return queryByConditions( + realQuerySql, database, measurement, tagOrders, fieldOrders, sessionId); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/AbstractInfluxDBMetaManager.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/AbstractInfluxDBMetaManager.java index 513b06e59f7b..176230f39d91 100644 --- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/AbstractInfluxDBMetaManager.java +++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/AbstractInfluxDBMetaManager.java @@ -22,8 +22,9 @@ import java.util.HashMap; import java.util.Map; +import java.util.Set; -public abstract class AbstractInfluxDBMetaManager { +public abstract class AbstractInfluxDBMetaManager implements IInfluxDBMetaManager { protected static final String SELECT_TAG_INFO_SQL = "select database_name,measurement_name,tag_name,tag_order from root.TAG_INFO "; @@ -32,7 +33,8 @@ public abstract class AbstractInfluxDBMetaManager { protected static Map>> database2Measurement2TagOrders = new HashMap<>(); - public static Map getTagOrders(String database, String measurement) { + @Override + public Map getTagOrders(String database, String measurement, long sessionID) { Map tagOrders = new HashMap<>(); Map> measurement2TagOrders = database2Measurement2TagOrders.get(database); @@ -45,8 +47,6 @@ public static Map getTagOrders(String database, String measurem return tagOrders; } - abstract void recover(); - abstract void setStorageGroup(String database, long sessionID); abstract void updateTagInfoRecords(TagInfoRecords tagInfoRecords, long sessionID); @@ -69,8 +69,13 @@ public final synchronized Map getTagOrdersWithAutoCreatingSchem return createDatabase(database, sessionID).computeIfAbsent(measurement, m -> new HashMap<>()); } + @Override public final synchronized String generatePath( - String database, String measurement, Map tags, long sessionID) { + String database, + String measurement, + Map tags, + Set fields, + long sessionID) { Map tagKeyToLayerOrders = getTagOrdersWithAutoCreatingSchema(database, measurement, sessionID); // to support rollback if fails to persisting new tag info diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/IInfluxDBMetaManager.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/IInfluxDBMetaManager.java new file mode 100644 index 000000000000..e3804ca5e76b --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/IInfluxDBMetaManager.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.protocol.influxdb.meta; + +import java.util.Map; +import java.util.Set; + +/** used to manage influxdb metadata */ +public interface IInfluxDBMetaManager { + + void recover(); + + Map getFieldOrders(String database, String measurement, long sessionId); + + String generatePath( + String database, + String measurement, + Map tags, + Set fields, + long sessionID); + + Map getTagOrders(String database, String measurement, long sessionID); +} diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/InfluxDBMetaManager.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/InfluxDBMetaManager.java index f2e58de977ed..e59d96dfbee3 100644 --- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/InfluxDBMetaManager.java +++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/InfluxDBMetaManager.java @@ -25,7 +25,10 @@ import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.protocol.influxdb.constant.InfluxConstant; +import org.apache.iotdb.db.protocol.influxdb.util.StringUtils; import org.apache.iotdb.db.qp.Planner; +import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan; @@ -142,6 +145,53 @@ public void updateTagInfoRecords(TagInfoRecords tagInfoRecords, long sessionID) } } + @Override + public Map getFieldOrders(String database, String measurement, long sessionID) { + Map fieldOrders = new HashMap<>(); + long queryId = ServiceProvider.SESSION_MANAGER.requestQueryId(true); + try { + String showTimeseriesSql = "show timeseries root." + database + '.' + measurement + ".**"; + PhysicalPlan physicalPlan = + serviceProvider.getPlanner().parseSQLToPhysicalPlan(showTimeseriesSql); + QueryContext queryContext = + serviceProvider.genQueryContext( + queryId, + true, + System.currentTimeMillis(), + showTimeseriesSql, + InfluxConstant.DEFAULT_CONNECTION_TIMEOUT_MS); + QueryDataSet queryDataSet = + serviceProvider.createQueryDataSet( + queryContext, physicalPlan, InfluxConstant.DEFAULT_FETCH_SIZE); + int fieldNums = 0; + Map tagOrders = + InfluxDBMetaManagerFactory.getInstance().getTagOrders(database, measurement, sessionID); + int tagOrderNums = tagOrders.size(); + while (queryDataSet.hasNext()) { + List fields = queryDataSet.next().getFields(); + String filed = StringUtils.getFieldByPath(fields.get(0).getStringValue()); + if (!fieldOrders.containsKey(filed)) { + // The corresponding order of fields is 1 + tagNum (the first is timestamp, then all tags, + // and finally all fields) + fieldOrders.put(filed, tagOrderNums + fieldNums + 1); + fieldNums++; + } + } + } catch (QueryProcessException + | TException + | StorageEngineException + | SQLException + | IOException + | InterruptedException + | QueryFilterOptimizationException + | MetadataException e) { + throw new InfluxDBException(e.getMessage()); + } finally { + ServiceProvider.SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId); + } + return fieldOrders; + } + private static class InfluxDBMetaManagerHolder { private static final InfluxDBMetaManager INSTANCE = new InfluxDBMetaManager(); diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/InfluxDBMetaManagerFactory.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/InfluxDBMetaManagerFactory.java new file mode 100644 index 000000000000..abad6b3e0ad0 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/InfluxDBMetaManagerFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.protocol.influxdb.meta; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.metadata.schemaregion.SchemaEngineMode; +import org.apache.iotdb.db.service.thrift.impl.ClientRPCServiceImpl; + +public class InfluxDBMetaManagerFactory { + public static IInfluxDBMetaManager getInstance() { + if (IoTDBDescriptor.getInstance() + .getConfig() + .getRpcImplClassName() + .equals(ClientRPCServiceImpl.class.getName())) { + switch (SchemaEngineMode.valueOf( + IoTDBDescriptor.getInstance().getConfig().getSchemaEngineMode())) { + case Tag: + return TagInfluxDBMetaManager.getInstance(); + default: + return NewInfluxDBMetaManager.getInstance(); + } + } else { + return InfluxDBMetaManager.getInstance(); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/NewInfluxDBMetaManager.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/NewInfluxDBMetaManager.java index 5269a2bf443b..10685303c675 100644 --- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/NewInfluxDBMetaManager.java +++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/NewInfluxDBMetaManager.java @@ -19,8 +19,8 @@ package org.apache.iotdb.db.protocol.influxdb.meta; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.db.protocol.influxdb.handler.NewQueryHandler; import org.apache.iotdb.db.protocol.influxdb.util.QueryResultUtils; +import org.apache.iotdb.db.protocol.influxdb.util.StringUtils; import org.apache.iotdb.db.service.thrift.impl.ClientRPCServiceImpl; import org.apache.iotdb.db.service.thrift.impl.NewInfluxDBServiceImpl; import org.apache.iotdb.rpc.IoTDBConnectionException; @@ -57,10 +57,13 @@ public void recover() { try { TSOpenSessionResp tsOpenSessionResp = clientRPCService.openSession( - new TSOpenSessionReq().setUsername("root").setPassword("root")); + new TSOpenSessionReq() + .setUsername("root") + .setPassword("root") + .setZoneId("Asia/Shanghai")); sessionID = tsOpenSessionResp.getSessionId(); TSExecuteStatementResp resp = - NewQueryHandler.executeStatement(SELECT_TAG_INFO_SQL, sessionID); + NewInfluxDBServiceImpl.executeStatement(SELECT_TAG_INFO_SQL, sessionID); IoTDBJDBCDataSet dataSet = QueryResultUtils.creatIoTJDBCDataset(resp); try { Map> measurement2TagOrders; @@ -121,6 +124,29 @@ public void updateTagInfoRecords(TagInfoRecords tagInfoRecords, long sessionID) } } + @Override + public Map getFieldOrders(String database, String measurement, long sessionID) { + Map fieldOrders = new HashMap<>(); + String showTimeseriesSql = "show timeseries root." + database + '.' + measurement + ".**"; + TSExecuteStatementResp executeStatementResp = + NewInfluxDBServiceImpl.executeStatement(showTimeseriesSql, sessionID); + List paths = QueryResultUtils.getFullPaths(executeStatementResp); + Map tagOrders = + InfluxDBMetaManagerFactory.getInstance().getTagOrders(database, measurement, sessionID); + int tagOrderNums = tagOrders.size(); + int fieldNums = 0; + for (String path : paths) { + String filed = StringUtils.getFieldByPath(path); + if (!fieldOrders.containsKey(filed)) { + // The corresponding order of fields is 1 + tagNum (the first is timestamp, then all tags, + // and finally all fields) + fieldOrders.put(filed, tagOrderNums + fieldNums + 1); + fieldNums++; + } + } + return fieldOrders; + } + private static class InfluxDBMetaManagerHolder { private static final NewInfluxDBMetaManager INSTANCE = new NewInfluxDBMetaManager(); diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/TagInfluxDBMetaManager.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/TagInfluxDBMetaManager.java new file mode 100644 index 000000000000..87fd7da44e7f --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/meta/TagInfluxDBMetaManager.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.protocol.influxdb.meta; + +import org.apache.iotdb.db.protocol.influxdb.util.QueryResultUtils; +import org.apache.iotdb.db.protocol.influxdb.util.StringUtils; +import org.apache.iotdb.db.service.thrift.impl.NewInfluxDBServiceImpl; +import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +/** use in tag schema region */ +public class TagInfluxDBMetaManager implements IInfluxDBMetaManager { + private static final String STORAGE_GROUP_PATH = "root.influxdbmate"; + + private static final String TAGS_SET = "set.tags"; + + private static final String FIELDS_SET = "set.fields"; + + private TagInfluxDBMetaManager() {} + + public static TagInfluxDBMetaManager getInstance() { + return TagInfluxDBMetaManagerHolder.INSTANCE; + } + + /** use tag schema region to save state information, no need to recover here */ + @Override + public void recover() {} + + /** + * get the fields information of influxdb corresponding database and measurement through tag + * schema region + * + * @param database influxdb database + * @param measurement influxdb measurement + * @param sessionId session id + * @return field information + */ + @Override + public Map getFieldOrders(String database, String measurement, long sessionId) { + return getTimeseriesFieldOrders(database, measurement, FIELDS_SET, sessionId); + } + + /** + * convert the database,measurement,and tags of influxdb to device path of IoTDB,and save the tags + * and fields information of the database and measurement to the tag schema region + * + * @param database influxdb database + * @param measurement influxdb measurement + * @param tags influxdb tags + * @param fields influxdb fields + * @param sessionID session id + * @return device path + */ + @Override + public String generatePath( + String database, + String measurement, + Map tags, + Set fields, + long sessionID) { + createInfluxDBMetaTimeseries(database, measurement, tags, fields, sessionID); + return generateDevicesPath(database, measurement, tags); + } + + private void createInfluxDBMetaTimeseries( + String database, + String measurement, + Map tags, + Set fields, + long sessionID) { + List fieldsList = new ArrayList<>(tags.keySet()); + createInfluxDBMetaTimeseries(database, measurement, TAGS_SET, fieldsList, sessionID); + fieldsList.clear(); + fieldsList.addAll(fields); + createInfluxDBMetaTimeseries(database, measurement, FIELDS_SET, fieldsList, sessionID); + } + + private void createInfluxDBMetaTimeseries( + String database, String measurement, String device, List fields, long sessionID) { + String statement = generateTimeseriesStatement(database, measurement, device, fields); + NewInfluxDBServiceImpl.executeStatement(statement, sessionID); + } + + private String generateTimeseriesStatement( + String database, String measurement, String device, List fields) { + StringBuilder timeseriesStatement = + new StringBuilder( + "create aligned timeseries " + + STORAGE_GROUP_PATH + + ".database." + + database + + ".measurement." + + measurement + + "." + + device + + "("); + for (int i = 0; i < fields.size() - 1; i++) { + String field = fields.get(i); + timeseriesStatement.append(field).append(" BOOLEAN, "); + } + timeseriesStatement.append(fields.get(fields.size() - 1)).append(" BOOLEAN)"); + return timeseriesStatement.toString(); + } + + /** + * get the tags information of influxdb corresponding database and measurement through tag schema + * region + * + * @param database influxdb database + * @param measurement influxdb measurement + * @param sessionID session id + * @return tags information + */ + @Override + public Map getTagOrders(String database, String measurement, long sessionID) { + return getTimeseriesFieldOrders(database, measurement, TAGS_SET, sessionID); + } + + private Map getTimeseriesFieldOrders( + String database, String measurement, String device, long sessionID) { + TSExecuteStatementResp statementResp = + NewInfluxDBServiceImpl.executeStatement( + "show timeseries " + + STORAGE_GROUP_PATH + + ".database." + + database + + ".measurement." + + measurement + + "." + + device, + sessionID); + List timeseriesPaths = QueryResultUtils.getFullPaths(statementResp); + Map fieldOrders = new HashMap<>(); + for (String timeseriesPath : timeseriesPaths) { + String field = StringUtils.getFieldByPath(timeseriesPath); + fieldOrders.put(field, fieldOrders.size()); + } + return fieldOrders; + } + + /** + * convert the database,measurement,and tags of influxdb to device path of IoTDB,ensure that + * influxdb records with the same semantics generate the same device path, so the device path is + * generated in order after sorting the tags + * + * @param database influxdb database + * @param measurement influxdb measurement + * @param tags influxdb tags + * @return device path + */ + private String generateDevicesPath( + String database, String measurement, Map tags) { + TreeMap tagsMap = new TreeMap<>(tags); + tagsMap.put("measurement", measurement); + StringBuilder devicePath = new StringBuilder("root." + database); + for (String tagKey : tagsMap.keySet()) { + devicePath.append(".").append(tagKey).append(".").append(tagsMap.get(tagKey)); + } + return devicePath.toString(); + } + + private static class TagInfluxDBMetaManagerHolder { + private static final TagInfluxDBMetaManager INSTANCE = new TagInfluxDBMetaManager(); + + private TagInfluxDBMetaManagerHolder() {} + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/QueryResultUtils.java b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/QueryResultUtils.java index 325971df2598..25199f6429f3 100644 --- a/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/QueryResultUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/QueryResultUtils.java @@ -20,7 +20,7 @@ import org.apache.iotdb.db.protocol.influxdb.constant.InfluxConstant; import org.apache.iotdb.db.protocol.influxdb.function.InfluxFunctionValue; -import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager; +import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManagerFactory; import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet; import org.apache.iotdb.rpc.IoTDBJDBCDataSet; import org.apache.iotdb.rpc.StatementExecutionException; @@ -82,7 +82,8 @@ public static QueryResult iotdbResultConvertInfluxResult( QueryResult.Series series = new QueryResult.Series(); series.setName(measurement); // gets the reverse map of the tag - Map tagOrders = InfluxDBMetaManager.getTagOrders(database, measurement); + Map tagOrders = + InfluxDBMetaManagerFactory.getInstance().getTagOrders(database, measurement, -1); Map tagOrderReversed = tagOrders.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey)); @@ -294,6 +295,12 @@ public static boolean checkQueryResultNull(QueryResult queryResult) { return queryResult.getResults().get(0).getSeries() == null; } + /** + * parse time series paths from query results + * + * @param tsExecuteStatementResp query results + * @return time series paths + */ public static List getFullPaths(TSExecuteStatementResp tsExecuteStatementResp) { List res = new ArrayList<>(); IoTDBJDBCDataSet ioTDBJDBCDataSet = creatIoTJDBCDataset(tsExecuteStatementResp); @@ -309,6 +316,13 @@ public static List getFullPaths(TSExecuteStatementResp tsExecuteStatemen return res; } + /** + * Convert align by device query result of NewIoTDB to the query result of influxdb,used for + * Memory and schema_file schema region + * + * @param tsExecuteStatementResp NewIoTDB execute statement resp to be converted + * @return query results in influxdb format + */ public static QueryResult iotdbResultConvertInfluxResult( TSExecuteStatementResp tsExecuteStatementResp, String database, @@ -321,7 +335,8 @@ public static QueryResult iotdbResultConvertInfluxResult( QueryResult.Series series = new QueryResult.Series(); series.setName(measurement); // gets the reverse map of the tag - Map tagOrders = InfluxDBMetaManager.getTagOrders(database, measurement); + Map tagOrders = + InfluxDBMetaManagerFactory.getInstance().getTagOrders(database, measurement, -1); Map tagOrderReversed = tagOrders.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey)); @@ -381,6 +396,87 @@ public static QueryResult iotdbResultConvertInfluxResult( return queryResult; } + /** + * Convert align by device query result of NewIoTDB to the query result of influxdb,used for tag + * schema region + * + * @param tsExecuteStatementResp NewIoTDB execute statement resp to be converted + * @return query results in influxdb format + */ + public static QueryResult iotdbResultConvertInfluxResult( + TSExecuteStatementResp tsExecuteStatementResp, + String database, + String measurement, + Map tagOrders, + Map fieldOrders) { + if (tsExecuteStatementResp == null) { + return getNullQueryResult(); + } + // generate series + QueryResult.Series series = new QueryResult.Series(); + series.setName(measurement); + Map tagOrderReversed = + tagOrders.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey)); + int tagSize = tagOrderReversed.size(); + Map fieldOrdersReversed = + fieldOrders.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey)); + ArrayList tagList = new ArrayList<>(); + for (int i = 0; i < tagSize; i++) { + tagList.add(tagOrderReversed.get(i)); + } + + ArrayList fieldList = new ArrayList<>(); + for (int i = 0; i < fieldOrders.size(); i++) { + fieldList.add(fieldOrdersReversed.get(i)); + } + + ArrayList columns = new ArrayList<>(); + columns.add("time"); + columns.addAll(tagList); + columns.addAll(fieldList); + // insert columns into series + series.setColumns(columns); + List> values = new ArrayList<>(); + IoTDBJDBCDataSet ioTDBJDBCDataSet = creatIoTJDBCDataset(tsExecuteStatementResp); + try { + while (ioTDBJDBCDataSet.hasCachedResults()) { + Object[] value = new Object[columns.size()]; + ioTDBJDBCDataSet.constructOneRow(); + value[0] = Long.valueOf(ioTDBJDBCDataSet.getValueByName("Time")); + String deviceName = ioTDBJDBCDataSet.getValueByName("Device"); + String[] deviceNameList = deviceName.split("\\."); + for (int i = 2; i < deviceNameList.length; i += 2) { + if (tagOrders.containsKey(deviceNameList[i])) { + int position = tagOrders.get(deviceNameList[i]) + 1; + value[position] = deviceNameList[i + 1]; + } + } + for (int i = 3; i <= ioTDBJDBCDataSet.columnNameList.size(); i++) { + Object o = ioTDBJDBCDataSet.getObject(ioTDBJDBCDataSet.findColumnNameByIndex(i)); + if (o != null) { + // insert the value of filed into it + int position = fieldOrders.get(ioTDBJDBCDataSet.findColumnNameByIndex(i)) + tagSize + 1; + value[position] = o; + } + } + values.add(Arrays.asList(value)); + } + } catch (Exception e) { + e.printStackTrace(); + } + + series.setValues(values); + + QueryResult queryResult = new QueryResult(); + QueryResult.Result result = new QueryResult.Result(); + result.setSeries(new ArrayList<>(Arrays.asList(series))); + queryResult.setResults(new ArrayList<>(Arrays.asList(result))); + + return queryResult; + } + public static List getInfluxFunctionValues( TSExecuteStatementResp tsExecuteStatementResp) { IoTDBJDBCDataSet ioTDBJDBCDataSet = creatIoTJDBCDataset(tsExecuteStatementResp); diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java index 89e5429dd19a..8209b434876a 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java @@ -151,8 +151,7 @@ public InfluxTSStatus createDatabase(InfluxCreateDatabaseReq req) { public InfluxQueryResultRsp query(InfluxQueryReq req) throws TException { Operator operator = InfluxDBLogicalGenerator.generate(req.command); queryHandler.checkInfluxDBQueryOperator(operator); - return queryHandler.queryInfluxDB( - req.database, (InfluxQueryOperator) operator, req.sessionId, IoTDB.serviceProvider); + return queryHandler.queryInfluxDB(req.database, (InfluxQueryOperator) operator, req.sessionId); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/NewInfluxDBServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/NewInfluxDBServiceImpl.java index 422bc27fd46a..1c40c94bc7d6 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/NewInfluxDBServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/NewInfluxDBServiceImpl.java @@ -19,17 +19,17 @@ package org.apache.iotdb.db.service.thrift.impl; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.db.protocol.influxdb.constant.InfluxConstant; import org.apache.iotdb.db.protocol.influxdb.dto.IoTDBPoint; import org.apache.iotdb.db.protocol.influxdb.handler.AbstractQueryHandler; -import org.apache.iotdb.db.protocol.influxdb.handler.NewQueryHandler; +import org.apache.iotdb.db.protocol.influxdb.handler.QueryHandlerFactory; import org.apache.iotdb.db.protocol.influxdb.input.InfluxLineParser; -import org.apache.iotdb.db.protocol.influxdb.meta.AbstractInfluxDBMetaManager; -import org.apache.iotdb.db.protocol.influxdb.meta.NewInfluxDBMetaManager; +import org.apache.iotdb.db.protocol.influxdb.meta.IInfluxDBMetaManager; +import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManagerFactory; import org.apache.iotdb.db.protocol.influxdb.operator.InfluxQueryOperator; import org.apache.iotdb.db.protocol.influxdb.sql.InfluxDBLogicalGenerator; import org.apache.iotdb.db.protocol.influxdb.util.InfluxReqAndRespUtils; import org.apache.iotdb.db.qp.logical.Operator; -import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.DataTypeUtils; import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxCloseSessionReq; import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxCreateDatabaseReq; @@ -42,6 +42,8 @@ import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq; +import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq; +import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq; import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp; @@ -57,13 +59,14 @@ public class NewInfluxDBServiceImpl implements IInfluxDBServiceWithHandler { private static final ClientRPCServiceImpl clientRPCService = new ClientRPCServiceImpl(); - private final AbstractInfluxDBMetaManager metaManager; + private final IInfluxDBMetaManager metaManager; private final AbstractQueryHandler queryHandler; public NewInfluxDBServiceImpl() { - metaManager = NewInfluxDBMetaManager.getInstance(); - queryHandler = new NewQueryHandler(); + metaManager = InfluxDBMetaManagerFactory.getInstance(); + metaManager.recover(); + queryHandler = QueryHandlerFactory.getInstance(); } public static ClientRPCServiceImpl getClientRPCService() { @@ -117,8 +120,19 @@ public InfluxTSStatus createDatabase(InfluxCreateDatabaseReq req) { public InfluxQueryResultRsp query(InfluxQueryReq req) throws TException { Operator operator = InfluxDBLogicalGenerator.generate(req.command); queryHandler.checkInfluxDBQueryOperator(operator); - return queryHandler.queryInfluxDB( - req.database, (InfluxQueryOperator) operator, req.sessionId, IoTDB.serviceProvider); + return queryHandler.queryInfluxDB(req.database, (InfluxQueryOperator) operator, req.sessionId); + } + + public static TSExecuteStatementResp executeStatement(String sql, long sessionId) { + TSExecuteStatementReq tsExecuteStatementReq = new TSExecuteStatementReq(); + tsExecuteStatementReq.setStatement(sql); + tsExecuteStatementReq.setSessionId(sessionId); + tsExecuteStatementReq.setStatementId( + NewInfluxDBServiceImpl.getClientRPCService().requestStatementId(sessionId)); + tsExecuteStatementReq.setFetchSize(InfluxConstant.DEFAULT_FETCH_SIZE); + TSExecuteStatementResp executeStatementResp = + NewInfluxDBServiceImpl.getClientRPCService().executeStatement(tsExecuteStatementReq); + return executeStatementResp; } @Override