Skip to content

Commit 8a608e8

Browse files
committed
adds torrent uploading API
1 parent 60dec8f commit 8a608e8

File tree

3 files changed

+198
-10
lines changed

3 files changed

+198
-10
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,4 @@ nim.cfg
4747
tests/integration/logs
4848

4949
data/
50+
data40k.bin

codex/node.nim

Lines changed: 123 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -618,33 +618,146 @@ proc store*(
618618

619619
return manifestBlk.cid.success
620620

621+
proc storePieces*(
622+
self: CodexNodeRef,
623+
stream: LPStream,
624+
filename: ?string = string.none,
625+
mimetype: ?string = string.none,
626+
blockSize: NBytes,
627+
pieceLength = NBytes 1024 * 64,
628+
): Future[?!BitTorrentManifest] {.async.} =
629+
## Save stream contents as dataset with given blockSize
630+
## to nodes's BlockStore, and return Cid of its manifest
631+
##
632+
info "Storing data"
633+
634+
let
635+
hcodec = Sha256HashCodec
636+
dataCodec = BlockCodec
637+
chunker = LPStreamChunker.new(stream, chunkSize = blockSize)
638+
numOfBlocksPerPiece = pieceLength.int div blockSize.int
639+
640+
var
641+
cids: seq[Cid]
642+
pieces: seq[MultiHash]
643+
pieceHashCtx: sha1
644+
pieceIter = Iter[int].new(0 ..< numOfBlocksPerPiece)
645+
646+
pieceHashCtx.init()
647+
648+
try:
649+
while (let chunk = await chunker.getBytes(); chunk.len > 0):
650+
if pieceIter.finished:
651+
without mh =? MultiHash.init($Sha1HashCodec, pieceHashCtx.finish()).mapFailure,
652+
err:
653+
return failure(err)
654+
pieces.add(mh)
655+
pieceIter = Iter[int].new(0 ..< numOfBlocksPerPiece)
656+
pieceHashCtx.init()
657+
without mhash =? MultiHash.digest($hcodec, chunk).mapFailure, err:
658+
return failure(err)
659+
660+
without cid =? Cid.init(CIDv1, dataCodec, mhash).mapFailure, err:
661+
return failure(err)
662+
663+
without blk =? bt.Block.new(cid, chunk, verify = false):
664+
return failure("Unable to init block from chunk!")
665+
666+
cids.add(cid)
667+
668+
if err =? (await self.networkStore.putBlock(blk)).errorOption:
669+
error "Unable to store block", cid = blk.cid, err = err.msg
670+
return failure(&"Unable to store block {blk.cid}")
671+
pieceHashCtx.update(chunk)
672+
discard pieceIter.next()
673+
except CancelledError as exc:
674+
raise exc
675+
except CatchableError as exc:
676+
return failure(exc.msg)
677+
finally:
678+
await stream.close()
679+
680+
without mh =? MultiHash.init($Sha1HashCodec, pieceHashCtx.finish()).mapFailure, err:
681+
return failure(err)
682+
pieces.add(mh)
683+
684+
without tree =? CodexTree.init(cids), err:
685+
return failure(err)
686+
687+
without treeCid =? tree.rootCid(CIDv1, dataCodec), err:
688+
return failure(err)
689+
690+
for index, cid in cids:
691+
without proof =? tree.getProof(index), err:
692+
return failure(err)
693+
if err =?
694+
(await self.networkStore.putCidAndProof(treeCid, index, cid, proof)).errorOption:
695+
# TODO add log here
696+
return failure(err)
697+
698+
let manifest = Manifest.new(
699+
treeCid = treeCid,
700+
blockSize = blockSize,
701+
datasetSize = NBytes(chunker.offset),
702+
version = CIDv1,
703+
hcodec = hcodec,
704+
codec = dataCodec,
705+
filename = filename,
706+
mimetype = mimetype,
707+
)
708+
709+
without manifestBlk =? await self.storeManifest(manifest), err:
710+
error "Unable to store manifest"
711+
return failure(err)
712+
713+
info "Stored data",
714+
manifestCid = manifestBlk.cid,
715+
treeCid = treeCid,
716+
blocks = manifest.blocksCount,
717+
datasetSize = manifest.datasetSize,
718+
filename = manifest.filename,
719+
mimetype = manifest.mimetype
720+
721+
let info = BitTorrentInfo(
722+
length: manifest.datasetSize.uint64,
723+
pieceLength: pieceLength.uint32,
724+
pieces: pieces,
725+
name: filename,
726+
)
727+
728+
let torrentManifest =
729+
newBitTorrentManifest(info = info, codexManifestCid = manifestBlk.cid)
730+
731+
return torrentManifest.success
732+
621733
proc storeTorrent*(
622734
self: CodexNodeRef,
623735
stream: LPStream,
624-
info: BitTorrentInfo,
625-
infoHash: MultiHash,
736+
filename: ?string = string.none,
626737
mimetype: ?string = string.none,
627-
): Future[?!Cid] {.async.} =
738+
): Future[?!MultiHash] {.async.} =
628739
info "Storing BitTorrent data"
629740

