Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions user_profile/management/commands/sync_sede_members.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def handle(self, *args, **options):
f"{summary.get('update_only_ignored', 0)} non-auth ignored, "
f"{summary.get('soft_removed_skipped', 0)} soft-removed skipped, "
f"{summary.get('soft_removed_reactivated', 0)} soft-removed reactivated, "
f"{summary.get('detail_truth_updated', 0)} detail-truth updated, "
f"{summary['conflicts']} conflicts, "
f"{summary['errors']} errors, "
f"{summary['active_members']} active members, "
Expand Down
229 changes: 182 additions & 47 deletions user_profile/services/sede_mercadopago.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
logger = logging.getLogger(__name__)

ACTIVE_SUBSCRIPTION_STATUSES = {'authorized'}
UPDATE_ONLY_SUBSCRIPTION_STATUSES = {'paused', 'cancelled'}
LAST_NAME_SIMILARITY_THRESHOLD = 0.85
FIRST_NAME_SIMILARITY_THRESHOLD = 0.90
SUBSCRIPTION_PAYMENTS_LOOKBACK_DAYS = 375
Expand Down Expand Up @@ -83,6 +82,48 @@ def _is_active_subscription_status(status):
return str(status or '').strip().lower() in ACTIVE_SUBSCRIPTION_STATUSES


def _sync_duplicate_alias_rows(subscription_id, defaults, match_method=''):
if not subscription_id:
return 0
alias_rows = list(
SedeSubscription.objects.filter(subscription_id__startswith=f'{subscription_id}__dup_')
)
if not alias_rows:
return 0

now = timezone.now()
for alias in alias_rows:
for field, value in defaults.items():
setattr(alias, field, value)
# Keep dup aliases in lockstep with canonical subscription state.
alias.is_soft_removed = False
alias.soft_removed_at = None
if match_method:
alias.matched_via = match_method
alias.synced_at = now
alias.updated_at = now
SedeSubscription.objects.bulk_update(
alias_rows,
fields=[
'plan_id',
'tier_name',
'status',
'payment_method',
'last_payment_date',
'last_payment_amount',
'next_payment_date',
'member_since',
'is_active',
'is_soft_removed',
'soft_removed_at',
'matched_via',
'synced_at',
'updated_at',
],
)
return len(alias_rows)


