Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions codex/codex.nim
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type
codexNode: CodexNodeRef
repoStore: RepoStore
maintenance: BlockMaintainer
taskpool: Taskpool
taskPool: Taskpool

CodexPrivateKey* = libp2p.PrivateKey # alias
EthWallet = ethers.Wallet
Expand Down Expand Up @@ -194,8 +194,8 @@ proc stop*(s: CodexServer) {.async.} =
error "Failed to stop codex node", failures = res.failure.len
raiseAssert "Failed to stop codex node"

if not s.taskpool.isNil:
s.taskpool.shutdown()
if not s.taskPool.isNil:
s.taskPool.shutdown()

proc new*(
T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey
Expand All @@ -216,16 +216,16 @@ proc new*(

var
cache: CacheStore = nil
taskpool: Taskpool
taskPool: Taskpool

try:
if config.numThreads == ThreadCount(0):
taskpool = Taskpool.new(numThreads = min(countProcessors(), 16))
taskPool = Taskpool.new(numThreads = min(countProcessors(), 16))
else:
taskpool = Taskpool.new(numThreads = int(config.numThreads))
info "Threadpool started", numThreads = taskpool.numThreads
taskPool = Taskpool.new(numThreads = int(config.numThreads))
info "Threadpool started", numThreads = taskPool.numThreads
except CatchableError as exc:
raiseAssert("Failure in taskpool initialization:" & exc.msg)
raiseAssert("Failure in taskPool initialization:" & exc.msg)

if config.cacheSize > 0'nb:
cache = CacheStore.new(cacheSize = config.cacheSize)
Expand Down Expand Up @@ -307,7 +307,7 @@ proc new*(
if config.prover:
let backend =
config.initializeBackend().expect("Unable to create prover backend.")
some Prover.new(store, backend, config.numProofSamples)
some Prover.new(store, backend, config.numProofSamples, taskPool)
else:
none Prover

Expand All @@ -317,7 +317,7 @@ proc new*(
engine = engine,
discovery = discovery,
prover = prover,
taskPool = taskpool,
taskPool = taskPool,
)

restServer = RestServerRef
Expand All @@ -337,5 +337,5 @@ proc new*(
restServer: restServer,
repoStore: repoStore,
maintenance: maintenance,
taskpool: taskpool,
taskPool: taskPool,
)
8 changes: 5 additions & 3 deletions codex/erasure/erasure.nim
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ proc encodeData(
return failure("Unable to store block!")
idx.inc(params.steps)

without tree =? CodexTree.init(cids[]), err:
without tree =? (await CodexTree.init(self.taskPool, cids[])), err:
return failure(err)

without treeCid =? tree.rootCid, err:
Expand Down Expand Up @@ -649,7 +649,8 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
without (cids, recoveredIndices) =? (await self.decodeInternal(encoded)), err:
return failure(err)

without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err:
without tree =?
(await CodexTree.init(self.taskPool, cids[0 ..< encoded.originalBlocksCount])), err:
return failure(err)

without treeCid =? tree.rootCid, err:
Expand Down Expand Up @@ -680,7 +681,8 @@ proc repair*(self: Erasure, encoded: Manifest): Future[?!void] {.async.} =
without (cids, _) =? (await self.decodeInternal(encoded)), err:
return failure(err)

without tree =? CodexTree.init(cids[0 ..< encoded.originalBlocksCount]), err:
without tree =?
(await CodexTree.init(self.taskPool, cids[0 ..< encoded.originalBlocksCount])), err:
return failure(err)

without treeCid =? tree.rootCid, err:
Expand Down
82 changes: 80 additions & 2 deletions codex/merkletree/codex/codex.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,23 @@
{.push raises: [].}

import std/bitops
import std/sequtils
import std/[atomics, sequtils]

import pkg/questionable
import pkg/questionable/results
import pkg/libp2p/[cid, multicodec, multihash]
import pkg/constantine/hashes
import pkg/taskpools
import pkg/chronos/threadsync
import ../../utils
import ../../rng
import ../../errors
import ../../blocktype

from ../../utils/digest import digestBytes

import ../../utils/uniqueptr

import ../merkletree

export merkletree
Expand All @@ -41,14 +45,16 @@
ByteTree* = MerkleTree[ByteHash, ByteTreeKey]
ByteProof* = MerkleProof[ByteHash, ByteTreeKey]

CodexTreeTask* = MerkleTask[ByteHash, ByteTreeKey]

CodexTree* = ref object of ByteTree
mcodec*: MultiCodec

CodexProof* = ref object of ByteProof
mcodec*: MultiCodec

# CodeHashes is not exported from libp2p
# So we need to recreate it instead of
# So we need to recreate it instead of
proc initMultiHashCodeTable(): Table[MultiCodec, MHash] {.compileTime.} =
for item in HashesList:
result[item.mcodec] = item
Expand Down Expand Up @@ -160,6 +166,54 @@
self.layers = ?merkleTreeWorker(self, leaves, isBottomLayer = true)
success self

proc init*(
_: type CodexTree,
tp: Taskpool,
mcodec: MultiCodec = Sha256HashCodec,
leaves: seq[ByteHash],
): Future[?!CodexTree] {.async: (raises: [CancelledError]).} =
if leaves.len == 0:
return failure "Empty leaves"

Check warning on line 176 in codex/merkletree/codex/codex.nim

View check run for this annotation

Codecov / codecov/patch

codex/merkletree/codex/codex.nim#L176

Added line #L176 was not covered by tests

let
mhash = ?mcodec.mhash()
compressor = proc(x, y: seq[byte], key: ByteTreeKey): ?!ByteHash {.noSideEffect.} =
compress(x, y, key, mhash)
Zero: ByteHash = newSeq[byte](mhash.size)

if mhash.size != leaves[0].len:
return failure "Invalid hash length"

without signal =? ThreadSignalPtr.new():
return failure("Unable to create thread signal")

defer:
signal.close().expect("closing once works")

var tree = CodexTree(compress: compressor, zero: Zero, mcodec: mcodec)

var task =
CodexTreeTask(tree: cast[ptr ByteTree](addr tree), leaves: leaves, signal: signal)

doAssert tp.numThreads > 1,
"Must have at least one separate thread or signal will never be fired"

Check warning on line 199 in codex/merkletree/codex/codex.nim

View check run for this annotation

Codecov / codecov/patch

codex/merkletree/codex/codex.nim#L199

Added line #L199 was not covered by tests

tp.spawn merkleTreeWorker(addr task)

let threadFut = signal.wait()

if err =? catch(await threadFut.join()).errorOption:
?catch(await noCancel threadFut)
if err of CancelledError:
raise (ref CancelledError) err

Check warning on line 208 in codex/merkletree/codex/codex.nim

View check run for this annotation

Codecov / codecov/patch

codex/merkletree/codex/codex.nim#L206-L208

Added lines #L206 - L208 were not covered by tests

if not task.success.load():
return failure("merkle tree task failed")

Check warning on line 212 in codex/merkletree/codex/codex.nim

View check run for this annotation

Codecov / codecov/patch

codex/merkletree/codex/codex.nim#L212

Added line #L212 was not covered by tests
tree.layers = extractValue(task.layers)

success tree

func init*(_: type CodexTree, leaves: openArray[MultiHash]): ?!CodexTree =
if leaves.len == 0:
return failure "Empty leaves"
Expand All @@ -170,6 +224,18 @@

CodexTree.init(mcodec, leaves)

proc init*(
_: type CodexTree, tp: Taskpool, leaves: seq[MultiHash]
): Future[?!CodexTree] {.async: (raises: [CancelledError]).} =
if leaves.len == 0:
return failure "Empty leaves"

Check warning on line 231 in codex/merkletree/codex/codex.nim

View check run for this annotation

Codecov / codecov/patch

codex/merkletree/codex/codex.nim#L231

Added line #L231 was not covered by tests

let
mcodec = leaves[0].mcodec
leaves = leaves.mapIt(it.digestBytes)

Check warning on line 236 in codex/merkletree/codex/codex.nim

View check run for this annotation

Codecov / codecov/patch

codex/merkletree/codex/codex.nim#L236

Added line #L236 was not covered by tests
await CodexTree.init(tp, mcodec, leaves)

func init*(_: type CodexTree, leaves: openArray[Cid]): ?!CodexTree =
if leaves.len == 0:
return failure "Empty leaves"
Expand All @@ -180,6 +246,18 @@

CodexTree.init(mcodec, leaves)

proc init*(
_: type CodexTree, tp: Taskpool, leaves: seq[Cid]
): Future[?!CodexTree] {.async: (raises: [CancelledError]).} =
if leaves.len == 0:
return failure("Empty leaves")

let
mcodec = (?leaves[0].mhash.mapFailure).mcodec
leaves = leaves.mapIt((?it.mhash.mapFailure).digestBytes)

await CodexTree.init(tp, mcodec, leaves)

proc fromNodes*(
_: type CodexTree,
mcodec: MultiCodec = Sha256HashCodec,
Expand Down
31 changes: 30 additions & 1 deletion codex/merkletree/merkletree.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@

{.push raises: [].}

import std/bitops
import std/[bitops, atomics]

import pkg/questionable/results
import pkg/taskpools
import pkg/chronos/threadsync

import ../errors
import ../utils/uniqueptr

type
CompressFn*[H, K] = proc(x, y: H, key: K): ?!H {.noSideEffect, raises: [].}
Expand All @@ -30,6 +33,13 @@
compress*: CompressFn[H, K] # compress function
zero*: H # zero value

MerkleTask*[H, K] = object
tree*: ptr MerkleTree[H, K]
leaves*: seq[H]
signal*: ThreadSignalPtr
layers*: UniquePtr[seq[seq[H]]]
success*: Atomic[bool]

func depth*[H, K](self: MerkleTree[H, K]): int =
return self.layers.len - 1

Expand Down Expand Up @@ -151,3 +161,22 @@
ys[halfn] = ?self.compress(xs[n], self.zero, key = key)

success @[@xs] & ?self.merkleTreeWorker(ys, isBottomLayer = false)

proc merkleTreeWorker*[H, K](task: ptr MerkleTask[H, K]) {.gcsafe.} =
defer:
discard task[].signal.fireSync()

let res = merkleTreeWorker(task[].tree[], task[].leaves, isBottomLayer = true)

if res.isErr:
task[].success.store(false)
return

Check warning on line 173 in codex/merkletree/merkletree.nim

View check run for this annotation

Codecov / codecov/patch

codex/merkletree/merkletree.nim#L173

Added line #L173 was not covered by tests

var layers = res.get()
var newOuterSeq = newSeq[seq[H]](layers.len)
for i in 0 ..< layers.len:
var isoInner = isolate(layers[i])
newOuterSeq[i] = extract(isoInner)

task[].layers = newUniquePtr(newOuterSeq)
task[].success.store(true)
53 changes: 52 additions & 1 deletion codex/merkletree/poseidon2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@

{.push raises: [].}

import std/sequtils
import std/[sequtils, atomics]

import pkg/poseidon2
import pkg/taskpools
import pkg/chronos/threadsync
import pkg/constantine/math/io/io_fields
import pkg/constantine/platforms/abstractions
import pkg/questionable/results

import ../utils
import ../utils/uniqueptr
import ../rng

import ./merkletree
Expand Down Expand Up @@ -44,6 +47,8 @@
Poseidon2Tree* = MerkleTree[Poseidon2Hash, PoseidonKeysEnum]
Poseidon2Proof* = MerkleProof[Poseidon2Hash, PoseidonKeysEnum]

Poseidon2TreeTask* = MerkleTask[Poseidon2Hash, PoseidonKeysEnum]

proc `$`*(self: Poseidon2Tree): string =
let root = if self.root.isOk: self.root.get.toHex else: "none"
"Poseidon2Tree(" & " root: " & root & ", leavesCount: " & $self.leavesCount &
Expand Down Expand Up @@ -77,9 +82,55 @@
self.layers = ?merkleTreeWorker(self, leaves, isBottomLayer = true)
success self

proc init*(
_: type Poseidon2Tree, tp: Taskpool, leaves: seq[Poseidon2Hash]
): Future[?!Poseidon2Tree] {.async: (raises: [CancelledError]).} =
if leaves.len == 0:
return failure "Empty leaves"

let compressor = proc(
x, y: Poseidon2Hash, key: PoseidonKeysEnum
): ?!Poseidon2Hash {.noSideEffect.} =
success compress(x, y, key.toKey)

without signal =? ThreadSignalPtr.new():
return failure("Unable to create thread signal")

defer:
signal.close().expect("closing once works")

var tree = Poseidon2Tree(compress: compressor, zero: Poseidon2Zero)
var task = Poseidon2TreeTask(
tree: cast[ptr Poseidon2Tree](addr tree), leaves: leaves, signal: signal
)

doAssert tp.numThreads > 1,
"Must have at least one separate thread or signal will never be fired"

Check warning on line 108 in codex/merkletree/poseidon2.nim

View check run for this annotation

Codecov / codecov/patch

codex/merkletree/poseidon2.nim#L108

Added line #L108 was not covered by tests

tp.spawn merkleTreeWorker(addr task)

Check warning on line 111 in codex/merkletree/poseidon2.nim

View check run for this annotation

Codecov / codecov/patch

codex/merkletree/poseidon2.nim#L111

Added line #L111 was not covered by tests
let threadFut = signal.wait()

if err =? catch(await threadFut.join()).errorOption:
?catch(await noCancel threadFut)

Check warning on line 115 in codex/merkletree/poseidon2.nim

View check run for this annotation

Codecov / codecov/patch

codex/merkletree/poseidon2.nim#L115

Added line #L115 was not covered by tests
if err of CancelledError:
raise (ref CancelledError) err

Check warning on line 117 in codex/merkletree/poseidon2.nim

View check run for this annotation

Codecov / codecov/patch

codex/merkletree/poseidon2.nim#L117

Added line #L117 was not covered by tests

if not task.success.load():
return failure("merkle tree task failed")

Check warning on line 121 in codex/merkletree/poseidon2.nim

View check run for this annotation

Codecov / codecov/patch

codex/merkletree/poseidon2.nim#L121

Added line #L121 was not covered by tests
tree.layers = extractValue(task.layers)

success tree

func init*(_: type Poseidon2Tree, leaves: openArray[array[31, byte]]): ?!Poseidon2Tree =
Poseidon2Tree.init(leaves.mapIt(Poseidon2Hash.fromBytes(it)))

proc init*(
_: type Poseidon2Tree, tp: Taskpool, leaves: seq[array[31, byte]]
): Future[?!Poseidon2Tree] {.async: (raises: [CancelledError]).} =
await Poseidon2Tree.init(tp, leaves.mapIt(Poseidon2Hash.fromBytes(it)))

Check warning on line 133 in codex/merkletree/poseidon2.nim

View check run for this annotation

Codecov / codecov/patch

codex/merkletree/poseidon2.nim#L133

Added line #L133 was not covered by tests
proc fromNodes*(
_: type Poseidon2Tree, nodes: openArray[Poseidon2Hash], nleaves: int
): ?!Poseidon2Tree =
Expand Down
Loading
Loading