Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changelog/1229.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
runtime/accounts: Include first-activity in API response

Adds a `first_activity` field to runtime accounts showing when the account
was first seen in a transaction. To backfill historical data, enable the
`first_activity_backfill` analyzer:

```yaml
analyzers:
first_activity_backfill: {}
```
97 changes: 97 additions & 0 deletions analyzer/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,103 @@ var (
ON CONFLICT (runtime, address) DO UPDATE
SET num_txs = EXCLUDED.num_txs`

RuntimeAccountFirstActivityUpsert = `
INSERT INTO chain.runtime_accounts AS accounts (runtime, address, first_activity)
VALUES ($1, $2, $3)
ON CONFLICT (runtime, address) DO UPDATE
SET first_activity = LEAST(COALESCE(accounts.first_activity, excluded.first_activity), excluded.first_activity)`

// Recomputes first_activity for all runtime accounts in runtime $1 up to height $2.
// Intended for use after fast-sync.
RuntimeAccountFirstActivityRecompute = `
WITH first_rounds AS (
SELECT runtime, account_address, MIN(tx_round) AS first_round
FROM chain.runtime_related_transactions
WHERE runtime = $1::runtime AND tx_round <= $2::bigint
GROUP BY runtime, account_address
)
INSERT INTO chain.runtime_accounts AS accts (runtime, address, first_activity)
SELECT DISTINCT ON (fr.runtime, fr.account_address) fr.runtime, fr.account_address, rt.timestamp
FROM first_rounds fr
JOIN chain.runtime_transactions rt ON rt.runtime = fr.runtime AND rt.round = fr.first_round
ON CONFLICT (runtime, address) DO UPDATE
SET first_activity = LEAST(COALESCE(accts.first_activity, EXCLUDED.first_activity), EXCLUDED.first_activity)`

// Fetches the current backfill cursor position.
RuntimeAccountFirstActivityBackfillGetCursor = `
SELECT last_runtime, last_address
FROM analysis.runtime_accounts_first_activity_backfill_state
WHERE id = 1`

// Fetches the first batch of accounts with their first activity timestamps (when cursor is NULL).
// Batches the timestamp lookup for efficiency.
RuntimeAccountFirstActivityBackfillAccountsStart = `
WITH accounts AS (
SELECT runtime, address, first_activity
FROM chain.runtime_accounts
ORDER BY runtime, address
LIMIT $1
),
first_rounds AS (
SELECT rrt.runtime, rrt.account_address, MIN(rrt.tx_round) AS first_round
FROM chain.runtime_related_transactions rrt
JOIN accounts a ON a.runtime = rrt.runtime AND a.address = rrt.account_address
GROUP BY rrt.runtime, rrt.account_address
)
SELECT a.runtime, a.address, a.first_activity, rt.timestamp AS computed_first_activity
FROM accounts a
LEFT JOIN first_rounds fr ON fr.runtime = a.runtime AND fr.account_address = a.address
LEFT JOIN chain.runtime_transactions rt ON rt.runtime = fr.runtime AND rt.round = fr.first_round
ORDER BY a.runtime, a.address`

// Fetches the next batch of accounts with their first activity timestamps, starting after the cursor.
// Batches the timestamp lookup for efficiency.
RuntimeAccountFirstActivityBackfillAccountsNext = `
WITH accounts AS (
SELECT runtime, address, first_activity
FROM chain.runtime_accounts
WHERE (runtime, address) > ($1, $2)
ORDER BY runtime, address
LIMIT $3
),
first_rounds AS (
SELECT rrt.runtime, rrt.account_address, MIN(rrt.tx_round) AS first_round
FROM chain.runtime_related_transactions rrt
JOIN accounts a ON a.runtime = rrt.runtime AND a.address = rrt.account_address
GROUP BY rrt.runtime, rrt.account_address
)
SELECT a.runtime, a.address, a.first_activity, rt.timestamp AS computed_first_activity
FROM accounts a
LEFT JOIN first_rounds fr ON fr.runtime = a.runtime AND fr.account_address = a.address
LEFT JOIN chain.runtime_transactions rt ON rt.runtime = fr.runtime AND rt.round = fr.first_round
ORDER BY a.runtime, a.address`

// Updates first_activity for a specific account.
RuntimeAccountFirstActivityBackfillUpdate = `
UPDATE chain.runtime_accounts
SET first_activity = LEAST(COALESCE(first_activity, $3), $3)
WHERE runtime = $1 AND address = $2`

// Updates the backfill cursor position.
RuntimeAccountFirstActivityBackfillUpdateCursor = `
UPDATE analysis.runtime_accounts_first_activity_backfill_state
SET last_runtime = $1, last_address = $2
WHERE id = 1`

// Checks if there are more accounts to process (when cursor is NULL).
RuntimeAccountFirstActivityBackfillHasMoreStart = `
SELECT CASE WHEN EXISTS (
SELECT 1 FROM chain.runtime_accounts LIMIT 1
) THEN 1 ELSE 0 END`

// Checks if there are more accounts to process after the current cursor.
RuntimeAccountFirstActivityBackfillHasMoreNext = `
SELECT CASE WHEN EXISTS (
SELECT 1 FROM chain.runtime_accounts
WHERE (runtime, address) > ($1, $2)
LIMIT 1
) THEN 1 ELSE 0 END`

RuntimeAccountTotalSentUpsert = `
INSERT INTO chain.runtime_accounts as accounts (runtime, address, total_sent)
VALUES ($1, $2, $3)
Expand Down
150 changes: 150 additions & 0 deletions analyzer/runtime/firstactivitybackfill/first_activity_backfill.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Package firstactivitybackfill implements the first_activity backfill analyzer.
// This analyzer backfills the first_activity field for runtime accounts that
// were created before the first_activity tracking was added.
package firstactivitybackfill

import (
"context"
"fmt"
"time"

"github.com/oasisprotocol/nexus/analyzer"
"github.com/oasisprotocol/nexus/analyzer/item"
"github.com/oasisprotocol/nexus/analyzer/queries"
"github.com/oasisprotocol/nexus/common"
"github.com/oasisprotocol/nexus/config"
"github.com/oasisprotocol/nexus/log"
"github.com/oasisprotocol/nexus/storage"
)

const (
analyzerPrefix = "first_activity_backfill"
defaultBatchSize = 100
)

type processor struct {
target storage.TargetStorage
logger *log.Logger

// Cursor state for the current batch.
lastRuntime *common.Runtime
lastAddress *string
}

var _ item.ItemProcessor[*accountItem] = (*processor)(nil)

type accountItem struct {
Runtime common.Runtime
Address string
ComputedFirstActivity *time.Time // The computed first activity timestamp (from batched lookup).
IsLast bool // True if this is the last item in the batch.
}

func NewAnalyzer(
cfg config.ItemBasedAnalyzerConfig,
target storage.TargetStorage,
logger *log.Logger,
) (analyzer.Analyzer, error) {
if cfg.BatchSize == 0 {
cfg.BatchSize = defaultBatchSize
}
logger = logger.With("analyzer", analyzerPrefix)
p := &processor{
target: target,
logger: logger,
}
return item.NewAnalyzer[*accountItem](
analyzerPrefix,
cfg,
p,
target,
logger,
)
}

func (p *processor) GetItems(ctx context.Context, limit uint64) ([]*accountItem, error) {
// Fetch current cursor position.
var lastRuntime *common.Runtime
var lastAddress *string
err := p.target.QueryRow(ctx, queries.RuntimeAccountFirstActivityBackfillGetCursor).Scan(&lastRuntime, &lastAddress)
if err != nil {
return nil, fmt.Errorf("fetching cursor: %w", err)
}

// Query accounts after cursor. Use separate queries to ensure index usage.
var rows storage.QueryResults
if lastRuntime == nil || lastAddress == nil {
rows, err = p.target.Query(ctx, queries.RuntimeAccountFirstActivityBackfillAccountsStart, limit)
} else {
rows, err = p.target.Query(ctx, queries.RuntimeAccountFirstActivityBackfillAccountsNext, *lastRuntime, *lastAddress, limit)
}
if err != nil {
return nil, fmt.Errorf("querying accounts: %w", err)
}
defer rows.Close()

var items []*accountItem
for rows.Next() {
var it accountItem
var firstActivity *time.Time // Scanned but unused; kept to match query columns.
if err := rows.Scan(&it.Runtime, &it.Address, &firstActivity, &it.ComputedFirstActivity); err != nil {
return nil, fmt.Errorf("scanning account: %w", err)
}
items = append(items, &it)
}

// Mark the last item and store cursor for update.
if len(items) > 0 {
items[len(items)-1].IsLast = true
p.lastRuntime = &items[len(items)-1].Runtime
p.lastAddress = &items[len(items)-1].Address
}

return items, nil
}

func (p *processor) ProcessItem(ctx context.Context, batch *storage.QueryBatch, item *accountItem) error {
// Update first_activity using the pre-computed timestamp from the batched query.
// The UPDATE query uses LEAST to ensure we keep the earlier timestamp if one exists.
if item.ComputedFirstActivity != nil {
batch.Queue(
queries.RuntimeAccountFirstActivityBackfillUpdate,
item.Runtime,
item.Address,
item.ComputedFirstActivity,
)
}

// If this is the last item in the batch, update the cursor.
if item.IsLast && p.lastRuntime != nil && p.lastAddress != nil {
batch.Queue(
queries.RuntimeAccountFirstActivityBackfillUpdateCursor,
*p.lastRuntime,
*p.lastAddress,
)
}

return nil
}

func (p *processor) QueueLength(ctx context.Context) (int, error) {
// Fetch current cursor position.
var lastRuntime *common.Runtime
var lastAddress *string
if err := p.target.QueryRow(ctx, queries.RuntimeAccountFirstActivityBackfillGetCursor).Scan(&lastRuntime, &lastAddress); err != nil {
return 0, fmt.Errorf("fetching cursor: %w", err)
}

// Check if there are more accounts after cursor. Use separate queries to ensure index usage.
var hasMore int
var err error
if lastRuntime == nil || lastAddress == nil {
err = p.target.QueryRow(ctx, queries.RuntimeAccountFirstActivityBackfillHasMoreStart).Scan(&hasMore)
} else {
err = p.target.QueryRow(ctx, queries.RuntimeAccountFirstActivityBackfillHasMoreNext, *lastRuntime, *lastAddress).Scan(&hasMore)
}
if err != nil {
return 0, fmt.Errorf("checking remaining work: %w", err)
}
return hasMore, nil
}
4 changes: 4 additions & 0 deletions analyzer/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ func (m *processor) FinalizeFastSync(ctx context.Context, lastFastSyncHeight int
m.logger.Info("recomputing total_received for every account")
batch.Queue(queries.RuntimeAccountTotalReceivedRecompute, m.runtime, lastFastSyncHeight, nativeTokenSymbol(m.sdkPT))

m.logger.Info("recomputing first_activity for every account")
batch.Queue(queries.RuntimeAccountFirstActivityRecompute, m.runtime, lastFastSyncHeight)

m.logger.Info("recomputing gas_for_calling for every contract")
batch.Queue(queries.RuntimeAccountGasForCallingRecompute, m.runtime, lastFastSyncHeight)

Expand Down Expand Up @@ -505,6 +508,7 @@ func (m *processor) queueDbUpdates(batch *storage.QueryBatch, data *BlockData) {
// DB deadlocks with as few as 2 parallel analyzers.
// We recalculate the number of transactions for all accounts at the end of fast-sync, by aggregating the tx data.
batch.Queue(queries.RuntimeAccountNumTxsUpsert, m.runtime, addr, 1)
batch.Queue(queries.RuntimeAccountFirstActivityUpsert, m.runtime, addr, data.Header.Timestamp)
}
}
m.queueTransactionInsert(batch, data.Header.Round, data.Header.Timestamp, transactionData)
Expand Down
5 changes: 5 additions & 0 deletions api/spec/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3762,6 +3762,11 @@ components:
allOf: [$ref: '#/components/schemas/RuntimeEvmBalance']
stats:
allOf: [$ref: '#/components/schemas/AccountStats']
first_activity:
type: string
format: date-time
description: The timestamp of the first transaction involving this account.
example: *iso_timestamp_1

RuntimeStatus:
type: object
Expand Down
6 changes: 6 additions & 0 deletions cmd/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
roflinstance "github.com/oasisprotocol/nexus/analyzer/rofl/instance_transactions"
"github.com/oasisprotocol/nexus/analyzer/roflmarket"
"github.com/oasisprotocol/nexus/analyzer/runtime"
"github.com/oasisprotocol/nexus/analyzer/runtime/firstactivitybackfill"
"github.com/oasisprotocol/nexus/analyzer/util"
"github.com/oasisprotocol/nexus/analyzer/validatorstakinghistory"
"github.com/oasisprotocol/nexus/cache/httpproxy"
Expand Down Expand Up @@ -633,6 +634,11 @@ func NewService(ctx context.Context, cfg *config.AnalysisConfig, logger *log.Log
return nebyprices.NewAnalyzer(common.RuntimeSapphire, cfg.Analyzers.SapphireNebyPrices, dbClient, logger)
})
}
if cfg.Analyzers.FirstActivityBackfill != nil {
analyzers, err = addAnalyzer(analyzers, err, "" /*syncTag*/, func() (A, error) {
return firstactivitybackfill.NewAnalyzer(*cfg.Analyzers.FirstActivityBackfill, dbClient, logger)
})
}
if cfg.Analyzers.MetadataRegistry != nil {
analyzers, err = addAnalyzer(analyzers, err, "" /*syncTag*/, func() (A, error) {
return metadata_registry.NewAnalyzer(*cfg.Analyzers.MetadataRegistry, dbClient, logger)
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ type AnalyzersList struct {
PontusxDevAbi *EvmAbiAnalyzerConfig `koanf:"evm_abi_pontusx_dev"`
SapphireNebyPrices *NebyPricesConfig `koanf:"neby_prices_sapphire"`

FirstActivityBackfill *ItemBasedAnalyzerConfig `koanf:"first_activity_backfill"`

MetadataRegistry *MetadataRegistryConfig `koanf:"metadata_registry"`
ValidatorStakingHistory *ValidatorStakingHistoryConfig `koanf:"validator_staking_history"`
NodeStats *NodeStatsConfig `koanf:"node_stats"`
Expand Down
1 change: 1 addition & 0 deletions storage/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2045,6 +2045,7 @@ func (c *StorageClient) RuntimeAccount(ctx context.Context, runtime common.Runti
&a.Stats.TotalSent,
&a.Stats.TotalReceived,
&a.Stats.NumTxns,
&a.FirstActivity,
)

switch err {
Expand Down
2 changes: 1 addition & 1 deletion storage/client/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ const (

RuntimeAccountStats = `
SELECT
total_sent, total_received, num_txs
total_sent, total_received, num_txs, first_activity
FROM chain.runtime_accounts
WHERE
(runtime = $1) AND
Expand Down
17 changes: 17 additions & 0 deletions storage/migrations/53_runtime_accounts_first_activity.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- Add first_activity column to runtime_accounts.
ALTER TABLE chain.runtime_accounts ADD COLUMN first_activity TIMESTAMP WITH TIME ZONE;

-- State table for tracking backfill progress.
-- Stores the cursor position (last processed runtime, address).
CREATE TABLE analysis.runtime_accounts_first_activity_backfill_state (
id INTEGER PRIMARY KEY DEFAULT 1 CHECK (id = 1), -- singleton row
last_runtime runtime,
last_address oasis_addr
);

-- Initialize with NULL cursor (start from beginning).
INSERT INTO analysis.runtime_accounts_first_activity_backfill_state (id) VALUES (1);

-- Re-apply grants for new table.
GRANT SELECT ON ALL TABLES IN SCHEMA analysis TO PUBLIC;
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA analysis TO PUBLIC;
1 change: 1 addition & 0 deletions tests/e2e_regression/damask/e2e_config_2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ analysis:
# evm_contract_code_emerald: { stop_if_queue_empty_for: 1s } # Disabled: emerald block analyzer is disabled in phase 1
evm_abi_emerald: { stop_if_queue_empty_for: 10s } # Give evm_contract_verifier time to fetch ABIs first. The 10s has been enough in practice, but might need to be tuned in the future, especially if the caching proxy has an empty cache.
evm_contract_verifier_emerald: { stop_if_queue_empty_for: 1s, sourcify_server_url: http://localhost:9191 }
first_activity_backfill: { stop_if_queue_empty_for: 1s }
validator_staking_history: { from: 8_048_956, stop_if_queue_empty_for: 1s, max_backoff_time: 6s }
metadata_registry: { interval: 5s, stop_if_queue_empty_for: 1s, repository_branch: "nexus-e2e", mock_logo_urls: true }
# Some non-block analyzers are not tested in e2e regressions.
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
runtime,address,num_txs,gas_for_calling,total_sent,total_received
runtime,address,num_txs,gas_for_calling,total_sent,total_received,first_activity
1 change: 1 addition & 0 deletions tests/e2e_regression/eden/e2e_config_2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ analysis:
evm_contract_code_emerald: { stop_if_queue_empty_for: 1s }
evm_abi_emerald: { stop_if_queue_empty_for: 10s } # Give evm_contract_verifier time to fetch ABIs first. The 10s has been enough in practice, but might need to be tuned in the future, especially if the caching proxy has an empty cache.
evm_contract_verifier_emerald: { stop_if_queue_empty_for: 1s, sourcify_server_url: http://localhost:9191 }
first_activity_backfill: { stop_if_queue_empty_for: 1s }
validator_staking_history: { from: 16_817_956, stop_if_queue_empty_for: 1s, max_backoff_time: 6s }
metadata_registry: { interval: 5s, stop_if_queue_empty_for: 1s, repository_branch: "nexus-e2e", mock_logo_urls: true }
# Some non-block analyzers are not tested in e2e regressions.
Expand Down
Loading
Loading