Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""rename credit_history ratio column to price

Revision ID: e1f2a3b4c5d6
Revises: d0e1f2a3b4c5
Create Date: 2025-12-01 00:00:00.000000

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'e1f2a3b4c5d6'
down_revision = 'd0e1f2a3b4c5'
branch_labels = None
depends_on = None


def upgrade() -> None:
# First, add the new price column
op.add_column('credit_history', sa.Column('price', sa.DECIMAL(), nullable=True))

# Add the new bonus_amount column
op.add_column('credit_history', sa.Column('bonus_amount', sa.BigInteger(), nullable=True))

# Transform data: price calculation depends on payment token
# Taking into account that bonus_ratio = 1.2
# For ALEPH token: ratio = (1 / price) * bonus_ratio, therefore price = bonus_ratio / ratio
# For other tokens: price = 1/ratio
# Only update rows where payment_method is NOT 'credit_expense' or 'credit_transfer'
# and where ratio is not null and not zero
connection = op.get_bind()
connection.execute(sa.text("""
UPDATE credit_history
SET price = CASE
WHEN token = 'ALEPH' THEN ROUND(1.2 / ratio, 18)
ELSE ROUND(1.0 / ratio, 18)
END
WHERE ratio IS NOT NULL
AND ratio != 0
AND (payment_method IS NULL
OR (payment_method != 'credit_expense' AND payment_method != 'credit_transfer'))
"""))

# Update bonus_amount for ALEPH token records
connection.execute(sa.text("""
UPDATE credit_history
SET bonus_amount = TRUNC(amount / 1.2)
WHERE token = 'ALEPH'
AND amount IS NOT NULL
"""))

# Drop the old ratio column
op.drop_column('credit_history', 'ratio')


def downgrade() -> None:
# Add back the ratio column
op.add_column('credit_history', sa.Column('ratio', sa.DECIMAL(), nullable=True))

# Transform data back: reverse price calculation depends on payment token
# For ALEPH token: price = 1.2 / ratio, therefore ratio = 1.2 / price
# For other tokens: ratio = 1/price
connection = op.get_bind()
connection.execute(sa.text("""
UPDATE credit_history
SET ratio = CASE
WHEN token = 'ALEPH' THEN ROUND(1.2 / price, 18)
ELSE ROUND(1.0 / price, 18)
END
WHERE price IS NOT NULL
AND price != 0
AND (payment_method IS NULL
OR (payment_method != 'credit_expense' AND payment_method != 'credit_transfer'))
"""))

# Drop the price column
op.drop_column('credit_history', 'price')

