Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 29 additions & 27 deletions dev-1/lesson-6.2/java/src/main/java/tech/ydb/app/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.common.transaction.TxMode;
Expand Down Expand Up @@ -124,7 +125,7 @@ public static void main(String[] args) throws IOException, InterruptedException,
queryServiceHelper.executeQuery("""
DECLARE $name AS Text;
DECLARE $line_num AS Int64;

UPSERT INTO write_file_progress(name, line_num) VALUES ($name, $line_num);
""",
TxMode.SERIALIZABLE_RW,
Expand Down Expand Up @@ -157,6 +158,7 @@ private static void runReadJob(int linesCount, SyncReader reader, SessionRetryCo
// В транзакции будет проверяться было ли уже обработано сообщение и если нет, то
// обрабатывать и сохранять прогресс. Эти операции должны быть атомарными, чтобы
// избежать ситуации, когда одно и тоже сообщение будет обработано несколько раз.
final int finalI = i;
retryCtx.supplyStatus(session -> {
var curTx = session.createNewTransaction(TxMode.SERIALIZABLE_RW);
var tx = new TransactionHelper(curTx);
Expand Down Expand Up @@ -186,21 +188,26 @@ private static void runReadJob(int linesCount, SyncReader reader, SessionRetryCo
var messageData = new String(message.getData(), StandardCharsets.UTF_8).split(":");
var name = messageData[0];
var length = messageData[1].length();
var lineNumber = message.getSeqNo();

// Сохраняем информацию о строке в таблицу
tx.executeQuery("""
DECLARE $name AS Text;
DECLARE $line AS Int64;
DECLARE $length AS Int64;
UPSERT INTO file(name, line, length) VALUES ($name, $line, $length);
""",
Params.of(
"$name", PrimitiveValue.newText(name),
"$line", PrimitiveValue.newInt64(lineNumber),
"$length", PrimitiveValue.newInt64(length)
)
);
if (finalI == 0) {
// Сохраняем информацию о строке в таблицу
tx.executeQuery("""
DECLARE $name AS Text;
DECLARE $length AS Int64;
INSERT INTO file(name, length) VALUES ($name, $length);
""",
Params.of("$name", PrimitiveValue.newText(name), "$length", PrimitiveValue.newInt64(length))
);
} else {
// Обновляем информацию
tx.executeQuery("""
DECLARE $name AS Text;
DECLARE $length AS Int64;
UPDATE file SET length = length + $length WHERE name = $name;
""",
Params.of("$name", PrimitiveValue.newText(name), "$length", PrimitiveValue.newInt64(length))
);
}

// Обновляем прогресс обработки партиции. Если произойдёт сбой после коммита транзакции,
// но до коммита сообщения в топик, то на основе этого прогресса повторная обработка
Expand All @@ -212,7 +219,7 @@ private static void runReadJob(int linesCount, SyncReader reader, SessionRetryCo
""",
Params.of(
"$partition_id", PrimitiveValue.newInt64(partitionId),
"$last_offset", PrimitiveValue.newInt64(message.getOffset())
"$last_offset", PrimitiveValue.newInt64(lastOffset)
)
);

Expand All @@ -232,15 +239,11 @@ private static void runReadJob(int linesCount, SyncReader reader, SessionRetryCo
private static void printTableFile(QueryServiceHelper queryServiceHelper) {
// Выводим информацию об обработанных строках
var queryReader = queryServiceHelper.executeQuery(
"SELECT name, line, length FROM file;", TxMode.SERIALIZABLE_RW, Params.empty());
"SELECT name, length FROM file;", TxMode.SERIALIZABLE_RW, Params.empty());

for (ResultSetReader resultSet : queryReader) {
while (resultSet.next()) {
LOGGER.info(
"name: " + resultSet.getColumn(0).getText() +
", line: " + resultSet.getColumn(1).getInt64() +
", length: " + resultSet.getColumn(2).getInt64()
);
LOGGER.info("name: {}, length: {}", resultSet.getColumn(0).getText(), resultSet.getColumn(1).getInt64());
}
}
}
Expand All @@ -250,11 +253,10 @@ private static void createSchema(QueryServiceHelper queryServiceHelper) {
queryServiceHelper.executeQuery("""
CREATE TABLE IF NOT EXISTS file (
name Text NOT NULL,
line Int64 NOT NULL,
length Int64 NOT NULL,
PRIMARY KEY (name, line)
PRIMARY KEY (name)
);

-- Таблица для хранения прогресса обработки партиции
-- используется для того, чтобы не повторять обработку сообщений
-- при перезапуске процесса.
Expand All @@ -263,13 +265,13 @@ CREATE TABLE IF NOT EXISTS file_progress (
last_offset Int64 NOT NULL,
PRIMARY KEY (partition_id)
);

CREATE TABLE IF NOT EXISTS write_file_progress (
name Text NOT NULL,
line_num Int64 NOT NULL,
PRIMARY KEY (name)
);

CREATE TOPIC IF NOT EXISTS file_topic (
CONSUMER file_consumer
) WITH(
Expand Down