Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.digitalasset.canton.SynchronizerAlias
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.time.Clock
import com.digitalasset.canton.topology.SynchronizerId
import com.digitalasset.canton.topology.PhysicalSynchronizerId
import com.digitalasset.canton.topology.admin.grpc.TopologyStoreId
import com.digitalasset.canton.tracing.TraceContext
import io.circe.Json
import io.grpc.{Status, StatusRuntimeException}
import io.opentelemetry.api.trace.Tracer
import org.apache.pekko.actor.ActorSystem
Expand All @@ -23,7 +24,7 @@ import org.lfdecentralizedtrust.splice.sv.LocalSynchronizerNode
import org.lfdecentralizedtrust.splice.util.BackupDump

import java.nio.file.Paths
import scala.concurrent.{blocking, ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, blocking}
import scala.util.{Failure, Success}

/** As taking a topology snapshot is not a cheap operation, we limit this trigger to produce at most one snapshot per day.
Expand All @@ -46,12 +47,12 @@ class PeriodicTopologySnapshotTrigger(
task: PeriodicTaskTrigger.PeriodicTask
)(implicit traceContext: TraceContext): Future[TaskOutcome] = {
participantAdminConnection
.getSynchronizerId(synchronizerAlias)
.getPhysicalSynchronizerId(synchronizerAlias)
.transformWith {
case Failure(s: StatusRuntimeException) if s.getStatus.getCode == Status.Code.NOT_FOUND =>
Future.successful(TaskNoop)
case Failure(e) => Future.failed(e)
case Success(synchronizerId) =>
case Success(physicalSynchronizerId) =>
val now = clock.now
val utcDate = now.toInstant.toString.split("T").head
val folderName = s"topology_snapshot_${now.toInstant}"
Expand All @@ -63,7 +64,7 @@ class PeriodicTopologySnapshotTrigger(
folderName,
now,
utcDate,
synchronizerId,
physicalSynchronizerId,
)
else Future.successful(TaskSuccess("Today's topology snapshot already exists."))
} yield res
Expand All @@ -83,7 +84,7 @@ class PeriodicTopologySnapshotTrigger(
folderName: String,
now: CantonTimestamp,
utcDate: String,
synchronizerId: SynchronizerId,
physicalSynchronizerId: PhysicalSynchronizerId,
)(implicit
traceContext: TraceContext,
esf: ExecutionSequencerFactory,
Expand All @@ -110,13 +111,16 @@ class PeriodicTopologySnapshotTrigger(
authorizedStore <- sequencerAdminConnection.exportAuthorizedStoreSnapshot(sequencerId.uid)
// list a summary of the transactions state at the time of the snapshot to validate further imports
summary <- sequencerAdminConnection.getTopologyTransactionsSummary(
TopologyStoreId.Synchronizer(synchronizerId),
TopologyStoreId.Synchronizer(physicalSynchronizerId.logical),
clock.now,
)
// we create a single metadata file to store the amounts of the different transactions along the sequencerId
metadata = summary.map(e =>
(e._1.code, e._2.toString)
) + ("sequencerId" -> sequencerId.toProtoPrimitive)
metadataMap = summary.map(e => (e._1.code, e._2.toString)) +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: feels like a pretty weird csv file. instead of having headers at the top we instead have each row be a key value pair. How do you feel about making it a json file?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I can make it json

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done :)

("sequencerId" -> sequencerId.toProtoPrimitive) +
("physicalSynchronizerId" -> physicalSynchronizerId.toProtoPrimitive)
metadataJson = Json
.obj(metadataMap.map { case (k, v) => k -> Json.fromString(v) }.toSeq*)
.spaces2
_ <- Future {
blocking {
val fileDesc =
Expand All @@ -132,7 +136,7 @@ class PeriodicTopologySnapshotTrigger(
BackupDump.write(
config.location,
Paths.get(s"$folderName/metadata"),
metadata.toString(),
metadataJson,
loggerFactory,
),
)
Expand Down
Loading