diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java index 9ecccd7487..86cfbf4cc3 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java @@ -20,7 +20,6 @@ import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.*; import java.io.File; -import java.io.FileWriter; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -34,6 +33,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; import org.apache.flume.ChannelException; import org.apache.flume.Context; import org.apache.flume.Event; @@ -58,7 +58,7 @@ import com.google.gson.Gson; public class TaildirSource extends AbstractSource implements - PollableSource, Configurable, BatchSizeSupported { + PollableSource, Configurable, BatchSizeSupported { private static final Logger logger = LoggerFactory.getLogger(TaildirSource.class); @@ -94,27 +94,27 @@ public synchronized void start() { logger.info("{} TaildirSource source starting with directory: {}", getName(), filePaths); try { reader = new ReliableTaildirEventReader.Builder() - .filePaths(filePaths) - .headerTable(headerTable) - .positionFilePath(positionFilePath) - .skipToEnd(skipToEnd) - .addByteOffset(byteOffsetHeader) - .cachePatternMatching(cachePatternMatching) - .annotateFileName(fileHeader) - .fileNameHeader(fileHeaderKey) - .build(); + .filePaths(filePaths) + .headerTable(headerTable) + .positionFilePath(positionFilePath) + .skipToEnd(skipToEnd) + .addByteOffset(byteOffsetHeader) + .cachePatternMatching(cachePatternMatching) + .annotateFileName(fileHeader) + .fileNameHeader(fileHeaderKey) + .build(); } catch (IOException e) { throw new FlumeException("Error instantiating ReliableTaildirEventReader", e); } idleFileChecker = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("idleFileChecker").build()); + new ThreadFactoryBuilder().setNameFormat("idleFileChecker").build()); idleFileChecker.scheduleWithFixedDelay(new idleFileCheckerRunnable(), - idleTimeout, checkIdleInterval, TimeUnit.MILLISECONDS); + idleTimeout, checkIdleInterval, TimeUnit.MILLISECONDS); positionWriter = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("positionWriter").build()); + new ThreadFactoryBuilder().setNameFormat("positionWriter").build()); positionWriter.scheduleWithFixedDelay(new PositionWriterRunnable(), - writePosInitDelay, writePosInterval, TimeUnit.MILLISECONDS); + writePosInitDelay, writePosInterval, TimeUnit.MILLISECONDS); super.start(); logger.debug("TaildirSource started"); @@ -147,8 +147,8 @@ public synchronized void stop() { @Override public String toString() { return String.format("Taildir source: { positionFile: %s, skipToEnd: %s, " - + "byteOffsetHeader: %s, idleTimeout: %s, writePosInterval: %s }", - positionFilePath, skipToEnd, byteOffsetHeader, idleTimeout, writePosInterval); + + "byteOffsetHeader: %s, idleTimeout: %s, writePosInterval: %s }", + positionFilePath, skipToEnd, byteOffsetHeader, idleTimeout, writePosInterval); } @Override @@ -157,9 +157,9 @@ public synchronized void configure(Context context) { Preconditions.checkState(fileGroups != null, "Missing param: " + FILE_GROUPS); filePaths = selectByKeys(context.getSubProperties(FILE_GROUPS_PREFIX), - fileGroups.split("\\s+")); + fileGroups.split("\\s+")); Preconditions.checkState(!filePaths.isEmpty(), - "Mapping for tailing files is empty or invalid: '" + FILE_GROUPS_PREFIX + "'"); + "Mapping for tailing files is empty or invalid: '" + FILE_GROUPS_PREFIX + "'"); String homePath = System.getProperty("user.home").replace('\\', '/'); positionFilePath = context.getString(POSITION_FILE, homePath + DEFAULT_POSITION_FILE); @@ -176,12 +176,12 @@ public synchronized void configure(Context context) { idleTimeout = context.getInteger(IDLE_TIMEOUT, DEFAULT_IDLE_TIMEOUT); writePosInterval = context.getInteger(WRITE_POS_INTERVAL, DEFAULT_WRITE_POS_INTERVAL); cachePatternMatching = context.getBoolean(CACHE_PATTERN_MATCHING, - DEFAULT_CACHE_PATTERN_MATCHING); + DEFAULT_CACHE_PATTERN_MATCHING); backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT, - PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT); + PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT); maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP, - PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP); + PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP); fileHeader = context.getBoolean(FILENAME_HEADER, DEFAULT_FILE_HEADER); fileHeaderKey = context.getString(FILENAME_HEADER_KEY, @@ -190,7 +190,7 @@ public synchronized void configure(Context context) { if (maxBatchCount <= 0) { maxBatchCount = DEFAULT_MAX_BATCH_COUNT; logger.warn("Invalid maxBatchCount specified, initializing source " - + "default maxBatchCount of {}", maxBatchCount); + + "default maxBatchCount of {}", maxBatchCount); } if (sourceCounter == null) { @@ -262,7 +262,7 @@ public long getMaxBackOffSleepInterval() { } private boolean tailFileProcess(TailFile tf, boolean backoffWithoutNL) - throws IOException, InterruptedException { + throws IOException, InterruptedException { long batchCount = 0; while (true) { reader.setCurrentFile(tf); @@ -277,7 +277,7 @@ private boolean tailFileProcess(TailFile tf, boolean backoffWithoutNL) reader.commit(); } catch (ChannelException ex) { logger.warn("The channel is full or unexpected failure. " + - "The source will try again after " + retryInterval + " ms"); + "The source will try again after " + retryInterval + " ms"); sourceCounter.incrementChannelWriteFail(); TimeUnit.MILLISECONDS.sleep(retryInterval); retryInterval = retryInterval << 1; @@ -343,23 +343,14 @@ public void run() { private void writePosition() { File file = new File(positionFilePath); - FileWriter writer = null; try { - writer = new FileWriter(file); if (!existingInodes.isEmpty()) { String json = toPosInfoJson(); - writer.write(json); + FileUtils.write(file, json, "UTF-8", false); } } catch (Throwable t) { logger.error("Failed writing positionFile", t); sourceCounter.incrementGenericProcessingFail(); - } finally { - try { - if (writer != null) writer.close(); - } catch (IOException e) { - logger.error("Error: " + e.getMessage(), e); - sourceCounter.incrementGenericProcessingFail(); - } } }