630-
without codexManifestCid =?
631-
await self.store(
632-
stream, filename = info.name, mimetype = mimetype, blockSize = NBytes 1024 * 16
741+
without bitTorrentManifest =?
742+
await self.storePieces(
743+
stream, filename = filename, mimetype = mimetype, blockSize = NBytes 1024 * 16
633744
):
634745
return failure("Unable to store BitTorrent data")
635746

636-
let bitTorrentManifest = newBitTorrentManifest(info, codexManifestCid)
747+
let infoBencoded = bencode(bitTorrentManifest.info)
748+
749+
without infoHash =? MultiHash.digest($Sha1HashCodec, infoBencoded).mapFailure, err:
750+
return failure(err)
637751

638752
without manifestBlk =? await self.storeBitTorrentManifest(
639753
bitTorrentManifest, infoHash
640754
), err:
641755
error "Unable to store manifest"
642756
return failure(err)
643757

644-
info "Stored BitTorrent data",
645-
manifestCid = manifestBlk.cid, codeManifestCid = codexManifestCid
758+
info "Stored BitTorrent data", infoHash = $infoHash, codexManifestCid
646759

647-
success manifestBlk.cid
760+
success infoHash
648761

649762
proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} =
650763
without cids =? await self.networkStore.listBlocks(BlockType.Manifest):

codex/rest/api.nim

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,80 @@ proc getFilenameFromContentDisposition(contentDisposition: string): ?string =
235235
proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRouter) =
236236
let allowedOrigin = router.allowedOrigin # prevents capture inside of api definition
237237

238+
router.api(MethodOptions, "/api/codex/v1/torrent") do(
239+
resp: HttpResponseRef
240+
) -> RestApiResponse:
241+
if corsOrigin =? allowedOrigin:
242+
resp.setCorsHeaders("POST", corsOrigin)
243+
resp.setHeader(
244+
"Access-Control-Allow-Headers", "content-type, content-disposition"
245+
)
246+
247+
resp.status = Http204
248+
await resp.sendBody("")
249+
250+
router.rawApi(MethodPost, "/api/codex/v1/torrent") do() -> RestApiResponse:
251+
## Upload a file in a streaming manner
252+
##
253+
254+
trace "Handling file upload"
255+
var bodyReader = request.getBodyReader()
256+
if bodyReader.isErr():
257+
return RestApiResponse.error(Http500, msg = bodyReader.error())
258+
259+
# Attempt to handle `Expect` header
260+
# some clients (curl), wait 1000ms
261+
# before giving up
262+
#
263+
await request.handleExpect()
264+
265+
var mimetype = request.headers.getString(ContentTypeHeader).some
266+
267+
if mimetype.get() != "":
268+
let mimetypeVal = mimetype.get()
269+
var m = newMimetypes()
270+
let extension = m.getExt(mimetypeVal, "")
271+
if extension == "":
272+
return RestApiResponse.error(
273+
Http422, "The MIME type '" & mimetypeVal & "' is not valid."
274+
)
275+
else:
276+
mimetype = string.none
277+
278+
const ContentDispositionHeader = "Content-Disposition"
279+
let contentDisposition = request.headers.getString(ContentDispositionHeader)
280+
let filename = getFilenameFromContentDisposition(contentDisposition)
281+
282+
if filename.isSome and not isValidFilename(filename.get()):
283+
return RestApiResponse.error(Http422, "The filename is not valid.")
284+
285+
# Here we could check if the extension matches the filename if needed
286+
287+
let reader = bodyReader.get()
288+
289+
try:
290+
without infoHash =? (
291+
await node.storeTorrent(
292+
AsyncStreamWrapper.new(reader = AsyncStreamReader(reader)),
293+
filename = filename,
294+
mimetype = mimetype,
295+
)
296+
), error:
297+
error "Error uploading file", exc = error.msg
298+
return RestApiResponse.error(Http500, error.msg)
299+
300+
codex_api_uploads.inc()
301+
trace "Uploaded torrent", infoHash = $infoHash
302+
return RestApiResponse.response(infoHash.hex)
303+
except CancelledError:
304+
trace "Upload cancelled error"
305+
return RestApiResponse.error(Http500)
306+
except AsyncStreamError:
307+
trace "Async stream error"
308+
return RestApiResponse.error(Http500)
309+
finally:
310+
await reader.closeWait()
311+
238312
router.api(MethodOptions, "/api/codex/v1/data") do(
239313
resp: HttpResponseRef
240314
) -> RestApiResponse:

0 commit comments

Comments
 (0)