Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better, faster, more debugging info #56

Merged
merged 1 commit into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/main/scala/goatrodeo/omnibor/Builder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ object Builder {
None,
store = storage,
purlOut = purlOut,
parentScope = ParentScope.forAndWith(toProcess.main, None),
blockList = blockGitoids,
keepRunning = () => !dead_?,
atEnd = (parent, _) => {
Expand All @@ -166,7 +167,9 @@ object Builder {
.toDouble
val itemsPerMinute = itemsPerSecond * 60.0d
val left = totalItems.toDouble - updatedCnt.toDouble
val remainingDuration = Duration.ZERO.plusSeconds((left / itemsPerSecond).round)
val remainingDuration = Duration.ZERO.plusSeconds(
(left / itemsPerSecond).round
)
f" Items/minute ${itemsPerMinute.round}, est remaining ${remainingDuration}"
} else ""
logger.info(
Expand Down Expand Up @@ -293,7 +296,8 @@ object Builder {
)
}
updatedAlias
}
},
item => f"Inserting Merkle Tree of ${itemId}"
)
// and update the Item with the AliasFrom
data.write(
Expand All @@ -307,7 +311,8 @@ object Builder {
val toAdd = (EdgeType.aliasFrom, tree)
if (item.connections.contains(toAdd)) item
else item.copy(connections = item.connections + toAdd)
}
},
item => f"Updating Merkle Tree alias for ${itemId}"
)
}

Expand Down
91 changes: 39 additions & 52 deletions src/main/scala/goatrodeo/omnibor/Item.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,51 +53,32 @@ case class Item(
myHash < thatHash
}

/**
* Builds a list of items that are referenced from this item. The references
* are of types `AliasFrom`, `BuiltFrom`, and `ContainedBy`
*
* The resulting `Item`s should be updated in the store
*/
def buildListOfReferencesForAliasFromBuiltFromContainedBy(): Vector[Item] = {
/** Builds a list of items that are referenced from this item. The references
* are of types `AliasFrom`, `BuiltFrom`, and `ContainedBy`
*
* The resulting `Item`s should be updated in the store
*/
def buildListOfReferencesForAliasFromBuiltFromContainedBy()
: Vector[(String, String)] = {
for {
edge <- this.connections.toVector
toUpdate <- edge match {
case Edge(EdgeType.aliasFrom, connection) => {
Vector(
Item(
identifier = connection,
reference = Item.noopLocationReference,
connections = TreeSet(EdgeType.aliasTo -> this.identifier),
bodyMimeType = None,
body = None
)
(EdgeType.aliasTo -> connection)
)

}

case Edge(EdgeType.builtFrom, connection) => {

Vector(
Item(
identifier = connection,
reference = Item.noopLocationReference,
connections = TreeSet(EdgeType.buildsTo -> identifier),
bodyMimeType = None,
body = None
)
)
Vector(EdgeType.buildsTo -> connection)

}
case Edge(EdgeType.containedBy, connection) => {

Vector(
Item(
identifier = connection,
reference = Item.noopLocationReference,
connections = TreeSet(EdgeType.contains -> identifier),
bodyMimeType = None,
body = None
)
EdgeType.contains -> connection
)
}
case _ => Vector.empty
Expand All @@ -108,30 +89,36 @@ case class Item(
}
}

/**
* Create or update (merge) this `Item` in the store.
*
* The resulting item will be returned. The resulting `Item`
* may be `this` or `this` merged with the item in the store
*
* @param store the `Storage` instance
*
* @return the updated item
*/
def createOrUpdateInStore(store: Storage): Item = {
store.write(identifier, {
case None => this
case Some(other) => this.merge(other)
})
/** Create or update (merge) this `Item` in the store.
*
* The resulting item will be returned. The resulting `Item` may be `this` or
* `this` merged with the item in the store
*
* @param store
* the `Storage` instance
*
* @return
* the updated item
*/
def createOrUpdateInStore(store: Storage, context: Item => String): Item = {
store.write(
identifier,
{
case None => this
case Some(other) => this.merge(other)
},
context
)
}

/**
* Merge this `Item` with another `Item`
*
* @param other the item to merge with
*
* @return the merged items
*/
/** Merge this `Item` with another `Item`
*
* @param other
* the item to merge with
*
* @return
* the merged items
*/
def merge(other: Item): Item = {

val (body, mime) =
Expand Down
50 changes: 27 additions & 23 deletions src/main/scala/goatrodeo/omnibor/Storage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import java.time.LocalDateTime
import java.time.ZoneOffset
import java.util.concurrent.atomic.AtomicInteger
import java.io.IOException
import com.typesafe.scalalogging.Logger

/** An abstract definition of a GitOID Corpus storage backend
*/
Expand Down Expand Up @@ -82,8 +83,11 @@ trait Storage {
* the path
* @param data
* the data to write
* @param context generate a String containing the context of the call that led to this write. Used for contention logging
*
* @return the resulting item if merged
*/
def write(path: String, opr: Option[Item] => Item): Item
def write(path: String, opr: Option[Item] => Item, context: Item => String): Item

/** Write data to the path
*
Expand Down Expand Up @@ -115,30 +119,16 @@ trait Storage {

def contains(identifier: String): Boolean

/**
* return only the keys that start with "gitoid:blob:sha256:"
/** return only the keys that start with "gitoid:blob:sha256:"
*
* @return
*/
def gitoidKeys(): Set[GitOID] = keys().filter(_.startsWith("gitoid:blob:sha256:"))
def gitoidKeys(): Set[GitOID] =
keys().filter(_.startsWith("gitoid:blob:sha256:"))

def destDirectory(): Option[File]
}

// trait StorageReader {
// def read(path: GitOID): Option[String]
// }

// trait BulkStorageReader {
// def bulkRead(
// paths: Set[GitOID],
// known: Map[GitOID, Option[Item]],
// totalBytes: Long = 0
// ): Map[GitOID, Option[Item]]
// }



/** Can the filenames be listed?
*/
trait ListFileNames extends Storage {
Expand Down Expand Up @@ -194,8 +184,10 @@ class MemStorage(val targetDir: Option[File])
extends Storage
with ListFileNames {

private val logger = Logger(getClass())

override def contains(identifier: String): Boolean = db.get().contains(identifier)
override def contains(identifier: String): Boolean =
db.get().contains(identifier)

override def destDirectory(): Option[File] = targetDir

Expand Down Expand Up @@ -229,8 +221,9 @@ class MemStorage(val targetDir: Option[File])
ret
}

override def write(path: String, opr: Option[Item] => Item): Item = {
val lock = sync.synchronized {

override def write(path: String, opr: Option[Item] => Item, context: Item => String): Item = {
val (lock, waiters) = sync.synchronized {
val theLock = Option(locks.get(path)) match {
case Some(lock) => lock
case None => {
Expand All @@ -240,13 +233,24 @@ class MemStorage(val targetDir: Option[File])
}
}

theLock.incrementAndGet()
theLock
// how many threads are waiting on this row?
val waiters = theLock.incrementAndGet()

theLock -> waiters
}

val contentionThreshold = 4


try {
lock.synchronized {
val current = read(path)
val updated = opr(current)
// if it's contentionThreshold or more, log the message and if it's a multiple of contentionThreshold, log again
if (waiters >= contentionThreshold && waiters % contentionThreshold == 0) {
logger.info(f"Lock contention for ${path} waiting ${waiters} context ${context(updated)}")
}

dbSync.synchronized {
val newVal = db.get() + (path -> updated)
db.set(newVal)
Expand Down
Loading