diff --git a/dev-1/lesson-6.2/java/src/main/java/tech/ydb/app/Application.java b/dev-1/lesson-6.2/java/src/main/java/tech/ydb/app/Application.java index 394947c..7246fd0 100644 --- a/dev-1/lesson-6.2/java/src/main/java/tech/ydb/app/Application.java +++ b/dev-1/lesson-6.2/java/src/main/java/tech/ydb/app/Application.java @@ -11,6 +11,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import tech.ydb.common.transaction.TxMode; @@ -124,7 +125,7 @@ public static void main(String[] args) throws IOException, InterruptedException, queryServiceHelper.executeQuery(""" DECLARE $name AS Text; DECLARE $line_num AS Int64; - + UPSERT INTO write_file_progress(name, line_num) VALUES ($name, $line_num); """, TxMode.SERIALIZABLE_RW, @@ -157,6 +158,7 @@ private static void runReadJob(int linesCount, SyncReader reader, SessionRetryCo // В транзакции будет проверяться было ли уже обработано сообщение и если нет, то // обрабатывать и сохранять прогресс. Эти операции должны быть атомарными, чтобы // избежать ситуации, когда одно и тоже сообщение будет обработано несколько раз. + final int finalI = i; retryCtx.supplyStatus(session -> { var curTx = session.createNewTransaction(TxMode.SERIALIZABLE_RW); var tx = new TransactionHelper(curTx); @@ -186,21 +188,26 @@ private static void runReadJob(int linesCount, SyncReader reader, SessionRetryCo var messageData = new String(message.getData(), StandardCharsets.UTF_8).split(":"); var name = messageData[0]; var length = messageData[1].length(); - var lineNumber = message.getSeqNo(); - // Сохраняем информацию о строке в таблицу - tx.executeQuery(""" - DECLARE $name AS Text; - DECLARE $line AS Int64; - DECLARE $length AS Int64; - UPSERT INTO file(name, line, length) VALUES ($name, $line, $length); - """, - Params.of( - "$name", PrimitiveValue.newText(name), - "$line", PrimitiveValue.newInt64(lineNumber), - "$length", PrimitiveValue.newInt64(length) - ) - ); + if (finalI == 0) { + // Сохраняем информацию о строке в таблицу + tx.executeQuery(""" + DECLARE $name AS Text; + DECLARE $length AS Int64; + INSERT INTO file(name, length) VALUES ($name, $length); + """, + Params.of("$name", PrimitiveValue.newText(name), "$length", PrimitiveValue.newInt64(length)) + ); + } else { + // Обновляем информацию + tx.executeQuery(""" + DECLARE $name AS Text; + DECLARE $length AS Int64; + UPDATE file SET length = length + $length WHERE name = $name; + """, + Params.of("$name", PrimitiveValue.newText(name), "$length", PrimitiveValue.newInt64(length)) + ); + } // Обновляем прогресс обработки партиции. Если произойдёт сбой после коммита транзакции, // но до коммита сообщения в топик, то на основе этого прогресса повторная обработка @@ -212,7 +219,7 @@ private static void runReadJob(int linesCount, SyncReader reader, SessionRetryCo """, Params.of( "$partition_id", PrimitiveValue.newInt64(partitionId), - "$last_offset", PrimitiveValue.newInt64(message.getOffset()) + "$last_offset", PrimitiveValue.newInt64(lastOffset) ) ); @@ -232,15 +239,11 @@ private static void runReadJob(int linesCount, SyncReader reader, SessionRetryCo private static void printTableFile(QueryServiceHelper queryServiceHelper) { // Выводим информацию об обработанных строках var queryReader = queryServiceHelper.executeQuery( - "SELECT name, line, length FROM file;", TxMode.SERIALIZABLE_RW, Params.empty()); + "SELECT name, length FROM file;", TxMode.SERIALIZABLE_RW, Params.empty()); for (ResultSetReader resultSet : queryReader) { while (resultSet.next()) { - LOGGER.info( - "name: " + resultSet.getColumn(0).getText() + - ", line: " + resultSet.getColumn(1).getInt64() + - ", length: " + resultSet.getColumn(2).getInt64() - ); + LOGGER.info("name: {}, length: {}", resultSet.getColumn(0).getText(), resultSet.getColumn(1).getInt64()); } } } @@ -250,11 +253,10 @@ private static void createSchema(QueryServiceHelper queryServiceHelper) { queryServiceHelper.executeQuery(""" CREATE TABLE IF NOT EXISTS file ( name Text NOT NULL, - line Int64 NOT NULL, length Int64 NOT NULL, - PRIMARY KEY (name, line) + PRIMARY KEY (name) ); - + -- Таблица для хранения прогресса обработки партиции -- используется для того, чтобы не повторять обработку сообщений -- при перезапуске процесса. @@ -263,13 +265,13 @@ CREATE TABLE IF NOT EXISTS file_progress ( last_offset Int64 NOT NULL, PRIMARY KEY (partition_id) ); - + CREATE TABLE IF NOT EXISTS write_file_progress ( name Text NOT NULL, line_num Int64 NOT NULL, PRIMARY KEY (name) ); - + CREATE TOPIC IF NOT EXISTS file_topic ( CONSUMER file_consumer ) WITH(