Skip to content

Commit f639209

Browse files
committed
fix(filesystem): update LocalFileStorage to fallback to a simple move implements as copy+delete (#234)
Resolves: #234
1 parent e38f279 commit f639209

File tree

1 file changed

+47
-2
lines changed
  • connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader

1 file changed

+47
-2
lines changed

connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/LocalFileStorage.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,14 @@
2828
import java.io.FileInputStream;
2929
import java.io.FileNotFoundException;
3030
import java.io.IOException;
31+
import java.io.InputStream;
3132
import java.net.URI;
3233
import java.nio.file.Files;
3334
import java.nio.file.Path;
3435
import java.nio.file.Paths;
3536
import java.nio.file.StandardCopyOption;
37+
import java.nio.file.attribute.BasicFileAttributeView;
38+
import java.nio.file.attribute.BasicFileAttributes;
3639

3740
import static io.streamthoughts.kafka.connect.filepulse.internal.IOUtils.createParentIfNotExists;
3841

@@ -97,8 +100,18 @@ public boolean move(final URI source, final URI dest) {
97100
);
98101
} catch (IOException inner) {
99102
inner.addSuppressed(outer);
100-
LOG.error("Error while moving file {}", source, inner);
101-
return false;
103+
try {
104+
doSimpleMove(sourcePath, destPath);
105+
LOG.debug("Simple move as copy+delete of {} to {} succeeded after move failed due to {}",
106+
source,
107+
dest,
108+
inner.getMessage()
109+
);
110+
} catch (IOException e) {
111+
e.addSuppressed(inner);
112+
LOG.error("Error while moving file {}", source, inner);
113+
return false;
114+
}
102115
}
103116
}
104117
return true;
@@ -112,4 +125,36 @@ public FileInputStream getInputStream(final URI uri) throws FileNotFoundExceptio
112125
return new FileInputStream(new File(uri));
113126
}
114127

128+
/**
129+
* Simple move implements as copy+delete.
130+
*
131+
* @param source the source path.
132+
* @param target the target path.
133+
*/
134+
static void doSimpleMove(final Path source, final Path target) throws IOException{
135+
// attributes of source file
136+
BasicFileAttributes attrs = Files.readAttributes(source, BasicFileAttributes.class);
137+
138+
if (attrs.isSymbolicLink())
139+
throw new IOException("Copying of symbolic links not supported");
140+
141+
// delete target if it exists
142+
Files.deleteIfExists(target);
143+
144+
// copy file
145+
try (InputStream in = Files.newInputStream(source)) {
146+
Files.copy(in, target);
147+
}
148+
149+
// try to copy basic attributes to target
150+
BasicFileAttributeView view = Files.getFileAttributeView(target, BasicFileAttributeView.class);
151+
try {
152+
view.setTimes(attrs.lastModifiedTime(), attrs.lastAccessTime(), attrs.creationTime());
153+
} catch (Throwable x) {
154+
LOG.debug("Failed to copy basic attributes while moving file {} to {}", source, target);
155+
}
156+
// finally delete source file
157+
Files.delete(source);
158+
}
159+
115160
}

0 commit comments

Comments
 (0)