diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml index 80c0491..3f0367f 100644 --- a/.github/workflows/dotnet.yml +++ b/.github/workflows/dotnet.yml @@ -19,7 +19,7 @@ jobs: - name: Setup .NET uses: actions/setup-dotnet@v4 with: - dotnet-version: '6.0.x' + dotnet-version: '9.0.x' - name: Restore dependencies run: dotnet restore "src/Apache.IoTDB/Apache.IoTDB.csproj" - name: Check License Header diff --git a/.github/workflows/e2e-multinode.yml b/.github/workflows/e2e-multinode.yml new file mode 100644 index 0000000..60d0a03 --- /dev/null +++ b/.github/workflows/e2e-multinode.yml @@ -0,0 +1,25 @@ +name: E2E Tests in MultiNode IoTDB + +on: + push: + branches: [ main, dev/* ] + pull_request: + branches: [ main ] + +jobs: + + build: + name: e2e test in MultiNode IoTDB + runs-on: ubuntu-latest + steps: + + - name: Check out code into the CSharp module directory + uses: actions/checkout@v4 + + - name: Set Docker & Run Test + run: | + docker compose -f docker-compose-2c2d.yml up --build --abort-on-container-exit --remove-orphans + + - name: Clean IoTDB & Shut Down Docker + run: | + docker compose -f docker-compose-2c2d.yml down diff --git a/docker-compose-2c2d.yml b/docker-compose-2c2d.yml new file mode 100644 index 0000000..318e8e8 --- /dev/null +++ b/docker-compose-2c2d.yml @@ -0,0 +1,133 @@ +version: "3" +services: + + # ConfigNode 1 + confignode-1: + image: apache/iotdb:2.0.1-beta-standalone + command: ["bash", "-c", "entrypoint.sh confignode"] + restart: always + healthcheck: + test: ["CMD", "ls", "/iotdb/data"] + interval: 3s + timeout: 5s + retries: 30 + start_period: 30s + environment: + - cn_internal_address=127.0.0.1 + - cn_internal_port=10710 + - cn_consensus_port=10720 + - cn_seed_config_node=127.0.0.1:10710 + - schema_replication_factor=2 + - data_replication_factor=2 + privileged: true + volumes: + - ./iotdb/confignode-1/data:/iotdb/data + - ./iotdb/confignode-1/logs:/iotdb/logs + network_mode: host + + # ConfigNode 2 + confignode-2: + image: apache/iotdb:2.0.1-beta-standalone + command: ["bash", "-c", "entrypoint.sh confignode"] + restart: always + healthcheck: + test: ["CMD", "ls", "/iotdb/data"] + interval: 3s + timeout: 5s + retries: 30 + start_period: 30s + environment: + - cn_internal_address=127.0.0.1 + - cn_internal_port=10711 + - cn_consensus_port=10721 + - cn_seed_config_node=127.0.0.1:10710 + - schema_replication_factor=2 + - data_replication_factor=2 + privileged: true + volumes: + - ./iotdb/confignode-2/data:/iotdb/data + - ./iotdb/confignode-2/logs:/iotdb/logs + network_mode: host + + # DataNode 1 + datanode-1: + image: apache/iotdb:2.0.1-beta-standalone + command: ["bash", "-c", "entrypoint.sh datanode"] + restart: always + healthcheck: + test: ["CMD", "ls", "/iotdb/data/datanode/system"] + interval: 10s + timeout: 60s + retries: 30 + start_period: 30s + depends_on: + confignode-1: + condition: service_healthy + environment: + - dn_rpc_address=127.0.0.1 + - dn_internal_address=127.0.0.1 + - dn_seed_config_node=127.0.0.1:10710 + - dn_rpc_port=6667 + - dn_internal_port=10730 + - dn_mpp_data_exchange_port=10740 + - dn_schema_region_consensus_port=10750 + - dn_data_region_consensus_port=10760 + - schema_replication_factor=2 + - data_replication_factor=2 + privileged: true + volumes: + - ./iotdb/datanode-1/data:/iotdb/data + - ./iotdb/datanode-1/logs:/iotdb/logs + network_mode: host + + # DataNode 2 + datanode-2: + image: apache/iotdb:2.0.1-beta-standalone + command: ["bash", "-c", "entrypoint.sh datanode"] + restart: always + healthcheck: + test: ["CMD", "ls", "/iotdb/data/datanode/system"] + interval: 10s + timeout: 60s + retries: 30 + start_period: 30s + depends_on: + confignode-1: + condition: service_healthy + confignode-2: + condition: service_healthy + environment: + - dn_rpc_address=127.0.0.1 + - dn_internal_address=127.0.0.1 + - dn_seed_config_node=127.0.0.1:10710 + - dn_rpc_port=6668 + - dn_internal_port=10731 + - dn_mpp_data_exchange_port=10741 + - dn_schema_region_consensus_port=10751 + - dn_data_region_consensus_port=10761 + - schema_replication_factor=2 + - data_replication_factor=2 + privileged: true + volumes: + - ./iotdb/datanode-2/data:/iotdb/data + - ./iotdb/datanode-2/logs:/iotdb/logs + network_mode: host + + # C# Client + apache.iotdb.samples: + image: ${DOCKER_REGISTRY-}apacheiotdbsamples + depends_on: + confignode-1: + condition: service_healthy + confignode-2: + condition: service_healthy + datanode-1: + condition: service_healthy + datanode-2: + condition: service_healthy + build: + context: . + dockerfile: samples/Apache.IoTDB.Samples/Dockerfile + command: ["--multi", "localhost:6667", "localhost:6668"] + # command: ["sleep", "infinity"] + network_mode: host \ No newline at end of file diff --git a/docker-compose.dcproj b/docker-compose.dcproj deleted file mode 100644 index 322d139..0000000 --- a/docker-compose.dcproj +++ /dev/null @@ -1,15 +0,0 @@ - - - - 2.1 - Linux - 4d457769-80cb-401f-9155-c3125c04facd - - - - docker-compose.yml - - - - - \ No newline at end of file diff --git a/docker-compose.override.yml b/docker-compose.override.yml deleted file mode 100644 index 8e89b07..0000000 --- a/docker-compose.override.yml +++ /dev/null @@ -1 +0,0 @@ -version: '3.4' diff --git a/docker-compose.yml b/docker-compose.yml index 37c6d1b..3fa34a0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,6 +11,7 @@ services: build: context: . dockerfile: samples/Apache.IoTDB.Samples/Dockerfile + command: ["--single", "iotdb"] networks: iotdb-network: ipv4_address: 172.18.0.2 diff --git a/samples/Apache.IoTDB.Samples/Apache.IoTDB.Samples.csproj b/samples/Apache.IoTDB.Samples/Apache.IoTDB.Samples.csproj index 7dd6f32..bae70a7 100644 --- a/samples/Apache.IoTDB.Samples/Apache.IoTDB.Samples.csproj +++ b/samples/Apache.IoTDB.Samples/Apache.IoTDB.Samples.csproj @@ -2,7 +2,7 @@ Exe - net6.0 + net9.0 Linux ..\.. @@ -19,6 +19,7 @@ + diff --git a/samples/Apache.IoTDB.Samples/Dockerfile b/samples/Apache.IoTDB.Samples/Dockerfile index 77c600b..26ee0a9 100644 --- a/samples/Apache.IoTDB.Samples/Dockerfile +++ b/samples/Apache.IoTDB.Samples/Dockerfile @@ -17,10 +17,10 @@ #See https://aka.ms/containerfastmode to understand how Visual Studio uses this Dockerfile to build your images for faster debugging. -FROM mcr.microsoft.com/dotnet/runtime:6.0 AS base +FROM mcr.microsoft.com/dotnet/runtime:9.0 AS base WORKDIR /app -FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build +FROM mcr.microsoft.com/dotnet/sdk:9.0 AS build WORKDIR /src COPY ["samples/Apache.IoTDB.Samples/Apache.IoTDB.Samples.csproj", "samples/Apache.IoTDB.Samples/"] COPY ["src/Apache.IoTDB/Apache.IoTDB.csproj", "src/Apache.IoTDB/"] diff --git a/samples/Apache.IoTDB.Samples/Program.cs b/samples/Apache.IoTDB.Samples/Program.cs index 861ca27..bc557bb 100644 --- a/samples/Apache.IoTDB.Samples/Program.cs +++ b/samples/Apache.IoTDB.Samples/Program.cs @@ -19,21 +19,64 @@ using Microsoft.Extensions.Logging; using NLog.Extensions.Logging; -using System; using System.Threading.Tasks; +using System.CommandLine; +using System.Collections.Generic; +using System; namespace Apache.IoTDB.Samples { public static class Program { public static async Task Main(string[] args) + { + var singleOption = new Option( + "--single", + () => "localhost", + description: "Use single endpoint (e.g. --single localhost)"); + + var multiOption = new Option>( + "--multi", + description: "Use multiple endpoints (e.g. --multi localhost:6667 localhost:6668)") + { + AllowMultipleArgumentsPerToken = true + }; + + var rootCommand = new RootCommand + { + singleOption, + multiOption + }; + + rootCommand.SetHandler(async (string single, List multi) => { var utilsTest = new UtilsTest(); utilsTest.TestParseEndPoint(); - var sessionPoolTest = new SessionPoolTest("iotdb"); + + SessionPoolTest sessionPoolTest; + + if (!string.IsNullOrEmpty(single) && (multi == null || multi.Count == 0)) + { + sessionPoolTest = new SessionPoolTest(single); + } + else if (multi != null && multi.Count != 0) + { + sessionPoolTest = new SessionPoolTest(multi); + } + else + { + Console.WriteLine("Please specify either --single or --multi endpoints."); + return; + } + await sessionPoolTest.Test(); + var tableSessionPoolTest = new TableSessionPoolTest(sessionPoolTest); await tableSessionPoolTest.Test(); + + }, singleOption, multiOption); + + await rootCommand.InvokeAsync(args); } public static void OpenDebugMode(this SessionPool session) diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.cs index 030429a..f24a0a5 100644 --- a/samples/Apache.IoTDB.Samples/SessionPoolTest.cs +++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.cs @@ -52,97 +52,104 @@ public partial class SessionPoolTest public List testMeasurements = new List( measurementCount.ConvertAll(x => testMeasurement + x.ToString()).ToArray() ); - - public SessionPoolTest(string _host = "localhost") { host = _host; nodeUrls.Add(host + ":" + port); } + public SessionPoolTest(List _nodeUrls) + { + nodeUrls = _nodeUrls; + } public async Task Test() { - await TestOpenWithNodeUrls(); + if(nodeUrls.Count == 1){ + await TestOpenWithNodeUrls(); - await TestOpenWith2NodeUrls(); + await TestOpenWith2NodeUrls(); - await TestOpenWithNodeUrlsAndInsertOneRecord(); + await TestOpenWithNodeUrlsAndInsertOneRecord(); - await TestInsertOneRecord(); + await TestInsertOneRecord(); - await TestInsertAlignedRecord(); + await TestInsertAlignedRecord(); - await TestInsertAlignedRecords(); + await TestInsertAlignedRecords(); - await TestInsertAlignedStringRecords(); + await TestInsertAlignedStringRecords(); - await TestInsertAlignedStringRecordsOfOneDevice(); + await TestInsertAlignedStringRecordsOfOneDevice(); - await TestInsertStringRecord(); + await TestInsertStringRecord(); - await TestInsertAlignedStringRecord(); + await TestInsertAlignedStringRecord(); - await TestInsertStringRecords(); + await TestInsertStringRecords(); - await TestInsertStringRecordsOfOneDevice(); + await TestInsertStringRecordsOfOneDevice(); - await TestInsertAlignedRecordsOfOneDevice(); + await TestInsertAlignedRecordsOfOneDevice(); - await TestInsertAlignedTablet(); + await TestInsertAlignedTablet(); - await TestInsertAlignedTablets(); + await TestInsertAlignedTablets(); - await TestInsertRecord(); + await TestInsertRecord(); - await TestCreateMultiTimeSeries(); + await TestCreateMultiTimeSeries(); - await TestInsertStrRecord(); + await TestInsertStrRecord(); - await TestInsertRecords(); + await TestInsertRecords(); - await TestInsertRecordsWithAllType(); + await TestInsertRecordsWithAllType(); - await TestInsertRecordsOfOneDevice(); + await TestInsertRecordsOfOneDevice(); - await TestInsertTablet(); + await TestInsertTablet(); - await TestInsertTabletWithAllType(); + await TestInsertTabletWithAllType(); - await TestInsertTabletWithNullValue(); + await TestInsertTabletWithNullValue(); - await TestInsertTablets(); + await TestInsertTablets(); - await TestSetAndUnsetSchemaTemplate(); + await TestSetAndUnsetSchemaTemplate(); - await TestCreateAlignedTimeseries(); + await TestCreateAlignedTimeseries(); - await TestCreateAndDropSchemaTemplate(); + await TestCreateAndDropSchemaTemplate(); - await TestGetTimeZone(); + await TestGetTimeZone(); - await TestCreateAndDeleteDatabase(); + await TestCreateAndDeleteDatabase(); - await TestCreateTimeSeries(); + await TestCreateTimeSeries(); - await TestDeleteTimeSeries(); + await TestDeleteTimeSeries(); - await TestDeleteDatabase(); + await TestDeleteDatabase(); - await TestCheckTimeSeriesExists(); + await TestCheckTimeSeriesExists(); - await TestSetTimeZone(); + await TestSetTimeZone(); - await TestDeleteData(); + await TestDeleteData(); - await TestNonSql(); + await TestNonSql(); - await TestRawDataQuery(); + await TestRawDataQuery(); - await TestLastDataQuery(); + await TestLastDataQuery(); - await TestSqlQuery(); + await TestSqlQuery(); - await TestNonSqlBy_ADO(); + await TestNonSqlBy_ADO(); + } + else { + await TestMultiNodeDataFetch(); + } } public async Task TestOpenWithNodeUrls() { @@ -454,7 +461,7 @@ await session_pool.ExecuteNonQueryStatementAsync( await res.Close(); Console.WriteLine("SHOW DEVICES sql passed!"); - res = await session_pool.ExecuteQueryStatementAsync("COUNT TIMESERIES root"); + res = await session_pool.ExecuteQueryStatementAsync($"COUNT TIMESERIES {testDatabaseName}"); res.ShowTableNames(); while (res.HasNext()) Console.WriteLine(res.Next()); @@ -572,5 +579,56 @@ public async Task TestLastDataQuery() await session_pool.Close(); Console.WriteLine("LastDataQuery Passed"); } + + public async Task TestMultiNodeDataFetch(){ + System.Diagnostics.Debug.Assert(nodeUrls.Count > 1, "nodeUrls.Count should be greater than 1 in MultiNode Test"); + var session_pool = new SessionPool.Builder() + .SetUsername(username) + .SetPassword(password) + .SetNodeUrl(nodeUrls) + .SetPoolSize(4) + .Build(); + await session_pool.Open(false); + if (debug) session_pool.OpenDebugMode(); + var status = await session_pool.DeleteDatabaseAsync(testDatabaseName); + var device_id = string.Format("{0}.{1}", testDatabaseName, testDevice); + var measurements = new List { testMeasurements[0], testMeasurements[1] }; + var data_type_lst = new List { TSDataType.BOOLEAN, TSDataType.FLOAT }; + var encoding_lst = new List { TSEncoding.PLAIN, TSEncoding.PLAIN }; + var compressor_lst = new List { Compressor.SNAPPY, Compressor.SNAPPY }; + var ts_path_lst = new List() { + string.Format("{0}.{1}.{2}", testDatabaseName, testDevice, testMeasurements[0]), + string.Format("{0}.{1}.{2}", testDatabaseName, testDevice, testMeasurements[1]) + }; + status = await session_pool.CreateMultiTimeSeriesAsync(ts_path_lst, data_type_lst, encoding_lst, compressor_lst); + + var records = new List(); + var values = new List() { true, 20.0f }; + var device_id_lst = new List() { }; + for (int i = 1; i <= fetchSize * processedSize * 4 + 783; i++) + { + var record = new RowRecord(i, values, measurements); + records.Add(record); + device_id_lst.Add(device_id); + } + + // insert data + status = await session_pool.InsertRecordsAsync(device_id_lst, records); + System.Diagnostics.Debug.Assert(status == 0); + // fetch data + var paths = new List() { string.Format("{0}.{1}", device_id, testMeasurements[0]), string.Format("{0}.{1}", device_id, testMeasurements[1]) }; + var res = await session_pool.ExecuteQueryStatementAsync("select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice)); + res.ShowTableNames(); + var count = 0; + while (res.HasNext()) + { + var record = res.Next(); + count++; + } + Console.WriteLine(count + " " + (fetchSize * processedSize * 4 + 783)); + System.Diagnostics.Debug.Assert(count == fetchSize * processedSize * 4 + 783); + await res.Close(); + Console.WriteLine("MultiNodeDataFetch Passed"); + } } } \ No newline at end of file diff --git a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs index 32191a2..7ba49ea 100644 --- a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs +++ b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs @@ -40,6 +40,7 @@ public class SessionDataSet : System.IDisposable private List _valueBufferLst, _bitmapBufferLst; private ByteBuffer _timeBuffer; private readonly ConcurrentClientQueue _clientQueue; + private Client _client; private int _rowIndex; private bool _hasCatchedResult; private RowRecord _cachedRowRecord; @@ -52,9 +53,10 @@ public class SessionDataSet : System.IDisposable private int DefaultTimeout => 10000; public int FetchSize { get; set; } public int RowCount { get; set; } - public SessionDataSet(string sql, TSExecuteStatementResp resp, ConcurrentClientQueue clientQueue, long statementId) + public SessionDataSet(string sql, TSExecuteStatementResp resp, Client client, ConcurrentClientQueue clientQueue, long statementId) { _clientQueue = clientQueue; + _client = client; _sql = sql; _queryDataset = resp.QueryDataSet; _queryId = resp.QueryId; @@ -266,14 +268,13 @@ private bool IsNull(int loc, int row_index) private bool FetchResults() { _rowIndex = 0; - var myClient = _clientQueue.Take(); - var req = new TSFetchResultsReq(myClient.SessionId, _sql, FetchSize, _queryId, true) + var req = new TSFetchResultsReq(_client.SessionId, _sql, FetchSize, _queryId, true) { Timeout = DefaultTimeout }; try { - var task = myClient.ServiceClient.fetchResultsAsync(req); + var task = _client.ServiceClient.fetchResultsAsync(req); var resp = task.ConfigureAwait(false).GetAwaiter().GetResult(); @@ -302,18 +303,13 @@ private bool FetchResults() { throw new TException("Cannot fetch result from server, because of network connection", e); } - finally - { - _clientQueue.Add(myClient); - } } public async Task Close() { if (!_isClosed) { - var myClient = _clientQueue.Take(); - var req = new TSCloseOperationReq(myClient.SessionId) + var req = new TSCloseOperationReq(_client.SessionId) { QueryId = _queryId, StatementId = _statementId @@ -321,7 +317,7 @@ public async Task Close() try { - var status = await myClient.ServiceClient.closeOperationAsync(req); + var status = await _client.ServiceClient.closeOperationAsync(req); } catch (TException e) { @@ -329,7 +325,8 @@ public async Task Close() } finally { - _clientQueue.Add(myClient); + _clientQueue.Add(_client); + _client = null; } } } diff --git a/src/Apache.IoTDB/SessionPool.Builder.cs b/src/Apache.IoTDB/SessionPool.Builder.cs index 91c0a48..fcd10ce 100644 --- a/src/Apache.IoTDB/SessionPool.Builder.cs +++ b/src/Apache.IoTDB/SessionPool.Builder.cs @@ -93,7 +93,7 @@ public Builder SetConnectionTimeoutInMs(int timeout) return this; } - public Builder setNodeUrl(List nodeUrls) + public Builder SetNodeUrl(List nodeUrls) { _nodeUrls = nodeUrls; return this; diff --git a/src/Apache.IoTDB/SessionPool.cs b/src/Apache.IoTDB/SessionPool.cs index cb16f70..c6bd70f 100644 --- a/src/Apache.IoTDB/SessionPool.cs +++ b/src/Apache.IoTDB/SessionPool.cs @@ -149,7 +149,7 @@ protected internal SessionPool(List nodeUrls, string username, string pa _sqlDialect = sqlDialect; _database = database; } - public async Task ExecuteClientOperationAsync(AsyncOperation operation, string errMsg, bool retryOnFailure = true) + public async Task ExecuteClientOperationAsync(AsyncOperation operation, string errMsg, bool retryOnFailure = true, bool putClientBack=true) { Client client = _clients.Take(); try @@ -197,7 +197,10 @@ public async Task ExecuteClientOperationAsync(AsyncOperation @@ -705,12 +708,13 @@ public async Task DeleteTimeSeriesAsync(string tsPath) public async Task CheckTimeSeriesExistsAsync(string tsPath) { - // TBD by dalong try { var sql = "SHOW TIMESERIES " + tsPath; var sessionDataset = await ExecuteQueryStatementAsync(sql); - return sessionDataset.HasNext(); + bool timeSeriesExists = sessionDataset.HasNext(); + await sessionDataset.Close(); // be sure to close the sessionDataset to put the client back to the pool + return timeSeriesExists; } catch (TException e) { @@ -1332,12 +1336,13 @@ public async Task ExecuteQueryStatementAsync(string sql, long ti throw new Exception(string.Format("execute query failed, sql: {0}, message: {1}", sql, status.Message)); } - return new SessionDataSet(sql, resp, _clients, client.StatementId) + return new SessionDataSet(sql, resp, client, _clients, client.StatementId) { FetchSize = _fetchSize, }; }, - errMsg: "Error occurs when executing query statement" + errMsg: "Error occurs when executing query statement", + putClientBack: false ); } @@ -1360,12 +1365,13 @@ public async Task ExecuteStatementAsync(string sql, long timeout throw new Exception(string.Format("execute query failed, sql: {0}, message: {1}", sql, status.Message)); } - return new SessionDataSet(sql, resp, _clients, client.StatementId) + return new SessionDataSet(sql, resp, client, _clients, client.StatementId) { FetchSize = _fetchSize, }; }, - errMsg: "Error occurs when executing query statement" + errMsg: "Error occurs when executing query statement", + putClientBack: false ); } @@ -1435,12 +1441,13 @@ public async Task ExecuteRawDataQuery(List paths, long s throw new Exception(string.Format("execute raw data query failed, message: {0}", status.Message)); } - return new SessionDataSet("", resp, _clients, client.StatementId) + return new SessionDataSet("", resp, client, _clients, client.StatementId) { FetchSize = _fetchSize, }; }, - errMsg: "Error occurs when executing raw data query" + errMsg: "Error occurs when executing raw data query", + putClientBack: false ); } public async Task ExecuteLastDataQueryAsync(List paths, long lastTime) @@ -1462,12 +1469,13 @@ public async Task ExecuteLastDataQueryAsync(List paths, throw new Exception(string.Format("execute last data query failed, message: {0}", status.Message)); } - return new SessionDataSet("", resp, _clients, client.StatementId) + return new SessionDataSet("", resp, client, _clients, client.StatementId) { FetchSize = _fetchSize, }; }, - errMsg: "Error occurs when executing last data query" + errMsg: "Error occurs when executing last data query", + putClientBack: false ); } [Obsolete("This method is obsolete. Use SQL instead.", false)]