# Drop the bonus_amount column
op.drop_column('credit_history', 'bonus_amount')
85 changes: 73 additions & 12 deletions src/aleph/db/accessors/balances.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,8 @@ def _bulk_insert_credit_history(
cursor = conn.cursor()

# Column specification for credit history
# Include the new message_timestamp field
copy_columns = "address, amount, credit_ref, credit_index, message_timestamp, last_update, ratio, tx_hash, expiration_date, token, chain, origin, provider, origin_ref, payment_method"
# Include the new message_timestamp and bonus_amount fields
copy_columns = "address, amount, credit_ref, credit_index, message_timestamp, last_update, price, bonus_amount, tx_hash, expiration_date, token, chain, origin, provider, origin_ref, payment_method"

csv_credit_history = StringIO("\n".join(csv_rows))
cursor.copy_expert(
Expand Down Expand Up @@ -453,7 +453,7 @@ def update_credit_balances_distribution(
"""
Updates credit balances for distribution messages (aleph_credit_distribution).

Distribution messages include all fields like ratio, tx_hash, provider,
Distribution messages include all fields like price, bonus_amount, tx_hash, provider,
payment_method, token, chain, and expiration_date.
"""

Expand All @@ -463,7 +463,7 @@ def update_credit_balances_distribution(
for index, credit_entry in enumerate(credits_list):
address = credit_entry["address"]
amount = abs(int(credit_entry["amount"]))
ratio = Decimal(credit_entry["ratio"])
price = Decimal(credit_entry["price"])
tx_hash = credit_entry["tx_hash"]
provider = credit_entry["provider"]

Expand All @@ -472,6 +472,7 @@ def update_credit_balances_distribution(
origin = credit_entry.get("origin", "")
origin_ref = credit_entry.get("ref", "")
payment_method = credit_entry.get("payment_method", "")
bonus_amount = credit_entry.get("bonus_amount", "")

# Convert expiration timestamp to datetime

Expand All @@ -482,7 +483,7 @@ def update_credit_balances_distribution(
)

csv_rows.append(
f"{address};{amount};{message_hash};{index};{message_timestamp};{last_update};{ratio};{tx_hash};{expiration_date or ''};{token};{chain};{origin};{provider};{origin_ref};{payment_method}"
f"{address};{amount};{message_hash};{index};{message_timestamp};{last_update};{price};{bonus_amount or ''};{tx_hash};{expiration_date or ''};{token};{chain};{origin};{provider};{origin_ref};{payment_method}"
)

_bulk_insert_credit_history(session, csv_rows)
Expand All @@ -500,7 +501,7 @@ def update_credit_balances_expense(
Expense messages have negative amounts and can include:
- execution_id (mapped to origin)
- node_id (mapped to tx_hash)
- price (mapped to ratio)
- price (mapped to price)
- time (skipped for now)
- ref (mapped to origin_ref)
"""
Expand All @@ -516,11 +517,11 @@ def update_credit_balances_expense(
# Map new fields
origin = credit_entry.get("execution_id", "")
tx_hash = credit_entry.get("node_id", "")
ratio = credit_entry.get("price", "")
price = credit_entry.get("price", "")
# Skip time field for now

csv_rows.append(
f"{address};{amount};{message_hash};{index};{message_timestamp};{last_update};{ratio};{tx_hash};;;;{origin};ALEPH;{origin_ref};credit_expense"
f"{address};{amount};{message_hash};{index};{message_timestamp};{last_update};{price};;{tx_hash};;;;{origin};ALEPH;{origin_ref};credit_expense"
)

_bulk_insert_credit_history(session, csv_rows)
Expand Down Expand Up @@ -562,15 +563,15 @@ def update_credit_balances_transfer(

# Add positive entry for recipient (origin = sender, provider = ALEPH, payment_method = credit_transfer)
csv_rows.append(
f"{recipient_address};{amount};{message_hash};{index};{message_timestamp};{last_update};;;{expiration_date or ''};;;{sender_address};ALEPH;;credit_transfer"
f"{recipient_address};{amount};{message_hash};{index};{message_timestamp};{last_update};;;;{expiration_date or ''};;;{sender_address};ALEPH;;credit_transfer"
)
index += 1

# Add negative entry for sender (unless sender is in whitelisted addresses)
# (origin = recipient, provider = ALEPH, payment_method = credit_transfer)
if sender_address not in whitelisted_addresses:
csv_rows.append(
f"{sender_address};{-amount};{message_hash};{index};{message_timestamp};{last_update};;;;;;{recipient_address};ALEPH;;credit_transfer"
f"{sender_address};{-amount};{message_hash};{index};{message_timestamp};{last_update};;;;;;;{recipient_address};ALEPH;;credit_transfer"
)
index += 1

Expand Down Expand Up @@ -602,6 +603,13 @@ def get_address_credit_history(
address: str,
page: int = 1,
pagination: int = 0,
tx_hash: Optional[str] = None,
token: Optional[str] = None,
chain: Optional[str] = None,
provider: Optional[str] = None,
origin: Optional[str] = None,
origin_ref: Optional[str] = None,
payment_method: Optional[str] = None,
) -> Sequence[AlephCreditHistoryDb]:
"""
Get paginated credit history entries for a specific address, ordered from newest to oldest.
Expand All @@ -611,6 +619,13 @@ def get_address_credit_history(
address: Address to get credit history for
page: Page number (starts at 1)
pagination: Number of entries per page (0 for all entries)
tx_hash: Filter by transaction hash
token: Filter by token
chain: Filter by chain
provider: Filter by provider
origin: Filter by origin
origin_ref: Filter by origin reference
payment_method: Filter by payment method

Returns:
List of credit history entries ordered by message_timestamp desc
Expand All @@ -621,6 +636,22 @@ def get_address_credit_history(
.order_by(AlephCreditHistoryDb.message_timestamp.desc())
)

# Apply filters
if tx_hash is not None:
query = query.where(AlephCreditHistoryDb.tx_hash == tx_hash)
if token is not None:
query = query.where(AlephCreditHistoryDb.token == token)
if chain is not None:
query = query.where(AlephCreditHistoryDb.chain == chain)
if provider is not None:
query = query.where(AlephCreditHistoryDb.provider == provider)
if origin is not None:
query = query.where(AlephCreditHistoryDb.origin == origin)
if origin_ref is not None:
query = query.where(AlephCreditHistoryDb.origin_ref == origin_ref)
if payment_method is not None:
query = query.where(AlephCreditHistoryDb.payment_method == payment_method)

if pagination > 0:
query = query.offset((page - 1) * pagination).limit(pagination)

Expand All @@ -630,21 +661,51 @@ def get_address_credit_history(
def count_address_credit_history(
session: DbSession,
address: str,
tx_hash: Optional[str] = None,
token: Optional[str] = None,
chain: Optional[str] = None,
provider: Optional[str] = None,
origin: Optional[str] = None,
origin_ref: Optional[str] = None,
payment_method: Optional[str] = None,
) -> int:
"""
Count total credit history entries for a specific address.
Count total credit history entries for a specific address with optional filters.

Args:
session: Database session
address: Address to count credit history for
tx_hash: Filter by transaction hash
token: Filter by token
chain: Filter by chain
provider: Filter by provider
origin: Filter by origin
origin_ref: Filter by origin reference
payment_method: Filter by payment method

Returns:
Total number of credit history entries for the address
Total number of credit history entries for the address matching the filters
"""
query = select(func.count(AlephCreditHistoryDb.credit_ref)).where(
AlephCreditHistoryDb.address == address
)

# Apply filters
if tx_hash is not None:
query = query.where(AlephCreditHistoryDb.tx_hash == tx_hash)
if token is not None:
query = query.where(AlephCreditHistoryDb.token == token)
if chain is not None:
query = query.where(AlephCreditHistoryDb.chain == chain)
if provider is not None:
query = query.where(AlephCreditHistoryDb.provider == provider)
if origin is not None:
query = query.where(AlephCreditHistoryDb.origin == origin)
if origin_ref is not None:
query = query.where(AlephCreditHistoryDb.origin_ref == origin_ref)
if payment_method is not None:
query = query.where(AlephCreditHistoryDb.payment_method == payment_method)

return session.execute(query).scalar_one()


Expand Down
3 changes: 2 additions & 1 deletion src/aleph/db/models/balances.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class AlephCreditHistoryDb(Base):

address: str = Column(String, nullable=False, index=True)
amount: int = Column(BigInteger, nullable=False)
ratio: Optional[Decimal] = Column(DECIMAL, nullable=True)
price: Optional[Decimal] = Column(DECIMAL, nullable=True)
bonus_amount: Optional[int] = Column(BigInteger, nullable=True)
tx_hash: Optional[str] = Column(String, nullable=True)
token: Optional[str] = Column(String, nullable=True)
chain: Optional[str] = Column(String, nullable=True)
Expand Down
16 changes: 15 additions & 1 deletion src/aleph/schemas/api/accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,27 @@ class GetAccountCreditHistoryQueryParams(BaseModel):
page: int = Field(
default=DEFAULT_PAGE, ge=1, description="Offset in pages. Starts at 1."
)
tx_hash: Optional[str] = Field(
default=None, description="Filter by transaction hash"
)
token: Optional[str] = Field(default=None, description="Filter by token")
chain: Optional[str] = Field(default=None, description="Filter by chain")
provider: Optional[str] = Field(default=None, description="Filter by provider")
origin: Optional[str] = Field(default=None, description="Filter by origin")
origin_ref: Optional[str] = Field(
default=None, description="Filter by origin reference"
)
payment_method: Optional[str] = Field(
default=None, description="Filter by payment method"
)


class CreditHistoryResponseItem(BaseModel):
model_config = ConfigDict(from_attributes=True)

amount: int
ratio: Optional[Decimal] = None
price: Optional[Decimal] = None
bonus_amount: Optional[int] = None
tx_hash: Optional[str] = None
token: Optional[str] = None
chain: Optional[str] = None
Expand Down
22 changes: 20 additions & 2 deletions src/aleph/web/controllers/accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,19 +240,37 @@ async def get_account_credit_history(request: web.Request) -> web.Response:
address=address,
page=query_params.page,
pagination=query_params.pagination,
tx_hash=query_params.tx_hash,
token=query_params.token,
chain=query_params.chain,
provider=query_params.provider,
origin=query_params.origin,
origin_ref=query_params.origin_ref,
payment_method=query_params.payment_method,
)

if not credit_history_entries:
raise web.HTTPNotFound(text="No credit history found for this address")

total_entries = count_address_credit_history(session=session, address=address)
total_entries = count_address_credit_history(
session=session,
address=address,
tx_hash=query_params.tx_hash,
token=query_params.token,
chain=query_params.chain,
provider=query_params.provider,
origin=query_params.origin,
origin_ref=query_params.origin_ref,
payment_method=query_params.payment_method,
)

# Convert to response items
history_adapter = TypeAdapter(list[CreditHistoryResponseItem])
credit_history_list = [
{
"amount": entry.amount,
"ratio": entry.ratio,
"price": entry.price,
"bonus_amount": entry.bonus_amount,
"tx_hash": entry.tx_hash,
"token": entry.token,
"chain": entry.chain,
Expand Down
Loading
Loading