diff --git a/README.md b/README.md index 4314260..147f1dd 100644 --- a/README.md +++ b/README.md @@ -1 +1,13 @@ -# java_homeworks \ No newline at end of file +### Torrent + +Для сборки выполнить + +``` +mvn install + +``` + +В папке target появятся client.jar и server.jar . +Клиенту при запуске можно передать имя хоста. По умолчанию localhost. + +У клиента есть команда `help`, показывающая список команд. \ No newline at end of file diff --git a/torrent/pom.xml b/torrent/pom.xml new file mode 100644 index 0000000..c7926bf --- /dev/null +++ b/torrent/pom.xml @@ -0,0 +1,99 @@ + + 4.0.0 + + 1 + torrent + 0.0.1-SNAPSHOT + + + torrent + http://maven.apache.org + + + UTF-8 + + + + + maven-compiler-plugin + + 1.9 + 1.9 + UTF-8 + + + + org.apache.maven.plugins + maven-assembly-plugin + + + torrent_server + package + + single + + + + + + torrent.server.Main + + + + + jar-with-dependencies + + server + false + + + + torrent_client + package + + single + + + + + + torrent.client.Main + + + + + jar-with-dependencies + + client + false + + + + + + + + + junit + junit + 4.12 + test + + + + + com.fasterxml.jackson.core + jackson-databind + 2.9.7 + + + + + org.apache.commons + commons-io + 1.3.2 + + + diff --git a/torrent/src/main/java/torrent/client/FileProblemException.java b/torrent/src/main/java/torrent/client/FileProblemException.java new file mode 100644 index 0000000..6cc213d --- /dev/null +++ b/torrent/src/main/java/torrent/client/FileProblemException.java @@ -0,0 +1,14 @@ +package torrent.client; + +public class FileProblemException extends Exception { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public FileProblemException(String string) { + super(string); + } + +} diff --git a/torrent/src/main/java/torrent/client/FilesDownloader.java b/torrent/src/main/java/torrent/client/FilesDownloader.java new file mode 100644 index 0000000..3a81e08 --- /dev/null +++ b/torrent/src/main/java/torrent/client/FilesDownloader.java @@ -0,0 +1,64 @@ +package torrent.client; + +import java.io.IOException; +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import torrent.client.FilesHolder.FileStatus; + +public class FilesDownloader { + + private FilesHolder filesHolder; + private SocketAddress toServer; + + private ExecutorService pool = Executors.newCachedThreadPool(); + + private Map> fileDownloadsFutures = new HashMap<>(); + + public FilesDownloader(FilesHolder stm, SocketAddress toServer) { + this.filesHolder = stm; + this.toServer = toServer; + stm.fileStatus.forEach((id, status) -> { + if (status == FileStatus.Downloading ) { + try { + startFileDownload(id); + } catch (IOException e) { + System.err.println("Failed. to start download at startup"); + e.printStackTrace(); + } + } + }); + } + + public boolean startFileDownload(int fileId) throws IOException { + if (fileDownloadsFutures.containsKey(fileId)) { + return false; + } + + filesHolder.fileStatus.put(fileId, FileStatus.Downloading); + + SingleFileDownloader downloader = new SingleFileDownloader(toServer, filesHolder, fileId, this); + fileDownloadsFutures.put(fileId, pool.submit(downloader)); + return true; + } + + public void stopFileDownload(int fileId) { + if (!filesHolder.fileStatus.containsKey(fileId)) { + throw new IllegalStateException("This file wasn't been downloading"); + } + + if (filesHolder.fileStatus.get(fileId) != FileStatus.Downloading) { + return; + } + + fileDownloadsFutures.get(fileId).cancel(true); + fileDownloadsFutures.remove(fileId); + + filesHolder.fileStatus.put(fileId, FileStatus.Paused); + } + +} diff --git a/torrent/src/main/java/torrent/client/FilesHolder.java b/torrent/src/main/java/torrent/client/FilesHolder.java new file mode 100644 index 0000000..e21f102 --- /dev/null +++ b/torrent/src/main/java/torrent/client/FilesHolder.java @@ -0,0 +1,216 @@ +package torrent.client; + +import java.io.Closeable; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.fasterxml.jackson.core.JsonGenerationException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class FilesHolder implements Closeable{ + + public final int pieceSize = 0xA00000; + //рабочие данные + + public enum FileStatus {Complete, Downloading, Paused}; + + private final Map files = new ConcurrentHashMap<>(); + public Map filePaths = new ConcurrentHashMap<>(); + + public Map fileStatus = new ConcurrentHashMap<>(); + public Map> completePieces = new ConcurrentHashMap<>(); + public Map fileSize = new ConcurrentHashMap<>(); + // + + ObjectMapper mapper = new ObjectMapper(); + private final Path mapPath; + private final Path filePathsPath; + private final Path fileStatusPath; + private final Path comletePiecesPath; + private final Path fileSizePath; + + public int numParts(int fileId) { + return Math.toIntExact((fileSize.get(fileId) + pieceSize - 1) / pieceSize); + } + + public int pieceOffset(int fileId, int numPart) { + return pieceSize * numPart; + } + + public int pieceLenght(int fileId, int numPart) { + int file_length; + file_length = Math.toIntExact(fileSize.get(fileId)); + return (numPart + 1) * pieceSize <= file_length + ? pieceSize + : file_length - numPart * pieceSize; + } + + public FilesHolder(String path) throws IOException { + mapPath = Paths.get(path); + filePathsPath = mapPath.resolve("filepaths"); + fileStatusPath = mapPath.resolve("filesStatus"); + comletePiecesPath = mapPath.resolve("completePieces"); + fileSizePath = mapPath.resolve("fileSizes"); + load(); + } + + public void writeMaps() throws IOException { + if (!Files.exists(mapPath)) + Files.createDirectories(mapPath); + + filePathsPath.toFile().createNewFile(); + mapper.writeValue(filePathsPath.toFile(), filePaths); + + fileStatusPath.toFile().createNewFile(); + mapper.writeValue(fileStatusPath.toFile(), fileStatus); + + comletePiecesPath.toFile().createNewFile(); + mapper.writeValue(comletePiecesPath.toFile(), completePieces); + + fileSizePath.toFile().createNewFile(); + mapper.writeValue(fileSizePath.toFile(), fileSize); + } + + public void save() throws JsonGenerationException, JsonMappingException, IOException { + writeMaps(); + } + + public void load() throws JsonGenerationException, JsonMappingException, IOException { + if (filePathsPath.toFile().exists()) + filePaths = mapper.readValue(filePathsPath.toFile(), new TypeReference>() {}); + if (fileStatusPath.toFile().exists()) + fileStatus = mapper.readValue(fileStatusPath.toFile(), new TypeReference>() {}); + if (comletePiecesPath.toFile().exists()) + completePieces = mapper.readValue(comletePiecesPath.toFile(), new TypeReference>>() {}); + if (fileSizePath.toFile().exists()) + fileSize = mapper.readValue(fileSizePath.toFile(), new TypeReference>() {}); + + if (!filePaths.keySet().equals(fileStatus.keySet()) + || !filePaths.keySet().equals(completePieces.keySet()) + || !filePaths.keySet().equals(fileSize.keySet())) { + throw new RuntimeException("maps key sets not equal"); + } + + for (Entry ent : filePaths.entrySet()) { + if (!Paths.get(ent.getValue()).toFile().exists()) { + completePieces.get(ent.getKey()).clear(); + fileStatus.put(ent.getKey(), FileStatus.Downloading); + } + } + } + + public void deleteFile(int id) { + if (! files.containsKey(id)) { + throw new IllegalStateException("File isn't known."); + } + try { + files.get(id).close(); + } catch (IOException e) {} + files.remove(id); + filePaths.remove(id); + fileStatus.remove(id); + completePieces.remove(id); + fileSize.remove(id); + } + + public void addFileToDownload(int id, long size, String filePath) throws FileProblemException, FileNotFoundException, IOException { + if (files.containsKey(id)) { + throw new FileProblemException("id already used"); + } + + if (filePaths.containsValue(filePath)) { + throw new FileProblemException("file with specified path used"); + } + + filePaths.put(id, filePath); + completePieces.put(id, ConcurrentHashMap.newKeySet()); + fileStatus.put(id, FileStatus.Paused); + fileSize.put(id, size); + + writeMaps(); + } + + public void addExistingFile(int id, Path path) throws FileProblemException, FileNotFoundException, IOException { + if (files.containsKey(id)) { + throw new FileProblemException("id already used"); + } + + if (filePaths.containsValue(path.toString())) { + throw new FileProblemException("file with specified path used"); + } + + fileSize.put(id, path.toFile().length()); + filePaths.put(id, path.toString()); + fileStatus.put(id, FileStatus.Complete); + + completePieces.put(id, + Stream.iterate(0, i -> i + 1) + .limit(numParts(id)) + .collect(Collectors.toSet())); + writeMaps(); + } + + private RandomAccessFile getOrCreateFile(int fileId) throws IOException { + if (!files.containsKey(fileId)) { + synchronized (files) { + if (!files.containsKey(fileId)) { + Path parent = Paths.get(filePaths.get(fileId)).getParent(); + if (parent != null) { + Files.createDirectories(parent); + } + RandomAccessFile res_local = new RandomAccessFile(filePaths.get(fileId), "rws"); + if (res_local.length() == 0) + res_local.setLength(fileSize.get(fileId)); + files.put(fileId, res_local); + } + } + } + + return files.get(fileId); + } + + public byte[] getPiece(int fileId, int pieceId) throws IOException { + byte[] buf = new byte[pieceLenght(fileId, pieceId)]; + RandomAccessFile file = getOrCreateFile(fileId); + synchronized (file) { + file.seek(pieceOffset(fileId, pieceId)); + file.readFully(buf); + } + return buf; + } + + public void putPiece(int fileId, int pieceId, byte[] buf) throws IOException { + if (buf.length != pieceLenght(fileId, pieceId)) { + throw new IllegalStateException("Attemp to put block of incorrect size"); + } + RandomAccessFile file = getOrCreateFile(fileId); + synchronized (file) { + file.seek(pieceOffset(fileId, pieceId)); + file.write(buf); + } + } + + @Override + public void close() throws IOException { + files.forEach((i, f) -> { + try { + f.close(); + } catch (IOException e) { + e.printStackTrace(); + } + }); + } + +} diff --git a/torrent/src/main/java/torrent/client/GetHandler.java b/torrent/src/main/java/torrent/client/GetHandler.java new file mode 100644 index 0000000..379bad1 --- /dev/null +++ b/torrent/src/main/java/torrent/client/GetHandler.java @@ -0,0 +1,40 @@ +package torrent.client; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.net.InetSocketAddress; + +import torrent.common.ConcreteTaskHandler; +import torrent.common.ServerRequestHandler.MessageProcessStatus; + +public class GetHandler implements ConcreteTaskHandler { + + private final FilesHolder fHolder; + + GetHandler(FilesHolder fHolder) { + this.fHolder = fHolder; + } + + @Override + public MessageProcessStatus computeResult(DataInputStream in, DataOutputStream out, InetSocketAddress clientInf) { + try { + int id = in.readInt(); + int partNum = in.readInt(); + + if (partNum >= fHolder.numParts(id)) { + return MessageProcessStatus.ERROR; + } + + out.write(fHolder.getPiece(id, partNum)); + + return MessageProcessStatus.SUCCESS; + } catch (EOFException e) { + return MessageProcessStatus.INCOMPLETE; + } catch (IOException e) { + return MessageProcessStatus.ERROR; + } + } + +} diff --git a/torrent/src/main/java/torrent/client/Main.java b/torrent/src/main/java/torrent/client/Main.java new file mode 100644 index 0000000..8eb93b0 --- /dev/null +++ b/torrent/src/main/java/torrent/client/Main.java @@ -0,0 +1,24 @@ +package torrent.client; + +import java.io.IOException; + +public class Main { + + public static void main(String[] args) throws IOException { + + String host = args.length == 0 ? "localhost" : args[0]; + + Thread t = new Thread(new MainInner(host, 8082, System.out, System.in, 100000, new FilesHolder("torrentData"))); + t.start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + System.out.println("\nSaving client state..."); + t.interrupt(); + } + }); + + } + +} diff --git a/torrent/src/main/java/torrent/client/MainInner.java b/torrent/src/main/java/torrent/client/MainInner.java new file mode 100644 index 0000000..3dd1f58 --- /dev/null +++ b/torrent/src/main/java/torrent/client/MainInner.java @@ -0,0 +1,85 @@ +package torrent.client; + +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import torrent.common.ConcreteTaskHandler; +import torrent.common.ServerProcess; + +public class MainInner implements Runnable { + + private final String host; + private final PrintStream out; + private final InputStream sinp; + private final long updateTime; + private final FilesHolder filesHolder; + private final int myport; + + public MainInner(String host, int port, PrintStream out, InputStream sinp, long updateTime, FilesHolder fh) { + this.host = host; + this.out = out; + this.sinp = sinp; + this.updateTime = updateTime; + this.filesHolder = fh; + this.myport = port; + } + + @Override + public void run() { + Thread server = null, updater = null, replTh = null; + try { + final SocketAddress toServer = new InetSocketAddress(host, 8081); + + server = new Thread(new ServerProcess( + myport, + new ConcreteTaskHandler[] { + new StatHandler(filesHolder), + new GetHandler(filesHolder) + })); + + updater = new Thread(new UpdatePerformer(filesHolder, toServer, myport, updateTime)); + + REPL repl = new REPL( + filesHolder, + new FilesDownloader(filesHolder, toServer), + toServer, + out, + sinp); + + replTh = new Thread(repl::startRepl); + + server.start(); + updater.start(); + replTh.start(); + + while (true) { + Thread.sleep(10000); + } + } catch (IOException e) { + out.println(e.getMessage()); + } catch (InterruptedException e) { + } finally { + server.interrupt(); + updater.interrupt(); + replTh.interrupt(); + + try { + server.join(); + updater.join(); + replTh.join(); + } catch (InterruptedException e2) { + e2.printStackTrace(); + } + + try { + filesHolder.save(); + filesHolder.close(); + } catch (IOException e1) { + out.println("Saving client state failed"); + out.print(e1.getMessage()); + } + } + } +} diff --git a/torrent/src/main/java/torrent/client/PieceDownloader.java b/torrent/src/main/java/torrent/client/PieceDownloader.java new file mode 100644 index 0000000..c293f24 --- /dev/null +++ b/torrent/src/main/java/torrent/client/PieceDownloader.java @@ -0,0 +1,66 @@ +package torrent.client; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; +import java.util.concurrent.Semaphore; + +public class PieceDownloader implements CompletionHandler { + private int fileId; + private int pieceLength; + private int pieceIdx; + private SingleFileDownloader singleFileDownloader; + + private ByteBuffer buffer; + private final Semaphore pieceSemaphore; + + public PieceDownloader(SingleFileDownloader filesDownloader, + int fileId, int pieceIdx, + Semaphore pieceSemaphore) { + this.singleFileDownloader = filesDownloader; + this.fileId = fileId; + + this.pieceIdx = pieceIdx; + + this.pieceLength = filesDownloader.filesHolder.pieceLenght(fileId, pieceIdx); + + buffer = ByteBuffer.wrap(new byte[pieceLength]); + + this.pieceSemaphore = pieceSemaphore; + } + + public ByteBuffer getBuffer() { + return buffer; + } + + @Override + public void completed(Integer result, AsynchronousSocketChannel attachment) { + if (buffer.hasRemaining()) { + attachment.read(buffer, attachment, this); + } else { + try { + singleFileDownloader.filesHolder.putPiece(fileId, pieceIdx, buffer.array()); + singleFileDownloader.filesHolder.completePieces.get(fileId).add(pieceIdx); + singleFileDownloader.checkIfComplete(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + try { + attachment.close(); + } catch (IOException e) { + e.printStackTrace(); + } + pieceSemaphore.release(); + } + } + } + + @Override + public void failed(Throwable exc, AsynchronousSocketChannel attachment) { + pieceSemaphore.release(); + System.err.println("PieceDownloader failed"); + } + + +} diff --git a/torrent/src/main/java/torrent/client/REPL.java b/torrent/src/main/java/torrent/client/REPL.java new file mode 100644 index 0000000..7606abe --- /dev/null +++ b/torrent/src/main/java/torrent/client/REPL.java @@ -0,0 +1,217 @@ +package torrent.client; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.net.SocketAddress; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.InputMismatchException; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Scanner; + +public class REPL { + + private final FilesHolder filesHolder; + private final FilesDownloader downloader; + + private Map listedFilesSize = new HashMap<>(); + private Map listedFilesName = new HashMap<>(); + + private SocketAddress toServer; + private final PrintStream out; + private final InputStream inStream; + + public REPL( + FilesHolder filesHolder, + FilesDownloader downloader, + SocketAddress toServer, + PrintStream out, + InputStream inStream) { + this.filesHolder = filesHolder; + this.downloader = downloader; + this.toServer = toServer; + this.out = out; + this.inStream = inStream; + } + + void printLineForExistingFile(int id, String printName) { + out.print(id + " " + printName + " "); + switch (filesHolder.fileStatus.get(id)) { + case Complete: + { + out.print("complete"); + break; + } + case Downloading: + { + out.print("->"); + break; + } + case Paused: + { + out.print("||"); + break; + } + } + out.println(" " + filesHolder.completePieces.get(id).size() + "/" + + filesHolder.numParts(id)); + } + + void printStatus() throws IOException { + + if (filesHolder.fileStatus.isEmpty()) { + out.println("No files yet"); + } + + for (Integer id : filesHolder.fileStatus.keySet()) { + printLineForExistingFile(id, filesHolder.filePaths.get(id)); + } + } + + void listAvaliableFiles() { + try { + ServerFilesLister.list(toServer, listedFilesSize, listedFilesName); + + if (listedFilesName.isEmpty()) { + out.println("No files avaliable on server"); + } + for (int id : listedFilesName.keySet()) { + if (!filesHolder.fileStatus.containsKey(id)) { + out.print(id + " " + listedFilesName.get(id) + " "); + out.println(0 + "/" + getNPieces(listedFilesSize.get(id))); + } else { + printLineForExistingFile(id, listedFilesName.get(id)); + } + } + + } catch (IOException e) { + out.println("Failed to list avaliable files.\n" + e.getMessage()); + } + } + + void startDownload(int id, String filename) throws FileNotFoundException, FileProblemException, IOException { + filename = filename.trim(); + Long size = listedFilesSize.get(id); + if (size == null) { + out.println("Unknown file id. Make list first."); + return; + } + filesHolder.addFileToDownload(id, size, filename); + downloader.startFileDownload(id); + } + + void deleteFile(int id) { + try { + downloader.stopFileDownload(id); + } catch (IllegalStateException e) {} + + filesHolder.deleteFile(id); + } + + void publishFile(String path) throws FileProblemException { + Path pathTo = Paths.get(path.trim()); + try { + if (filesHolder.filePaths.containsValue(pathTo.toString())) { + throw new FileProblemException("file with specified path used"); + } + + int id = Uploader.uploadAndGetId(toServer, pathTo); + + filesHolder.addExistingFile(id, pathTo); + out.println("The file has an id " + id); + } catch (FileNotFoundException e) { + //e.printStackTrace(); + out.println("Failed to publish file.\n" + + pathTo + " not exists."); + } catch (IOException e) { + //e.printStackTrace(); + out.println("Failed to publish file.\n" + e.getMessage()); + } + } + + int getNPieces(long size) { + return Math.toIntExact((size - 1 + filesHolder.pieceSize) / filesHolder.pieceSize); + } + + public final static String helpMessage = + "This is torrent client.\n" + + "commands:\n" + + "list\n" + + "publish \n" + + "delete \n" + + "get \n" + + "pause \n" + + "resume \n" + + "status\n" + + "help"; + + public void startRepl() { + + try (Scanner in = new Scanner(inStream)) { + while (true) { + try { + out.print(">"); + String command = in.next(); + switch (command) { + case "help": + { + out.println(helpMessage); + } + break; + case "list": + { + listAvaliableFiles(); + } + break; + case "publish": + { + publishFile(in.nextLine()); + } + break; + case "delete": + { + deleteFile(in.nextInt()); + } + break; + case "get": + { + startDownload(in.nextInt(), in.next()); + } + break; + case "pause": + { + downloader.stopFileDownload(in.nextInt()); + } + break; + case "resume": + { + downloader.startFileDownload(in.nextInt()); + } + break; + case "status": + { + printStatus(); + } + break; + + default: + { + out.println("unknown command " + command); + } + } + } catch (IOException | FileProblemException | IllegalStateException e) { + out.println(e.getMessage()); + } catch (InputMismatchException imme) { + in.next(); + out.println("Input mismatch"); + } catch (NoSuchElementException e) { + return; + } + } + } + } +} diff --git a/torrent/src/main/java/torrent/client/ServerFilesLister.java b/torrent/src/main/java/torrent/client/ServerFilesLister.java new file mode 100644 index 0000000..31c9536 --- /dev/null +++ b/torrent/src/main/java/torrent/client/ServerFilesLister.java @@ -0,0 +1,47 @@ +package torrent.client; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.Socket; +import java.net.SocketAddress; +import java.util.Map; + +public class ServerFilesLister { + public static void list(SocketAddress addr, Map fileSizes, Map filesNames) throws IOException { + Socket s = null; + DataOutputStream dout = null; + DataInputStream dinp = null; + try { + s = new Socket(); + s.connect(addr); + dout = new DataOutputStream(s.getOutputStream()); + dinp = new DataInputStream(s.getInputStream()); + + dout.writeByte(1); + dout.flush(); + + fileSizes.clear(); + filesNames.clear(); + + int count = dinp.readInt(); + + for (int i = 0; i < count; i++) { + int id = dinp.readInt(); + String name = dinp.readUTF(); + long size = dinp.readLong(); + + fileSizes.put(id, size); + filesNames.put(id, name); + } + } catch (IOException e) { + if (s == null || dinp == null || dout == null) { + throw new IOException("Failed to connect to server"); + } + throw e; + } finally { + if (s != null) + s.close(); + } + } +} diff --git a/torrent/src/main/java/torrent/client/SingleFileDownloader.java b/torrent/src/main/java/torrent/client/SingleFileDownloader.java new file mode 100644 index 0000000..647d004 --- /dev/null +++ b/torrent/src/main/java/torrent/client/SingleFileDownloader.java @@ -0,0 +1,233 @@ +package torrent.client; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; + +import torrent.client.FilesHolder.FileStatus; + +public class SingleFileDownloader implements Runnable { + + private SocketAddress srvAddr; + final FilesHolder filesHolder; + private final int fileId; + private final int numPieces; + private final FilesDownloader filesDownloader; + + private Random rand = new Random(3); + + private List fileSources = new ArrayList<>(); + private Map> pieceSources = new HashMap<>(); + private Map pieceChannels = new HashMap<>(); + + private final int numOfActivePieces = 5; + private final ExecutorService pieceQueue = Executors.newFixedThreadPool(numOfActivePieces); + private final Semaphore pieceSemaphore = new Semaphore(numOfActivePieces); + + public SingleFileDownloader(SocketAddress srvAddr, FilesHolder stm, int fileId, FilesDownloader fdl) { + this.srvAddr = srvAddr; + this.filesHolder = stm; + this.fileId = fileId; + + numPieces = stm.numParts(fileId); + + this.filesDownloader = fdl; + } + + private void updateFileSources() { + DataOutputStream out; + DataInputStream in; + try (Socket toServer = new Socket()) { + toServer.connect(srvAddr); + out = new DataOutputStream(toServer.getOutputStream()); + in = new DataInputStream(toServer.getInputStream()); + out.writeByte(3); + out.writeInt(fileId); + + int clientsCount = in.readInt(); + fileSources.clear(); + byte[] ipBuf = new byte[4]; + for (int i = 0; i < clientsCount; i++) { + in.readFully(ipBuf); + + fileSources.add(new InetSocketAddress(InetAddress.getByAddress(ipBuf), + in.readShort())); + } + } catch (IOException e) { + System.out.println("SourceUpdater: creation of output stream failed while update file sources"); + } + } + + private void updatePieceSources() { + DataOutputStream out; + DataInputStream in; + + pieceSources.clear(); + for (int i = 0; i < numPieces; i++) { + pieceSources.put(i, new ArrayList<>()); + } + + for (SocketAddress addr : fileSources) { + try (Socket othClient = new Socket()) { + othClient.connect(addr, 10000); + out = new DataOutputStream(othClient.getOutputStream()); + in = new DataInputStream(othClient.getInputStream()); + + out.writeByte(1); + out.writeInt(fileId); + + int partCount = in.readInt(); + + for (int i = 0; i < partCount; i++) { + int partNum = in.readInt(); + pieceSources.get(partNum).add(addr); + } + + } catch (IOException e) { + System.out.println("SourceUpdater: failed in updatePieceSources"); + } + } + } + + + private boolean getRequest(AsynchronousSocketChannel chan, int fileId, int partNum) throws IOException, InterruptedException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dout = new DataOutputStream(baos); + + dout.writeByte(2); + dout.writeInt(fileId); + dout.writeInt(partNum); + + dout.close(); + ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); + + do { + try { + chan.write(bb).get(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + } while (bb.hasRemaining()); + + return true; + } + + private void dispatchPieceDownloaders() throws InterruptedException { + for (int i = 0; i < numPieces; i++) { + if (filesHolder.completePieces.get(fileId).contains(i)) { + pieceChannels.remove(i); + continue; + } + + AsynchronousSocketChannel chan = pieceChannels.get(i); + if (chan != null) { + if (!chan.isOpen()) { + pieceChannels.remove(i); + } else { + continue; + } + } + + List sources = pieceSources.get(i); + if (!sources.isEmpty()) { + int rIdx = rand.nextInt(sources.size()); + + int nPiece = i; + pieceQueue.execute(() -> { + try { + pieceSemaphore.acquire(); + } catch (InterruptedException e) { + return; + } + AsynchronousSocketChannel pieceChan; + try { + pieceChan = AsynchronousSocketChannel.open(); + pieceChan.connect(sources.get(rIdx)).get(); + if (!getRequest(pieceChan, fileId, nPiece)) { + System.out.println("SFD: get request failed"); + return; + } + } catch (IOException e) { + e.printStackTrace(); + return; + } catch (InterruptedException e) { + return; + } catch (ExecutionException e) { + return; + } finally { + pieceSemaphore.release(); + } + + PieceDownloader pdl = new PieceDownloader(this, fileId, nPiece, pieceSemaphore); + pieceChan.read(pdl.getBuffer(), pieceChan, pdl); + pieceChannels.put(nPiece, pieceChan); + }); + + } + } + } + + boolean checkIfComplete() { + if (filesHolder.completePieces.get(fileId).size() < filesHolder.numParts(fileId)) { + return false; + } + try { + filesDownloader.stopFileDownload(fileId); + filesHolder.fileStatus.put(fileId, FileStatus.Complete); + filesHolder.save(); + } catch (IOException e){ + e.printStackTrace(); + } + return true; + } + + private void closeAllChannels() { + pieceQueue.shutdownNow(); + pieceChannels.forEach((i, ch) -> { + try { + ch.close(); + } catch (IOException e1) { + e1.printStackTrace(); + } + }); + } + + @Override + public void run() { + try { + while (true) { + if (checkIfComplete()) { + return; + } + + updateFileSources(); + updatePieceSources(); + + try { + dispatchPieceDownloaders(); + Thread.sleep(10000); + } catch (InterruptedException e) { + return; + } + } + } finally { + closeAllChannels(); + } + } +} diff --git a/torrent/src/main/java/torrent/client/StatHandler.java b/torrent/src/main/java/torrent/client/StatHandler.java new file mode 100644 index 0000000..aa429d1 --- /dev/null +++ b/torrent/src/main/java/torrent/client/StatHandler.java @@ -0,0 +1,47 @@ +package torrent.client; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Set; + +import torrent.common.ConcreteTaskHandler; +import torrent.common.ServerRequestHandler.MessageProcessStatus; + +public class StatHandler implements ConcreteTaskHandler { + + private final FilesHolder fHolder; + + StatHandler(FilesHolder fHolder) { + this.fHolder = fHolder; + } + + @Override + public MessageProcessStatus computeResult(DataInputStream in, DataOutputStream out, InetSocketAddress clientInf) { + try { + int id = in.readInt(); + + Set pieces = fHolder.completePieces.get(id); + + if (pieces == null) { + return MessageProcessStatus.ERROR; + } + + out.writeInt(pieces.size()); + + for (int i : pieces) { + out.writeInt(i); + } + + return MessageProcessStatus.SUCCESS; + } catch (EOFException e) { + return MessageProcessStatus.INCOMPLETE; + } catch (IOException e) { + return MessageProcessStatus.ERROR; + } + + } + +} diff --git a/torrent/src/main/java/torrent/client/UpdatePerformer.java b/torrent/src/main/java/torrent/client/UpdatePerformer.java new file mode 100644 index 0000000..0a38b89 --- /dev/null +++ b/torrent/src/main/java/torrent/client/UpdatePerformer.java @@ -0,0 +1,60 @@ +package torrent.client; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.Socket; +import java.net.SocketAddress; + +public class UpdatePerformer implements Runnable { + + private SocketAddress toServer; + private FilesHolder filesHolder; + private final int myPort; + + private final long updateTime; + public UpdatePerformer(FilesHolder filesHolder, SocketAddress toServer, int myPort, long updateTime) { + this.filesHolder = filesHolder; + this.toServer = toServer; + this.myPort = myPort; + this.updateTime = updateTime; + } + + @Override + public void run() { + + while(true) { + Socket s = null; + DataOutputStream out = null; + try { + s = new Socket(); + s.connect(toServer); + out = new DataOutputStream(s.getOutputStream()); + + out.writeByte(4); + out.writeShort(myPort); + + out.writeInt(filesHolder.completePieces.size()); + for (Integer fileId : filesHolder.completePieces.keySet()) { + out.writeInt(fileId); + } + + } catch (IOException e1) { + System.out.println("\nUpdate to server failed"); + } finally { + try { + out.close(); + s.close(); + } catch (Exception e) { + } + + } + + try { + Thread.sleep(updateTime); + } catch (InterruptedException e) { + return; + } + } + } +} + diff --git a/torrent/src/main/java/torrent/client/Uploader.java b/torrent/src/main/java/torrent/client/Uploader.java new file mode 100644 index 0000000..dd4ec9a --- /dev/null +++ b/torrent/src/main/java/torrent/client/Uploader.java @@ -0,0 +1,46 @@ +package torrent.client; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; + +public class Uploader { + public static int uploadAndGetId(SocketAddress addr, Path filePath) throws IOException { + Socket s = null; + DataOutputStream dout = null; + DataInputStream dinp = null; + try { + s = new Socket(); + + s.connect(addr); + + dout = new DataOutputStream(s.getOutputStream()); + dinp = new DataInputStream(s.getInputStream()); + + if (!Files.exists(filePath)) { + throw new FileNotFoundException(); + } + + long len = filePath.toFile().length(); + dout.writeByte(2); + dout.writeUTF(filePath.getFileName().toString()); + dout.writeLong(len); + dout.flush(); + + return dinp.readInt(); + } catch (IOException e) { + if (s == null || dinp == null || dout == null) { + throw new IOException("Failed to connect to server"); + } + throw e; + } finally { + if (s != null) + s.close(); + } + } +} diff --git a/torrent/src/main/java/torrent/common/ConcreteTaskHandler.java b/torrent/src/main/java/torrent/common/ConcreteTaskHandler.java new file mode 100644 index 0000000..4514a21 --- /dev/null +++ b/torrent/src/main/java/torrent/common/ConcreteTaskHandler.java @@ -0,0 +1,10 @@ +package torrent.common; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.net.InetSocketAddress; +import torrent.common.ServerRequestHandler.MessageProcessStatus; + +public interface ConcreteTaskHandler { + MessageProcessStatus computeResult(DataInputStream in, DataOutputStream out, InetSocketAddress clientInf); +} diff --git a/torrent/src/main/java/torrent/common/FileInformation.java b/torrent/src/main/java/torrent/common/FileInformation.java new file mode 100644 index 0000000..01fbe2c --- /dev/null +++ b/torrent/src/main/java/torrent/common/FileInformation.java @@ -0,0 +1,14 @@ +package torrent.common; + +public class FileInformation { + public int id; + public String name; + public long size; + + public FileInformation() {}; + public FileInformation(int id2, String name2, long size2) { + id = id2; + name = name2; + size = size2; + } +} diff --git a/torrent/src/main/java/torrent/common/RequestCompletionHandler.java b/torrent/src/main/java/torrent/common/RequestCompletionHandler.java new file mode 100644 index 0000000..7835989 --- /dev/null +++ b/torrent/src/main/java/torrent/common/RequestCompletionHandler.java @@ -0,0 +1,69 @@ +package torrent.common; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; + +import torrent.common.ServerRequestHandler.MessageProcessStatus; + +public class RequestCompletionHandler implements CompletionHandler { + private AsynchronousSocketChannel clientChannel; + + public RequestCompletionHandler(AsynchronousSocketChannel sch) { + clientChannel = sch; + } + + @Override + public void completed(Integer result, ServerRequestHandler handler) { + MessageProcessStatus status; + + try { + status = handler.messageProcessAttemp((InetSocketAddress) clientChannel.getRemoteAddress()); + } catch (IOException e) { + return; + } + + if (status == MessageProcessStatus.SUCCESS) { + ByteBuffer toTransmit = handler.getTransmittingBuffer(); + clientChannel.write(toTransmit, null, new CompletionHandler() { + + @Override + public void completed(Integer result, Object attachment) { + if (toTransmit.hasRemaining()) { + clientChannel.write(toTransmit, null, this); + } else { + try { + clientChannel.close(); + } catch (IOException e) { + System.out.println("RequestCompletionHandler: Error while closing channel: " + e.getMessage()); + } + } + } + + @Override + public void failed(Throwable exc, Object attachment) { + System.out.println("Fail while transmitting data: " + exc.getMessage()); + } + }); + } + + if (status == MessageProcessStatus.INCOMPLETE){ + clientChannel.read(handler.getReceivingBuffer(), handler, this); + } + + if (status == MessageProcessStatus.ERROR) { + try { + clientChannel.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + @Override + public void failed(Throwable exc, ServerRequestHandler attachment) { + System.out.println("Fail while receiving data: " + exc.getMessage()); + } +} diff --git a/torrent/src/main/java/torrent/common/ServerProcess.java b/torrent/src/main/java/torrent/common/ServerProcess.java new file mode 100644 index 0000000..dc7e5ed --- /dev/null +++ b/torrent/src/main/java/torrent/common/ServerProcess.java @@ -0,0 +1,53 @@ +package torrent.common; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.AsynchronousServerSocketChannel; +import java.nio.channels.AsynchronousSocketChannel; +import java.util.concurrent.ExecutionException; + +public class ServerProcess implements Runnable { + + final int myPort; + + private final AsynchronousServerSocketChannel srvChannel = AsynchronousServerSocketChannel.open(); + + private final ConcreteTaskHandler[] concreteHandlers; + + public ServerProcess(int port, ConcreteTaskHandler[] concreteHandlers) throws IOException { + myPort = port; + this.concreteHandlers = concreteHandlers; + srvChannel.bind(new InetSocketAddress(myPort)); + } + + @Override + public void run() { + while (true) { + AsynchronousSocketChannel clientChannel = null; + try { + clientChannel = srvChannel.accept().get(); + } catch (InterruptedException e) { + try { + srvChannel.close(); + } catch (IOException e1) { + System.out.println("Exception while closing srvChannel " + e1.getMessage());; + } + return; + } catch (ExecutionException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + ServerRequestHandler handler = new ServerRequestHandler(concreteHandlers); + + try { + clientChannel.read(handler.getReceivingBuffer(), handler, + new RequestCompletionHandler(clientChannel)); + } catch (Exception e) { + System.out.println("Server: Exception while reading from channel."); + } + } + + } + +} diff --git a/torrent/src/main/java/torrent/common/ServerRequestHandler.java b/torrent/src/main/java/torrent/common/ServerRequestHandler.java new file mode 100644 index 0000000..07f9254 --- /dev/null +++ b/torrent/src/main/java/torrent/common/ServerRequestHandler.java @@ -0,0 +1,64 @@ +package torrent.common; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +public class ServerRequestHandler{ + + private ByteBuffer inputBuffer = ByteBuffer.allocate(1000); + private ByteBuffer outputBuffer; + + public enum MessageProcessStatus {INCOMPLETE, ERROR, SUCCESS}; + + private ConcreteTaskHandler[] handlers; + + public ServerRequestHandler(ConcreteTaskHandler[] handlers) { + this.handlers = handlers; + } + + public ByteBuffer getReceivingBuffer() { + return inputBuffer; + } + + public MessageProcessStatus messageProcessAttemp(InetSocketAddress clientInf) throws IOException { + try ( + DataInputStream dInp = new DataInputStream( + new ByteArrayInputStream(inputBuffer.array(), + 0, + inputBuffer.position())); + + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + DataOutputStream dOut = new DataOutputStream(bout); + ) { + byte typeIndex; + try { + typeIndex = dInp.readByte(); + } catch (IOException e) { + return MessageProcessStatus.INCOMPLETE; + } + + if (typeIndex > handlers.length || typeIndex < 1) { + System.out.println("Operation index is bad"); + return MessageProcessStatus.ERROR; + } + + MessageProcessStatus status = handlers[typeIndex - 1].computeResult(dInp, dOut, clientInf); + if (status != MessageProcessStatus.SUCCESS) { + return status; + } + + outputBuffer = ByteBuffer.wrap(bout.toByteArray()); + return status; + } + } + + public ByteBuffer getTransmittingBuffer() { + return outputBuffer; + } + +} diff --git a/torrent/src/main/java/torrent/server/AbstractServerTaskHandler.java b/torrent/src/main/java/torrent/server/AbstractServerTaskHandler.java new file mode 100644 index 0000000..83e6e52 --- /dev/null +++ b/torrent/src/main/java/torrent/server/AbstractServerTaskHandler.java @@ -0,0 +1,21 @@ +package torrent.server; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.net.InetSocketAddress; +import torrent.common.ConcreteTaskHandler; +import torrent.common.ServerRequestHandler.MessageProcessStatus; + +public abstract class AbstractServerTaskHandler implements ConcreteTaskHandler{ + protected final StorageManager storage; + + public AbstractServerTaskHandler(StorageManager storage) { + this.storage = storage; + } + + @Override + public abstract MessageProcessStatus computeResult( + DataInputStream in, + DataOutputStream out, + InetSocketAddress clientInf); +} diff --git a/torrent/src/main/java/torrent/server/ListHandler.java b/torrent/src/main/java/torrent/server/ListHandler.java new file mode 100644 index 0000000..f7deb1d --- /dev/null +++ b/torrent/src/main/java/torrent/server/ListHandler.java @@ -0,0 +1,40 @@ +package torrent.server; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import torrent.common.FileInformation; +import torrent.common.ServerRequestHandler.MessageProcessStatus; + +public class ListHandler extends AbstractServerTaskHandler { + + public ListHandler(StorageManager stm) { + super(stm); + } + + @Override + public MessageProcessStatus computeResult(DataInputStream in, DataOutputStream out, InetSocketAddress clientInf){ + try { + + List cl = new ArrayList<>(storage.data.map.values()); + + out.writeInt(cl.size()); + + for(FileInformation d : cl) { + out.writeInt(d.id); + out.writeUTF(d.name); + out.writeLong(d.size); + } + + } catch (IOException e) { + return MessageProcessStatus.ERROR; + } + + return MessageProcessStatus.SUCCESS; + } + +} diff --git a/torrent/src/main/java/torrent/server/Main.java b/torrent/src/main/java/torrent/server/Main.java new file mode 100644 index 0000000..63776d1 --- /dev/null +++ b/torrent/src/main/java/torrent/server/Main.java @@ -0,0 +1,19 @@ +package torrent.server; + +public class Main { + public static void main(String[] args) { + + Thread t = new Thread(new MainInner(5 * 60 * 1000)); + t.start(); + + System.out.println("Server started"); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + System.out.println("\nSaving server state..."); + t.interrupt(); + } + }); + } +} diff --git a/torrent/src/main/java/torrent/server/MainInner.java b/torrent/src/main/java/torrent/server/MainInner.java new file mode 100644 index 0000000..dad933a --- /dev/null +++ b/torrent/src/main/java/torrent/server/MainInner.java @@ -0,0 +1,60 @@ +package torrent.server; + +import java.io.IOException; + +import torrent.common.ConcreteTaskHandler; +import torrent.common.ServerProcess; + +public class MainInner implements Runnable { + + private final long updateMillis; + + public MainInner (long updateMillis) { + this.updateMillis = updateMillis; + } + + @Override + public void run() { + StorageManager storageManager = null; + Thread cleanThread = null; + Thread srvThread = null; + try { + storageManager = new StorageManager("serverFile"); + + cleanThread = new Thread(new OldClientsCleaner(storageManager, updateMillis)); + cleanThread.setDaemon(true); + cleanThread.start(); + + srvThread = new Thread(new ServerProcess( + 8081, + new ConcreteTaskHandler[] { + new ListHandler(storageManager), + new UploadHandler(storageManager), + new SourcesHandler(storageManager), + new UpdateHandler(storageManager)})); + + srvThread.setDaemon(true); + srvThread.start(); + + while (true) { + Thread.sleep(10000); + } + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + } finally { + try { + cleanThread.interrupt(); + srvThread.interrupt(); + cleanThread.join(); + srvThread.join(); + storageManager.save(); + } catch (IOException e1) { + e1.printStackTrace(); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + } + +} diff --git a/torrent/src/main/java/torrent/server/OldClientsCleaner.java b/torrent/src/main/java/torrent/server/OldClientsCleaner.java new file mode 100644 index 0000000..4dfbe7f --- /dev/null +++ b/torrent/src/main/java/torrent/server/OldClientsCleaner.java @@ -0,0 +1,43 @@ +package torrent.server; + +public class OldClientsCleaner implements Runnable { + + public final long updateTime;// = 5 * 60 * 1000; + + private StorageManager storage; + + public OldClientsCleaner(StorageManager storage, long updateTimeMillis) { + this.storage = storage; + updateTime = updateTimeMillis; + } + + @Override + public void run() { + while (true) { + try { + Thread.sleep(updateTime); + } catch (InterruptedException e) { + return; + } + + long currTime = System.currentTimeMillis(); + + storage.lastClientUpdate + .entrySet() + .removeIf(ent -> currTime - ent.getValue() > updateTime); + + storage.clients.values().forEach(set -> + set.removeIf(s -> { + return !storage.lastClientUpdate.containsKey(s); + })); + + try { + storage.save(); + } catch (Exception e) { + System.out.println("Cleaner: error while saving. " + e.getMessage()); + } + + } + } + +} diff --git a/torrent/src/main/java/torrent/server/ServerData.java b/torrent/src/main/java/torrent/server/ServerData.java new file mode 100644 index 0000000..2bae9cb --- /dev/null +++ b/torrent/src/main/java/torrent/server/ServerData.java @@ -0,0 +1,11 @@ +package torrent.server; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import torrent.common.FileInformation; + +public class ServerData { + public int filesCount = 0; + public Map map = new ConcurrentHashMap<>(); +} diff --git a/torrent/src/main/java/torrent/server/SourcesHandler.java b/torrent/src/main/java/torrent/server/SourcesHandler.java new file mode 100644 index 0000000..c0b095f --- /dev/null +++ b/torrent/src/main/java/torrent/server/SourcesHandler.java @@ -0,0 +1,51 @@ +package torrent.server; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import torrent.common.ServerRequestHandler.MessageProcessStatus; + +public class SourcesHandler extends AbstractServerTaskHandler { + + public SourcesHandler(StorageManager stm) { + super(stm); + } + + @Override + public MessageProcessStatus computeResult(DataInputStream in, DataOutputStream out, InetSocketAddress clientInf) { + try { + int id = in.readInt(); + + if (!storage.data.map.containsKey(id)) { + out.writeInt(0); + return MessageProcessStatus.SUCCESS; + } + if (storage.clients.get(id) == null) { + storage.clients.put(id, ConcurrentHashMap.newKeySet()); + } + + Set clients = new HashSet<>(storage.clients.get(id)); + + out.writeInt(clients.size()); + + for (InetSocketAddress addr : storage.clients.get(id)) { + out.write(addr.getAddress().getAddress()); + out.writeShort(addr.getPort()); + } + + } catch (EOFException e){ + return MessageProcessStatus.INCOMPLETE; + } catch (IOException e){ + return MessageProcessStatus.ERROR; + } + + return MessageProcessStatus.SUCCESS; + } + +} diff --git a/torrent/src/main/java/torrent/server/StorageManager.java b/torrent/src/main/java/torrent/server/StorageManager.java new file mode 100644 index 0000000..4122465 --- /dev/null +++ b/torrent/src/main/java/torrent/server/StorageManager.java @@ -0,0 +1,45 @@ +package torrent.server; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import torrent.server.ServerData; + +public class StorageManager { + + private final Path savePath; + private final ObjectMapper mapper = new ObjectMapper(); + + //public final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + public volatile ServerData data = new ServerData(); + public volatile Map lastClientUpdate = new ConcurrentHashMap<>(); + public volatile Map> clients = new ConcurrentHashMap<>(); + + public StorageManager(String savePath) { + this.savePath = Paths.get(savePath); + try { + load(); + } catch (Exception e) { + e.printStackTrace(); + data = new ServerData(); + } + } + + public void save() throws IOException { + savePath.toFile().createNewFile(); + mapper.writeValue(savePath.toFile(), data); + } + + public void load() throws IOException { + if (savePath.toFile().exists()) + data = mapper.readValue(savePath.toFile(), ServerData.class); + } +} diff --git a/torrent/src/main/java/torrent/server/UpdateHandler.java b/torrent/src/main/java/torrent/server/UpdateHandler.java new file mode 100644 index 0000000..8f5ea0b --- /dev/null +++ b/torrent/src/main/java/torrent/server/UpdateHandler.java @@ -0,0 +1,58 @@ +package torrent.server; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.ConcurrentHashMap; + +import torrent.common.FileInformation; +import torrent.common.ServerRequestHandler.MessageProcessStatus; + +public class UpdateHandler extends AbstractServerTaskHandler { + + public UpdateHandler(StorageManager stm) { + super(stm); + } + + @Override + public MessageProcessStatus computeResult(DataInputStream in, DataOutputStream out, InetSocketAddress clientInf) { + try { + short clientPort = in.readShort(); + int count = in.readInt(); + + for (int i = 0; i < count; i++) { + int id = in.readInt(); + + InetSocketAddress clientAddr = new InetSocketAddress(clientInf.getAddress(), clientPort); + FileInformation fInf = storage.data.map.get(id); + + if (fInf == null) { + out.writeBoolean(false); + return MessageProcessStatus.SUCCESS; + } + + if (!storage.clients.containsKey(id)) { + new ConcurrentHashMap<>(); + storage.clients.put(id, ConcurrentHashMap.newKeySet()); + } + storage.clients.get(id).add(clientAddr); + storage.lastClientUpdate.put(clientAddr, System.currentTimeMillis()); + //System.out.println("Update handler: " + clientAddr + " added"); + } + storage.save(); + out.writeBoolean(true); + } catch (EOFException e){ + return MessageProcessStatus.INCOMPLETE; + } catch (IOException e) { + try { + out.writeBoolean(false); + } catch (IOException e1) {} + return MessageProcessStatus.ERROR; + } + + return MessageProcessStatus.SUCCESS; + } + +} diff --git a/torrent/src/main/java/torrent/server/UploadHandler.java b/torrent/src/main/java/torrent/server/UploadHandler.java new file mode 100644 index 0000000..2dcb9b6 --- /dev/null +++ b/torrent/src/main/java/torrent/server/UploadHandler.java @@ -0,0 +1,36 @@ +package torrent.server; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.net.InetSocketAddress; +import torrent.common.FileInformation; +import torrent.common.ServerRequestHandler.MessageProcessStatus; + +public class UploadHandler extends AbstractServerTaskHandler { + + public UploadHandler(StorageManager stm) { + super(stm); + } + + @Override + public MessageProcessStatus computeResult(DataInputStream in, DataOutputStream out, InetSocketAddress clientInf) { + try { + String name = in.readUTF(); + long size = in.readLong(); + + int id = storage.data.filesCount++; + storage.data.map.put(id, new FileInformation(id, name, size)); + storage.save(); + out.writeInt(id); + } catch (EOFException e) { + return MessageProcessStatus.INCOMPLETE; + } catch (IOException e){ + return MessageProcessStatus.ERROR; + } + + return MessageProcessStatus.SUCCESS; + } + +} diff --git a/torrent/src/test/java/torrent/InteractionTests.java b/torrent/src/test/java/torrent/InteractionTests.java new file mode 100644 index 0000000..2b5d3d2 --- /dev/null +++ b/torrent/src/test/java/torrent/InteractionTests.java @@ -0,0 +1,351 @@ +package torrent; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import torrent.client.FilesHolder; +import torrent.client.FilesHolder.FileStatus; +import torrent.client.REPL; +import torrent.server.MainInner; + +public class InteractionTests { + + Thread server; + + final int ncl = 3; + Thread[] cl = new Thread[ncl]; + + PrintWriter[] inp = new PrintWriter[ncl]; + ByteArrayOutputStream[] outp = new ByteArrayOutputStream[ncl]; + FilesHolder[] fh = new FilesHolder[ncl]; + Path root[] = new Path[ncl]; + + String getOutput(int i) throws InterruptedException { + Thread.sleep(1000); + String res = outp[i].toString(); + outp[i].reset(); + return res; + } + + void commandTo(int i, String s) { + inp[i].println(s); + inp[i].flush(); + } + + public void startServer() throws InterruptedException { + server = new Thread(new MainInner(3000)); + server.start(); + Thread.sleep(1000); + } + + public void initClient(int i) throws IOException { + outp[i] = new ByteArrayOutputStream(); + PipedInputStream pin = new PipedInputStream(); + inp[i] = new PrintWriter(new PipedOutputStream(pin)); + root[i] = Paths.get("torrentData/client" + i); + fh[i] = new FilesHolder(root[i].toString()); + cl[i] = new Thread(new torrent.client.MainInner("localhost", 8090 + i, + new PrintStream(outp[i]), + pin, + 2000, + fh[i])); + } + + final int testFileSize = 145 * (1 << 20); + public void createClientFiles() throws IOException { + byte[] arr = new byte[testFileSize]; + Random r = new Random(5); + r.nextBytes(arr); + for (int i = 0; i < 2; i++) { + Files.createDirectories(root[i]); + OutputStream out = new FileOutputStream(root[i].resolve("file1").toString()); + out.write(arr); + out.close(); + } + } + + @Before + public void setUp() throws InterruptedException, IOException { + //startServer(); + for (int i = 0; i < 3; i++) { + initClient(i); + } + createClientFiles(); + } + + + public void stopServer() throws IOException, InterruptedException { + if (server == null) + return; + System.out.println("Stopping server"); + server.interrupt(); + server.join(); + } + + public void stopClients() throws InterruptedException { + for (int i = 0; i < 3; i++) { + cl[i].interrupt(); + cl[i].join(); + } + } + + @After + public void cleanUp() throws IOException, InterruptedException { + stopServer(); + stopClients(); + Files.deleteIfExists(Paths.get("serverFile")); + FileUtils.deleteDirectory(Paths.get("torrentData").toFile()); + } + + @Test + public void testBaseCommands() throws InterruptedException { + cl[0].start(); + Thread.sleep(2000); + assertEquals(">", getOutput(0)); + + commandTo(0, "help"); + assertEquals(REPL.helpMessage + "\n>", getOutput(0)); + + commandTo(0, "delete a"); + assertEquals("Input mismatch\n>", getOutput(0)); + + commandTo(0, "list"); + assertEquals( + "Failed to list avaliable files.\n" + + "Failed to connect to server\n" + + ">", getOutput(0)); + + commandTo(0, "status"); + assertEquals("No files yet\n>", getOutput(0)); + + commandTo(0, "publish nonExstFile"); + assertEquals("Failed to publish file.\n" + + "Failed to connect to server\n>", getOutput(0)); + + commandTo(0, "pause 1"); + assertEquals("This file wasn't been downloading\n>", getOutput(0)); + + commandTo(0, "delete 1"); + assertEquals("File isn't known.\n>", getOutput(0)); + + commandTo(0, "get 1 torrentData/client0/file1"); + assertEquals("Unknown file id. Make list first.\n>", getOutput(0)); + + //################################ + startServer(); + //################################ + + commandTo(0, "list"); + assertEquals("No files avaliable on server\n>", getOutput(0)); + + commandTo(0, "status"); + assertEquals("No files yet\n>", getOutput(0)); + + commandTo(0, "publish nonExstFile"); + assertEquals("Failed to publish file.\n" + + "nonExstFile not exists.\n>", getOutput(0)); + + commandTo(0, "publish " + root[0].resolve("file1")); + getOutput(0); + commandTo(0, "list"); + assertEquals("0 file1 complete 15/15\n>", getOutput(0)); + + commandTo(0, "publish " + root[0].resolve("file1")); + assertEquals("file with specified path used\n" + + ">", getOutput(0)); + + commandTo(0, "list"); + assertEquals("0 file1 complete 15/15\n>", getOutput(0)); + } + + @Test + public void testDownloadFromSinglePeer() throws InterruptedException, IOException { + startServer(); + + cl[0].start(); + + commandTo(0, "publish " + root[0].resolve("file1")); + Thread.sleep(1000); + assertEquals(">The file has an id 0\n" + + ">", getOutput(0)); + + commandTo(0, "status"); + assertEquals("0 torrentData/client0/file1 complete 15/15" + + "\n>", getOutput(0)); + + + commandTo(0, "status"); + assertEquals("0 torrentData/client0/file1 complete 15/15" + + "\n>", getOutput(0)); + + commandTo(0, "list"); + assertEquals("0 file1 complete 15/15\n" + + ">", getOutput(0)); + + cl[2].start(); + assertEquals(">", getOutput(2)); + + commandTo(2, "list"); + assertEquals("0 file1 0/15\n" + + ">", getOutput(2)); + + cl[0].interrupt(); + cl[0].join(); + + commandTo(2, "list"); + assertEquals("0 file1 0/15\n" + + ">", getOutput(2)); + + commandTo(2, "get 0 " + root[2].resolve("file1_copy")); + assertEquals(">", getOutput(2)); + + commandTo(2, "status"); + assertEquals("0 torrentData/client2/file1_copy -> 0/15\n" + + ">", getOutput(2)); + + cl[2].interrupt(); + cl[2].join(); + initClient(2); + cl[2].start(); + + getOutput(2); + commandTo(2, "status"); + assertEquals("0 torrentData/client2/file1_copy -> 0/15\n" + + ">", getOutput(2)); + + initClient(0); + cl[0].start(); + assertEquals(">", getOutput(0)); + + commandTo(0, "status"); + assertEquals("0 torrentData/client0/file1 complete 15/15" + + "\n>", getOutput(0)); + + String outp; + do { + commandTo(2, "status"); + outp = getOutput(2); + System.out.println(outp); + } while (!"0 torrentData/client2/file1_copy complete 15/15\n>".equals(outp)); + + cl[2].interrupt(); + cl[2].join(); + assertTrue(FileUtils.contentEquals(root[0].resolve("file1").toFile(), + Paths.get("torrentData/client2/file1_copy").toFile())); + } + + @Test + public void testDownloadFromTwoPeers() throws InterruptedException, FileNotFoundException, IOException { + startServer(); + + cl[0].start(); + cl[1].start(); + + commandTo(0, "publish " + root[0].resolve("file1")); + assertEquals(">The file has an id 0\n" + + ">", getOutput(0)); + fh[0] + .completePieces + .get(0) + .remove(0); + fh[0].fileStatus.put(0, FileStatus.Paused); + + fh[1].filePaths.put(0, root[1].resolve("file1").toString()); + { + Set pieces = ConcurrentHashMap.newKeySet(); + pieces.add(0); + fh[1].completePieces.put(0, pieces); + } + fh[1].fileStatus.put(0, FileStatus.Paused); + fh[1].fileSize.put(0, (long) testFileSize); + fh[1].writeMaps(); + fh[1].load(); + + + cl[2].start(); + Thread.sleep(3000); + commandTo(2, "list"); + Thread.sleep(1000); + commandTo(2, "get 0 " + root[2].resolve("file1_copy")); + + getOutput(2); + String outp; + do { + commandTo(2, "status"); + outp = getOutput(2); + System.out.println(outp); + Thread.sleep(10); + } while (!"0 torrentData/client2/file1_copy complete 15/15\n>".equals(outp)); + + cl[2].interrupt(); + cl[2].join(); + assertTrue(FileUtils.contentEquals(root[0].resolve("file1").toFile(), + Paths.get("torrentData/client2/file1_copy").toFile())); + } + + @Test + public void testPause() throws InterruptedException { + startServer(); + + cl[0].start(); + commandTo(0, "publish " + root[0].resolve("file1").toString()); + Thread.sleep(1000); + assertEquals(">The file has an id 0\n" + + ">", getOutput(0)); + + commandTo(0, "status"); + assertEquals("0 torrentData/client0/file1 complete 15/15\n>", getOutput(0)); + + commandTo(0, "pause 0"); + getOutput(0); + commandTo(0, "status"); + assertEquals("0 torrentData/client0/file1 complete 15/15\n>", getOutput(0)); + + cl[0].interrupt(); + cl[0].join(); + + cl[2].start(); + getOutput(2); + commandTo(2, "list"); + assertEquals("0 file1 0/15\n>", getOutput(2)); + commandTo(2, "status"); + assertEquals("No files yet\n>", getOutput(2)); + commandTo(2, "get 0 " + root[2].resolve("file1_copy")); + assertEquals(">", getOutput(2)); + commandTo(2, "status"); + assertEquals("0 torrentData/client2/file1_copy -> 0/15\n>", getOutput(2)); + + commandTo(2, "pause 0"); + getOutput(2); + + commandTo(2, "status"); + assertEquals("0 torrentData/client2/file1_copy || 0/15\n>", getOutput(2)); + + commandTo(2, "resume 0"); + getOutput(2); + + commandTo(2, "status"); + assertEquals("0 torrentData/client2/file1_copy -> 0/15\n>", getOutput(2)); + } +} diff --git a/torrent/src/test/java/torrent/server/ServerTests.java b/torrent/src/test/java/torrent/server/ServerTests.java new file mode 100644 index 0000000..15e3b1b --- /dev/null +++ b/torrent/src/test/java/torrent/server/ServerTests.java @@ -0,0 +1,275 @@ +package torrent.server; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.Socket; +import java.net.UnknownHostException; +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import torrent.server.MainInner; + + +public class ServerTests { + + Thread server; + + Socket socket = new Socket(); + + DataOutputStream out; + DataInputStream in; + + void establishConnection() throws IOException { + socket = new Socket("localhost", 8081); + out = new DataOutputStream(socket.getOutputStream()); + in = new DataInputStream(socket.getInputStream()); + } + + void closeConnection() throws IOException { + in.close(); + out.close(); + socket.close(); + } + + public void startServer() throws InterruptedException { + //Thread.sleep(1000); + server = new Thread(new MainInner(3000)); + server.start(); + Thread.sleep(1000); + } + + @Before + public void setUp() throws InterruptedException { + startServer(); + } + + + public void stopServer() throws IOException, InterruptedException { + System.out.println("Stopping server"); + server.interrupt(); + server.join(); + } + + @After + public void cleanUp() throws IOException, InterruptedException { + stopServer(); + Files.delete(Paths.get("serverFile")); + } + + @Test + public void testFantasy() throws UnknownHostException, IOException, InterruptedException { + + establishConnection(); + out.writeByte(1); //list + assertEquals(0, in.readInt()); + + closeConnection(); + establishConnection(); + + out.writeByte(2); //upload + out.writeUTF("file1.txt"); + + out.writeLong(11); + + assertEquals(0, in.readInt()); + + closeConnection(); + establishConnection(); + + out.writeByte(1); + assertEquals(1, in.readInt()); + assertEquals(0, in.readInt()); + assertEquals("file1.txt", in.readUTF()); + assertEquals(11, in.readLong()); + + closeConnection(); + establishConnection(); + System.out.println("Sources for non existing id"); + out.writeByte(3); //sources + out.writeInt(1); + + assertEquals(0, in.readInt()); + + closeConnection(); + establishConnection(); + + out.writeByte(3); + out.writeInt(0); + + assertEquals(0, in.readInt()); + + closeConnection(); + establishConnection(); + + out.writeByte(4); //update + out.writeShort(1234); + out.writeInt(1); + out.writeInt(0); + assertTrue(in.readBoolean()); + + closeConnection(); + establishConnection(); + + out.writeByte(3); + out.writeInt(0); + + assertEquals(1, in.readInt()); + + byte[] ip = new byte[4]; + in.readFully(ip); + assertArrayEquals(new byte[] {127,0,0,1}, ip); + + assertEquals(1234, in.readShort()); + + Thread.sleep(1000); + + closeConnection(); + establishConnection(); + + out.writeByte(2); //upload + out.writeUTF("file2.txt"); + + out.writeLong(12); + + assertEquals(1, in.readInt()); + + closeConnection(); + establishConnection(); + + out.writeByte(1); + assertEquals(2, in.readInt()); + assertEquals(0, in.readInt()); + assertEquals("file1.txt", in.readUTF()); + assertEquals(11, in.readLong()); + assertEquals(1, in.readInt()); + assertEquals("file2.txt", in.readUTF()); + assertEquals(12, in.readLong()); + + Thread.sleep(2000); + + closeConnection(); + establishConnection(); + + out.writeByte(1); + assertEquals(2, in.readInt()); + assertEquals(0, in.readInt()); + assertEquals("file1.txt", in.readUTF()); + assertEquals(11, in.readLong()); + assertEquals(1, in.readInt()); + assertEquals("file2.txt", in.readUTF()); + assertEquals(12, in.readLong()); + + closeConnection(); + establishConnection(); + + out.writeByte(4); //update + out.writeShort(1235); + out.writeInt(1); + out.writeInt(1); + assertTrue(in.readBoolean()); + + closeConnection(); + establishConnection(); + //Thread.sleep(1000); + out.writeByte(3); //sources + out.writeInt(1); + System.out.println("before 1 sources"); + assertEquals(1, in.readInt()); + + do { + closeConnection(); + establishConnection(); + + out.writeByte(3); //sources + out.writeInt(0); + + Thread.sleep(10); + } while (0 != in.readInt()); + } + + @Test + public void testSourcesCleaner() throws IOException, InterruptedException { + establishConnection(); + out.writeByte(1); //list + assertEquals(0, in.readInt()); + + closeConnection(); + establishConnection(); + + out.writeByte(2); //upload + out.writeUTF("file1.txt"); + + out.writeLong(11); + + assertEquals(0, in.readInt()); + + closeConnection(); + establishConnection(); + + out.writeByte(4); //update + out.writeShort(1235); + out.writeInt(1); + out.writeInt(0); + assertTrue(in.readBoolean()); + + do { + closeConnection(); + establishConnection(); + + out.writeByte(3); //sources + out.writeInt(0); + + Thread.sleep(10); + //assertEquals(0, in.readInt()); + } while (0 != in.readInt()); + + } + + @Test + public void testSavingState() throws IOException, InterruptedException { + establishConnection(); + + out.writeByte(2); //upload + out.writeUTF("file1.txt"); + + out.writeLong(11); + closeConnection(); + establishConnection(); + + out.writeByte(2); //upload + out.writeUTF("file2.txt"); + + out.writeLong(12); + + assertEquals(1, in.readInt()); + + closeConnection(); + stopServer(); + Thread.sleep(2000); + startServer(); + + establishConnection(); + + out.writeByte(1); + + System.out.println("check count"); + assertEquals(2, in.readInt()); + + assertEquals(0, in.readInt()); + assertEquals("file1.txt", in.readUTF()); + assertEquals(11, in.readLong()); + assertEquals(1, in.readInt()); + assertEquals("file2.txt", in.readUTF()); + assertEquals(12, in.readLong()); + } + +}