def _normalize_frequency_type(value):
mapping = {
'days': 'dia',
Expand Down Expand Up @@ -916,6 +957,25 @@ def _extract_subscription_details(subscription_summary, subscription_detail, pay
}


def _build_subscription_defaults(details, match_method=''):
subscription_id = details.get('subscription_id') or ''
return {
'plan_id': details.get('plan_id') or '',
'tier_name': details.get('tier_name') or '',
'status': details.get('status') or '',
'payment_method': details.get('payment_method') or '',
'last_payment_date': details.get('last_payment_date'),
'last_payment_amount': details.get('last_payment_amount'),
'next_payment_date': details.get('next_payment_date'),
'member_since': details.get('member_since'),
'is_active': (
_is_active_subscription_status(details.get('status'))
or _is_forced_active_subscription(subscription_id)
),
'matched_via': match_method or '',
}


def match_subscription_to_user(
sdk,
subscription_summary,
Expand Down Expand Up @@ -985,33 +1045,18 @@ def match_subscription_to_user(
def apply_subscription_to_profile(profile, details, match_method=''):
subscription_id = details.get('subscription_id') or ''
if not subscription_id:
return
return 0

is_active = (
_is_active_subscription_status(details.get('status'))
or _is_forced_active_subscription(subscription_id)
)
defaults = {
'plan_id': details.get('plan_id') or '',
'tier_name': details.get('tier_name') or '',
'status': details.get('status') or '',
'payment_method': details.get('payment_method') or '',
'last_payment_date': details.get('last_payment_date'),
'last_payment_amount': details.get('last_payment_amount'),
'next_payment_date': details.get('next_payment_date'),
'member_since': details.get('member_since'),
'is_active': is_active,
'matched_via': match_method or '',
'synced_at': timezone.now(),
}
defaults = _build_subscription_defaults(details, match_method=match_method)
defaults['synced_at'] = timezone.now()

existing = SedeSubscription.objects.filter(subscription_id=subscription_id).select_related('profile').first()
if existing:
if existing.is_soft_removed:
incoming_last_payment_date = defaults.get('last_payment_date')
# Soft-removed subscriptions stay excluded unless a new charge appears.
if not incoming_last_payment_date or incoming_last_payment_date == existing.last_payment_date:
return
return 0
existing.is_soft_removed = False
existing.soft_removed_at = None
# Do not replace an existing subscription/user match on periodic sync.
Expand All @@ -1022,13 +1067,14 @@ def apply_subscription_to_profile(profile, details, match_method=''):
if existing.is_soft_removed is False:
update_fields.extend(['is_soft_removed', 'soft_removed_at'])
existing.save(update_fields=update_fields)
return
return _sync_duplicate_alias_rows(subscription_id, defaults, match_method=match_method)

SedeSubscription.objects.create(
subscription_id=subscription_id,
profile=profile,
**defaults,
)
return _sync_duplicate_alias_rows(subscription_id, defaults, match_method=match_method)


def _format_unmatched_message(details):
Expand Down Expand Up @@ -1102,6 +1148,93 @@ def _deactivate_stale_members(active_subscription_ids, log):
return stale_count


def _with_duplicate_alias_ids(subscription_ids):
base_ids = [str(sub_id or '').strip() for sub_id in subscription_ids if sub_id]
if not base_ids:
return set()

expanded = set(base_ids)
for base_id in base_ids:
alias_ids = SedeSubscription.objects.filter(
subscription_id__startswith=f'{base_id}__dup_'
).values_list('subscription_id', flat=True)
expanded.update(alias_ids)
return expanded


def _reconcile_local_subscriptions_with_remote_detail_truth(sdk, log=None):
"""
Final guardrail: local known subscriptions must mirror /preapproval/{id} status.
This prevents stale /search status from leaving active members as inactive.
"""
log = log or logger
canonical_subscriptions = list(
SedeSubscription.objects.filter(is_soft_removed=False)
.exclude(subscription_id__contains='__dup_')
.select_related('profile')
)
if not canonical_subscriptions:
return {'updated': 0, 'active_ids': [], 'errors': 0}

summaries = [{'id': sub.subscription_id} for sub in canonical_subscriptions if sub.subscription_id]
payloads = _prefetch_subscription_remote_map(summaries, log=log, include_payments=False)
now = timezone.now()
updates = []
active_ids = []
errors = 0

for existing_subscription in canonical_subscriptions:
sub_id = existing_subscription.subscription_id
payload = payloads.get(sub_id, {})
if payload.get('error'):
errors += 1
log.warning(' Detail-truth reconciliation failed for %s: %s', sub_id, payload.get('error'))
continue

details = _extract_subscription_details(
{'id': sub_id},
payload.get('detail') or {'id': sub_id},
)
defaults = _build_subscription_defaults(
details,
match_method=existing_subscription.matched_via or 'subscription_id',
)
for field, value in defaults.items():
setattr(existing_subscription, field, value)
existing_subscription.synced_at = now
existing_subscription.updated_at = now
updates.append(existing_subscription)
_sync_duplicate_alias_rows(sub_id, defaults, match_method=defaults['matched_via'])
if existing_subscription.is_active:
active_ids.append(sub_id)

if updates:
SedeSubscription.objects.bulk_update(
updates,
fields=[
'plan_id',
'tier_name',
'status',
'payment_method',
'last_payment_date',
'last_payment_amount',
'next_payment_date',
'member_since',
'is_active',
'matched_via',
'synced_at',
'updated_at',
],
)
if updates or errors:
log.info(
'Detail-truth reconciliation updated %d subscription(s), errors=%d',
len(updates),
errors,
)
return {'updated': len(updates), 'active_ids': active_ids, 'errors': errors}


def _process_subscription(
sdk,
subscription_summary,
Expand Down Expand Up @@ -1426,8 +1559,11 @@ def run_full_sync(log=None):
authorized_subscriptions = [
sub for sub in subscriptions if (sub.get('status') or '').lower() == 'authorized'
]
# Some subscriptions come as non-authorized in /search while /preapproval/{id}
# already reports authorized. Refresh all non-authorized summary rows so known
# local matches are reconciled with detail truth before deactivation.
update_only_subscriptions = [
sub for sub in subscriptions if (sub.get('status') or '').lower() in UPDATE_ONLY_SUBSCRIPTION_STATUSES
sub for sub in subscriptions if (sub.get('status') or '').lower() not in ACTIVE_SUBSCRIPTION_STATUSES
]
total = len(subscriptions)
fetch_duration = time.perf_counter() - fetch_started
Expand Down Expand Up @@ -1490,22 +1626,16 @@ def run_full_sync(log=None):
subscription_summary,
payload.get('detail') or subscription_summary,
)
existing_subscription.plan_id = details.get('plan_id') or ''
existing_subscription.tier_name = details.get('tier_name') or ''
existing_subscription.status = details.get('status') or ''
existing_subscription.payment_method = details.get('payment_method') or ''
existing_subscription.last_payment_date = details.get('last_payment_date')
existing_subscription.last_payment_amount = details.get('last_payment_amount')
existing_subscription.next_payment_date = details.get('next_payment_date')
existing_subscription.member_since = details.get('member_since')
existing_subscription.is_active = (
_is_active_subscription_status(details.get('status'))
or _is_forced_active_subscription(sub_id)
defaults = _build_subscription_defaults(
details,
match_method=existing_subscription.matched_via or 'subscription_id',
)
existing_subscription.matched_via = existing_subscription.matched_via or 'subscription_id'
for field, value in defaults.items():
setattr(existing_subscription, field, value)
existing_subscription.synced_at = now
existing_subscription.updated_at = now
matched_updates.append(existing_subscription)
_sync_duplicate_alias_rows(sub_id, defaults, match_method=defaults['matched_via'])
matched_count += 1
_clear_unmatched_subscription(sub_id)
if existing_subscription.is_active:
Expand Down Expand Up @@ -1677,21 +1807,16 @@ def run_full_sync(log=None):
subscription_summary,
payload.get('detail') or subscription_summary,
)
existing_subscription.plan_id = details.get('plan_id') or ''
existing_subscription.tier_name = details.get('tier_name') or ''
existing_subscription.status = details.get('status') or ''
existing_subscription.payment_method = details.get('payment_method') or ''
existing_subscription.last_payment_date = details.get('last_payment_date')
existing_subscription.last_payment_amount = details.get('last_payment_amount')
existing_subscription.next_payment_date = details.get('next_payment_date')
existing_subscription.member_since = details.get('member_since')
existing_subscription.is_active = (
_is_active_subscription_status(details.get('status'))
or _is_forced_active_subscription(sub_id)
defaults = _build_subscription_defaults(
details,
match_method=existing_subscription.matched_via or 'subscription_id',
)
for field, value in defaults.items():
setattr(existing_subscription, field, value)
existing_subscription.synced_at = now
existing_subscription.updated_at = now
matched_non_auth_updates.append(existing_subscription)
_sync_duplicate_alias_rows(sub_id, defaults, match_method=defaults['matched_via'])
update_only_updated += 1
if existing_subscription.is_active:
active_subscription_ids.append(sub_id)
Expand Down Expand Up @@ -1755,9 +1880,16 @@ def run_full_sync(log=None):
update_only_ignored += len(update_new_unknown)
non_auth_duration = time.perf_counter() - non_auth_started

truth_started = time.perf_counter()
detail_truth_summary = _reconcile_local_subscriptions_with_remote_detail_truth(sdk, log=log)
active_subscription_ids = detail_truth_summary.get('active_ids', active_subscription_ids)
error_count += detail_truth_summary.get('errors', 0)
truth_duration = time.perf_counter() - truth_started

deactivate_started = time.perf_counter()
log.info('Deactivating stale members...')
deactivated = _deactivate_stale_members(active_subscription_ids, log)
active_subscription_ids_for_deactivation = list(_with_duplicate_alias_ids(active_subscription_ids))
deactivated = _deactivate_stale_members(active_subscription_ids_for_deactivation, log)
deactivate_duration = time.perf_counter() - deactivate_started
total_duration = time.perf_counter() - sync_started

Expand All @@ -1776,11 +1908,13 @@ def run_full_sync(log=None):
'update_only_ignored': update_only_ignored,
'soft_removed_skipped': soft_removed_skipped,
'soft_removed_reactivated': soft_removed_reactivated,
'detail_truth_updated': detail_truth_summary.get('updated', 0),
'duration_seconds': round(total_duration, 2),
'timings': {
'fetch_seconds': round(fetch_duration, 2),
'authorized_seconds': round(authorized_duration, 2),
'non_auth_seconds': round(non_auth_duration, 2),
'detail_truth_seconds': round(truth_duration, 2),
'deactivate_seconds': round(deactivate_duration, 2),
},
}
Expand All @@ -1795,11 +1929,12 @@ def run_full_sync(log=None):
log.info('Active members: %d', summary['active_members'])
log.info('Deactivated: %d', summary['deactivated'])
log.info(
'Timing: total=%ss fetch=%ss authorized=%ss non_auth=%ss deactivate=%ss',
'Timing: total=%ss fetch=%ss authorized=%ss non_auth=%ss truth=%ss deactivate=%ss',
summary['duration_seconds'],
summary['timings']['fetch_seconds'],
summary['timings']['authorized_seconds'],
summary['timings']['non_auth_seconds'],
summary['timings']['detail_truth_seconds'],
summary['timings']['deactivate_seconds'],
)
log.info('=' * 80)
Expand Down
Loading