diff --git a/.gitignore b/.gitignore index f6292ddac..cfb9e7aea 100644 --- a/.gitignore +++ b/.gitignore @@ -46,4 +46,4 @@ docker/prometheus-data nim.cfg tests/integration/logs -data/ +data*/ diff --git a/codex/bittorrent/bencoding.nim b/codex/bittorrent/bencoding.nim new file mode 100644 index 000000000..1b5c457f1 --- /dev/null +++ b/codex/bittorrent/bencoding.nim @@ -0,0 +1,29 @@ +## Nim-Codex +## Copyright (c) 2025 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import std/strformat + +import pkg/stew/byteutils + +func bencode*(value: uint64): seq[byte] = + fmt"i{value}e".toBytes + +func bencode*(value: int64): seq[byte] = + fmt"i{value}e".toBytes + +func bencode*(value: openArray[byte]): seq[byte] = + fmt"{value.len}:".toBytes & @value + +func bencode*(value: string): seq[byte] = + bencode(value.toBytes) + +proc bencode*[T: not byte](value: openArray[T]): seq[byte] = + fmt"l{value.mapIt(bencode(it).toString).join}e".toBytes diff --git a/codex/bittorrent/magnetlink.nim b/codex/bittorrent/magnetlink.nim new file mode 100644 index 000000000..f0e113c8f --- /dev/null +++ b/codex/bittorrent/magnetlink.nim @@ -0,0 +1,107 @@ +import std/strutils +import std/sequtils + +import pkg/stew/byteutils +import pkg/libp2p/[multicodec, multihash] +import pkg/questionable +import pkg/questionable/results + +import ../errors +import ./manifest/manifest + +type + TorrentVersion* = enum + v1 + v2 + hybrid + + MagnetLink* = ref object + version: TorrentVersion + infoHashV1: ?MultiHash + infoHashV2: ?MultiHash + +proc version*(self: MagnetLink): TorrentVersion = + ## Get the version of the magnet link + ## + ## returns: the version of the magnet link + result = self.version + +proc infoHashV1*(self: MagnetLink): ?MultiHash = + ## Get the info hash of the magnet link + ## + ## returns: the info hash of the magnet link + result = self.infoHashV1 + +proc infoHashV2*(self: MagnetLink): ?MultiHash = + ## Get the info hash of the magnet link + ## + ## returns: the info hash of the magnet link + result = self.infoHashV2 + +proc parseMagnetLink(link: string): ?!MagnetLink = + let prefix = "magnet:?" + if not link.startsWith(prefix): + return failure("Invalid magnet link format (missing 'magnet:?' prefix)") + let infoHashParts = link[prefix.len .. ^1].split("&").filterIt(it.startsWith("xt=")) + if infoHashParts.len < 1: + return + failure("Invalid magnet link format (at least one info hash part is required)") + let v1Prefix = "xt=urn:btih:" + let v2Prefix = "xt=urn:btmh:" + var infoHashV1 = none(MultiHash) + var infoHashV2 = none(MultiHash) + for infoHashPart in infoHashParts: + # var a = infoHashPart[v1Prefix.len .. ^1] + if infoHashPart.startsWith(v1Prefix): + without infoHash =? BitTorrentInfo.buildMultiHash( + infoHashPart[v1Prefix.len .. ^1] + ), err: + return failure("Error parsing info hash: " & err.msg) + infoHashV1 = some(infoHash) + elif infoHashPart.startsWith(v2Prefix): + without infoHash =? BitTorrentInfo.buildMultiHash( + infoHashPart[v2Prefix.len .. ^1] + ), err: + return failure("Error parsing info hash: " & err.msg) + infoHashV2 = some(infoHash) + + if infoHashV1.isNone and infoHashV2.isNone: + return failure("Invalid magnet link format (missing info hash part)") + + var version: TorrentVersion + if infoHashV1.isSome and infoHashV2.isSome: + version = TorrentVersion.hybrid + elif infoHashV1.isSome: + version = TorrentVersion.v1 + else: + version = TorrentVersion.v2 + + let magnetLink = + MagnetLink(version: version, infoHashV1: infoHashV1, infoHashV2: infoHashV2) + return success(magnetLink) + +proc getHashHex(multiHash: MultiHash): string = + ## Get the info hash of the magnet link as a hex string + result = byteutils.toHex(multiHash.data.buffer[multiHash.dpos .. ^1]).toUpperAscii() + +proc `$`*(self: MagnetLink): string = + ## Convert the magnet link to a string + ## + ## returns: the magnet link as a string + if self.version == TorrentVersion.hybrid: + result = + "magnet:?xt=urn:btih:" & (!self.infoHashV1).getHashHex() & "&xt=urn:btmh:" & + (!self.infoHashV2).hex + elif self.version == v1: + result = "magnet:?xt=urn:btih:" & (!self.infoHashV1).getHashHex() + else: + result = "magnet:?xt=urn:btmh:" & (!self.infoHashV2).hex + +proc newMagnetLink*(magnetLinkString: string): ?!MagnetLink = + ## Create a new magnet link + ## + ## version: the version of the magnet link + ## magnetLinkString: text containing the magnet link + ## + ## returns: a Result containing a magnet link object or a failure + parseMagnetLink(magnetLinkString) diff --git a/codex/bittorrent/manifest.nim b/codex/bittorrent/manifest.nim new file mode 100644 index 000000000..ac8cdb58c --- /dev/null +++ b/codex/bittorrent/manifest.nim @@ -0,0 +1,5 @@ +import ./manifest/manifest +import ./manifest/encoding +import ./manifest/decoding + +export manifest, encoding, decoding diff --git a/codex/bittorrent/manifest/decoding.nim b/codex/bittorrent/manifest/decoding.nim new file mode 100644 index 000000000..91cbff5a9 --- /dev/null +++ b/codex/bittorrent/manifest/decoding.nim @@ -0,0 +1,90 @@ +## Nim-Codex +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import pkg/libp2p/cid +import pkg/libp2p/multihash +import pkg/libp2p/protobuf/minprotobuf + +import pkg/questionable/results + +import ../../blocktype +import ./manifest + +func decode*(_: type BitTorrentManifest, data: openArray[byte]): ?!BitTorrentManifest = + # ```protobuf + # Message BitTorrentManifest { + # Message Piece { + # bytes data = 1; + # } + # + # Message BitTorrentInfo { + # uint32 length = 1; + # uint32 pieceLength = 2; + # repeated Piece pieces = 3; + # optional string name = 4; + # } + # + # BitTorrentInfo info = 1; + # bytes codexManifestCid = 2; + # ``` + + var + pbNode = initProtoBuffer(data) + pbInfo: ProtoBuffer + length: uint64 + pieceLength: uint32 + pieces: seq[MultiHash] + piecesBytes: seq[seq[byte]] + name: string + cidBuf = newSeq[byte]() + codexManifestCid: Cid + + if pbNode.getField(1, pbInfo).isErr: + return failure("Unable to decode `info` from BitTorrentManifest") + + if pbInfo.getField(1, length).isErr: + return failure("Unable to decode `length` from BitTorrentInfo") + + if pbInfo.getField(2, pieceLength).isErr: + return failure("Unable to decode `pieceLength` from BitTorrentInfo") + + if ?pbInfo.getRepeatedField(3, piecesBytes).mapFailure: + for piece in piecesBytes: + var pbPiece = initProtoBuffer(piece) + var dataBuf = newSeq[byte]() + if pbPiece.getField(1, dataBuf).isErr: + return failure("Unable to decode piece `data` to MultiHash") + without mhash =? MultiHash.init(dataBuf).mapFailure, err: + return failure(err.msg) + pieces.add(mhash) + discard ?pbInfo.getField(4, name).mapFailure + + if ?pbNode.getField(2, cidBuf).mapFailure: + without cid =? Cid.init(cidBuf).mapFailure, err: + return failure(err.msg) + codexManifestCid = cid + + let info = BitTorrentInfo( + length: length, + pieceLength: pieceLength, + pieces: pieces, + name: if name.len > 0: name.some else: string.none, + ) + BitTorrentManifest(info: info, codexManifestCid: codexManifestCid).success + +func decode*(_: type BitTorrentManifest, blk: Block): ?!BitTorrentManifest = + ## Decode a manifest using `decoder` + ## + + if not ?blk.cid.isTorrentInfoHash: + return failure "Cid not a torrent info hash codec" + + BitTorrentManifest.decode(blk.data) diff --git a/codex/bittorrent/manifest/encoding.nim b/codex/bittorrent/manifest/encoding.nim new file mode 100644 index 000000000..96317d3e9 --- /dev/null +++ b/codex/bittorrent/manifest/encoding.nim @@ -0,0 +1,59 @@ +## Nim-Codex +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import pkg/libp2p/cid +import pkg/libp2p/multihash +import pkg/libp2p/protobuf/minprotobuf + +import pkg/questionable/results + +import ./manifest + +proc write(pb: var ProtoBuffer, field: int, value: MultiHash) = + var ipb = initProtoBuffer() + ipb.write(1, value.data.buffer) + ipb.finish() + pb.write(field, ipb) + +proc write(pb: var ProtoBuffer, field: int, value: BitTorrentInfo) = + var ipb = initProtoBuffer() + ipb.write(1, value.length) + ipb.write(2, value.pieceLength) + for piece in value.pieces: + ipb.write(3, piece) + if name =? value.name: + ipb.write(4, name) + ipb.finish() + pb.write(field, ipb) + +proc encode*(manifest: BitTorrentManifest): seq[byte] = + # ```protobuf + # Message BitTorrentManifest { + # Message Piece { + # bytes data = 1; + # } + # + # Message BitTorrentInfo { + # uint32 length = 1; + # uint32 pieceLength = 2; + # repeated Piece pieces = 3; + # optional string name = 4; + # } + # + # BitTorrentInfo info = 1; + # bytes codexManifestCid = 2; + # ``` + + var ipb = initProtoBuffer() + ipb.write(1, manifest.info) + ipb.write(2, manifest.codexManifestCid.data.buffer) + ipb.finish() + ipb.buffer diff --git a/codex/bittorrent/manifest/manifest.nim b/codex/bittorrent/manifest/manifest.nim new file mode 100644 index 000000000..682938031 --- /dev/null +++ b/codex/bittorrent/manifest/manifest.nim @@ -0,0 +1,103 @@ +## Nim-Codex +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import pkg/libp2p +import pkg/stew/byteutils +import pkg/questionable +import pkg/questionable/results + +import ../../merkletree/codex/codex +import ../../utils/json + +import ../../errors +import ../../codextypes +import ../bencoding + +type + BitTorrentInfo* = ref object + length* {.serialize.}: uint64 + pieceLength* {.serialize.}: uint32 + pieces* {.serialize.}: seq[MultiHash] + name* {.serialize.}: ?string + + BitTorrentManifest* = ref object + info* {.serialize.}: BitTorrentInfo + codexManifestCid* {.serialize.}: Cid + +proc `$`*(self: BitTorrentInfo): string = + "BitTorrentInfo(length: " & $self.length & ", pieceLength: " & $self.pieceLength & + ", pieces: " & $self.pieces & ", name: " & $self.name & ")" + +proc `$`*(self: BitTorrentManifest): string = + "BitTorrentManifest(info: " & $self.info & ", codexManifestCid: " & + $self.codexManifestCid & ")" + +func `==`*(a: BitTorrentInfo, b: BitTorrentInfo): bool = + a.length == b.length and a.pieceLength == b.pieceLength and a.pieces == b.pieces and + a.name == b.name + +func `==`*(a: BitTorrentManifest, b: BitTorrentManifest): bool = + a.info == b.info and a.codexManifestCid == b.codexManifestCid + +proc newBitTorrentManifest*( + info: BitTorrentInfo, codexManifestCid: Cid +): BitTorrentManifest = + BitTorrentManifest(info: info, codexManifestCid: codexManifestCid) + +func bencode*(info: BitTorrentInfo): seq[byte] = + # flatten pieces + var pieces: seq[byte] + for mh in info.pieces: + pieces.add(mh.data.buffer.toOpenArray(mh.dpos, mh.dpos + mh.size - 1)) + result = @['d'.byte] + result.add(bencode("length") & bencode(info.length)) + if name =? info.name: + result.add(bencode("name") & bencode(name)) + result.add(bencode("piece length") & bencode(info.pieceLength)) + result.add(bencode("pieces") & bencode(pieces)) + result.add('e'.byte) + +func validate*(self: BitTorrentManifest, cid: Cid): ?!bool = + # First stage of validation: + # (1) bencode the info dictionary from the torrent manifest + # (2) hash the bencoded info dictionary + # (3) compare the hash with the info hash in the cid + # + # This will prove that our info metadata is correct. + # It still does not proof that the "codexManifestCid" from the torrent manifest + # points to genuine content. This validation will be done while fetching blocks + # where we will be able to detect that the aggregated pieces do not match + # the hashes in the info dictionary from the torrent manifest. + let infoBencoded = bencode(self.info) + without infoHash =? MultiHash.digest($Sha1HashCodec, infoBencoded).mapFailure, err: + return failure(err.msg) + without cidInfoHash =? cid.mhash.mapFailure, err: + return failure(err.msg) + return success(infoHash == cidInfoHash) + +func buildMultiHash*(_: type BitTorrentInfo, input: string): ?!MultiHash = + without bytes =? input.hexToSeqByte.catch, err: + return failure err.msg + without hash =? MultiHash.init(bytes): + without mhashMetaSha1 =? Sha1HashCodec.mhash, err: + return failure err.msg + if bytes.len == mhashMetaSha1.size: + without hash =? MultiHash.init($Sha1HashCodec, bytes).mapFailure, err: + return failure err.msg + return success hash + without mhashMetaSha256 =? Sha256HashCodec.mhash, err: + return failure err.msg + if bytes.len == mhashMetaSha256.size: + without hash =? MultiHash.init($Sha256HashCodec, bytes).mapFailure, err: + return failure err.msg + return success hash + return failure "given bytes is not a correct multihash" + return success hash diff --git a/codex/bittorrent/piecevalidator.nim b/codex/bittorrent/piecevalidator.nim new file mode 100644 index 000000000..3e9d3902d --- /dev/null +++ b/codex/bittorrent/piecevalidator.nim @@ -0,0 +1,146 @@ +## Nim-Codex +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import std/sequtils +import pkg/chronos +import pkg/libp2p/multihash +import pkg/questionable/results + +import ../utils/iter +import ../manifest +import ../blocktype +import ./manifest + +type + PieceHandle* = Future[void].Raising([CancelledError]) + TorrentPieceValidator* = ref object + torrentManifest: BitTorrentManifest + numberOfPieces: int + numberOfBlocksPerPiece: int + pieces: seq[PieceHandle] + waitIter: Iter[int] + confirmIter: Iter[int] + validationIter: Iter[int] + +proc newTorrentPieceValidator*( + torrentManifest: BitTorrentManifest, codexManifest: Manifest +): TorrentPieceValidator = + let numOfPieces = torrentManifest.info.pieces.len + let numOfBlocksPerPiece = + torrentManifest.info.pieceLength.int div codexManifest.blockSize.int + let pieces = newSeqWith( + numOfPieces, + cast[PieceHandle](newFuture[void]("PieceValidator.newTorrentPieceValidator")), + ) + + TorrentPieceValidator( + torrentManifest: torrentManifest, + numberOfPieces: numOfPieces, + numberOfBlocksPerPiece: numOfBlocksPerPiece, + pieces: pieces, + waitIter: Iter[int].new(0 ..< numOfPieces), + confirmIter: Iter[int].new(0 ..< numOfPieces), + validationIter: Iter[int].new(0 ..< numOfPieces), + ) + +func numberOfBlocksPerPiece*(self: TorrentPieceValidator): int = + self.numberOfBlocksPerPiece + +proc getNewPieceIterator*(self: TorrentPieceValidator): Iter[int] = + Iter[int].new(0 ..< self.numberOfPieces) + +proc getNewBlocksPerPieceIterator*(self: TorrentPieceValidator): Iter[int] = + Iter[int].new(0 ..< self.numberOfBlocksPerPiece) + +proc waitForNextPiece*( + self: TorrentPieceValidator +): Future[int] {.async: (raises: [CancelledError]).} = + if self.waitIter.finished: + return -1 + let pieceIndex = self.waitIter.next() + await self.pieces[pieceIndex] + pieceIndex + +proc confirmCurrentPiece*(self: TorrentPieceValidator): int {.raises: [].} = + if self.confirmIter.finished: + return -1 + let pieceIndex = self.confirmIter.next() + self.pieces[pieceIndex].complete() + pieceIndex + +proc cancel*(self: TorrentPieceValidator): Future[void] {.async: (raises: []).} = + await noCancel allFutures(self.pieces.mapIt(it.cancelAndWait)) + +proc validatePiece*( + self: TorrentPieceValidator, blocks: seq[Block] +): int {.raises: [].} = + var pieceHashCtx: sha1 + pieceHashCtx.init() + + for blk in blocks: + pieceHashCtx.update(blk.data) + + let computedPieceHash = pieceHashCtx.finish() + + let pieceIndex = self.validationIter.next() + if (computedPieceHash != self.torrentManifest.info.pieces[pieceIndex]): + return -1 + + pieceIndex + +################################################################# +# Previous API, keeping it for now, probably will not be needed +# +################################################################# + +proc waitForPiece*( + self: TorrentPieceValidator, index: int +): Future[?!void] {.async: (raises: [CancelledError]).} = + if index < 0 or index >= self.pieces.len: + return failure("Invalid piece index") + await self.pieces[index] + success() + +proc cancelPiece*( + self: TorrentPieceValidator, index: int +): Future[?!void] {.async: (raises: [CancelledError]).} = + if index < 0 or index >= self.pieces.len: + return failure("Invalid piece index") + await noCancel self.pieces[index].cancelAndWait() + success() + +proc markPieceAsValid*(self: TorrentPieceValidator, index: int): ?!void {.raises: [].} = + if index < 0 or index >= self.pieces.len: + return failure("Invalid piece index") + self.pieces[index].complete() + success() + +proc validatePiece*( + self: TorrentPieceValidator, blocks: seq[Block], index: int +): ?!void {.raises: [].} = + if index < 0 or index >= self.pieces.len: + return failure("Invalid piece index") + + var pieceHashCtx: sha1 + pieceHashCtx.init() + + for blk in blocks: + pieceHashCtx.update(blk.data) + + let computedPieceHash = pieceHashCtx.finish() + + # if index == 1: + # return failure("Piece verification failed (simulated)") + + if (computedPieceHash != self.torrentManifest.info.pieces[index]): + return failure("Piece verification failed") + + success() diff --git a/codex/bittorrent/torrentdownloader.nim b/codex/bittorrent/torrentdownloader.nim new file mode 100644 index 000000000..43f56466d --- /dev/null +++ b/codex/bittorrent/torrentdownloader.nim @@ -0,0 +1,318 @@ +## Nim-Codex +## Copyright (c) 2025 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import std/sequtils +import std/sugar +import pkg/chronos +import pkg/libp2p/multihash +import pkg/questionable/results + +import ../logutils +import ../utils/iter +import ../utils/safeasynciter +import ../utils/trackedfutures +import ../errors +import ../manifest +import ../blocktype +import ../stores/networkstore +import ./manifest + +logScope: + topics = "codex node torrentdownloader" + +type + PieceHandle* = Future[void].Raising([CancelledError]) + TorrentPiece* = ref object + pieceIndex: int + pieceHash: MultiHash + blockIndexStart: int + blockIndexEnd: int + handle: PieceHandle + + TorrentDownloader* = ref object + torrentManifest: BitTorrentManifest + codexManifest: Manifest + networkStore: NetworkStore + numberOfPieces: int + numberOfBlocksPerPiece: int + pieces: seq[TorrentPiece] + waitIter: Iter[int] + blockIter: Iter[int] + pieceIndex: int + queue: AsyncQueue[TorrentPiece] + trackedFutures: TrackedFutures + +func numberOfBlocks(piece: TorrentPiece): int = + piece.blockIndexEnd - piece.blockIndexStart + 1 + +proc getNewBlockIterator(piece: TorrentPiece): Iter[int] = + Iter[int].new(piece.blockIndexStart .. piece.blockIndexEnd) + +proc validate(piece: TorrentPiece, blocks: seq[Block]): ?!void {.raises: [].} = + var pieceHashCtx: sha1 + pieceHashCtx.init() + + for blk in blocks: + pieceHashCtx.update(blk.data) + + let computedPieceHash = pieceHashCtx.finish() + + if (computedPieceHash != piece.pieceHash): + return failure("Piece verification failed") + + success() + +proc allBlocksFinished(futs: seq[Future[?!Block]]): seq[?!Block] {.raises: [].} = + ## If all futures have finished, return corresponding values, + ## otherwise return failure + ## + + try: + let values = collect: + for b in futs: + if b.finished: + b.read + return values + except CatchableError as e: + raiseAssert e.msg + +proc deleteBlocks( + self: TorrentDownloader, piece: TorrentPiece +): Future[void] {.async: (raises: [CancelledError]).} = + let treeCid = self.codexManifest.treeCid + let blockIter = piece.getNewBlockIterator() + while not blockIter.finished: + # deleting a block that is not in localStore is harmless + # blocks that are in localStore and in use will not be deleted + try: + if err =? (await self.networkStore.localStore.delBlock(treeCid, blockIter.next())).errorOption: + warn "Could not delete block", err = err.msg + continue + except CatchableError as e: + warn "Could not delete block", error = e.msg + continue + +proc getSuccessfulBlocks(futs: seq[Future[?!Block]]): ?!seq[Block] {.raises: [].} = + let blockResults = allBlocksFinished(futs) + if blockResults.len != futs.len or blockResults.anyIt(it.isFailure): + return failure("Some blocks failed to fetch") + success blockResults.mapIt(it.get) + +proc fetchPiece( + self: TorrentDownloader, piece: TorrentPiece +): Future[?!void] {.async: (raises: [CancelledError]).} = + let treeCid = self.codexManifest.treeCid + let blockIter = piece.getNewBlockIterator() + var blockFutures = newSeq[Future[?!Block]]() + for blockIndex in blockIter: + let address = BlockAddress.init(treeCid, blockIndex) + blockFutures.add(self.networkStore.getBlock(address)) + + await allFutures(blockFutures) + + without blocks =? getSuccessfulBlocks(blockFutures), err: + await self.deleteBlocks(piece) + return failure(err) + + # all blocks in piece are there: we are ready for validation + if err =? piece.validate(blocks).errorOption: + # we do not know on which block validation failed + # thus we try to delete as many as we can + await self.deleteBlocks(piece) + return failure(err) + + success() + +func numberOfBlocksInPiece*(self: TorrentDownloader, pieceIndex: int): ?!int = + if pieceIndex < 0 or pieceIndex >= self.numberOfPieces: + return failure("Invalid piece index") + let piece = self.pieces[pieceIndex] + success(piece.numberOfBlocks) + +########################################################################### +# Public API +########################################################################### + +proc getNewBlockIterator*(self: TorrentDownloader, pieceIndex: int): ?!Iter[int] = + if pieceIndex < 0 or pieceIndex >= self.numberOfPieces: + return failure("Invalid piece index") + let piece = self.pieces[pieceIndex] + success(piece.getNewBlockIterator()) + +proc getNewPieceIterator*(self: TorrentDownloader): Iter[int] = + Iter[int].new(0 ..< self.numberOfPieces) + +proc waitForNextPiece*( + self: TorrentDownloader +): Future[int] {.async: (raises: [CancelledError]).} = + if self.waitIter.finished: + return -1 + let pieceIndex = self.waitIter.next() + await self.pieces[pieceIndex].handle + pieceIndex + +proc cancel*(self: TorrentDownloader): Future[void] {.async: (raises: []).} = + await noCancel allFutures(self.pieces.mapIt(it.handle.cancelAndWait)) + +proc downloadPieces*(self: TorrentDownloader): Future[void] {.async: (raises: []).} = + try: + while not self.queue.empty: + let piece = self.queue.popFirstNoWait() + trace "Downloading piece", pieceIndex = piece.pieceIndex + if err =? (await self.fetchPiece(piece)).errorOption: + error "Could not fetch piece", err = err.msg + # add the piece to the end of the queue + # to try to fetch the piece again + self.queue.addLastNoWait(piece) + continue + else: + # piece fetched and validated successfully + # mark it as ready + trace "Piece fetched and validated", pieceIndex = piece.pieceIndex + piece.handle.complete() + if not self.queue.empty: + await sleepAsync(1.millis) + except CancelledError: + trace "Downloading pieces cancelled" + except AsyncQueueFullError as e: + error "Queue is full", error = e.msg + except AsyncQueueEmptyError as e: + error "Trying to pop from empty queue", error = e.msg + finally: + await noCancel self.cancel() + +proc finished*(self: TorrentDownloader): bool = + self.pieceIndex == -1 + +proc getNext*( + self: TorrentDownloader +): Future[?!(int, seq[byte])] {.async: (raises: [CancelledError]).} = + try: + if self.pieceIndex == -1: + return success((-1, newSeq[byte]())) + if self.blockIter.finished: + trace "Waiting for piece to be ready" + self.pieceIndex = await self.waitForNextPiece() + trace "Got piece", pieceIndex = self.pieceIndex + if self.pieceIndex == -1: + return success((-1, newSeq[byte]())) + else: + let piece = self.pieces[self.pieceIndex] + self.blockIter = piece.getNewBlockIterator() + let blockIndex = self.blockIter.next() + if blockIndex == self.codexManifest.blocksCount - 1: + self.pieceIndex = -1 + let address = BlockAddress.init(self.codexManifest.treeCid, blockIndex) + without blk =? (await self.networkStore.localStore.getBlock(address)), err: + error "Could not get block from local store", error = err.msg + return failure("Could not get block from local store: " & err.msg) + success((blockIndex, blk.data)) + except CancelledError as e: + trace "Getting next block from downloader cancelled" + raise e + except CatchableError as e: + warn "Could not get block from local store", error = e.msg + return failure("Could not get block from local store: " & e.msg) + +proc getAsyncBlockIterator*(self: TorrentDownloader): SafeAsyncIter[(int, seq[byte])] = + proc genNext(): Future[?!(int, seq[byte])] {. + async: (raw: true, raises: [CancelledError]) + .} = + self.getNext() + + proc isFinished(): bool = + self.finished() + + SafeAsyncIter[(int, seq[byte])].new( + genNext = genNext, isFinished = isFinished, finishOnErr = true + ) + +proc start*(self: TorrentDownloader) = + self.trackedFutures.track(self.downloadPieces()) + +proc stop*(self: TorrentDownloader) {.async: (raises: []).} = + self.pieceIndex = -1 + await noCancel self.cancel() + await noCancel self.trackedFutures.cancelTracked() + +proc newTorrentPiece*( + pieceIndex: int, pieceHash: MultiHash, blockIndexStart: int, blockIndexEnd: int +): TorrentPiece = + proc newRaisingFuture[T]( + fromProc: static[string] = "" + ): Future[T] {.async: (raw: true, raises: [CancelledError]).} = + let fut = newFuture[T](fromProc) + return fut + + TorrentPiece( + pieceIndex: pieceIndex, + pieceHash: pieceHash, + blockIndexStart: blockIndexStart, + blockIndexEnd: blockIndexEnd, + handle: newRaisingFuture[void]("PieceValidator.newTorrentPiece"), + ) + +proc newTorrentDownloader*( + torrentManifest: BitTorrentManifest, + codexManifest: Manifest, + networkStore: NetworkStore, +): ?!TorrentDownloader = + let + blocksCount = codexManifest.blocksCount + numOfPieces = torrentManifest.info.pieces.len + numOfBlocksPerPiece = + torrentManifest.info.pieceLength.int div codexManifest.blockSize.int + numOfBlocksInLastPiece = blocksCount - (numOfBlocksPerPiece * (numOfPieces - 1)) + + let pieces = collect: + for i in 0 ..< numOfPieces: + var blockIndexEnd = ((i + 1) * numOfBlocksPerPiece) - 1 + if i == numOfPieces - 1: + # last piece can have less blocks than numOfBlocksPerPiece + blockIndexEnd = i * numOfBlocksPerPiece + numOfBlocksInLastPiece - 1 + + let piece = newTorrentPiece( + pieceIndex = i, + pieceHash = torrentManifest.info.pieces[i], + blockIndexStart = i * numOfBlocksPerPiece, + blockIndexEnd = blockIndexEnd, + ) + piece + + let queue = newAsyncQueue[TorrentPiece](maxsize = numOfPieces) + + let iter = Iter.new(0 ..< numOfPieces) + var pieceDownloadSequence = newSeqWith(numOfPieces, iter.next()) + # optional: randomize the order of pieces + # not sure if this is such a great idea when streaming content + # Rng.instance.shuffle(pieceDownloadSequence) + + trace "Piece download sequence", pieceDownloadSequence + + for i in pieceDownloadSequence: + try: + queue.addLastNoWait(pieces[i]) + except AsyncQueueFullError: + raiseAssert "Fatal: could not add pieces to queue" + + TorrentDownloader( + torrentManifest: torrentManifest, + codexManifest: codexManifest, + networkStore: networkStore, + numberOfPieces: numOfPieces, + numberOfBlocksPerPiece: numOfBlocksPerPiece, + pieces: pieces, + waitIter: Iter[int].new(0 ..< numOfPieces), + blockIter: Iter[int].empty(), + pieceIndex: 0, + queue: queue, + trackedFutures: TrackedFutures(), + ).success diff --git a/codex/bittorrent/torrentparser.nim b/codex/bittorrent/torrentparser.nim new file mode 100644 index 000000000..0050fd319 --- /dev/null +++ b/codex/bittorrent/torrentparser.nim @@ -0,0 +1,41 @@ +import std/strutils +import std/re + +import pkg/questionable/results +import pkg/stew/byteutils +import pkg/stew/base10 + +import ../errors + +proc extractInfoFromTorrent*(torrentBytes: seq[byte]): ?!seq[byte] = + ## Extract the info from a torrent file + ## + ## params: + ## torrentBytes: the torrent file bytes + ## + ## returns: the bytes containing only the content of the info dictionary + ## or a failure if info is not found or invalid + let torrentStr = string.fromBytes(torrentBytes) + if torrentStr.contains("file tree") or torrentStr.contains("piece layers"): + return failure("Torrent v2 provided. Only v1 is currently supported.") + let infoKeyPos = torrentStr.find("info") + if infoKeyPos == -1: + return failure("Torrent file does not contain info dictionary.") + let infoStartPos = infoKeyPos + "info".len + if torrentStr[infoStartPos] != 'd': + return failure("Torrent file does not contain valid info dictionary.") + + var matches = newSeq[tuple[first, last: int]](1) + let (_, piecesEndIndex) = torrentStr.findBounds(re"pieces(\d+):", matches) + if matches.len == 1: + let (first, last) = matches[0] + let piecesLenStr = torrentStr[first .. last] + without piecesLen =? Base10.decode(uint, piecesLenStr).mapFailure, err: + return failure("Error decoding pieces length: " & err.msg) + let piecesEndMarkerPos = piecesEndIndex + 1 + piecesLen.int + if torrentStr[piecesEndMarkerPos] != 'e': + return failure("Torrent file does not contain valid pieces.") + let infoDirStr = torrentStr[infoStartPos .. piecesEndMarkerPos] + infoDirStr.toBytes().success + else: + return failure("Torrent file does not contain valid pieces.") diff --git a/codex/blockexchange/engine/advertiser.nim b/codex/blockexchange/engine/advertiser.nim index ef1465f9a..8adb70563 100644 --- a/codex/blockexchange/engine/advertiser.nim +++ b/codex/blockexchange/engine/advertiser.nim @@ -27,6 +27,9 @@ import ../../stores/blockstore import ../../logutils import ../../manifest +# tarballs +import ../../tarballs/[directorymanifest, decoding] + logScope: topics = "codex discoveryengine advertiser" @@ -56,7 +59,20 @@ proc addCidToQueue(b: Advertiser, cid: Cid) {.async: (raises: [CancelledError]). trace "Advertising", cid +proc advertiseInfoHash(b: Advertiser, cid: Cid) {.async: (raises: [CancelledError]).} = + if (infoHashCid =? cid.isTorrentInfoHash): + # announce torrent info hash + await b.addCidToQueue(cid) + return + await b.addCidToQueue(cid) + proc advertiseBlock(b: Advertiser, cid: Cid) {.async: (raises: [CancelledError]).} = + without isTorrent =? cid.isTorrentInfoHash, err: + warn "Unable to determine if cid is torrent info hash" + return + if isTorrent: + await b.addCidToQueue(cid) + return without isM =? cid.isManifest, err: warn "Unable to determine if cid is manifest" return @@ -69,6 +85,11 @@ proc advertiseBlock(b: Advertiser, cid: Cid) {.async: (raises: [CancelledError]) without manifest =? Manifest.decode(blk), err: error "Unable to decode as manifest", err = err.msg + # Try if it not a directory manifest + without manifest =? DirectoryManifest.decode(blk), err: + error "Unable to decode as manifest", err = err.msg + return + await b.addCidToQueue(cid) return # announce manifest cid and tree cid @@ -83,6 +104,12 @@ proc advertiseBlock(b: Advertiser, cid: Cid) {.async: (raises: [CancelledError]) proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} = try: while b.advertiserRunning: + if cidsIter =? await b.localStore.listBlocks(blockType = BlockType.Torrent): + trace "Advertiser begins iterating torrent blocks..." + for c in cidsIter: + if cid =? await c: + await b.advertiseBlock(cid) + trace "Advertiser iterating torrent blocks finished." if cidsIter =? await b.localStore.listBlocks(blockType = BlockType.Manifest): trace "Advertiser begins iterating blocks..." for c in cidsIter: diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 36d00cf0b..e3f1c4937 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -23,7 +23,6 @@ import ../../rng import ../../stores/blockstore import ../../blocktype import ../../utils -import ../../utils/exceptions import ../../utils/trackedfutures import ../../merkletree import ../../logutils diff --git a/codex/blocktype.nim b/codex/blocktype.nim index 7e13493d8..72f35b232 100644 --- a/codex/blocktype.nim +++ b/codex/blocktype.nim @@ -79,6 +79,9 @@ proc init*(_: type BlockAddress, cid: Cid): BlockAddress = proc init*(_: type BlockAddress, treeCid: Cid, index: Natural): BlockAddress = BlockAddress(leaf: true, treeCid: treeCid, index: index) +func isTorrentCid*(cid: Cid): ?!bool = + success (InfoHashV1Codec == ?cid.contentType().mapFailure(CodexError)) + proc `$`*(b: Block): string = result &= "cid: " & $b.cid result &= "\ndata: " & string.fromBytes(b.data) @@ -108,7 +111,12 @@ proc new*( ## creates a new block for both storage and network IO ## - if verify: + without isTorrent =? cid.isTorrentCid, err: + return "Unable to determine if cid is torrent info hash".failure + + # info hash cids are "fake cids" - they will not validate + # info hash validation is done outside of the cid itself + if verify and not isTorrent: let mhash = ?cid.mhash.mapFailure computedMhash = ?MultiHash.digest($mhash.mcodec, data).mapFailure diff --git a/codex/codextypes.nim b/codex/codextypes.nim index 274b9be0a..da9cfce25 100644 --- a/codex/codextypes.nim +++ b/codex/codextypes.nim @@ -24,6 +24,11 @@ import ./errors export tables const + # BitTorrent specific + # DefaultPieceLength* = NBytes 1024 * 1024 * 4 # 4MiB + DefaultPieceLength* = NBytes 1024 * 256 # 256KiB + BitTorrentBlockSize* = NBytes 1024 * 16 + # Size of blocks for storage / network exchange, DefaultBlockSize* = NBytes 1024 * 64 DefaultCellSize* = NBytes 2048 @@ -36,6 +41,7 @@ const DefaultSamplesNum* = 5 # hashes + Sha1HashCodec* = multiCodec("sha1") Sha256HashCodec* = multiCodec("sha2-256") Sha512HashCodec* = multiCodec("sha2-512") Pos2Bn128SpngCodec* = multiCodec("poseidon2-alt_bn_128-sponge-r2") @@ -55,6 +61,9 @@ const CodexSlotCellCodec, ] + # BitTorrent + InfoHashV1Codec* = multiCodec("torrent-info") + proc initEmptyCidTable(): ?!Table[(CidVersion, MultiCodec, MultiCodec), Cid] = ## Initialize padding blocks table ## diff --git a/codex/manifest/coders.nim b/codex/manifest/coders.nim index 30e0c7ca7..63ab19cdc 100644 --- a/codex/manifest/coders.nim +++ b/codex/manifest/coders.nim @@ -258,3 +258,8 @@ func decode*(_: type Manifest, blk: Block): ?!Manifest = return failure "Cid not a manifest codec" Manifest.decode(blk.data) + +func new*(T: type Manifest, data: openArray[byte]): ?!Manifest = + ## Create a manifest instance from given data + ## + Manifest.decode(data) diff --git a/codex/manifest/manifest.nim b/codex/manifest/manifest.nim index cbb0bace0..5b28aaaa6 100644 --- a/codex/manifest/manifest.nim +++ b/codex/manifest/manifest.nim @@ -134,6 +134,12 @@ func mimetype*(self: Manifest): ?string = # Operations on block list ############################################################ +func isTorrentInfoHash*(cid: Cid): ?!bool = + success (InfoHashV1Codec == ?cid.contentType().mapFailure(CodexError)) + +func isTorrentInfoHash*(mc: MultiCodec): ?!bool = + success (mc == InfoHashV1Codec) + func isManifest*(cid: Cid): ?!bool = success (ManifestCodec == ?cid.contentType().mapFailure(CodexError)) @@ -360,9 +366,3 @@ func new*( filename: manifest.filename, mimetype: manifest.mimetype, ) - -func new*(T: type Manifest, data: openArray[byte]): ?!Manifest = - ## Create a manifest instance from given data - ## - - Manifest.decode(data) diff --git a/codex/namespaces.nim b/codex/namespaces.nim index c159ab1a0..23baca653 100644 --- a/codex/namespaces.nim +++ b/codex/namespaces.nim @@ -15,6 +15,7 @@ const # number of blocks in the repo CodexBlocksNamespace* = CodexRepoNamespace & "/blocks" # blocks namespace CodexManifestNamespace* = CodexRepoNamespace & "/manifests" # manifest namespace + TorrentInfoHashNamespace* = CodexRepoNamespace & "/torrents" # info hashes namespace CodexBlocksTtlNamespace* = # Cid TTL CodexMetaNamespace & "/ttl" CodexBlockProofNamespace* = # Cid and Proof diff --git a/codex/node.nim b/codex/node.nim index b742df2cc..b9104e463 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -47,6 +47,11 @@ import ./logutils import ./utils/asynciter import ./utils/trackedfutures +# bittorrent +from ./codextypes import InfoHashV1Codec +import ./bittorrent/manifest +import ./bittorrent/torrentdownloader + export logutils logScope: @@ -77,6 +82,8 @@ type CodexNodeRef* = ref CodexNode + Torrent* = tuple[torrentManifest: BitTorrentManifest, codexManifest: Manifest] + OnManifest* = proc(cid: Cid, manifest: Manifest): void {.gcsafe, raises: [].} BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {. gcsafe, async: (raises: [CancelledError]) @@ -94,6 +101,26 @@ func engine*(self: CodexNodeRef): BlockExcEngine = func discovery*(self: CodexNodeRef): Discovery = return self.discovery +proc storeBitTorrentManifest*( + self: CodexNodeRef, manifest: BitTorrentManifest, infoHash: MultiHash +): Future[?!bt.Block] {.async.} = + let encodedManifest = manifest.encode() + + without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, error: + trace "Unable to create CID for BitTorrent info hash" + return failure(error) + + without blk =? bt.Block.new(data = encodedManifest, cid = infoHashCid, verify = false), + error: + trace "Unable to create block from manifest" + return failure(error) + + if err =? (await self.networkStore.putBlock(blk)).errorOption: + trace "Unable to store BitTorrent manifest block", cid = blk.cid, err = err.msg + return failure(err) + + success blk + proc storeManifest*( self: CodexNodeRef, manifest: Manifest ): Future[?!bt.Block] {.async.} = @@ -134,7 +161,38 @@ proc fetchManifest*( trace "Decoded manifest", cid - return manifest.success + manifest.success + +proc fetchTorrentManifest*( + self: CodexNodeRef, infoHashCid: Cid +): Future[?!BitTorrentManifest] {.async: (raises: [CancelledError]).} = + if err =? infoHashCid.isTorrentInfoHash.errorOption: + return failure "CID has invalid content type for torrent info hash {$cid}" + + trace "Retrieving torrent manifest for infoHashCid", infoHashCid + without blk =? await self.networkStore.getBlock(BlockAddress.init(infoHashCid)), err: + trace "Error retrieve manifest block", infoHashCid, err = err.msg + return failure err + + trace "Successfully retrieved torrent manifest with given block cid", + cid = blk.cid, infoHashCid + trace "Decoding torrent manifest" + + without torrentManifest =? BitTorrentManifest.decode(blk), err: + trace "Unable to decode torrent manifest", err = err.msg + return failure("Unable to decode torrent manifest") + + trace "Decoded torrent manifest", infoHashCid, torrentManifest = $torrentManifest + + without isValid =? torrentManifest.validate(infoHashCid), err: + trace "Error validating torrent manifest", infoHashCid, err = err.msg + return failure(err.msg) + + if not isValid: + trace "Torrent manifest does not match torrent info hash", infoHashCid + return failure "Torrent manifest does not match torrent info hash {$infoHashCid}" + + return torrentManifest.success proc findPeer*(self: CodexNodeRef, peerId: PeerId): Future[?PeerRecord] {.async.} = ## Find peer using the discovery service from the given CodexNode @@ -233,7 +291,7 @@ proc fetchBatched*( self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch, fetchLocal) proc fetchDatasetAsync*( - self: CodexNodeRef, manifest: Manifest, fetchLocal = true + self: CodexNodeRef, manifest: Manifest, fetchLocal = true, onBatch: BatchProc = nil ): Future[void] {.async: (raises: []).} = ## Asynchronously fetch a dataset in the background. ## This task will be tracked and cleaned up on node shutdown. @@ -241,7 +299,10 @@ proc fetchDatasetAsync*( try: if err =? ( await self.fetchBatched( - manifest = manifest, batchSize = DefaultFetchBatch, fetchLocal = fetchLocal + manifest = manifest, + batchSize = DefaultFetchBatch, + fetchLocal = fetchLocal, + onBatch = onBatch, ) ).errorOption: error "Unable to fetch blocks", err = err.msg @@ -338,6 +399,29 @@ proc retrieve*( await self.streamEntireDataset(manifest, cid) +proc getTorrentDownloader*( + self: CodexNodeRef, torrentManifest: BitTorrentManifest, codexManifest: Manifest +): ?!TorrentDownloader = + newTorrentDownloader(torrentManifest, codexManifest, self.networkStore) + +proc retrieveTorrent*( + self: CodexNodeRef, infoHash: MultiHash +): Future[?!Torrent] {.async: (raises: [CancelledError]).} = + without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, error: + trace "Unable to create CID for BitTorrent info hash" + return failure(error) + + without torrentManifest =? (await self.fetchTorrentManifest(infoHashCid)), err: + trace "Unable to fetch Torrent Manifest" + return failure(err) + + without codexManifest =? (await self.fetchManifest(torrentManifest.codexManifestCid)), + err: + trace "Unable to fetch Codex Manifest for torrent info hash" + return failure(err) + + success (torrentManifest: torrentManifest, codexManifest: codexManifest) + proc deleteSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} = if err =? (await self.networkStore.delBlock(cid)).errorOption: error "Error deleting block", cid, err = err.msg @@ -403,6 +487,7 @@ proc store*( filename: ?string = string.none, mimetype: ?string = string.none, blockSize = DefaultBlockSize, + pad = true, ): Future[?!Cid] {.async.} = ## Save stream contents as dataset with given blockSize ## to nodes's BlockStore, and return Cid of its manifest @@ -412,7 +497,7 @@ proc store*( let hcodec = Sha256HashCodec dataCodec = BlockCodec - chunker = LPStreamChunker.new(stream, chunkSize = blockSize) + chunker = LPStreamChunker.new(stream, chunkSize = blockSize, pad) var cids: seq[Cid] @@ -478,6 +563,168 @@ proc store*( return manifestBlk.cid.success +proc storePieces*( + self: CodexNodeRef, + stream: LPStream, + filename: ?string = string.none, + mimetype: ?string = string.none, + blockSize: NBytes, + pieceLength = DefaultPieceLength, +): Future[?!BitTorrentManifest] {.async.} = + ## Save stream contents as dataset with given blockSize + ## to nodes's BlockStore, and return Cid of its manifest + ## + info "Storing pieces" + + let + hcodec = Sha256HashCodec + dataCodec = BlockCodec + chunker = LPStreamChunker.new(stream, chunkSize = blockSize, pad = false) + numOfBlocksPerPiece = pieceLength.int div blockSize.int + + var + cids: seq[Cid] + pieces: seq[MultiHash] + pieceHashCtx: sha1 + pieceIter = Iter[int].new(0 ..< numOfBlocksPerPiece) + + pieceHashCtx.init() + + trace "number of blocks per piece: ", numOfBlocksPerPiece + + try: + while (let chunk = await chunker.getBytes(); chunk.len > 0): + trace "storing block...", chunkLength = chunk.len + if pieceIter.finished: + trace "finishing piece..." + without mh =? MultiHash.init($Sha1HashCodec, pieceHashCtx.finish()).mapFailure, + err: + return failure(err) + pieces.add(mh) + trace "successfully computed piece multihash", + piece = $mh, numberOfPieces = pieces.len + pieceIter = Iter[int].new(0 ..< numOfBlocksPerPiece) + pieceHashCtx.init() + without mhash =? MultiHash.digest($hcodec, chunk).mapFailure, err: + return failure(err) + + without cid =? Cid.init(CIDv1, dataCodec, mhash).mapFailure, err: + return failure(err) + + without blk =? bt.Block.new(cid, chunk, verify = false): + return failure("Unable to init block from chunk!") + + cids.add(cid) + + if err =? (await self.networkStore.putBlock(blk)).errorOption: + error "Unable to store block", cid = blk.cid, err = err.msg + return failure(&"Unable to store block {blk.cid}") + pieceHashCtx.update(chunk) + let idx = pieceIter.next() + trace "stored block in piece with index", idx + if chunk.len < blockSize.int: + trace "no more blocks to read" + break + except CancelledError as exc: + raise exc + except CatchableError as exc: + return failure(exc.msg) + finally: + await stream.close() + + without mh =? MultiHash.init($Sha1HashCodec, pieceHashCtx.finish()).mapFailure, err: + return failure(err) + pieces.add(mh) + + trace "successfully computed last piece multihash", piece = $mh + + trace "finished processing blocks", numberOfPieces = pieces.len + + without tree =? CodexTree.init(cids), err: + return failure(err) + + without treeCid =? tree.rootCid(CIDv1, dataCodec), err: + return failure(err) + + for index, cid in cids: + without proof =? tree.getProof(index), err: + return failure(err) + if err =? + (await self.networkStore.putCidAndProof(treeCid, index, cid, proof)).errorOption: + # TODO add log here + return failure(err) + + let manifest = Manifest.new( + treeCid = treeCid, + blockSize = blockSize, + datasetSize = NBytes(chunker.offset), + version = CIDv1, + hcodec = hcodec, + codec = dataCodec, + filename = filename, + mimetype = mimetype, + ) + + without manifestBlk =? await self.storeManifest(manifest), err: + error "Unable to store manifest" + return failure(err) + + info "Stored data", + manifestCid = manifestBlk.cid, + treeCid = treeCid, + blocks = manifest.blocksCount, + datasetSize = manifest.datasetSize, + filename = manifest.filename, + mimetype = manifest.mimetype + + let info = BitTorrentInfo( + length: manifest.datasetSize.uint64, + pieceLength: pieceLength.uint32, + pieces: pieces, + name: filename, + ) + + let torrentManifest = + newBitTorrentManifest(info = info, codexManifestCid = manifestBlk.cid) + + return torrentManifest.success + +proc storeTorrent*( + self: CodexNodeRef, + stream: LPStream, + filename: ?string = string.none, + mimetype: ?string = string.none, +): Future[?!MultiHash] {.async.} = + info "Storing BitTorrent data" + + without bitTorrentManifest =? + await self.storePieces( + stream, filename = filename, mimetype = mimetype, blockSize = BitTorrentBlockSize + ): + return failure("Unable to store BitTorrent data") + + trace "Created BitTorrent manifest", bitTorrentManifest = $bitTorrentManifest + + let infoBencoded = bencode(bitTorrentManifest.info) + + trace "BitTorrent Info successfully bencoded" + + without infoHash =? MultiHash.digest($Sha1HashCodec, infoBencoded).mapFailure, err: + return failure(err) + + trace "computed info hash", infoHash = $infoHash + + without manifestBlk =? await self.storeBitTorrentManifest( + bitTorrentManifest, infoHash + ), err: + error "Unable to store manifest" + return failure(err) + + info "Stored BitTorrent data", + infoHash = $infoHash, codexManifestCid = bitTorrentManifest.codexManifestCid + + success infoHash + proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} = without cidsIter =? await self.networkStore.listBlocks(BlockType.Manifest): warn "Failed to listBlocks" diff --git a/codex/rest/api.nim b/codex/rest/api.nim index e31a0f590..c75361c49 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -38,7 +38,14 @@ import ../erasure/erasure import ../manifest import ../streams/asyncstreamwrapper import ../stores +import ../utils/safeasynciter import ../utils/options +import ../bittorrent/manifest +import ../bittorrent/torrentdownloader +import ../bittorrent/magnetlink +import ../bittorrent/torrentparser + +import ../tarballs/[directorymanifest, directorydownloader, tarballnodeextensions] import ./coders import ./json @@ -52,6 +59,16 @@ declareCounter(codex_api_downloads, "codex API downloads") proc validate(pattern: string, value: string): int {.gcsafe, raises: [Defect].} = 0 +proc formatTorrentManifest( + infoHash: MultiHash, torrentManifest: BitTorrentManifest +): RestTorrentContent = + return RestTorrentContent.init(infoHash, torrentManifest) + +proc formatDirectoryManifest( + cid: Cid, manifest: DirectoryManifest +): RestDirectoryContent = + return RestDirectoryContent.init(cid, manifest) + proc formatManifest(cid: Cid, manifest: Manifest): RestContent = return RestContent.init(cid, manifest) @@ -151,6 +168,126 @@ proc retrieveCid( if not lpStream.isNil: await lpStream.close() +proc retrieveDirectory( + node: CodexNodeRef, cid: Cid, resp: HttpResponseRef +): Future[void] {.async: (raises: [CancelledError, HttpWriteError]).} = + ## Download torrent from the node in a streaming + ## manner + ## + let directoryDownloader = newDirectoryDownloader(node) + + var bytes = 0 + try: + without directoryManifest =? (await node.fetchDirectoryManifest(cid)), err: + error "Unable to fetch Directory Metadata", err = err.msg + resp.status = Http404 + await resp.sendBody(err.msg) + return + + resp.addHeader("Content-Type", "application/octet-stream") + + resp.setHeader( + "Content-Disposition", "attachment; filename=\"" & directoryManifest.name & "\"" + ) + + # ToDo: add contentSize to the directory manifest + # let contentLength = codexManifest.datasetSize + # resp.setHeader("Content-Length", $(contentLength.int)) + + await resp.prepare(HttpResponseStreamType.Plain) + + echo "streaming directory: ", cid + directoryDownloader.start(cid) + + echo "streaming directory started: ", cid + await sleepAsync(1.seconds) + echo "after sleep..." + + while true: + echo "getNext: ", directoryDownloader.queue.len, " entries in queue" + let data = await directoryDownloader.getNext() + echo "getNext[2]: ", data.len, " bytes" + await sleepAsync(1.seconds) + if data.len == 0: + break + bytes += data.len + await resp.sendChunk(addr data[0], data.len) + + echo "out of loop: ", directoryDownloader.queue.len, " entries in queue" + await resp.finish() + codex_api_downloads.inc() + except CancelledError as exc: + info "Streaming directory cancelled", exc = exc.msg + raise exc + finally: + info "Sent bytes for directory", cid, bytes + await directoryDownloader.stop() + +proc retrieveInfoHash( + node: CodexNodeRef, infoHash: MultiHash, resp: HttpResponseRef +): Future[void] {.async: (raises: [CancelledError, HttpWriteError]).} = + ## Download torrent from the node in a streaming + ## manner + ## + var torrentDownloader: TorrentDownloader + + var bytes = 0 + try: + without torrent =? (await node.retrieveTorrent(infoHash)), err: + error "Unable to fetch Torrent Metadata", err = err.msg + resp.status = Http404 + await resp.sendBody(err.msg) + return + let (torrentManifest, codexManifest) = torrent + + if codexManifest.mimetype.isSome: + resp.setHeader("Content-Type", codexManifest.mimetype.get()) + else: + resp.addHeader("Content-Type", "application/octet-stream") + + if codexManifest.filename.isSome: + resp.setHeader( + "Content-Disposition", + "attachment; filename=\"" & codexManifest.filename.get() & "\"", + ) + else: + resp.setHeader("Content-Disposition", "attachment") + + let contentLength = codexManifest.datasetSize + resp.setHeader("Content-Length", $(contentLength.int)) + + await resp.prepare(HttpResponseStreamType.Plain) + + without downloader =? node.getTorrentDownloader(torrentManifest, codexManifest), err: + error "Unable to stream torrent", err = err.msg + resp.status = Http500 + await resp.sendBody(err.msg) + return + + torrentDownloader = downloader + torrentDownloader.start() + + for blockFut in torrentDownloader.getAsyncBlockIterator(): + let blockRes = await blockFut + without (blockIndex, data) =? (blockRes), err: + error "Error streaming blocks", err = err.msg + resp.status = Http500 + if resp.isPending(): + await resp.sendBody(err.msg) + return + trace "streaming block", blockIndex, len = data.len + bytes += data.len + await resp.sendChunk(addr data[0], data.len) + + await resp.finish() + codex_api_downloads.inc() + except CancelledError as exc: + info "Stream cancelled", exc = exc.msg + raise exc + finally: + info "Sent bytes for torrent", infoHash = $infoHash, bytes + await torrentDownloader.stop() + proc buildCorsHeaders( httpMethod: string, allowedOrigin: Option[string] ): seq[(string, string)] = @@ -181,7 +318,150 @@ proc getFilenameFromContentDisposition(contentDisposition: string): ?string = return filename[0 ..^ 2].some proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRouter) = - let allowedOrigin = router.allowedOrigin # prevents capture inside of api defintion + let allowedOrigin = router.allowedOrigin # prevents capture inside of api definition + + router.api(MethodOptions, "/api/codex/v1/tar") do( + resp: HttpResponseRef + ) -> RestApiResponse: + if corsOrigin =? allowedOrigin: + resp.setCorsHeaders("POST", corsOrigin) + resp.setHeader( + "Access-Control-Allow-Headers", "content-type, content-disposition" + ) + + resp.status = Http204 + await resp.sendBody("") + + router.rawApi(MethodPost, "/api/codex/v1/tar") do() -> RestApiResponse: + ## Upload a file in a streaming manner + ## + + trace "Handling upload of a tar file" + var bodyReader = request.getBodyReader() + if bodyReader.isErr(): + return RestApiResponse.error(Http500, msg = bodyReader.error()) + + # Attempt to handle `Expect` header + # some clients (curl), wait 1000ms + # before giving up + # + await request.handleExpect() + + var mimetype = request.headers.getString(ContentTypeHeader).some + + if mimetype.get() != "": + let mimetypeVal = mimetype.get() + var m = newMimetypes() + let extension = m.getExt(mimetypeVal, "") + if extension == "": + return RestApiResponse.error( + Http422, "The MIME type '" & mimetypeVal & "' is not valid." + ) + else: + mimetype = string.none + + const ContentDispositionHeader = "Content-Disposition" + let contentDisposition = request.headers.getString(ContentDispositionHeader) + let filename = getFilenameFromContentDisposition(contentDisposition) + + if filename.isSome and not isValidFilename(filename.get()): + return RestApiResponse.error(Http422, "The filename is not valid.") + + # Here we could check if the extension matches the filename if needed + + let reader = bodyReader.get() + let stream = AsyncStreamReader(reader) + + try: + without json =? (await node.storeTarball(stream = stream)), error: + error "Error uploading tarball", exc = error.msg + return RestApiResponse.error(Http500, error.msg) + + codex_api_uploads.inc() + trace "Uploaded tarball", result = json + return RestApiResponse.response(json, contentType = "application/json") + except CancelledError: + trace "Upload cancelled error" + return RestApiResponse.error(Http500) + except AsyncStreamError: + trace "Async stream error" + return RestApiResponse.error(Http500) + finally: + await reader.closeWait() + + router.api(MethodOptions, "/api/codex/v1/torrent") do( + resp: HttpResponseRef + ) -> RestApiResponse: + if corsOrigin =? allowedOrigin: + resp.setCorsHeaders("POST", corsOrigin) + resp.setHeader( + "Access-Control-Allow-Headers", "content-type, content-disposition" + ) + + resp.status = Http204 + await resp.sendBody("") + + router.rawApi(MethodPost, "/api/codex/v1/torrent") do() -> RestApiResponse: + ## Upload a file in a streaming manner + ## + + trace "Handling file upload" + var bodyReader = request.getBodyReader() + if bodyReader.isErr(): + return RestApiResponse.error(Http500, msg = bodyReader.error()) + + # Attempt to handle `Expect` header + # some clients (curl), wait 1000ms + # before giving up + # + await request.handleExpect() + + var mimetype = request.headers.getString(ContentTypeHeader).some + + if mimetype.get() != "": + let mimetypeVal = mimetype.get() + var m = newMimetypes() + let extension = m.getExt(mimetypeVal, "") + if extension == "": + return RestApiResponse.error( + Http422, "The MIME type '" & mimetypeVal & "' is not valid." + ) + else: + mimetype = string.none + + const ContentDispositionHeader = "Content-Disposition" + let contentDisposition = request.headers.getString(ContentDispositionHeader) + let filename = getFilenameFromContentDisposition(contentDisposition) + + if filename.isSome and not isValidFilename(filename.get()): + return RestApiResponse.error(Http422, "The filename is not valid.") + + # Here we could check if the extension matches the filename if needed + + let reader = bodyReader.get() + + try: + without infoHash =? ( + await node.storeTorrent( + AsyncStreamWrapper.new(reader = AsyncStreamReader(reader)), + filename = filename, + mimetype = mimetype, + ) + ), error: + error "Error uploading file", exc = error.msg + return RestApiResponse.error(Http500, error.msg) + + codex_api_uploads.inc() + trace "Uploaded torrent", infoHash = $infoHash + return RestApiResponse.response(infoHash.hex) + except CancelledError: + trace "Upload cancelled error" + return RestApiResponse.error(Http500) + except AsyncStreamError: + trace "Async stream error" + return RestApiResponse.error(Http500) + finally: + await reader.closeWait() router.api(MethodOptions, "/api/codex/v1/data") do( resp: HttpResponseRef @@ -347,6 +627,136 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute resp.setHeader("Access-Control-Expose-Headers", "Content-Disposition") await node.retrieveCid(cid.get(), local = false, resp = resp) + router.api(MethodGet, "/api/codex/v1/dir/{cid}/network/stream") do( + cid: Cid, resp: HttpResponseRef + ) -> RestApiResponse: + ## Download a file from the network in a streaming + ## manner + ## + + var headers = buildCorsHeaders("GET", allowedOrigin) + + if cid.isErr: + return RestApiResponse.error(Http400, $cid.error(), headers = headers) + + if corsOrigin =? allowedOrigin: + resp.setCorsHeaders("GET", corsOrigin) + resp.setHeader("Access-Control-Headers", "X-Requested-With") + + resp.setHeader("Access-Control-Expose-Headers", "Content-Disposition") + await node.retrieveDirectory(cid.get(), resp = resp) + + router.api(MethodOptions, "/api/codex/v1/torrent/magnet") do( + resp: HttpResponseRef + ) -> RestApiResponse: + if corsOrigin =? allowedOrigin: + resp.setCorsHeaders("POST", corsOrigin) + resp.setHeader( + "Access-Control-Allow-Headers", "content-type, content-disposition" + ) + + resp.status = Http204 + await resp.sendBody("") + + router.api(MethodPost, "/api/codex/v1/torrent/magnet") do( + contentBody: Option[ContentBody], resp: HttpResponseRef + ) -> RestApiResponse: + let mimeType = request.headers.getString(ContentTypeHeader) + echo "mimeType: ", mimeType + if mimeType != "text/plain" and mimeType != "application/octet-stream": + return RestApiResponse.error( + Http422, + "Missing \"Content-Type\" header: expected either \"text/plain\" or \"application/octet-stream\".", + ) + without magnetLinkBytes =? contentBody .? data: + return RestApiResponse.error(Http422, "No magnet link provided.") + echo "magnetLinkBytes: ", bytesToString(magnetLinkBytes).strip + without magnetLink =? newMagnetLink(bytesToString(magnetLinkBytes).strip), err: + return RestApiResponse.error(Http422, err.msg) + if magnetLink.version != TorrentVersion.v1: + return RestApiResponse.error( + Http422, "Only torrents version 1 are currently supported!" + ) + without infoHash =? magnetLink.infoHashV1: + return RestApiResponse.error( + Http422, "The magnet link does not contain a valid info hash." + ) + await node.retrieveInfoHash(infoHash, resp = resp) + # return + # RestApiResponse.response($magnetLink, Http200, "text/plain") + + router.api(MethodOptions, "/api/codex/v1/torrent/torrent-file") do( + resp: HttpResponseRef + ) -> RestApiResponse: + if corsOrigin =? allowedOrigin: + resp.setCorsHeaders("POST", corsOrigin) + resp.setHeader( + "Access-Control-Allow-Headers", "content-type, content-disposition" + ) + + resp.status = Http204 + await resp.sendBody("") + + router.api(MethodPost, "/api/codex/v1/torrent/torrent-file") do( + contentBody: Option[ContentBody], resp: HttpResponseRef + ) -> RestApiResponse: + let mimeType = request.headers.getString(ContentTypeHeader) + echo "mimeType: ", mimeType + if mimeType != "application/json" and mimeType != "application/octet-stream": + return RestApiResponse.error( + Http422, + "Missing \"Content-Type\" header: expected either \"application/json\" or \"application/octet-stream\".", + ) + without torrentBytes =? contentBody .? data: + return RestApiResponse.error(Http422, "No torrent file content provided.") + var infoBytes: seq[byte] + if mimeType == "application/json": + without torrentManifest =? BitTorrentManifest.fromJson(torrentBytes): + return RestApiResponse.error(Http422, "Invalid torrent JSON file content.") + echo "torrentManifest: ", torrentManifest + let torrentInfo = torrentManifest.info + # very basic validation for now + if torrentInfo.length == 0 or torrentInfo.pieceLength == 0 or + torrentInfo.pieces.len == 0: + return + RestApiResponse.error(Http422, "The torrent file is invalid or incomplete.") + # return RestApiResponse.response($torrentInfo, contentType = "text/plain") + infoBytes = bencode(torrentInfo) + else: + without infoBencoded =? extractInfoFromTorrent(torrentBytes), err: + return RestApiResponse.error( + Http422, "Failed extracting info directory from the torrent file." + ) + infoBytes = infoBencoded + without infoHash =? MultiHash.digest($Sha1HashCodec, infoBytes).mapFailure, err: + return RestApiResponse.error( + Http422, "The torrent file does not contain a valid info hash." + ) + return await node.retrieveInfoHash(infoHash, resp = resp) + + router.api(MethodGet, "/api/codex/v1/torrent/{infoHash}/network/stream") do( + infoHash: MultiHash, resp: HttpResponseRef + ) -> RestApiResponse: + var headers = buildCorsHeaders("GET", allowedOrigin) + + without infoHash =? infoHash.mapFailure, error: + return RestApiResponse.error(Http400, error.msg, headers = headers) + + if infoHash.mcodec != Sha1HashCodec: + return RestApiResponse.error( + Http400, "Only torrents version 1 are currently supported!", headers = headers + ) + + if corsOrigin =? allowedOrigin: + resp.setCorsHeaders("GET", corsOrigin) + resp.setHeader("Access-Control-Headers", "X-Requested-With") + + trace "torrent requested: ", multihash = $infoHash + + await node.retrieveInfoHash(infoHash, resp = resp) + + # return RestApiResponse.response(Http200) + router.api(MethodGet, "/api/codex/v1/data/{cid}/network/manifest") do( cid: Cid, resp: HttpResponseRef ) -> RestApiResponse: @@ -365,6 +775,55 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute let json = %formatManifest(cid.get(), manifest) return RestApiResponse.response($json, contentType = "application/json") + router.api(MethodGet, "/api/codex/v1/data/{cid}/network/dirmanifest") do( + cid: Cid, resp: HttpResponseRef + ) -> RestApiResponse: + ## Download only the directory manifest. + ## + + var headers = buildCorsHeaders("GET", allowedOrigin) + + if cid.isErr: + return RestApiResponse.error(Http400, $cid.error(), headers = headers) + + without manifest =? (await node.fetchDirectoryManifest(cid.get())), err: + error "Failed to fetch directory manifest", err = err.msg + return RestApiResponse.error(Http404, err.msg, headers = headers) + + let json = %formatDirectoryManifest(cid.get(), manifest) + return RestApiResponse.response($json, contentType = "application/json") + + router.api(MethodGet, "/api/codex/v1/torrent/{infoHash}/network/manifest") do( + infoHash: MultiHash, resp: HttpResponseRef + ) -> RestApiResponse: + ## Download only the Bit Torrent manifest (if found) + ## + + var headers = buildCorsHeaders("GET", allowedOrigin) + + without infoHash =? infoHash.mapFailure, error: + return RestApiResponse.error(Http400, error.msg, headers = headers) + + if infoHash.mcodec != Sha1HashCodec: + return RestApiResponse.error( + Http400, "Only torrents version 1 are currently supported!", headers = headers + ) + + without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, err: + error "Unable to create CID for BitTorrent info hash", err = err.msg + resp.status = Http404 + await resp.sendBody(err.msg) + return + + without torrentManifest =? (await node.fetchTorrentManifest(infoHashCid)), err: + error "Unable to fetch Torrent Manifest", err = err.msg + resp.status = Http404 + await resp.sendBody(err.msg) + return + + let json = %formatTorrentManifest(infoHash, torrentManifest) + return RestApiResponse.response($json, contentType = "application/json") + router.api(MethodGet, "/api/codex/v1/space") do() -> RestApiResponse: let json = %RestRepoStore( diff --git a/codex/rest/coders.nim b/codex/rest/coders.nim index 319ce3d65..88688db61 100644 --- a/codex/rest/coders.nim +++ b/codex/rest/coders.nim @@ -21,6 +21,9 @@ import ../sales import ../purchasing import ../utils/stintutils +from ../codextypes import Sha1HashCodec +import ../bittorrent/manifest + proc encodeString*(cid: type Cid): Result[string, cstring] = ok($cid) @@ -82,6 +85,16 @@ proc decodeString*( except ValueError as e: err e.msg.cstring +proc decodeString*(_: type MultiHash, value: string): Result[MultiHash, cstring] = + without mhash =? BitTorrentInfo.buildMultiHash(value), e: + return err e.msg.cstring + ok mhash + # try: + # let bytes = value.hexToSeqByte + # MultiHash.init($Sha1HashCodec, bytes) + # except ValueError as e: + # err e.msg.cstring + proc decodeString*[T: PurchaseId | RequestId | Nonce | SlotId | AvailabilityId]( _: type T, value: string ): Result[T, cstring] = diff --git a/codex/rest/json.nim b/codex/rest/json.nim index 1b9459c12..2fe8c75f0 100644 --- a/codex/rest/json.nim +++ b/codex/rest/json.nim @@ -9,6 +9,9 @@ import ../utils/json import ../manifest import ../units +import ../bittorrent/manifest +import ../tarballs/directorymanifest + export json type @@ -47,6 +50,14 @@ type cid* {.serialize.}: Cid manifest* {.serialize.}: Manifest + RestDirectoryContent* = object + cid* {.serialize.}: Cid + manifest* {.serialize.}: DirectoryManifest + + RestTorrentContent* = object + infoHash* {.serialize.}: MultiHash + torrentManifest* {.serialize.}: BitTorrentManifest + RestContentList* = object content* {.serialize.}: seq[RestContent] @@ -81,6 +92,16 @@ proc init*(_: type RestContentList, content: seq[RestContent]): RestContentList proc init*(_: type RestContent, cid: Cid, manifest: Manifest): RestContent = RestContent(cid: cid, manifest: manifest) +proc init*( + _: type RestDirectoryContent, cid: Cid, manifest: DirectoryManifest +): RestDirectoryContent = + RestDirectoryContent(cid: cid, manifest: manifest) + +proc init*( + _: type RestTorrentContent, infoHash: MultiHash, torrentManifest: BitTorrentManifest +): RestTorrentContent = + RestTorrentContent(infoHash: infoHash, torrentManifest: torrentManifest) + proc init*(_: type RestNode, node: dn.Node): RestNode = RestNode( nodeId: RestNodeId.init(node.id), diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index bbe0bef8f..8c4696496 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -25,6 +25,7 @@ type BlockNotFoundError* = object of CodexError BlockType* {.pure.} = enum + Torrent Manifest Block Both diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index 54bed1b8c..0b1876a1f 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -157,6 +157,9 @@ method listBlocks*( without cid =? cid, err: trace "Cannot get Cid from the iterator", err = err.msg return false + without isTorrent =? cid.isTorrentInfoHash, err: + trace "Error checking if cid is a torrent info hash", err = err.msg + return false without isManifest =? cid.isManifest, err: trace "Error checking if cid is a manifest", err = err.msg return false @@ -166,8 +169,10 @@ method listBlocks*( return true of BlockType.Manifest: return isManifest + of BlockType.Torrent: + return isTorrent of BlockType.Block: - return not isManifest + return not (isManifest or isTorrent) ) ) success(iter) diff --git a/codex/stores/keyutils.nim b/codex/stores/keyutils.nim index 0634b6a26..e8b545261 100644 --- a/codex/stores/keyutils.nim +++ b/codex/stores/keyutils.nim @@ -24,6 +24,7 @@ const CodexBlocksKey* = Key.init(CodexBlocksNamespace).tryGet CodexTotalBlocksKey* = Key.init(CodexBlockTotalNamespace).tryGet CodexManifestKey* = Key.init(CodexManifestNamespace).tryGet + TorrentInfoHashKey* = Key.init(TorrentInfoHashNamespace).tryGet BlocksTtlKey* = Key.init(CodexBlocksTtlNamespace).tryGet BlockProofKey* = Key.init(CodexBlockProofNamespace).tryGet QuotaKey* = Key.init(CodexQuotaNamespace).tryGet @@ -33,7 +34,9 @@ const func makePrefixKey*(postFixLen: int, cid: Cid): ?!Key = let cidKey = ?Key.init(($cid)[^postFixLen ..^ 1] & "/" & $cid) - if ?cid.isManifest: + if ?cid.isTorrentInfoHash: + success TorrentInfoHashKey / cidKey + elif ?cid.isManifest: success CodexManifestKey / cidKey else: success CodexBlocksKey / cidKey diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index bea2971c7..902e71136 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -304,6 +304,7 @@ method listBlocks*( let key = case blockType + of BlockType.Torrent: TorrentInfoHashKey of BlockType.Manifest: CodexManifestKey of BlockType.Block: CodexBlocksKey of BlockType.Both: CodexRepoKey diff --git a/codex/tarballs/decoding.nim b/codex/tarballs/decoding.nim new file mode 100644 index 000000000..d1963d27e --- /dev/null +++ b/codex/tarballs/decoding.nim @@ -0,0 +1,60 @@ +## Nim-Codex +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import pkg/libp2p/cid +import pkg/libp2p/multihash +import pkg/libp2p/protobuf/minprotobuf + +import pkg/questionable/results + +import ../blocktype +import ./directorymanifest + +func decode*(_: type DirectoryManifest, data: openArray[byte]): ?!DirectoryManifest = + # ```protobuf + # Message DirectoryManifest { + # Message Cid { + # bytes data = 1; + # } + # string name = 1; + # repeated Cid cids = 2; + # ``` + + var + pbNode = initProtoBuffer(data) + pbInfo: ProtoBuffer + name: string + cids: seq[Cid] + cidsBytes: seq[seq[byte]] + + if pbNode.getField(1, name).isErr: + return failure("Unable to decode `name` from DirectoryManifest") + + if ?pbNode.getRepeatedField(2, cidsBytes).mapFailure: + for cidEntry in cidsBytes: + var pbCid = initProtoBuffer(cidEntry) + var dataBuf = newSeq[byte]() + if pbCid.getField(1, dataBuf).isErr: + return failure("Unable to decode piece `data` to Cid") + without cid =? Cid.init(dataBuf).mapFailure, err: + return failure(err.msg) + cids.add(cid) + + DirectoryManifest(name: name, cids: cids).success + +func decode*(_: type DirectoryManifest, blk: Block): ?!DirectoryManifest = + ## Decode a directory manifest using `decoder` + ## + + if not ?blk.cid.isManifest: + return failure "Cid is not a Directory Manifest Cid" + + DirectoryManifest.decode(blk.data) diff --git a/codex/tarballs/directorydownloader.nim b/codex/tarballs/directorydownloader.nim new file mode 100644 index 000000000..ffc78c404 --- /dev/null +++ b/codex/tarballs/directorydownloader.nim @@ -0,0 +1,250 @@ +## Nim-Codex +## Copyright (c) 2025 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import std/os +import std/times +import std/strutils +import std/sequtils +import std/sugar +import pkg/chronos +import pkg/libp2p/[cid, multihash] +import pkg/libp2p/stream/lpstream +import pkg/questionable/results +import pkg/stew/byteutils + +import ../node +import ../logutils +import ../utils/iter +import ../utils/safeasynciter +import ../utils/trackedfutures +import ../errors +import ../manifest +import ../blocktype +import ../stores/blockstore + +import ./tarballs +import ./directorymanifest +import ./decoding + +logScope: + topics = "codex node directorydownloader" + +type DirectoryDownloader* = ref object + node: CodexNodeRef + queue*: AsyncQueue[seq[byte]] + finished: bool + trackedFutures: TrackedFutures + +proc printQueue(self: DirectoryDownloader) = + echo "Queue: ", self.queue.len, " entries" + for i in 0 ..< self.queue.len: + echo "Entry ", i, ": ", self.queue[i].len, " bytes" + +proc createEntryHeader( + self: DirectoryDownloader, entry: TarballEntry, basePath: string +): string = + echo "Creating entry header for ", entry.name + echo "basePath = ", basePath + echo "entry = ", entry + result = newStringOfCap(512) + result.add(entry.name) + result.setLen(100) + # ToDo: read permissions from the TarballEntry + if entry.kind == ekDirectory: + result.add("000755 \0") # Dir mode + else: + result.add("000644 \0") # File mode + result.add(toOct(0, 6) & " \0") # Owner's numeric user ID + result.add(toOct(0, 6) & " \0") # Group's numeric user ID + result.add(toOct(entry.contentLength, 11) & ' ') # File size + result.add(toOct(entry.lastModified.toUnix(), 11) & ' ') # Last modified time + result.add(" ") # Empty checksum for now + result.setLen(156) + result.add(ord(entry.kind).char) + result.setLen(257) + result.add("ustar\0") # UStar indicator + result.add(toOct(0, 2)) # UStar version + result.setLen(329) + result.add(toOct(0, 6) & "\0 ") # Device major number + result.add(toOct(0, 6) & "\0 ") # Device minor number + result.add(basePath) + result.setLen(512) + + var checksum: int + for i in 0 ..< result.len: + checksum += result[i].int + + let checksumStr = toOct(checksum, 6) & '\0' + for i in 0 ..< checksumStr.len: + result[148 + i] = checksumStr[i] + +proc fetchTarball( + self: DirectoryDownloader, cid: Cid, basePath = "" +): Future[?!void] {.async: (raises: [CancelledError]).} = + echo "fetchTarball: ", cid, " basePath = ", basePath + # we got a Cid - let's check if this is a manifest (can be either + # a directory or file manifest) + without isM =? cid.isManifest, err: + warn "Unable to determine if cid is a manifest" + return failure("Unable to determine if cid is a manifest") + + if not isM: + # this is not a manifest, so we can return + return failure("given cid is not a manifest: " & $cid) + + # get the manifest + without blk =? await self.node.blockStore.getBlock(cid), err: + error "Error retrieving manifest block", cid, err = err.msg + return + failure("Error retrieving manifest block (cid = " & $cid & "), err = " & err.msg) + + without manifest =? Manifest.decode(blk), err: + info "Unable to decode as manifest - trying to decode as directory manifest", + err = err.msg + # Try if it not a directory manifest + without manifest =? DirectoryManifest.decode(blk), err: + error "Unable to decode as directory manifest", err = err.msg + return failure("Unable to decode as valid manifest (cid = " & $cid & ")") + # this is a directory manifest + echo "Decoded directory manifest: ", $manifest + let dirEntry = TarballEntry( + kind: ekDirectory, + name: manifest.name, + lastModified: getTime(), # ToDo: store actual time in the manifest + permissions: parseFilePermissions(cast[uint32](0o755)), # same here + contentLength: 0, + ) + let header = self.createEntryHeader(dirEntry, basePath) + echo "header = ", header + await self.queue.addLast(header.toBytes()) + self.printQueue() + var entryLength = header.len + let alignedEntryLength = (entryLength + 511) and not 511 # 512 byte aligned + if alignedEntryLength - entryLength > 0: + echo "Adding ", alignedEntryLength - entryLength, " bytes of padding" + var data = newSeq[byte]() + data.setLen(alignedEntryLength - entryLength) + await self.queue.addLast(data) + self.printQueue() + + for cid in manifest.cids: + echo "fetching directory content: ", cid + if err =? (await self.fetchTarball(cid, basePath / manifest.name)).errorOption: + error "Error fetching directory content", + cid, path = basePath / manifest.name, err = err.msg + return failure( + "Error fetching directory content (cid = " & $cid & "), err = " & err.msg + ) + echo "fetchTarball[DIR]: ", cid, " basePath = ", basePath, " done" + return success() + + # this is a regular file (Codex) manifest + echo "Decoded file manifest: ", $manifest + let fileEntry = TarballEntry( + kind: ekNormalFile, + name: manifest.filename |? "unknown", + lastModified: getTime(), # ToDo: store actual time in the manifest + permissions: parseFilePermissions(cast[uint32](0o644)), # same here + contentLength: manifest.datasetSize.int, + ) + let header = self.createEntryHeader(fileEntry, basePath) + await self.queue.addLast(header.toBytes()) + self.printQueue() + var contentLength = 0 + + proc onBatch( + blocks: seq[Block] + ): Future[?!void] {.async: (raises: [CancelledError]).} = + echo "onBatch: ", blocks.len, " blocks" + for blk in blocks: + echo "onBatch[blk.data]: ", string.fromBytes(blk.data) + # await self.queue.addLast(string.fromBytes(blk.data)) + await self.queue.addLast(blk.data) + self.printQueue() + contentLength += blk.data.len + # this can happen if the content was stored with padding + if contentLength > manifest.datasetSize.int: + contentLength = manifest.datasetSize.int + echo "onBatch[contentLength]: ", contentLength + success() + + await self.node.fetchDatasetAsync(manifest, fetchLocal = true, onBatch = onBatch) + + echo "contentLength: ", contentLength + echo "manifest.datasetSize.int: ", manifest.datasetSize.int + if contentLength != manifest.datasetSize.int: + echo "Warning: entry length mismatch, expected ", + manifest.datasetSize.int, " got ", contentLength + + let entryLength = header.len + contentLength + let alignedEntryLength = (entryLength + 511) and not 511 # 512 byte aligned + if alignedEntryLength - entryLength > 0: + echo "Adding ", alignedEntryLength - entryLength, " bytes of padding" + var data = newSeq[byte]() + echo "alignedEntryLength: ", alignedEntryLength + echo "entryLength: ", entryLength + echo "alignedEntryLength - entryLength: ", alignedEntryLength - entryLength + data.setLen(alignedEntryLength - entryLength) + echo "data.len: ", data.len + await self.queue.addLast(data) + self.printQueue() + echo "fetchTarball: ", cid, " basePath = ", basePath, " done" + return success() + +proc streamDirectory( + self: DirectoryDownloader, cid: Cid +): Future[void] {.async: (raises: []).} = + try: + if err =? (await self.fetchTarball(cid, basePath = "")).errorOption: + error "Error fetching directory content", cid, err = err.msg + return + # Two consecutive zero-filled records at end + var data = newSeq[byte]() + data.setLen(1024) + await self.queue.addLast(data) + self.printQueue() + # mark the end of the stream + self.finished = true + echo "streamDirectory: ", cid, " done" + except CancelledError: + info "Streaming directory cancelled:", cid + +########################################################################### +# Public API +########################################################################### + +proc start*(self: DirectoryDownloader, cid: Cid) = + ## Starts streaming the directory content + self.trackedFutures.track(self.streamDirectory(cid)) + +proc stop*(self: DirectoryDownloader) {.async: (raises: []).} = + await noCancel self.trackedFutures.cancelTracked() + +proc getNext*( + self: DirectoryDownloader +): Future[seq[byte]] {.async: (raises: [CancelledError]).} = + ## Returns the next entry from the queue + echo "getNext: ", self.queue.len, " entries in queue" + if (self.queue.len == 0 and self.finished): + return @[] + echo "getNext[2]: ", self.queue.len, " entries in queue" + let chunk = await self.queue.popFirst() + echo "getNext: ", chunk.len, " bytes" + return chunk + +proc newDirectoryDownloader*(node: CodexNodeRef): DirectoryDownloader = + ## Creates a new DirectoryDownloader instance + DirectoryDownloader( + node: node, + queue: newAsyncQueue[seq[byte]](), + finished: false, + trackedFutures: TrackedFutures(), + ) diff --git a/codex/tarballs/directorymanifest.nim b/codex/tarballs/directorymanifest.nim new file mode 100644 index 000000000..1e83158b7 --- /dev/null +++ b/codex/tarballs/directorymanifest.nim @@ -0,0 +1,26 @@ +## Nim-Codex +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import pkg/libp2p/cid +import ../utils/json + +type DirectoryManifest* = ref object + name* {.serialize.}: string + cids* {.serialize.}: seq[Cid] + +proc `$`*(self: DirectoryManifest): string = + "DirectoryManifest(name: " & self.name & ", cids: " & $self.cids & ")" + +func `==`*(a: DirectoryManifest, b: DirectoryManifest): bool = + a.name == b.name and a.cids == b.cids + +proc newDirectoryManifest*(name: string, cids: seq[Cid]): DirectoryManifest = + DirectoryManifest(name: name, cids: cids) diff --git a/codex/tarballs/encoding.nim b/codex/tarballs/encoding.nim new file mode 100644 index 000000000..8e25853cb --- /dev/null +++ b/codex/tarballs/encoding.nim @@ -0,0 +1,39 @@ +## Nim-Codex +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import pkg/libp2p/cid +import pkg/libp2p/protobuf/minprotobuf + +import ./directorymanifest + +proc write(pb: var ProtoBuffer, field: int, value: Cid) = + var ipb = initProtoBuffer() + ipb.write(1, value.data.buffer) + ipb.finish() + pb.write(field, ipb) + +proc encode*(manifest: DirectoryManifest): seq[byte] = + # ```protobuf + # Message DirectoryManifest { + # Message Cid { + # bytes data = 1; + # } + # + # string name = 1; + # repeated Cid cids = 2; + # ``` + + var ipb = initProtoBuffer() + ipb.write(1, manifest.name) + for cid in manifest.cids: + ipb.write(2, cid) + ipb.finish() + ipb.buffer diff --git a/codex/tarballs/stdstreamwrapper.nim b/codex/tarballs/stdstreamwrapper.nim new file mode 100644 index 000000000..10e3aeb07 --- /dev/null +++ b/codex/tarballs/stdstreamwrapper.nim @@ -0,0 +1,72 @@ +## Nim-LibP2P +## Copyright (c) 2019 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import std/streams +import pkg/libp2p +import pkg/chronos + +import ../logutils + +logScope: + topics = "libp2p stdstreamwrapper" + +const StdStreamWrapperName* = "StdStreamWrapper" + +type StdStreamWrapper* = ref object of LPStream + stream*: Stream + +method initStream*(self: StdStreamWrapper) = + if self.objName.len == 0: + self.objName = StdStreamWrapperName + + procCall LPStream(self).initStream() + +proc newStdStreamWrapper*(stream: Stream = nil): StdStreamWrapper = + let stream = StdStreamWrapper(stream: stream) + + stream.initStream() + return stream + +template withExceptions(body: untyped) = + try: + body + except CatchableError as exc: + raise newException(Defect, "Unexpected error in StdStreamWrapper", exc) + +method readOnce*( + self: StdStreamWrapper, pbytes: pointer, nbytes: int +): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} = + trace "Reading bytes from stream", bytes = nbytes + if isNil(self.stream): + error "StdStreamWrapper: stream is nil" + raiseAssert("StdStreamWrapper: stream is nil") + + if self.atEof: + raise newLPStreamEOFError() + + withExceptions: + return self.stream.readData(pbytes, nbytes) + +method atEof*(self: StdStreamWrapper): bool = + withExceptions: + return self.stream.atEnd() + +method closeImpl*(self: StdStreamWrapper) {.async: (raises: []).} = + try: + trace "Shutting down std stream" + + self.stream.close() + + trace "Shutdown async chronos stream" + except CatchableError as exc: + trace "Error closing std stream", msg = exc.msg + + await procCall LPStream(self).closeImpl() diff --git a/codex/tarballs/tarballnodeextensions.nim b/codex/tarballs/tarballnodeextensions.nim new file mode 100644 index 000000000..9aa36fbd0 --- /dev/null +++ b/codex/tarballs/tarballnodeextensions.nim @@ -0,0 +1,118 @@ +import std/streams + +import pkg/chronos +import pkg/libp2p/cid +import pkg/questionable/results + +import ../node +import ../blocktype +import ../manifest +import ../stores/blockstore + +import ./tarballs +import ./stdstreamwrapper +import ./directorymanifest +import ./encoding +import ./decoding + +proc fetchDirectoryManifest*( + self: CodexNodeRef, cid: Cid +): Future[?!DirectoryManifest] {.async: (raises: [CancelledError]).} = + ## Fetch and decode a manifest block + ## + + if err =? cid.isManifest.errorOption: + return failure "CID has invalid content type for manifest {$cid}" + + trace "Retrieving directory manifest for cid", cid + + without blk =? await self.blockStore.getBlock(BlockAddress.init(cid)), err: + trace "Error retrieving directory manifest block", cid, err = err.msg + return failure err + + trace "Decoding directory manifest for cid", cid + + without manifest =? DirectoryManifest.decode(blk), err: + trace "Unable to decode as directory manifest", err = err.msg + return failure("Unable to decode as directory manifest") + + trace "Decoded directory manifest", cid + + manifest.success + +proc storeDirectoryManifest*( + self: CodexNodeRef, manifest: DirectoryManifest +): Future[?!Block] {.async.} = + let encodedManifest = manifest.encode() + + without blk =? Block.new(data = encodedManifest, codec = ManifestCodec), error: + trace "Unable to create block from manifest" + return failure(error) + + if err =? (await self.blockStore.putBlock(blk)).errorOption: + trace "Unable to store manifest block", cid = blk.cid, err = err.msg + return failure(err) + + success blk + +proc storeTarball*( + self: CodexNodeRef, stream: AsyncStreamReader +): Future[?!string] {.async.} = + info "Storing tarball data" + + # Just as a proof of concept, we process tar bar in memory + # Later to see how to do actual streaming to either store + # tarball locally in some tmp folder, or to process the + # tarball incrementally + let tarballBytes = await stream.read() + let stream = newStringStream(string.fromBytes(tarballBytes)) + + proc onProcessedTarFile( + stream: Stream, fileName: string + ): Future[?!Cid] {.gcsafe, async: (raises: [CancelledError]).} = + try: + echo "onProcessedTarFile:name: ", fileName + let stream = newStdStreamWrapper(stream) + await self.store(stream, filename = some fileName, pad = false) + except CancelledError as e: + raise e + except CatchableError as e: + error "Error processing tar file", fileName, exc = e.msg + return failure(e.msg) + + proc onProcessedTarDir( + name: string, cids: seq[Cid] + ): Future[?!Cid] {.gcsafe, async: (raises: [CancelledError]).} = + try: + echo "onProcessedTarDir:name: ", name + echo "onProcessedTarDir:cids: ", cids + let directoryManifest = newDirectoryManifest(name = name, cids = cids) + without manifestBlk =? await self.storeDirectoryManifest(directoryManifest), err: + error "Unable to store manifest" + return failure(err) + manifestBlk.cid.success + except CancelledError as e: + raise e + except CatchableError as e: + error "Error processing tar dir", name, exc = e.msg + return failure(e.msg) + + let tarball = Tarball() + if err =? (await tarball.open(stream, onProcessedTarFile)).errorOption: + error "Unable to open tarball", err = err.msg + return failure(err) + echo "tarball = ", $tarball + without root =? tarball.findRootDir(), err: + return failure(err.msg) + echo "root = ", root + let dirs = processDirEntries(tarball) + echo "dirs = ", dirs + without tree =? (await buildTree(root = root, dirs = dirs, onProcessedTarDir)), err: + error "Unable to build tree", err = err.msg + return failure(err) + echo "" + echo "preorderTraversal:" + let json = newJArray() + preorderTraversal(tree, json) + echo "json = ", json + success($json) diff --git a/codex/tarballs/tarballs.nim b/codex/tarballs/tarballs.nim new file mode 100644 index 000000000..b0ba59e9d --- /dev/null +++ b/codex/tarballs/tarballs.nim @@ -0,0 +1,282 @@ +{.push raises: [].} + +import std/os +import std/times +import std/strutils +import std/strformat +import std/sequtils +import std/streams +import std/tables + +import std/random + +import pkg/chronos +import pkg/questionable/results +import pkg/libp2p/[cid, multicodec, multihash] +import pkg/serde/json + +import ../blocktype +import ../manifest +import ./directorymanifest + +proc example2*(_: type Block, size: int = 4096): ?!Block = + let length = rand(size) + let bytes = newSeqWith(length, rand(uint8)) + Block.new(bytes) + +proc example2*(_: type Cid): ?!Cid = + Block.example2 .? cid + +const + TUREAD* = 0o00400'u32 # read by owner */ + TUWRITE* = 0o00200'u32 # write by owner */ + TUEXEC* = 0o00100'u32 # execute/search by owner */ + TGREAD* = 0o00040'u32 # read by group */ + TGWRITE* = 0o00020'u32 # write by group */ + TGEXEC* = 0o00010'u32 # execute/search by group */ + TOREAD* = 0o00004'u32 # read by other */ + TOWRITE* = 0o00002'u32 # write by other */ + TOEXEC* = 0o00001'u32 # execute/search by other */ + +type + EntryKind* = enum + ekNormalFile = '0' + ekDirectory = '5' + + TarballEntry* = object + kind*: EntryKind + name*: string + cid*: Cid + contentLength*: int + lastModified*: times.Time + permissions*: set[FilePermission] + + Tarball* = ref object + contents*: OrderedTable[string, TarballEntry] + + TarballError* = object of ValueError + + TarballTree* = ref object + name*: string + cid*: Cid + children*: seq[TarballTree] + + # ToDo: make sure we also record files permissions, modification time, etc... + # For now, only fileName so that we do not have to change the Codex manifest + # right away + OnProcessedTarFile* = proc(stream: Stream, fileName: string): Future[?!Cid] {. + gcsafe, async: (raises: [CancelledError]) + .} + + OnProcessedTarDir* = proc(name: string, cids: seq[Cid]): Future[?!Cid] {. + gcsafe, async: (raises: [CancelledError]) + .} + +proc `$`*(tarball: Tarball): string = + result = "Tarball with " & $tarball.contents.len & " entries" + for name, entry in tarball.contents.pairs: + var lastModified: string = "(unknown)" + try: + let lastModified = $entry.lastModified + except TimeFormatParseError: + discard + result.add( + "\n " & + fmt"{name}: name = {entry.name}, {entry.kind} ({entry.contentLength} bytes) @ {lastModified} [{entry.cid}]" + ) + +proc `$`*(tarballEntry: TarballEntry): string = + ## Returns a string representation of the tarball entry. + result = fmt"({tarballEntry.kind}, {tarballEntry.name})" + +proc parseFilePermissions*(permissions: uint32): set[FilePermission] = + if defined(windows) or permissions == 0: + # Ignore file permissions on Windows. If they are absent (.zip made on + # Windows for example), set default permissions. + result.incl fpUserRead + result.incl fpUserWrite + result.incl fpGroupRead + result.incl fpOthersRead + else: + if (permissions and TUREAD) != 0: + result.incl(fpUserRead) + if (permissions and TUWRITE) != 0: + result.incl(fpUserWrite) + if (permissions and TUEXEC) != 0: + result.incl(fpUserExec) + if (permissions and TGREAD) != 0: + result.incl(fpGroupRead) + if (permissions and TGWRITE) != 0: + result.incl(fpGroupWrite) + if (permissions and TGEXEC) != 0: + result.incl(fpGroupExec) + if (permissions and TOREAD) != 0: + result.incl(fpOthersRead) + if (permissions and TOWRITE) != 0: + result.incl(fpOthersWrite) + if (permissions and TOEXEC) != 0: + result.incl(fpOthersExec) + +proc toUnixPath(path: string): string = + path.replace('\\', '/') + +proc clear*(tarball: Tarball) = + tarball.contents.clear() + +proc openStreamImpl( + tarball: Tarball, stream: Stream, onProcessedTarFile: OnProcessedTarFile = nil +): Future[?!void] {.async: (raises: []).} = + tarball.clear() + + proc trim(s: string): string = + for i in 0 ..< s.len: + if s[i] == '\0': + return s[0 ..< i] + s + + try: + var data = stream.readAll() # TODO: actually treat as a stream + + var pos: int + while pos < data.len: + if pos + 512 > data.len: + return failure("Attempted to read past end of file, corrupted tarball?") + + let + header = data[pos ..< pos + 512] + fileName = header[0 ..< 100].trim() + + pos += 512 + + if fileName.len == 0: + continue + + let + fileSize = + try: + parseOctInt(header[124 .. 134]) + except ValueError: + raise newException(TarballError, "Unexpected error while opening tarball") + lastModified = + try: + parseOctInt(header[136 .. 146]) + except ValueError: + raise newException(TarballError, "Unexpected error while opening tarball") + typeFlag = header[156] + fileMode = + try: + parseOctInt(header[100 ..< 106]) + except ValueError: + raise newException( + TarballError, "Unexpected error while opening tarball (mode)" + ) + fileNamePrefix = + if header[257 ..< 263] == "ustar\0": + header[345 ..< 500].trim() + else: + "" + + if pos + fileSize > data.len: + return failure("Attempted to read past end of file, corrupted tarball?") + + let normalizedFileName = normalizePathEnd(fileName) + + if typeFlag == '0' or typeFlag == '\0': + if not onProcessedTarFile.isNil: + let stream = newStringStream(data[pos ..< pos + fileSize]) + without cid =? + await onProcessedTarFile(stream, normalizedFileName.lastPathPart), err: + return failure(err.msg) + tarball.contents[(fileNamePrefix / fileName).toUnixPath()] = TarballEntry( + kind: ekNormalFile, + name: normalizedFileName, + contentLength: fileSize, + cid: cid, + lastModified: initTime(lastModified, 0), + permissions: parseFilePermissions(cast[uint32](fileMode)), + ) + elif typeFlag == '5': + tarball.contents[normalizePathEnd((fileNamePrefix / fileName).toUnixPath())] = TarballEntry( + kind: ekDirectory, + name: normalizedFileName, + lastModified: initTime(lastModified, 0), + permissions: parseFilePermissions(cast[uint32](fileMode)), + ) + + # Move pos by fileSize, where fileSize is 512 byte aligned + pos += (fileSize + 511) and not 511 + success() + except CatchableError as e: + return failure(e.msg) + +proc open*( + tarball: Tarball, bytes: string, onProcessedTarFile: OnProcessedTarFile = nil +): Future[?!void] {.async: (raw: true, raises: []).} = + let stream = newStringStream(bytes) + tarball.openStreamImpl(stream, onProcessedTarFile) + +proc open*( + tarball: Tarball, stream: Stream, onProcessedTarFile: OnProcessedTarFile = nil +): Future[?!void] {.async: (raw: true, raises: []).} = + tarball.openStreamImpl(stream, onProcessedTarFile) + +proc processDirEntries*(tarball: Tarball): Table[string, seq[TarballEntry]] = + result = initTable[string, seq[TarballEntry]]() + for name, entry in tarball.contents.pairs: + let path = normalizePathEnd(name) + if not isRootDir(path): + let (head, _) = splitPath(path) + result.withValue(head, value): + value[].add(entry) + do: + result[head] = @[entry] + +proc findRootDir*(tarball: Tarball): ?!string = + var rootDir = "" + for entry in tarball.contents.values: + if entry.kind == ekDirectory: + if isRootDir(entry.name): + return success(entry.name) + failure("No root directory found in tarball") + +proc buildTree*( + root: string, + dirs: Table[string, seq[TarballEntry]], + onProcessedTarDir: OnProcessedTarDir = nil, +): Future[?!TarballTree] {.async: (raises: [CancelledError]).} = + let tree = TarballTree(name: root.lastPathPart, children: @[]) + let entries = dirs.getOrDefault(root) + for entry in entries: + if entry.kind == ekDirectory: + without subTree =? + await buildTree(root = entry.name, dirs = dirs, onProcessedTarDir), err: + return failure(err.msg) + # compute Cid for the subtree + # let cids = subTree.children.mapIt(it.cid) + # if not onProcessedTarDir.isNil: + # without cid =? await onProcessedTarDir(subTree.name, cids), err: + # return failure(err.msg) + # subTree.cid = cid + tree.children.add(subTree) + else: + let child = + TarballTree(name: entry.name.lastPathPart, children: @[], cid: entry.cid) + tree.children.add(child) + let cids = tree.children.mapIt(it.cid) + if not onProcessedTarDir.isNil: + without cid =? await onProcessedTarDir(tree.name, cids), err: + return failure(err.msg) + tree.cid = cid + success(tree) + +proc preorderTraversal*(root: TarballTree, json: JsonNode) = + echo root.name + let jsonObj = newJObject() + jsonObj["name"] = newJString(root.name) + jsonObj["cid"] = newJString($root.cid) + json.add(jsonObj) + if root.children.len > 0: + let jsonArray = newJArray() + jsonObj["children"] = jsonArray + for child in root.children: + preorderTraversal(child, jsonArray) diff --git a/codex/utils/json.nim b/codex/utils/json.nim index 5bd168464..b4286ea67 100644 --- a/codex/utils/json.nim +++ b/codex/utils/json.nim @@ -2,7 +2,8 @@ import std/options import std/typetraits from pkg/ethers import Address from pkg/libp2p import - Cid, PeerId, SignedPeerRecord, MultiAddress, AddressInfo, init, `$` + Cid, PeerId, SignedPeerRecord, MultiAddress, AddressInfo, MultiHash, init, hex, `$` +import pkg/stew/byteutils import pkg/contractabi import pkg/codexdht/discv5/node as dn import pkg/serde/json @@ -35,3 +36,12 @@ func `%`*(obj: MultiAddress): JsonNode = func `%`*(address: ethers.Address): JsonNode = % $address + +proc fromJson*(_: type MultiHash, json: JsonNode): ?!MultiHash = + expectJsonKind(MultiHash, JString, json) + without bytes =? json.str.hexToSeqByte.catch, err: + return failure(err.msg) + MultiHash.init(bytes).mapFailure + +func `%`*(multiHash: MultiHash): JsonNode = + %multiHash.hex diff --git a/tests/codex/bittorrent/helpers.nim b/tests/codex/bittorrent/helpers.nim new file mode 100644 index 000000000..10bcaf261 --- /dev/null +++ b/tests/codex/bittorrent/helpers.nim @@ -0,0 +1,102 @@ +import pkg/chronos +import pkg/libp2p/[cid, multicodec, multihash] +import pkg/questionable/results + +import pkg/codex/stores/cachestore +import pkg/codex/utils/iter + +import pkg/codex/manifest +import pkg/codex/bittorrent/manifest + +proc torrentInfoForCodexManifest*( + localStore: BlockStore, + codexManifest: Manifest, + pieceLength = DefaultPieceLength.int, + name = string.none, +): Future[?!BitTorrentInfo] {.async.} = + let treeCid = codexManifest.treeCid + let datasetSize = codexManifest.datasetSize + let blockSize = codexManifest.blockSize + let numOfBlocks = divUp(datasetSize, blockSize) + let blockIter = Iter.new(0 ..< numOfBlocks) + var blocks = newSeq[Block](numOfBlocks) + while not blockIter.finished: + let index = blockIter.next() + without blk =? (await localStore.getBlock(treeCid, index)), err: + return failure(err) + blocks[index] = blk + let + numOfBlocksPerPiece = pieceLength div BitTorrentBlockSize.int + numOfPieces = divUp(datasetSize, pieceLength.NBytes) + + var + pieces: seq[MultiHash] + pieceHashCtx: sha1 + pieceIter = Iter[int].new(0 ..< numOfBlocksPerPiece) + + pieceHashCtx.init() + + for blk in blocks: + if blk.data.len == 0: + break + if pieceIter.finished: + without mh =? MultiHash.init($Sha1HashCodec, pieceHashCtx.finish()).mapFailure, + err: + return failure(err) + pieces.add(mh) + pieceIter = Iter[int].new(0 ..< numOfBlocksPerPiece) + pieceHashCtx.init() + pieceHashCtx.update(blk.data) + discard pieceIter.next() + + without mh =? MultiHash.init($Sha1HashCodec, pieceHashCtx.finish()).mapFailure, err: + return failure(err) + pieces.add(mh) + + let info = BitTorrentInfo( + length: datasetSize.uint64, + pieceLength: pieceLength.uint32, + pieces: pieces, + name: name, + ) + + success info + +proc storeCodexManifest*( + codexManifest: Manifest, localStore: BlockStore +): Future[?!Block] {.async.} = + without encodedManifest =? codexManifest.encode(), err: + trace "Unable to encode manifest", err = err.msg + return failure(err) + + without blk =? Block.new(data = encodedManifest, codec = ManifestCodec), err: + trace "Unable to create block from manifest", err = err.msg + return failure(err) + + if err =? (await localStore.putBlock(blk)).errorOption: + trace "Unable to store manifest block", cid = blk.cid, err = err.msg + return failure(err) + + success blk + +proc storeTorrentManifest*( + torrentManifest: BitTorrentManifest, localStore: BlockStore +): Future[?!Block] {.async.} = + let infoBencoded = torrentManifest.info.bencode() + let infoHash = MultiHash.digest($Sha1HashCodec, infoBencoded).tryGet + let encodedManifest = torrentManifest.encode() + + without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, err: + trace "Unable to create CID for BitTorrent info hash", err = err.msg + return failure(err) + + without blk =? Block.new(data = encodedManifest, cid = infoHashCid, verify = false), + err: + trace "Unable to create block from manifest", err = err.msg + return failure(err) + + if err =? (await localStore.putBlock(blk)).errorOption: + trace "Unable to store BitTorrent manifest block", cid = blk.cid, err = err.msg + return failure(err) + + success blk diff --git a/tests/codex/bittorrent/testbencoding.nim b/tests/codex/bittorrent/testbencoding.nim new file mode 100644 index 000000000..57cb167f2 --- /dev/null +++ b/tests/codex/bittorrent/testbencoding.nim @@ -0,0 +1,116 @@ +import std/strformat +import std/sequtils + +import pkg/unittest2 +import pkg/nimcrypto +import pkg/stew/byteutils +import pkg/questionable + +import ../../examples +import ../../../codex/bittorrent/bencoding + +type ExampleObject* = ref object + length*: uint64 + pieceLength*: uint32 + pieces*: seq[seq[byte]] + name*: ?string + +func bencode(obj: ExampleObject): seq[byte] = + # flatten pieces + var pieces: seq[byte] + for piece in obj.pieces: + pieces.add(piece) + result = @['d'.byte] + result.add(bencode("length") & bencode(obj.length)) + if name =? obj.name: + result.add(bencode("name") & bencode(name)) + result.add(bencode("piece length") & bencode(obj.pieceLength)) + result.add(bencode("pieces") & bencode(pieces)) + result.add('e'.byte) + +proc toString(bytes: seq[byte]): string = + result = newStringOfCap(len(bytes)) + for b in bytes: + add(result, b.char) + +proc checkEncoding(actual: seq[byte], expected: string) = + check actual.toString == expected + +suite "b-encoding": + test "int": + checkEncoding(bencode(1'i8), "i1e") + checkEncoding(bencode(-1'i8), "i-1e") + checkEncoding(bencode(int8.low), fmt"i{int8.low}e") + checkEncoding(bencode(int8.high), fmt"i{int8.high}e") + checkEncoding(bencode(uint8.low), fmt"i{uint8.low}e") + checkEncoding(bencode(uint8.high), fmt"i{uint8.high}e") + checkEncoding(bencode(int16.low), fmt"i{int16.low}e") + checkEncoding(bencode(int16.high), fmt"i{int16.high}e") + checkEncoding(bencode(uint16.low), fmt"i{uint16.low}e") + checkEncoding(bencode(uint16.high), fmt"i{uint16.high}e") + checkEncoding(bencode(int32.low), fmt"i{int32.low}e") + checkEncoding(bencode(int32.high), fmt"i{int32.high}e") + checkEncoding(bencode(uint32.low), fmt"i{uint32.low}e") + checkEncoding(bencode(uint32.high), fmt"i{uint32.high}e") + checkEncoding(bencode(uint.high), fmt"i{uint.high}e") + checkEncoding(bencode(int64.low), fmt"i{int64.low}e") + checkEncoding(bencode(int64.high), fmt"i{int64.high}e") + checkEncoding(bencode(uint64.low), fmt"i{uint64.low}e") + checkEncoding(bencode(uint64.high), fmt"i{uint64.high}e") + checkEncoding(bencode(int.low), fmt"i{int.low}e") + checkEncoding(bencode(int.high), fmt"i{int.high}e") + + test "empty buffer": + let input: array[0, byte] = [] + check bencode(input) == "0:".toBytes + + test "buffer": + let input = [1.byte, 2, 3] + check bencode(input) == fmt"{input.len}:".toBytes() & @input + + test "longer buffer": + let input = toSeq(1.byte .. 127.byte) + check bencode(input) == fmt"{input.len}:".toBytes() & @input + + test "string": + let input = "abc" + check bencode(input) == "3:abc".toBytes + + test "longer string": + let input = exampleString(127) + check bencode(input) == fmt"{input.len}:{input}".toBytes + + test "empty string": + let input = "" + check bencode(input) == "0:".toBytes + + test "empty list": + let input: seq[string] = @[] + check bencode(input) == "le".toBytes + + test "list (of strings)": + let input = ["abc", "def"] + check bencode(input) == "l3:abc3:defe".toBytes + + test "list (of seq[byte])": + let seq1 = toSeq(1.byte .. 127.byte) + let seq2 = toSeq(128.byte .. 150.byte) + let input = [seq1, seq2] + check bencode(input) == + fmt"l{seq1.len}:".toBytes & seq1 & fmt"{seq2.len}:".toBytes & seq2 & @['e'.byte] + + test "list (of integers)": + let input = [1, -2, 3, 0x7f, -0x80, 0xff] + check bencode(input) == "li1ei-2ei3ei127ei-128ei255ee".toBytes + + test "custom type": + let piece = "1cc46da027e7ff6f1970a2e58880dbc6a08992a0".hexToSeqByte + let obj = ExampleObject( + length: 40960, pieceLength: 65536, pieces: @[piece], name: "data40k.bin".some + ) + let encoded = bencode(obj) + check encoded == + "d6:lengthi40960e4:name11:data40k.bin12:piece lengthi65536e6:pieces20:".toBytes & + piece & @['e'.byte] + let expectedInfoHash = "1902d602db8c350f4f6d809ed01eff32f030da95" + check $sha1.digest(encoded) == expectedInfoHash.toUpperAscii diff --git a/tests/codex/bittorrent/testmagnetlink.nim b/tests/codex/bittorrent/testmagnetlink.nim new file mode 100644 index 000000000..253868cce --- /dev/null +++ b/tests/codex/bittorrent/testmagnetlink.nim @@ -0,0 +1,112 @@ +import std/strformat + +import pkg/unittest2 + +import pkg/libp2p/[multicodec, multihash] +import pkg/questionable/results +import pkg/stew/byteutils + +import ../examples + +import pkg/codex/bittorrent/magnetlink + +suite "bittorrent magnet links": + test "creates correct magnet link object from provided string": + let infoHash = "1902d602db8c350f4f6d809ed01eff32f030da95".toUpperAscii + let magnetLinkStr = fmt"magnet:?xt=urn:btih:{infoHash}" + let magnetLink = newMagnetLink(magnetLinkStr).tryGet() + check $magnetLink == magnetLinkStr + test "correctly parses magnet link version 1": + let multiHash = MultiHash.example(Sha1HashCodec) + let hash = multiHash.data.buffer[multiHash.dpos .. ^1] + let magnetLinkStr = + fmt"magnet:?xt=urn:btih:{byteutils.toHex(hash).toUpperAscii}&dn=example.txt&tr=udp://tracker.example.com/announce&x.pe=31.205.250.200:8080" + let magnetLink = newMagnetLink(magnetLinkStr).tryGet() + check $magnetLink == magnetLinkStr.split("&")[0] + + test "correctly parses magnet link version 2": + let multiHash = MultiHash.example() + let magnetLinkStr = + fmt"magnet:?xt=urn:btmh:{multihash.hex}&dn=example.txt&tr=udp://tracker.example.com/announce&x.pe=31.205.250.200:8080" + let magnetLink = newMagnetLink(magnetLinkStr).tryGet() + check $magnetLink == magnetLinkStr.split("&")[0] + + test "correctly parses hybrid magnet links": + let multiHashV1 = MultiHash.example(Sha1HashCodec) + let hash = multiHashV1.data.buffer[multiHashV1.dpos .. ^1] + let multiHash = MultiHash.example() + let magnetLinkStr = + fmt"magnet:?xt=urn:btih:{byteutils.toHex(hash).toUpperAscii}&xt=urn:btmh:{multihash.hex}&dn=example.txt&tr=udp://tracker.example.com/announce&x.pe=31.205.250.200:8080" + let magnetLink = newMagnetLink(magnetLinkStr).tryGet() + check $magnetLink == magnetLinkStr.split("&")[0 .. 1].join("&") + + test "accepts hybrid magnet links with one info hash part incorrect (v1 part correct)": + let multiHashV1 = MultiHash.example(Sha1HashCodec) + let hash = multiHashV1.data.buffer[multiHashV1.dpos .. ^1] + let magnetLinkStr = + fmt"magnet:?xt=urn:btih:{byteutils.toHex(hash).toUpperAscii}&xt=urn:btmh&dn=example.txt&tr=udp://tracker.example.com/announce&x.pe=31.205.250.200:8080" + let magnetLink = newMagnetLink(magnetLinkStr).tryGet() + check $magnetLink == magnetLinkStr.split("&")[0] + + test "accepts hybrid magnet links with one info hash part incorrect (v2 part correct)": + let multiHash = MultiHash.example() + let magnetLinkStr = + fmt"magnet:?xt=urn:btih&xt=urn:btmh:{multihash.hex}&dn=example.txt&tr=udp://tracker.example.com/announce&x.pe=31.205.250.200:8080" + let magnetLink = newMagnetLink(magnetLinkStr).tryGet() + check $magnetLink == "magnet:?" & magnetLinkStr.split("&")[1] + + test "fails for magnet links without 'magnet' prefix": + let magnetLinkStr = "invalid_magnet_link" + let magnetLink = newMagnetLink(magnetLinkStr) + check magnetLink.isFailure + check magnetLink.error.msg == + "Invalid magnet link format (missing 'magnet:?' prefix)" + + test "fails for magnet links without 'infoHash' part": + let magnetLinkStr = + "magnet:?dn=example.txt&tr=udp://tracker.example.com/announce&x.pe=31.205.250.200:8080" + let magnetLink = newMagnetLink(magnetLinkStr) + check magnetLink.isFailure + check magnetLink.error.msg == + "Invalid magnet link format (at least one info hash part is required)" + + for (magnetLinkStr, errorMsg) in [ + ( + "magnet:?xt=urn:btih:", + "Error parsing info hash: given bytes is not a correct multihash", + ), + ( + "magnet:?xt=urn:btmh:", + "Error parsing info hash: given bytes is not a correct multihash", + ), + ( + "magnet:?xt=urn:btih:1234567890&xt=urn:btmh:", + "Error parsing info hash: given bytes is not a correct multihash", + ), + ( + "magnet:?xt=urn:btih:1234567890&xt=urn:btmh:1234567890", + "Error parsing info hash: given bytes is not a correct multihash", + ), + ( + "magnet:?xt=urn:btmh:1234567890&xt=urn:btih:", + "Error parsing info hash: given bytes is not a correct multihash", + ), + ( + "magnet:?xt=urn:btmh:1234567890&xt=urn:btih:1234567890", + "Error parsing info hash: given bytes is not a correct multihash", + ), + ( + "magnet:?xt=urn:btmh:&xt=urn:btih:1234567890", + "Error parsing info hash: given bytes is not a correct multihash", + ), + ( + "magnet:?xt=urn:btih:&xt=urn:btmh:1234567890", + "Error parsing info hash: given bytes is not a correct multihash", + ), + ("magnet:?xt=urn:btih", "Invalid magnet link format (missing info hash part)"), + ("magnet:?xt=urn:btmh", "Invalid magnet link format (missing info hash part)"), + ]: + test fmt"fails for magnet links with invalid hashes: {magnetLinkStr}": + let magnetLink = newMagnetLink(magnetLinkStr) + check magnetLink.isFailure + check magnetLink.error.msg == errorMsg diff --git a/tests/codex/bittorrent/testmanifest.nim b/tests/codex/bittorrent/testmanifest.nim new file mode 100644 index 000000000..27b9b69ee --- /dev/null +++ b/tests/codex/bittorrent/testmanifest.nim @@ -0,0 +1,76 @@ +import std/strformat + +import pkg/unittest2 +import pkg/libp2p/[cid, multicodec, multihash] +import pkg/stew/byteutils +import pkg/questionable + +import ../../examples + +import pkg/codex/bittorrent/manifest + +suite "BitTorrent manifest": + # In the tests below, we use an example info dictionary + # from a valid torrent file (v1 so far). + # { + # "info": { + # "length": 40960, + # "name": "data40k.bin", + # "piece length": 65536, + # "pieces": [ + # "1cc46da027e7ff6f1970a2e58880dbc6a08992a0" + # ] + # } + # } + let examplePieceHash = "1cc46da027e7ff6f1970a2e58880dbc6a08992a0".hexToSeqByte + let examplePieceMultihash = MultiHash.init($Sha1HashCodec, examplePieceHash).tryGet + let exampleInfo = BitTorrentInfo( + length: 40960, + pieceLength: 65536, + pieces: @[examplePieceMultihash], + name: "data40k.bin".some, + ) + let dummyCodexManifestCid = Cid.init( + CIDv1, ManifestCodec, MultiHash.digest($Sha256HashCodec, seq[byte].example()).tryGet + ).tryGet + + test "b-encoding info dictionary": + let infoEncoded = bencode(exampleInfo) + check infoEncoded == + "d6:lengthi40960e4:name11:data40k.bin12:piece lengthi65536e6:pieces20:".toBytes & + examplePieceHash & @['e'.byte] + let expectedInfoHash = "1902d602db8c350f4f6d809ed01eff32f030da95" + check $sha1.digest(infoEncoded) == expectedInfoHash.toUpperAscii + + test "validating against info hash Cid": + let infoHash = "1902d602db8c350f4f6d809ed01eff32f030da95".hexToSeqByte + let infoMultiHash = MultiHash.init($Sha1HashCodec, infoHash).tryGet + let infoHashCid = Cid.init(CIDv1, InfoHashV1Codec, infoMultiHash).tryGet + let bitTorrentManifest = newBitTorrentManifest( + info = exampleInfo, codexManifestCid = dummyCodexManifestCid + ) + + check bitTorrentManifest.validate(cid = infoHashCid).tryGet == true + + for testData in [ + ( + "1902d602db8c350f4f6d809ed01eff32f030da95", + "11141902D602DB8C350F4F6D809ED01EFF32F030DA95", + ), + ( + "499B3A24C2C653C9600D0C22B33EC504ECCA1999AAF56E559505F342A2062497", + "1220499B3A24C2C653C9600D0C22B33EC504ECCA1999AAF56E559505F342A2062497", + ), + ( + "1220499B3A24C2C653C9600D0C22B33EC504ECCA1999AAF56E559505F342A2062497", + "1220499B3A24C2C653C9600D0C22B33EC504ECCA1999AAF56E559505F342A2062497", + ), + ( + "11141902D602DB8C350F4F6D809ED01EFF32F030DA95", + "11141902D602DB8C350F4F6D809ED01EFF32F030DA95", + ), + ]: + let (input, expectedOutput) = testData + test fmt"Build MultiHash from '{input}'": + let hash = BitTorrentInfo.buildMultiHash(input).tryGet + check hash.hex == expectedOutput diff --git a/tests/codex/bittorrent/testpiecevalidator.nim b/tests/codex/bittorrent/testpiecevalidator.nim new file mode 100644 index 000000000..d6e0b33cc --- /dev/null +++ b/tests/codex/bittorrent/testpiecevalidator.nim @@ -0,0 +1,154 @@ +import std/strformat + +import pkg/libp2p/[cid, multicodec, multihash] +import pkg/questionable/results + +import ../../asynctest +import ../examples + +import pkg/codex/manifest +import pkg/codex/bittorrent/manifest +import pkg/codex/bittorrent/piecevalidator + +template createExampleTorrentData() {.dirty.} = + const numOfPieces = 10 + const pieceLength = 65536 + const contentLength = pieceLength * numOfPieces + let pieces = newSeqWith(numOfPieces, MultiHash.example(Sha1HashCodec)) + let exampleInfo = BitTorrentInfo( + length: contentLength, + pieceLength: pieceLength, + pieces: pieces, + name: "data.bin".some, + ) + let dummyCodexManifestCid = Cid.example() + let exampleTorrentManifest = + newBitTorrentManifest(info = exampleInfo, codexManifestCid = dummyCodexManifestCid) + let infoBencoded = exampleInfo.bencode() + let infoHash = MultiHash.digest($Sha1HashCodec, infoBencoded).tryGet + let exampleCodexManifest = Manifest.new( + treeCid = Cid.example, + blockSize = BitTorrentBlockSize.NBytes, + datasetSize = exampleInfo.length.NBytes, + filename = exampleInfo.name, + mimetype = "application/octet-stream".some, + ) + +suite "Torrent PieceValidator": + createExampleTorrentData() + + var pieceValidator: TorrentPieceValidator + + setup: + pieceValidator = + newTorrentPieceValidator(exampleTorrentManifest, exampleCodexManifest) + + test "correctly sets numberOfBlocksPerPiece": + check pieceValidator.numberOfBlocksPerPiece == + exampleInfo.pieceLength.int div exampleCodexManifest.blockSize.int + + test "reports an error when trying to wait for an invalid piece": + let res = await pieceValidator.waitForPiece(exampleTorrentManifest.info.pieces.len) + check isFailure(res) + check res.error.msg == "Invalid piece index" + + test "reports an error when trying to mark an invalid piece as valid": + let res = pieceValidator.markPieceAsValid(exampleTorrentManifest.info.pieces.len) + check isFailure(res) + check res.error.msg == "Invalid piece index" + + for i in 0 ..< exampleTorrentManifest.info.pieces.len: + test fmt"can await piece {i}": + let fut = pieceValidator.waitForPiece(i) + check pieceValidator.markPieceAsValid(i) == success() + check (await fut) == success() + + test "awaiting for piece can be cancelled": + let pieceIndex = 0 + let fut = pieceValidator.waitForPiece(pieceIndex) + check (await pieceValidator.cancelPiece(pieceIndex)) == success() + let res = catch(await fut) + check isFailure(res) + check res.error of CancelledError + + test "all pieces can be cancelled": + let fut1 = pieceValidator.waitForPiece(1) + let fut2 = pieceValidator.waitForPiece(2) + + await pieceValidator.cancel() + + let res1 = catch(await fut1) + check isFailure(res1) + check res1.error of CancelledError + let res2 = catch(await fut2) + check isFailure(res2) + check res2.error of CancelledError + + test "awaiting all pieces sequentially": + let numberOfPieces = exampleTorrentManifest.info.pieces.len + for i in 0 ..< numberOfPieces: + let fut = pieceValidator.waitForNextPiece() + check pieceValidator.confirmCurrentPiece() == i + check (await fut) == i + + test "awaiting is independent from confirming": + let numberOfPieces = exampleTorrentManifest.info.pieces.len + var futs = newSeq[Future[int]](numberOfPieces) + for i in 0 ..< numberOfPieces: + futs[i] = pieceValidator.waitForNextPiece() + for i in 0 ..< numberOfPieces: + check pieceValidator.confirmCurrentPiece() == i + for i in 0 ..< numberOfPieces: + check (await futs[i]) == i + + test "sequential validation of blocks": + let blocksInPieces = newSeqWith( + numOfPieces, + newSeqWith( + pieceLength div BitTorrentBlockSize.int, Block.example(BitTorrentBlockSize.int) + ), + ) + var pieces = newSeq[MultiHash](blocksInPieces.len) + for i in 0 ..< blocksInPieces.len: + let blocks = blocksInPieces[i] + var pieceHashCtx: sha1 + pieceHashCtx.init() + for blk in blocks: + pieceHashCtx.update(blk.data) + pieces[i] = MultiHash.init($Sha1HashCodec, pieceHashCtx.finish()).tryGet + + let info = BitTorrentInfo( + length: contentLength, + pieceLength: pieceLength, + pieces: pieces, + name: "data.bin".some, + ) + let manifestCid = Cid.example() + let torrentManifest = + newBitTorrentManifest(info = info, codexManifestCid = manifestCid) + let codexManifest = Manifest.new( + treeCid = Cid.example, + blockSize = BitTorrentBlockSize.NBytes, + datasetSize = info.length.NBytes, + filename = info.name, + mimetype = "application/octet-stream".some, + ) + + pieceValidator = newTorrentPieceValidator(torrentManifest, codexManifest) + + for pieceIndex, blks in blocksInPieces: + # streaming client will wait on the piece validator to validate the piece + let fut = pieceValidator.waitForNextPiece() + + # during prefetch we will validate each piece sequentially + # piece validator maintains internal iterators in its object + # to keep track of the validation order + check pieceValidator.validatePiece(blks) == pieceIndex + + # after piece is validated, the prefetch task will confirm the piece + # again, using internal state, the validator knows which piece to confirm + check pieceValidator.confirmCurrentPiece() == pieceIndex + + # the fut will be resolved after the piece is confirmed + # and the streaming client can continue + check (await fut) == pieceIndex diff --git a/tests/codex/bittorrent/testtorrentdownloader.nim b/tests/codex/bittorrent/testtorrentdownloader.nim new file mode 100644 index 000000000..ca1c1115f --- /dev/null +++ b/tests/codex/bittorrent/testtorrentdownloader.nim @@ -0,0 +1,281 @@ +import std/importutils # private access + +import pkg/libp2p/[cid, multicodec, multihash] +import pkg/questionable/results + +import pkg/codex/rng +import pkg/codex/blockexchange +import pkg/codex/stores +import pkg/codex/discovery +import pkg/codex/blocktype + +import pkg/codex/manifest +import pkg/codex/bittorrent/manifest +import pkg/codex/bittorrent/torrentdownloader + +import pkg/codex/utils/iter +import pkg/codex/utils/safeasynciter +import pkg/codex/logutils + +import ../../asynctest +import ./helpers +import ../helpers +import ../examples + +logScope: + topics = "testtorrentdownloader" + +privateAccess(TorrentPiece) +privateAccess(TorrentDownloader) + +template setupDependencies() {.dirty.} = + var + rng: Rng + seckey: PrivateKey + peerId: PeerId + wallet: WalletRef + blockDiscovery: Discovery + peerStore: PeerCtxStore + pendingBlocks: PendingBlocksManager + + network: BlockExcNetwork + localStore: CacheStore + discovery: DiscoveryEngine + advertiser: Advertiser + engine: BlockExcEngine + networkStore: NetworkStore + + setup: + rng = Rng.instance() + seckey = PrivateKey.random(rng[]).tryGet() + peerId = PeerId.init(seckey.getPublicKey().tryGet()).tryGet() + wallet = WalletRef.example + blockDiscovery = Discovery.new() + peerStore = PeerCtxStore.new() + pendingBlocks = PendingBlocksManager.new() + + network = BlockExcNetwork() + localStore = CacheStore.new(chunkSize = BitTorrentBlockSize.NBytes) + discovery = + DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) + advertiser = Advertiser.new(localStore, blockDiscovery) + engine = BlockExcEngine.new( + localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks + ) + networkStore = NetworkStore.new(engine, localStore) + +asyncchecksuite "Torrent Downloader": + setupDependencies() + + const + pieceLength = 64.KiBs.int + blockSize = BitTorrentBlockSize.NBytes + + # this is an invariant that pieceLength is always power of two + # and multiple of blockSize + let numOfBlocksPerPiece = pieceLength div blockSize.int + + var + codexManifest: Manifest + torrentInfo: BitTorrentInfo + torrentManifest: BitTorrentManifest + + blocks: seq[Block] + codexManifestBlock: Block + torrentManifestBlock: Block + + torrentDownloader: TorrentDownloader + + proc createTestData(datasetSize: int) {.async.} = + trace "requested dataset", datasetSize + blocks = await makeRandomBlocks(datasetSize = datasetSize, blockSize = blockSize) + for index, blk in blocks: + trace "block created ", index, len = blk.data.len + codexManifest = await storeDataGetManifest(localStore, blocks) + codexManifestBlock = (await storeCodexManifest(codexManifest, localStore)).tryGet() + torrentInfo = ( + await torrentInfoForCodexManifest( + localStore, codexManifest, pieceLength = pieceLength, name = "data.bin".some + ) + ).tryGet() + torrentManifest = newBitTorrentManifest( + info = torrentInfo, codexManifestCid = codexManifestBlock.cid + ) + torrentManifestBlock = + (await storeTorrentManifest(torrentManifest, localStore)).tryGet() + + proc validatePiece(torrentDownloader: TorrentDownloader, pieceIndex: int) {.async.} = + let treeCid = codexManifest.treeCid + var pieceHashCtx: sha1 + pieceHashCtx.init() + let blockIter = torrentDownloader.getNewBlockIterator(pieceIndex).tryGet + let blks = newSeq[Block]() + while not blockIter.finished: + let blockIndex = blockIter.next() + let address = BlockAddress.init(treeCid, blockIndex) + let blk = (await localStore.getBlock(address)).tryGet() + trace "got block from local store", treeCid, blockIndex, cid = blk.cid + pieceHashCtx.update(blk.data) + let computedPieceHash = pieceHashCtx.finish() + let expectedPieceHash = torrentDownloader.pieces[pieceIndex].pieceHash + trace "comparing piece hashes", expectedPieceHash, computedPieceHash + check expectedPieceHash == computedPieceHash + trace "piece validated", treeCid, pieceIndex + + setup: + await createTestData(datasetSize = 72.KiBs.int) + + torrentDownloader = + newTorrentDownloader(torrentManifest, codexManifest, networkStore).tryGet() + + assert torrentInfo.pieces.len == 2 + assert codexManifest.blocksCount == 5 + + test "correctly sets up the pieces": + let blocksCount = codexManifest.blocksCount + let numOfPieces = torrentInfo.pieces.len + # last piece can have less blocks than numOfBlocksPerPiece + # we know how many blocks we have: + let numOfBlocksInLastPiece = blocksCount - (numOfBlocksPerPiece * (numOfPieces - 1)) + + # echo "codeManifest: ", $codexManifest + # echo "torrentInfo: ", $torrentInfo + # echo "torrentManifest: ", $torrentManifest + # echo "codexManifestBlockCid: ", $(codexManifestBlock.cid) + # echo "torrentManifestBlockCid: ", $(torrentManifestBlock.cid) + + check torrentDownloader.pieces.len == torrentInfo.pieces.len + check torrentDownloader.numberOfBlocksPerPiece == numOfBlocksPerPiece + + for index, piece in torrentDownloader.pieces: + assert index < numOfPieces + let + expectedBlockIndexStart = index * numOfBlocksPerPiece + expectedBlockIndexEnd = + if index < numOfPieces - 1: + (index + 1) * numOfBlocksPerPiece - 1 + else: + index * numOfBlocksPerPiece + numOfBlocksInLastPiece - 1 + expectedNumOfBlocksInPiece = expectedBlockIndexEnd - expectedBlockIndexStart + 1 + check piece.pieceIndex == index + check piece.pieceHash == torrentInfo.pieces[index] + check piece.blockIndexStart == expectedBlockIndexStart + check piece.blockIndexEnd == expectedBlockIndexEnd + check torrentDownloader.numberOfBlocksInPiece(index).tryGet == + expectedNumOfBlocksInPiece + let blockIterator = torrentDownloader.getNewBlockIterator(index).tryGet + for blkIndex in expectedBlockIndexStart .. expectedBlockIndexEnd: + check blkIndex == blockIterator.next() + check blockIterator.finished == true + check piece.handle.finished == false + + test "pieces are validated": + torrentDownloader.start() + + let pieceIter = torrentDownloader.getNewPieceIterator() + + while not pieceIter.finished: + let expectedPieceIndex = pieceIter.next() + trace "waiting for piece", expectedPieceIndex + let waitFut = torrentDownloader.waitForNextPiece() + let status = await waitFut.withTimeout(1.seconds) + assert status == true + let pieceIndex = await waitFut + trace "got piece", pieceIndex + check pieceIndex == expectedPieceIndex + await torrentDownloader.validatePiece(pieceIndex) + + check (await torrentDownloader.waitForNextPiece()) == -1 + check torrentDownloader.queue.empty + + test "get downloaded blocks": + torrentDownloader.start() + + let blockIter = Iter.new(0 ..< codexManifest.blocksCount) + + while not torrentDownloader.finished: + let dataFut = torrentDownloader.getNext() + let status = await dataFut.withTimeout(1.seconds) + assert status == true + let (blockIndex, data) = (await dataFut).tryGet() + trace "got data", blockIndex, len = data.len + let expectedBlockIndex = blockIter.next() + check blockIndex == expectedBlockIndex + let treeCid = codexManifest.treeCid + let address = BlockAddress.init(treeCid, expectedBlockIndex) + let blk = (await localStore.getBlock(address)).tryGet() + check blk.data == data + + check blockIter.finished + await torrentDownloader.stop() + + test "get downloaded blocks using async iter": + torrentDownloader.start() + + let blockIter = Iter.new(0 ..< codexManifest.blocksCount) + + for dataFut in torrentDownloader.getAsyncBlockIterator(): + let status = await dataFut.withTimeout(1.seconds) + assert status == true + let (blockIndex, data) = (await dataFut).tryGet() + trace "got data", blockIndex, len = data.len + let expectedBlockIndex = blockIter.next() + check blockIndex == expectedBlockIndex + let treeCid = codexManifest.treeCid + let address = BlockAddress.init(treeCid, expectedBlockIndex) + let blk = (await localStore.getBlock(address)).tryGet() + check blk.data == data + + check blockIter.finished + await torrentDownloader.stop() + + test "get downloaded blocks using async pairs iter": + torrentDownloader.start() + + let blockIter = Iter.new(0 ..< codexManifest.blocksCount) + + for i, dataFut in torrentDownloader.getAsyncBlockIterator(): + let status = await dataFut.withTimeout(1.seconds) + assert status == true + let (blockIndex, data) = (await dataFut).tryGet() + trace "got data", blockIndex, len = data.len + let expectedBlockIndex = blockIter.next() + check i == expectedBlockIndex + check blockIndex == expectedBlockIndex + let treeCid = codexManifest.treeCid + let address = BlockAddress.init(treeCid, expectedBlockIndex) + let blk = (await localStore.getBlock(address)).tryGet() + check blk.data == data + + check blockIter.finished + await torrentDownloader.stop() + + test "canceling download": + torrentDownloader.start() + + let blockIter = Iter.new(0 ..< codexManifest.blocksCount) + + var (blockIndex, data) = (await torrentDownloader.getNext()).tryGet() + + check blockIndex == 0 + check data.len > 0 + + await torrentDownloader.stop() + + (blockIndex, data) = (await torrentDownloader.getNext()).tryGet() + check blockIndex == -1 + check data.len == 0 + + test "stoping before starting (simulate cancellation)": + let blockIter = Iter.new(0 ..< codexManifest.blocksCount) + + # download did not even start, thus this one will not complete + let dataFut = torrentDownloader.getNext() + + # calling stop will cancel awaiting for the next block + await torrentDownloader.stop() + + assert dataFut.finished + + expect CancelledError: + discard await dataFut diff --git a/tests/codex/bittorrent/testtorrentparser.nim b/tests/codex/bittorrent/testtorrentparser.nim new file mode 100644 index 000000000..76369fd87 --- /dev/null +++ b/tests/codex/bittorrent/testtorrentparser.nim @@ -0,0 +1,48 @@ +import std/sequtils + +import pkg/unittest2 + +import pkg/libp2p/[multicodec, multihash] +import pkg/questionable/results +import pkg/stew/byteutils + +import ../examples + +import pkg/codex/bittorrent/manifest/manifest +import pkg/codex/bittorrent/torrentParser + +suite "torrentParser": + test "extracts info directory bytes from the torrent binary data": + let pieces = @[ + "21FEBA308CD51E9ACF88417193A9EA60F0F84646", + "3D4A8279853DA2DA355A574740217D446506E8EB", + "1AD686B48B9560B15B8843FD00E7EC1B59624B09", + "5015E7DA0C40350624C6B5A1FED1DB39720B726C", + ].map( + proc(hash: string): MultiHash = + let bytes = hash.hexToSeqByte.catch.tryGet() + MultiHash.init($Sha1HashCodec, bytes).mapFailure.tryGet() + ) + + let info = BitTorrentInfo( + length: 1048576, pieceLength: 262144, pieces: pieces, name: some("data1M.bin") + ) + let encodedInfo = info.bencode() + let infoHash = MultiHash.digest($Sha1HashCodec, encodedInfo).mapFailure.tryGet() + let torrentBytes = ("d4:info" & string.fromBytes(encodedInfo) & "e").toBytes() + # let torrentBytesHex = byteutils.toHex(torrentBytes) + + # check torrentBytesHex == "64343a696e666f64363a6c656e677468693130343835373665343a6e616d6531303a64617461314d2e62696e31323a7069656365206c656e6774686932363231343465363a70696563657338303a21feba308cd51e9acf88417193a9ea60f0f846463d4a8279853da2da355a574740217d446506e8eb1ad686b48b9560b15b8843fd00e7ec1b59624b095015e7da0c40350624c6b5a1fed1db39720b726c6565" + + let infoBytes = extractInfoFromTorrent(torrentBytes).tryGet() + + # echo string.fromBytes(infoBytes) + + # let infoBytesHex = byteutils.toHex(infoBytes) + + # check infoBytesHex == "64363a6c656e677468693130343835373665343a6e616d6531303a64617461314d2e62696e31323a7069656365206c656e6774686932363231343465363a70696563657338303a21feba308cd51e9acf88417193a9ea60f0f846463d4a8279853da2da355a574740217d446506e8eb1ad686b48b9560b15b8843fd00e7ec1b59624b095015e7da0c40350624c6b5a1fed1db39720b726c65" + + let extractedInfoHash = + MultiHash.digest($Sha1HashCodec, infoBytes).mapFailure.tryGet() + + check extractedInfoHash == infoHash diff --git a/tests/codex/testbittorrent.nim b/tests/codex/testbittorrent.nim new file mode 100644 index 000000000..4db7fe869 --- /dev/null +++ b/tests/codex/testbittorrent.nim @@ -0,0 +1,8 @@ +import ./bittorrent/testbencoding +import ./bittorrent/testmanifest +import ./bittorrent/testpiecevalidator +import ./bittorrent/testtorrentdownloader +import ./bittorrent/testmagnetlink +import ./bittorrent/testtorrentparser + +{.warning[UnusedImport]: off.} diff --git a/tests/examples.nim b/tests/examples.nim index 9ef4e2922..fd33f4a2c 100644 --- a/tests/examples.nim +++ b/tests/examples.nim @@ -88,10 +88,12 @@ proc example(_: type G2Point): G2Point = proc example*(_: type Groth16Proof): Groth16Proof = Groth16Proof(a: G1Point.example, b: G2Point.example, c: G1Point.example) -proc example*(_: type RandomChunker, blocks: int): Future[seq[byte]] {.async.} = +proc example*( + _: type RandomChunker, blocks: int, blockSize = DefaultBlockSize.int +): Future[seq[byte]] {.async.} = let rng = Rng.instance() let chunker = RandomChunker.new( - rng, size = DefaultBlockSize * blocks.NBytes, chunkSize = DefaultBlockSize + rng, size = blockSize.NBytes * blocks.NBytes, chunkSize = blockSize ) var data: seq[byte] while (let moar = await chunker.getBytes(); moar != []): diff --git a/tests/fixtures/tarballs/dir/dir1/file11.txt b/tests/fixtures/tarballs/dir/dir1/file11.txt new file mode 100644 index 000000000..f6854cfe7 --- /dev/null +++ b/tests/fixtures/tarballs/dir/dir1/file11.txt @@ -0,0 +1 @@ +File 11 diff --git a/tests/fixtures/tarballs/dir/dir1/file12.txt b/tests/fixtures/tarballs/dir/dir1/file12.txt new file mode 100644 index 000000000..e64f6a28a --- /dev/null +++ b/tests/fixtures/tarballs/dir/dir1/file12.txt @@ -0,0 +1 @@ +File 12 diff --git a/tests/fixtures/tarballs/dir/file1.txt b/tests/fixtures/tarballs/dir/file1.txt new file mode 100644 index 000000000..50fcd26d6 --- /dev/null +++ b/tests/fixtures/tarballs/dir/file1.txt @@ -0,0 +1 @@ +File 1 diff --git a/tests/fixtures/tarballs/dir/file2.txt b/tests/fixtures/tarballs/dir/file2.txt new file mode 100644 index 000000000..4475433e2 --- /dev/null +++ b/tests/fixtures/tarballs/dir/file2.txt @@ -0,0 +1 @@ +File 2 diff --git a/tests/fixtures/tarballs/testtarbar.tar b/tests/fixtures/tarballs/testtarbar.tar new file mode 100644 index 000000000..3fe060318 Binary files /dev/null and b/tests/fixtures/tarballs/testtarbar.tar differ diff --git a/tests/integration/codexclient.nim b/tests/integration/codexclient.nim index 17ed6dd4c..cb16677b6 100644 --- a/tests/integration/codexclient.nim +++ b/tests/integration/codexclient.nim @@ -1,6 +1,5 @@ import std/strutils - -from pkg/libp2p import Cid, `$`, init +from pkg/libp2p import Cid, MultiHash, `$`, init, hex import pkg/stint import pkg/questionable/results import pkg/chronos/apps/http/[httpserver, shttpserver, httpclient, httptable] @@ -95,6 +94,16 @@ proc info*( let response = await client.get(client.baseurl & "/debug/info") return JsonNode.parse(await response.body) +proc connect*( + client: CodexClient, peerId: string, address: string +): Future[?!void] {.async: (raises: [CancelledError, HttpError]).} = + let url = client.baseurl & "/connect/" & peerId & "?addrs=" & address + let response = await client.get(url) + if response.status != 200: + return + failure("Cannot connect to node with peerId: " & peerId & ": " & $response.status) + return success() + proc setLogLevel*( client: CodexClient, level: string ): Future[void] {.async: (raises: [CancelledError, HttpError]).} = @@ -120,9 +129,37 @@ proc upload*( proc upload*( client: CodexClient, bytes: seq[byte] -): Future[?!Cid] {.async: (raw: true).} = +): Future[?!Cid] {.async: (raw: true), raises: [CancelledError, HttpError].} = return client.upload(string.fromBytes(bytes)) +proc uploadTorrent*( + client: CodexClient, + contents: string, + filename = string.none, + contentType = "application/octet-stream", +): Future[?!MultiHash] {.async: (raises: [CancelledError, HttpError]).} = + var headers = newSeq[HttpHeaderTuple]() + if name =? filename: + headers = + @[ + ("Content-Disposition", "filename=\"" & name & "\""), + ("Content-Type", contentType), + ] + let response = + await client.post(client.baseurl & "/torrent", body = contents, headers = headers) + if not response.status == 200: + return failure($response.status) + let body = await response.body + let bytesRes = catch(body.hexToSeqByte) + without bytes =? bytesRes, err: + return failure(err) + MultiHash.init(bytes).mapFailure + +proc uploadTorrent*( + client: CodexClient, bytes: seq[byte], filename = string.none +): Future[?!MultiHash] {.async: (raw: true), raises: [CancelledError, HttpError].} = + client.uploadTorrent(string.fromBytes(bytes), filename) + proc downloadRaw*( client: CodexClient, cid: string, local = false ): Future[HttpClientResponseRef] {. @@ -158,6 +195,54 @@ proc downloadNoStream*( success await response.body +proc downloadTorrent*( + client: CodexClient, infoHash: MultiHash +): Future[?!string] {.async: (raises: [CancelledError, HttpError]).} = + let response = + await client.get(client.baseurl & "/torrent/" & infoHash.hex & "/network/stream") + + if response.status != 200: + return failure($response.status) + + success await response.body + +proc downloadTorrent*( + client: CodexClient, + contents: string, + contentType = "text/plain", + endpoint = "magnet", +): Future[?!string] {.async: (raises: [CancelledError, HttpError]).} = + if contents.len == 0: + return failure("No content provided!") + if endpoint != "magnet" and endpoint != "torrent-file": + return failure( + "Invalid endpoint: has to be either 'magnet' or 'torrent-file' but got: " & + endpoint + ) + if endpoint == "magnet" and + (contentType != "application/octet-stream" and contentType != "text/plain"): + return failure( + "Invalid content type: for 'magnet' endpoint has to be either 'application/octet-stream' or 'text/plain' but got: " & + contentType + ) + if endpoint == "torrent-file" and + (contentType != "application/octet-stream" and contentType != "application/json"): + return failure( + "Invalid content type: for 'torrent-file' endpoint has to be either 'application/octet-stream' or 'application/json' but got: " & + contentType + ) + + var headers = newSeq[HttpHeaderTuple]() + headers = @[("Content-Type", contentType)] + + let response = await client.post( + client.baseurl & "/torrent/" & endpoint, body = contents, headers = headers + ) + if not response.status == 200: + return failure($response.status) + + success await response.body + proc downloadManifestOnly*( client: CodexClient, cid: Cid ): Future[?!string] {.async: (raises: [CancelledError, HttpError]).} = @@ -169,6 +254,17 @@ proc downloadManifestOnly*( success await response.body +proc downloadTorrentManifestOnly*( + client: CodexClient, infoHash: MultiHash +): Future[?!RestTorrentContent] {.async: (raises: [CancelledError, HttpError]).} = + let response = + await client.get(client.baseurl & "/torrent/" & infoHash.hex & "/network/manifest") + + if response.status != 200: + return failure($response.status) + + RestTorrentContent.fromJson(await response.body) + proc deleteRaw*( client: CodexClient, cid: string ): Future[HttpClientResponseRef] {. diff --git a/tests/integration/testbittorrent.nim b/tests/integration/testbittorrent.nim new file mode 100644 index 000000000..10cd65c64 --- /dev/null +++ b/tests/integration/testbittorrent.nim @@ -0,0 +1,206 @@ +import std/net +import std/strformat +import std/sequtils +import std/json except `%`, `%*` +import pkg/nimcrypto +from pkg/libp2p import `==`, `$`, MultiHash, init, digest, hex +import pkg/codex/units +import pkg/codex/utils/iter +import pkg/codex/manifest +import pkg/codex/rest/json +import pkg/codex/bittorrent/manifest +import ./twonodes +import ../examples +import ../codex/examples + +proc createInfoDictionaryForContent( + content: seq[byte], pieceLength = DefaultPieceLength.int, name = string.none +): ?!BitTorrentInfo = + let + numOfBlocksPerPiece = pieceLength div BitTorrentBlockSize.int + numOfPieces = divUp(content.len.NBytes, pieceLength.NBytes) + + var + pieces: seq[MultiHash] + pieceHashCtx: sha1 + pieceIter = Iter[int].new(0 ..< numOfBlocksPerPiece) + + pieceHashCtx.init() + + let chunks = content.distribute(num = numOfPieces, spread = false) + + for chunk in chunks: + if chunk.len == 0: + break + if pieceIter.finished: + without mh =? MultiHash.init($Sha1HashCodec, pieceHashCtx.finish()).mapFailure, + err: + return failure(err) + pieces.add(mh) + pieceIter = Iter[int].new(0 ..< numOfBlocksPerPiece) + pieceHashCtx.init() + pieceHashCtx.update(chunk) + discard pieceIter.next() + + without mh =? MultiHash.init($Sha1HashCodec, pieceHashCtx.finish()).mapFailure, err: + return failure(err) + pieces.add(mh) + + let info = BitTorrentInfo( + length: content.len.uint64, + pieceLength: pieceLength.uint32, + pieces: pieces, + name: name, + ) + + success info + +twonodessuite "BitTorrent API": + setup: + # why we do not seem to need this? yet it is twice as fast with this + let infoPeer1 = (await client1.info()).tryGet + let peerId1 = infoPeer1["id"].getStr() + let announceAddress1 = infoPeer1["announceAddresses"][0].getStr() + (await client2.connect(peerId1, announceAddress1)).tryGet + + test "uploading and downloading the content", twoNodesConfig: + let exampleContent = exampleString(100) + let infoHash = (await client1.uploadTorrent(exampleContent)).tryGet + let downloadedContent = (await client2.downloadTorrent(infoHash)).tryGet + check downloadedContent == exampleContent + + test "downloading content using magnet link", twoNodesConfig: + let exampleContent = exampleString(100) + let multiHash = (await client1.uploadTorrent(exampleContent)).tryGet + let infoHash = byteutils.toHex(multiHash.data.buffer[multiHash.dpos .. ^1]) + let magnetLink = fmt"magnet:?xt=urn:btih:{infoHash}" + let downloadedContent = (await client2.downloadTorrent(magnetLink)).tryGet + check downloadedContent == exampleContent + + test "downloading content using torrent file", twoNodesConfig: + let exampleFileName = "example.txt" + let exampleContent = exampleString(100) + let multiHash = ( + await client1.uploadTorrent( + contents = exampleContent, + filename = some exampleFileName, + contentType = "text/plain", + ) + ).tryGet + + let expectedInfo = createInfoDictionaryForContent( + content = exampleContent.toBytes, name = some exampleFileName + ).tryGet + + let expectedInfoBencoded = expectedInfo.bencode() + let expectedMultiHash = + MultiHash.digest($Sha1HashCodec, expectedInfoBencoded).mapFailure.tryGet() + + assert expectedMultiHash == multiHash + + let torrentFileContent = "d4:info" & string.fromBytes(expectedInfoBencoded) & "e" + + let downloadedContent = ( + await client2.downloadTorrent( + contents = torrentFileContent, + contentType = "application/octet-stream", + endpoint = "torrent-file", + ) + ).tryGet + check downloadedContent == exampleContent + + test "downloading content using torrent file (JSON format)", twoNodesConfig: + let exampleFileName = "example.txt" + let exampleContent = exampleString(100) + let multiHash = ( + await client1.uploadTorrent( + contents = exampleContent, + filename = some exampleFileName, + contentType = "text/plain", + ) + ).tryGet + + let expectedInfo = createInfoDictionaryForContent( + content = exampleContent.toBytes, name = some exampleFileName + ).tryGet + + let expectedInfoBencoded = expectedInfo.bencode() + let expectedMultiHash = + MultiHash.digest($Sha1HashCodec, expectedInfoBencoded).mapFailure.tryGet() + + assert expectedMultiHash == multiHash + + let infoJson = %*{"info": %expectedInfo} + + let torrentJson = $infoJson + + let downloadedContent = ( + await client2.downloadTorrent( + contents = torrentJson, + contentType = "application/json", + endpoint = "torrent-file", + ) + ).tryGet + check downloadedContent == exampleContent + + test "uploading and downloading the content (exactly one piece long)", twoNodesConfig: + let numOfBlocksPerPiece = int(DefaultPieceLength div BitTorrentBlockSize) + let bytes = await RandomChunker.example( + blocks = numOfBlocksPerPiece, blockSize = BitTorrentBlockSize.int + ) + + let infoHash = (await client1.uploadTorrent(bytes)).tryGet + let downloadedContent = (await client2.downloadTorrent(infoHash)).tryGet + check downloadedContent.toBytes == bytes + + test "uploading and downloading the content (exactly two pieces long)", twoNodesConfig: + let numOfBlocksPerPiece = int(DefaultPieceLength div BitTorrentBlockSize) + let bytes = await RandomChunker.example( + blocks = numOfBlocksPerPiece * 2, blockSize = BitTorrentBlockSize.int + ) + + let infoHash = (await client1.uploadTorrent(bytes)).tryGet + let downloadedContent = (await client2.downloadTorrent(infoHash)).tryGet + check downloadedContent.toBytes == bytes + + # use with debugging to see the content + # use: + # CodexConfigs.init(nodes = 2).debug().withLogTopics("restapi", "node").some + # in tests/integration/twonodes.nim + # await sleepAsync(2.seconds) + + test "retrieving torrent manifest for given info hash", twoNodesConfig: + let exampleFileName = "example.txt" + let exampleContent = exampleString(100) + let infoHash = ( + await client1.uploadTorrent( + contents = exampleContent, + filename = some exampleFileName, + contentType = "text/plain", + ) + ).tryGet + + let expectedInfo = createInfoDictionaryForContent( + content = exampleContent.toBytes, name = some exampleFileName + ).tryGet + + let restTorrentContent = + (await client2.downloadTorrentManifestOnly(infoHash)).tryGet + let torrentManifest = restTorrentContent.torrentManifest + let info = torrentManifest.info + + check info == expectedInfo + + let response = ( + await client2.downloadManifestOnly(cid = torrentManifest.codexManifestCid) + ).tryGet + + let restContent = RestContent.fromJson(response).tryGet + + check restContent.cid == torrentManifest.codexManifestCid + + let codexManifest = restContent.manifest + check codexManifest.datasetSize.uint64 == info.length + check codexManifest.blockSize == BitTorrentBlockSize + check codexManifest.filename == info.name + check codexManifest.mimetype == "text/plain".some diff --git a/tests/testCodex.nim b/tests/testCodex.nim index 6a9b107e9..7967b45cd 100644 --- a/tests/testCodex.nim +++ b/tests/testCodex.nim @@ -1,3 +1,4 @@ +import ./codex/testbittorrent import ./codex/teststores import ./codex/testblockexchange import ./codex/testasyncheapqueue diff --git a/tests/testIntegration.nim b/tests/testIntegration.nim index 152d22dde..02b7560af 100644 --- a/tests/testIntegration.nim +++ b/tests/testIntegration.nim @@ -1,3 +1,4 @@ +import ./integration/testbittorrent import ./integration/testcli import ./integration/testrestapi import ./integration/testrestapivalidation