From 9f7986f420a70b513cddf1291d20df4176ef806a Mon Sep 17 00:00:00 2001 From: adisa39 Date: Wed, 25 Jun 2025 18:28:29 +0100 Subject: [PATCH 1/6] revert user/seller id reference changes --- src/helpers/gasSaver.ts | 129 +++++++++++++ src/helpers/payment.ts | 53 +++-- src/models/Seller.ts | 5 + src/models/enums/paymentDirection.ts | 4 + src/services/payment.service.ts | 276 ++++++++++++++------------- src/types.ts | 19 +- src/utils/queues/queue.ts | 38 ++++ 7 files changed, 370 insertions(+), 154 deletions(-) create mode 100644 src/helpers/gasSaver.ts create mode 100644 src/models/enums/paymentDirection.ts create mode 100644 src/utils/queues/queue.ts diff --git a/src/helpers/gasSaver.ts b/src/helpers/gasSaver.ts new file mode 100644 index 00000000..09ee4316 --- /dev/null +++ b/src/helpers/gasSaver.ts @@ -0,0 +1,129 @@ +import PaymentCrossReference from '../models/PaymentCrossReference'; +import { U2UPaymentStatus } from '../models/enums/u2uPaymentStatus'; +import Order from '../models/Order'; +import pi from '../config/platformAPIclient'; +import logger from '../config/loggingConfig'; +import Payment from '../models/Payment'; +import { PaymentDirection } from '../models/enums/paymentDirection'; +import Seller from '../models/Seller'; +import { IOrder } from '../types'; +import { createA2UPayment, updatePaymentCrossReference } from '../services/payment.service'; +import path from 'path'; + +const GAS_FEE = 0.01; +const THREE_DAYS_AGO = new Date(Date.now() - 3 * 24 * 60 * 60 * 1000); + +const computeRevenue = ( + xRefId: string, + sellerPiUid: string, + amount: string, + paymentList: { xRef: string[]; sellerPiUid: string; amount: string }[] +): void => { + const existing = paymentList.find(p => p.sellerPiUid === sellerPiUid); + if (existing) { + existing.amount = ( + parseFloat(existing.amount) + parseFloat(amount) + ).toFixed(3); + existing.xRef.push(xRefId); + } else { + paymentList.push({ + xRef: [xRefId], + sellerPiUid, + amount: parseFloat(amount).toFixed(2), + }); + } +}; + +export const gasSaver = async (): Promise<{ xRef: string[]; sellerPiUid: string; amount: string }[]> => { + try { + const paymentList: { xRef: string[]; sellerPiUid: string; amount: string }[] = []; + + // 1️⃣ Fetch all U2ACompleted or A2UFailed refs + const xRefs = await PaymentCrossReference.find({ + a2u_payment_id: null, + $or: [ + { u2u_status: U2UPaymentStatus.U2ACompleted }, + { u2u_status: U2UPaymentStatus.A2UFailed } + ] + }) + .populate<{ order_id: {_id:String, total_amount:string, seller_id: {seller_id:string, gas_saver:boolean}} }>({ + path: 'order_id', + select: 'total_amount seller_id', + populate: { + path: 'seller_id', + model: 'Seller', // ensure this matches your actual Seller model name + select: 'seller_id gas_saver' + } + }) + .lean() + .exec(); + + logger.info(`Fetched ${xRefs.length} payment cross-references for gas saver processing`); + + // 2️⃣ Get latest A2U per seller via aggregation (1 DB query) + const latestA2uPerSeller = await PaymentCrossReference.aggregate([ + { $match: { a2u_payment_id: { $ne: null } } }, + { + $group: { + _id: '$seller_id', + latestDate: { $max: '$a2u_completed_at' } + } + } + ]); + + const sellerLatestMap = new Map(); + for (const entry of latestA2uPerSeller) { + sellerLatestMap.set(entry._id?.toString(), entry.latestDate); + } + + // 2️⃣ Build the batched paymentList + for (const ref of xRefs) { + const order = ref.order_id; + const seller = order?.seller_id + + if (!order || !seller || !seller.seller_id) { + logger.warn(`Missing order/seller for ref ${ref._id}`); + continue; + } + + const orderId = order._id.toString(); + const sellerPiUid = seller.seller_id; + const isGasSaver = seller.gas_saver; + + // query to get latest A2U date for each unique seller_id + const latestA2uDate = sellerLatestMap.get(sellerPiUid) || ref.u2a_completed_at; + const isWithin3Days = latestA2uDate && new Date(latestA2uDate) >= THREE_DAYS_AGO; + + const netAmount = parseFloat(order.total_amount.toString()) - GAS_FEE; + if (netAmount <= 0) { + logger.warn(`Order ${order._id.toString()} net amount is less than or equal to zero; skipping.`); + continue; + } + + // apply gasSaver logic + if (isGasSaver) { + + if (isWithin3Days) { + computeRevenue(ref._id.toString(), sellerPiUid, netAmount.toFixed(4), paymentList); + } else { + logger.info(`Skipping gas saver for seller ${sellerPiUid} due to recent A2U activity.`); + continue; + } + + } else { + paymentList.push({ + xRef: [ref._id.toString()], + sellerPiUid: sellerPiUid, + amount: netAmount.toFixed(4), + }) + } + } + logger.info(`Computed ${paymentList.length} payment batches for processing`); + + return paymentList + + } catch (err: any) { + logger.error('Error in gasSaver:', err); + return []; + } +}; diff --git a/src/helpers/payment.ts b/src/helpers/payment.ts index 541c96bf..01fba28d 100644 --- a/src/helpers/payment.ts +++ b/src/helpers/payment.ts @@ -11,7 +11,9 @@ import { completePayment, createPaymentCrossReference, createA2UPayment, - cancelPayment + cancelPayment, + updatePaymentCrossReference, + getxRefByOrderId } from '../services/payment.service'; import { cancelOrder, @@ -20,7 +22,7 @@ import { } from '../services/order.service'; import { IUser, NewOrder, PaymentDataType, PaymentDTO, PaymentInfo } from '../types'; import logger from '../config/loggingConfig'; -import { onIncompletePaymentFound } from '../controllers/paymentController'; +import { drainQueue } from '../utils/queues/queue'; function buildPaymentData( piPaymentId: string, @@ -139,9 +141,25 @@ export const processIncompletePayment = async (payment: PaymentInfo) => { // If the completed payment was for a buyer checkout, update the associated order if (updatedPayment?.payment_type === PaymentType.BuyerCheckout) { - await updatePaidOrder(updatedPayment._id as string); + const updatedOrder = await updatePaidOrder(updatedPayment._id as string); logger.warn("Old order found and updated"); - } + + // update the payment cross-reference if it exists else create a new one + const xRef = getxRefByOrderId(updatedOrder._id as string); + + if (!xRef) { + logger.warn("No existing payment cross-reference found, creating a new one"); + const xRefData = { + orderId: updatedOrder._id as string, + u2aPaymentId: updatedPayment._id as string, + u2uStatus: U2UPaymentStatus.U2ACompleted, + u2aCompletedAt: new Date(), + a2uPaymentId: null, + sellerId: updatedPayment.user_id.toString(), + } + await createPaymentCrossReference(xRefData); + } + } // Notify the Pi Platform that the payment is complete await platformAPIClient.post(`/v2/payments/${ paymentId }/complete`, { txid }); @@ -232,10 +250,12 @@ export const processPaymentCompletion = async (paymentId: string, txid: string) const u2uRefData = { u2aPaymentId: completedPayment._id as string, u2uStatus: U2UPaymentStatus.U2ACompleted, + orderId: order._id as string, + u2aCompletedAt: new Date(), a2uPaymentId: null, }; - await createPaymentCrossReference(order._id as string, u2uRefData); - logger.info("U2U cross-reference created", u2uRefData); + await createPaymentCrossReference(u2uRefData); + logger.info("U2U cross-reference created", u2uRefData); // Notify Pi Platform of successful completion const completedPiPayment = await platformAPIClient.post(`/v2/payments/${ paymentId }/complete`, { txid }); @@ -246,17 +266,20 @@ export const processPaymentCompletion = async (paymentId: string, txid: string) logger.info("Payment marked completed on Pi blockchain", completedPiPayment.status); - const payentMemo = completedPiPayment.data.memo as string + await drainQueue(); // Start A2U (App-to-User) payment to the seller - await createA2UPayment({ - sellerId: order.seller_id.toString(), - amount: order.total_amount.toString(), - buyerId: order.buyer_id.toString(), - paymentType: PaymentType.BuyerCheckout, - orderId: order._id as string, - memo: payentMemo - }); + // await createA2UPayment({ + // sellerId: order.seller_id.toString(), + // amount: order.total_amount.toString(), + // buyerId: order.buyer_id.toString(), + // paymentType: PaymentType.BuyerCheckout, + // xRefId: xRef._id as string, + // memo: payentMemo + // }); + + // const paymentList = await gasSaver() + // logger.info('Gas saver payment list:', paymentList); } else if (completedPayment?.payment_type === PaymentType.Membership) { // Notify Pi platform for membership payment completion diff --git a/src/models/Seller.ts b/src/models/Seller.ts index fe9f5c96..16495d52 100644 --- a/src/models/Seller.ts +++ b/src/models/Seller.ts @@ -75,6 +75,11 @@ const sellerSchema = new Schema( type: Boolean, default: false, required: false + }, + gas_saver: { + type: Boolean, + default: true, + required: false } }, { timestamps: true } // Adds timestamps to track creation and update times diff --git a/src/models/enums/paymentDirection.ts b/src/models/enums/paymentDirection.ts new file mode 100644 index 00000000..f9e7c637 --- /dev/null +++ b/src/models/enums/paymentDirection.ts @@ -0,0 +1,4 @@ +export enum PaymentDirection { + A2U = 'a2u', + U2A = 'u2a' +} diff --git a/src/services/payment.service.ts b/src/services/payment.service.ts index 34c6ce45..28bbd99e 100644 --- a/src/services/payment.service.ts +++ b/src/services/payment.service.ts @@ -13,6 +13,9 @@ import { PaymentDTO, } from "../types"; import logger from "../config/loggingConfig"; +import { PaymentDirection } from "../models/enums/paymentDirection"; +import { dir } from "console"; +import { PaymentType } from "../models/enums/paymentType"; export const createPayment = async (paymentData: NewPayment): Promise => { try { @@ -59,162 +62,63 @@ export const completePayment = async ( }; export const createPaymentCrossReference = async ( - orderId: string, refData: U2URefDataType ): Promise => { try { const newRef = new PaymentCrossReference({ - order_id: orderId, + order_id: refData.orderId, u2a_payment_id: refData.u2aPaymentId, + u2u_status: refData.u2uStatus, u2a_completed_at: new Date(), a2u_payment_id: null }); return await newRef.save(); } catch (error: any) { - logger.error(`Failed to create Payment xRef for orderID ${orderId}: ${error.message}`); + logger.error(`Failed to create Payment xRef for orderID ${refData.orderId}: ${error.message}`); throw error; } }; +export const getxRefByOrderId = async (orderId: string) => { + try { + const existingXRef = await PaymentCrossReference.findOne({order_id: orderId}).lean().exec() + return existingXRef + } catch (error:any) { + logger.error("Can't find xRef with id: ", orderId) + } +} + export const updatePaymentCrossReference = async ( - orderId: string, + xRefIds: string[], refData: U2URefDataType -): Promise => { +): Promise => { try { - const updatedRef = await PaymentCrossReference.findOneAndUpdate( - { order_id: orderId }, + const result = await PaymentCrossReference.updateMany( + { _id: { $in: xRefIds } }, { - a2u_payment_id: refData.a2uPaymentId, - a2u_completed_at: new Date(), - u2u_status: refData.u2uStatus - }, - { new: true } - ).lean().exec(); + $set: { + u2a_payment_id: refData.u2aPaymentId, + a2u_payment_id: refData.a2uPaymentId, + a2u_completed_at: new Date(), + u2u_status: refData.u2uStatus, + }, + } + ).exec(); - if (!updatedRef) { - logger.error(`Failed to update Payment xRef for orderID ${orderId}`); - throw new Error('Failed to update Payment xRef'); + if (result.modifiedCount === 0) { + logger.warn(`No PaymentCrossReference updated for IDs: ${xRefIds.join(', ')}`); + } else { + logger.info(`Updated ${result.modifiedCount} PaymentCrossReferences.`); } - return updatedRef; + return result.modifiedCount; } catch (error: any) { - logger.error(`Failed to update Payment xRef for orderID ${orderId}: ${error.message}`); + logger.error(`Failed to update PaymentCrossReferences: ${error.message}`); throw error; } }; -export const createA2UPayment = async (a2uPaymentData: A2UPaymentDataType): Promise => { - try { - /* Step 1: Subtract gas fee from original amount to calculate net transfer payment amount */ - const gasFee = 0.01; - const newAmount = parseFloat(a2uPaymentData.amount) - gasFee; - logger.info('Adjusted A2U payment amount: ', { newAmount }); - if (newAmount <= 0) { - logger.error(`Invalid A2U payment amount ${ newAmount }; must be > 0 after gas fee deduction`); - throw new Error('Invalid A2U payment amount'); - } - - /* Step 2: Get seller's Pi UID using the seller's _id field */ - const existingSeller = await Seller.findById(a2uPaymentData.sellerId) - .select('seller_id -_id') // Include seller_id, exclude _id - .exec(); - - if (!existingSeller?.seller_id) { - logger.error(`Failed to find seller with ID ${ a2uPaymentData.sellerId }`); - throw new Error('Failed to find seller; no record found'); - } - - /* Step 3: Create a Pi blockchain payment request using seller's Pi UID */ - const a2uData = { - amount: newAmount, - memo: a2uPaymentData.memo, - metadata: { direction: "A2U", orderId: a2uPaymentData.orderId, sellerId: a2uPaymentData.sellerId, buyerId: a2uPaymentData.buyerId }, - uid: existingSeller?.seller_id as string, - }; - - const paymentId = await pi.createPayment(a2uData); - logger.debug('Payment ID: ', { paymentId }); - if (!paymentId) { - logger.error(`Failed to create A2U Pi payment for UID ${ existingSeller.seller_id }`); - throw new Error('Failed to create A2U Pi payment'); - } - - /* Step 4: Save the new A2U payment in the DB collection */ - const newPayment = await createPayment({ - piPaymentId: paymentId, - userId: a2uPaymentData.buyerId as string, - amount: newAmount.toString(), - memo: a2uPaymentData.memo, - paymentType: a2uPaymentData.paymentType - }); - logger.info('New A2U payment record created'); - if (!newPayment) { - logger.error(`Failed to create A2U payment DB record with Payment ID ${ paymentId }`); - throw new Error('Failed to create A2U payment DB record'); - } - - /* Step 5: Submit the Pi payment to finalize the blockchain transaction */ - const txid = await pi.submitPayment(paymentId); - if (!txid) { - logger.error(`Failed to submit A2U Pi payment with Payment ID ${ paymentId }`); - throw new Error('Failed to submit A2U Pi payment'); - } - logger.info('Transaction ID: ', { txid }); - - /* Step 6: Mark the payment record as completed */ - const updatedPayment = await completePayment(paymentId, txid); - if (!updatedPayment) { - logger.error(`Failed to complete A2U payment DB record with Payment ID ${ paymentId } + Txn ID ${ txid }`); - throw new Error('Failed to complete A2U payment DB record'); - } - logger.info('Marked A2U payment record as completed'); - - /* Step 7: Update the Payment xRef with A2U completion status */ - const u2uRefData = { - u2uStatus: U2UPaymentStatus.A2UCompleted, - a2uPaymentId: updatedPayment?._id as string, - } - const u2uRef = await updatePaymentCrossReference(a2uPaymentData.orderId, u2uRefData); - if (!u2uRef) { - logger.error(`Failed to update Payment xRef with A2U Payment ID ${ updatedPayment?._id }`); - throw new Error('Failed to update Payment xRef with A2U payment data'); - } - - logger.info('updated Payment xRef record', u2uRef); - - /* Step 8: Mark the payment as complete in the Pi blockchain (final confirmation) */ - const completedPiPayment = await pi.completePayment(paymentId, txid); - if (!completedPiPayment) { - logger.error(`Failed to complete A2U Pi payment with Payment ID ${ paymentId } + Txn ID ${ txid }`); - throw new Error('Failed to complete A2U Pi payment transaction'); - } - - logger.info(`A2U payment process completed successfully for Order ID ${ a2uPaymentData.orderId }`); - return updatedPayment; - } catch (error: any) { - if (axios.isAxiosError(error)) { - logger.error(`Axios error during A2U payment: ${error.message}`, { - status: error.response?.status, - data: error.response?.data, - config: error.config, - }); - } else { - logger.error(`Failed to create A2U payment for Order ID ${a2uPaymentData.orderId}:`, { - message: error.message, - stack: error.stack, - }); - } - - // Handle cancellation of the payment if it was created but not completed - const {incomplete_server_payments} = await getIncompleteServerPayments(); - logger.info("found incomplete server payments", incomplete_server_payments); - if (incomplete_server_payments && incomplete_server_payments.length > 0) { - await completeServerPayment(incomplete_server_payments); - } - return null; - } -}; export const getIncompleteServerPayments = async (): Promise => { try { @@ -240,7 +144,7 @@ export const completeServerPayment = async (serverPayments: PaymentDTO[]): Promi for (const payment of serverPayments) { let transaction = payment.transaction || null; const piPaymentId = payment.identifier; - const metadata = payment.metadata as { orderId: string; sellerId: string; buyerId: string }; + const metadata = payment.metadata as { xRefId: string; sellerId: string; buyerId: string }; if (!piPaymentId) { logger.error('Missing Pi payment ID'); @@ -266,13 +170,13 @@ export const completeServerPayment = async (serverPayments: PaymentDTO[]): Promi logger.info(`Marked A2U payment as completed for ${piPaymentId}`); // Update U2U payment cross reference - const u2uRef = await updatePaymentCrossReference(metadata?.orderId, { + const u2uRef = await updatePaymentCrossReference([metadata?.xRefId], { u2uStatus: U2UPaymentStatus.A2UCompleted, a2uPaymentId: updatedPayment._id as string, }); if (!u2uRef) { - throw new Error(`Failed to update payment cross reference for order ${metadata.orderId}`); + throw new Error(`Failed to update payment cross reference for ${metadata.xRefId}`); } logger.info('Updated U2U reference record', u2uRef); @@ -283,15 +187,15 @@ export const completeServerPayment = async (serverPayments: PaymentDTO[]): Promi throw new Error(`Failed to confirm Pi payment on blockchain for ${piPaymentId}`); } - logger.info(`✅ A2U payment process completed for Order ID: ${metadata.orderId}`); + logger.info(`✅ A2U payment process completed for xRef ID: ${metadata.xRefId}`); } catch (error: any) { if (axios.isAxiosError(error)) { - logger.error(`Axios error during A2U payment for order ${metadata.orderId || 'unknown'}: ${error.message}`, { + logger.error(`Axios error during A2U payment for xRef ${metadata.xRefId || 'unknown'}: ${error.message}`, { status: error.response?.status, data: error.response?.data, }); } else { - logger.error(`❌ Error completing server payment for Order ID ${metadata.orderId || 'unknown'}: ${error.message}`); + logger.error(`❌ Error completing server payment for xRef ID ${metadata.xRefId || 'unknown'}: ${error.message}`); } } } @@ -319,7 +223,7 @@ export const cancelPayment = async (piPaymentId: string): Promise => { + try { + const a2uData = { + amount: parseFloat(a2uPaymentData.amount), + memo: a2uPaymentData.memo, + metadata: { direction: "A2U", sellerId: a2uPaymentData.sellerId }, + uid: a2uPaymentData.sellerId as string, + }; + + const paymentId = await pi.createPayment(a2uData); + logger.debug('Payment ID: ', { paymentId }); + if (!paymentId) { + logger.error(`Failed to create A2U Pi payment for UID ${ a2uPaymentData.sellerId }`); + throw new Error('Failed to create A2U Pi payment'); + } + + /* Step 5: Submit the Pi payment to finalize the blockchain transaction */ + const txid = await pi.submitPayment(paymentId); + if (!txid) { + logger.error(`Failed to submit A2U Pi payment with Payment ID ${ paymentId }`); + throw new Error('Failed to submit A2U Pi payment'); + } + logger.info('Transaction ID: ', { txid }); + + // get xRef for each xRefIds + for (const refId of a2uPaymentData.xRefIds) { + + const xRef = await PaymentCrossReference.findById(refId) + .populate<{u2a_payment_id: {memo:string}}>({path:'u2a_payment_id', model: 'Payment', select: 'memo'}) + .lean() + .exec(); + + const completeda2uPayment = await Payment.create({ + pi_payment_id: paymentId, + user_id: a2uPaymentData.sellerId, + amount: a2uPaymentData.amount, + memo: xRef?.u2a_payment_id.memo as string, + txid: txid, + direction: PaymentDirection.A2U, + payment_type: PaymentType.BuyerCheckout, + paid: true, + cancelled: false, + }) + + const xRefData = { + u2uStatus: U2UPaymentStatus.A2UCompleted, + a2uPaymentId: completeda2uPayment?._id as string, + }; + + const updatedRef = await PaymentCrossReference.findByIdAndUpdate( + refId, + { + $set: { + a2u_payment_id: completeda2uPayment?._id as string, + a2u_completed_at: new Date(), + u2u_status: xRefData.u2uStatus, + error_message: '', + } + } + ).lean() + .exec(); + + logger.info('updated Payment xRef record', updatedRef?._id.toString()); + + } + + const completedPiPayment = await pi.completePayment(paymentId, txid); + if (!completedPiPayment) { + logger.error(`Failed to complete A2U Pi payment with Payment ID ${ paymentId } + Txn ID ${ txid }`); + throw new Error('Failed to complete A2U Pi payment transaction'); + } + + logger.info(`A2U payment process completed successfully for xRef ID ${ a2uPaymentData.xRefIds }`); + return paymentId; + + } catch (error: any) { + if (axios.isAxiosError(error)) { + logger.error(`Axios error during A2U payment: ${error.message}`, { + status: error.response?.status, + data: error.response?.data, + config: error.config, + }); + } else { + logger.error(`Failed to create A2U payment for Order ID ${a2uPaymentData.xRefIds}:`, { + message: error.message, + stack: error.stack, + }); + } + + // Handle cancellation of the payment if it was created but not completed + const {incomplete_server_payments} = await getIncompleteServerPayments(); + logger.info("found incomplete server payments", incomplete_server_payments); + if (incomplete_server_payments && incomplete_server_payments.length > 0) { + await completeServerPayment(incomplete_server_payments); + } + return null; + } }; \ No newline at end of file diff --git a/src/types.ts b/src/types.ts index 06f8f177..850210d1 100644 --- a/src/types.ts +++ b/src/types.ts @@ -74,6 +74,7 @@ export interface ISeller extends Document { fulfillment_description?: string; pre_restriction_seller_type?: SellerType | null; isPreRestricted: boolean; + gas_saver: boolean; }; // Combined interface representing a seller with selected user settings @@ -178,6 +179,19 @@ export type OrderPaymentMetadataType = { // ======================== // PAYMENT MODELS // ======================== + + +export interface IA2UJob extends Document { + sellerPiUid: string; + amount: number; + orders: string[]; + memo: string, + status: 'pending' | 'processing' | 'completed' | 'failed'; + attempts: number; + last_error?: string; + createdAt: Date; + updatedAt: Date; +} export interface IPayment extends Document { user_id: Types.ObjectId; amount: Types.Decimal128; @@ -231,15 +245,14 @@ export interface NewPayment { export interface U2URefDataType { u2aPaymentId?: string, u2uStatus: U2UPaymentStatus, + orderId?: string, a2uPaymentId: string | null, }; export interface A2UPaymentDataType { sellerId: string, amount: string, - buyerId: string, - paymentType: PaymentType, - orderId: string, + xRefIds: string[], memo: string }; diff --git a/src/utils/queues/queue.ts b/src/utils/queues/queue.ts new file mode 100644 index 00000000..612d0b7e --- /dev/null +++ b/src/utils/queues/queue.ts @@ -0,0 +1,38 @@ + +import logger from "../../config/loggingConfig"; +import { gasSaver } from "../../helpers/gasSaver"; +import { createA2UPayment } from "../../services/payment.service"; + +function sleep(ms: number) { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +export async function drainQueue(): Promise { + try { + const paymentList = await gasSaver(); + const IS_EMPTY_LIST = paymentList.length === 0; + let delay = 10000; // 3 second delay + + if (IS_EMPTY_LIST) { + logger.info("No payments to process in the queue."); + delay = 60000 * 60 * 60 * 4; // 4 hours delay + } + + for (const payment of paymentList) { + await createA2UPayment({ + sellerId: payment.sellerPiUid, + amount: payment.amount, + xRefIds: payment.xRef, + memo: "A2U payment for checkout", + + }); + + await sleep(delay); + } + + + }catch (error) { + logger.error('Failed to drain queue:', error); + throw error; + } +} \ No newline at end of file From e942963d6d5f813c439447869cf4c2d15ea1bab3 Mon Sep 17 00:00:00 2001 From: adisa39 Date: Fri, 4 Jul 2025 12:13:19 +0100 Subject: [PATCH 2/6] include A2UPaymentQueue collection and customized payment queque logics with cron-job scheduler --- src/controllers/paymentController.ts | 2 +- src/cron/index.ts | 12 +++- src/cron/jobs/a2uJobWorker.ts | 76 ++++++++++++++++++++++++ src/cron/utils/dropTable.ts | 24 ++++++++ src/helpers/gasSaver.ts | 2 +- src/helpers/payment.ts | 6 +- src/index.ts | 5 +- src/models/A2UPaymentQueue.ts | 23 ++++++++ src/models/Payment.ts | 7 +++ src/services/payment.service.ts | 9 +-- src/types.ts | 7 ++- src/utils/queues/a2uQueue.ts | 19 ++++++ src/utils/queues/queue.ts | 88 ++++++++++++++++++++-------- 13 files changed, 241 insertions(+), 39 deletions(-) create mode 100644 src/cron/jobs/a2uJobWorker.ts create mode 100644 src/cron/utils/dropTable.ts create mode 100644 src/models/A2UPaymentQueue.ts create mode 100644 src/utils/queues/a2uQueue.ts diff --git a/src/controllers/paymentController.ts b/src/controllers/paymentController.ts index 82cdff35..e9441ce7 100644 --- a/src/controllers/paymentController.ts +++ b/src/controllers/paymentController.ts @@ -32,7 +32,7 @@ export const onPaymentApproval = async (req: Request, res: Response) => { const approvedPayment = await processPaymentApproval(paymentId, currentUser); return res.status(200).json(approvedPayment); } catch (error) { - logger.error(`Failed to approve Pi payment for paymentID ${ paymentId }:`, error); + logger.error(`Failed to approve Pi payment for piPaymentID ${ paymentId }:`, error); return res.status(500).json({ success: false, message: 'An error occurred while approving Pi payment; please try again later' diff --git a/src/cron/index.ts b/src/cron/index.ts index f65f8073..9b119f5c 100644 --- a/src/cron/index.ts +++ b/src/cron/index.ts @@ -1,6 +1,7 @@ import schedule from "node-schedule"; import { runSanctionBot } from "./jobs/sanctionBot.job"; import logger from "../config/loggingConfig"; +import processNextJob from "./jobs/a2uJobWorker"; export const scheduleCronJobs = () => { logger.info("Initializing scheduled cron jobs..."); @@ -8,12 +9,17 @@ export const scheduleCronJobs = () => { // Run the Sanction Bot job daily at 22:00 UTC const sanctionJobTime = '0 0 22 * * *'; - schedule.scheduleJob(sanctionJobTime, async () => { + // Run drain payment queue every 5 min + const a2uPaymentJobTime = '0 */5 * * * *'; // Every 5 minutes + + schedule.scheduleJob(a2uPaymentJobTime, async () => { logger.info('🕒 Sanction Bot job triggered (22:00 UTC).'); try { - await runSanctionBot(); - logger.info("✅ Sanction Bot job completed successfully."); + // await runSanctionBot(); + await processNextJob(); + logger.info("✅ A2U payment Bot job completed successfully."); + // logger.info("✅ Sanction Bot job completed successfully."); } catch (error) { logger.error("❌ Sanction Bot job failed:", error); } diff --git a/src/cron/jobs/a2uJobWorker.ts b/src/cron/jobs/a2uJobWorker.ts new file mode 100644 index 00000000..4618f379 --- /dev/null +++ b/src/cron/jobs/a2uJobWorker.ts @@ -0,0 +1,76 @@ +import logger from "../../config/loggingConfig"; +import A2UPaymentQueue from "../../models/A2UPaymentQueue"; +import { createA2UPayment } from "../../services/payment.service"; + +// workers/mongodbA2UWorker.ts +async function processNextJob(): Promise { + const now = new Date(); + const MAXATTEMPT = 3 + + const threeDaysAgo = new Date(); + threeDaysAgo.setDate(threeDaysAgo.getDate() - 3); + + const job = await A2UPaymentQueue.findOneAndUpdate( + { + $or: [ + { status: 'pending' }, + { status: 'failed' }, + { + status: 'batching', + last_a2u_date: { $lte: threeDaysAgo } + } + ], + attempts: { $lt: 3 } + }, + { + status: 'processing', + $inc: { attempts: 1 }, + updatedAt: new Date(), + }, + { + sort: { updatedAt: 1 }, + new: true, + } + ); + + + if (!job) return; + + const { sellerPiUid, amount, xRef_ids, _id, attempts, memo, last_a2u_date } = job; + + try { + logger.info(`[→] Attempt ${attempts}/${MAXATTEMPT} for ${sellerPiUid}`); + + await createA2UPayment({ + sellerPiUid: sellerPiUid, + amount: amount.toString(), + memo: "A2U payment", + xRefIds: xRef_ids + }) + + await A2UPaymentQueue.findByIdAndUpdate(_id, { + status: 'completed', + updatedAt: new Date(), + last_a2u_date: new Date(), + last_error: null, + }); + + console.log(`[✔] A2U payment completed for ${sellerPiUid}`); + } catch (err: any) { + + const willRetry = attempts < MAXATTEMPT; + + await A2UPaymentQueue.findByIdAndUpdate(_id, { + status: willRetry ? 'pending' : 'failed', + last_error: err.message, + updatedAt: new Date(), + }); + + logger.error(`[✘] A2U payment failed for ${sellerPiUid}: ${err.message}`); + if (!willRetry) { + logger.info(`[⚠️] Job permanently failed after ${attempts} attempts.`); + } + } +} + +export default processNextJob; diff --git a/src/cron/utils/dropTable.ts b/src/cron/utils/dropTable.ts new file mode 100644 index 00000000..95c89c13 --- /dev/null +++ b/src/cron/utils/dropTable.ts @@ -0,0 +1,24 @@ +import mongoose from 'mongoose'; +import logger from '../../config/loggingConfig'; + +async function dropCollection(collectionName:string) { + try { + + if (!mongoose.connection.db) { + throw new Error('Database connection not established.'); + } + + const collections = await mongoose.connection.db.listCollections({ name: collectionName }).toArray(); + + if (collections.length > 0) { + await mongoose.connection.db.dropCollection(collectionName); + logger.info(`${collectionName} collection dropped.`); + } else { + logger.info('Collection does not exist.'); + } + } catch (err) { + console.error('Error dropping collection:', err); + } +} + +export default dropCollection; diff --git a/src/helpers/gasSaver.ts b/src/helpers/gasSaver.ts index 09ee4316..083892ba 100644 --- a/src/helpers/gasSaver.ts +++ b/src/helpers/gasSaver.ts @@ -48,7 +48,7 @@ export const gasSaver = async (): Promise<{ xRef: string[]; sellerPiUid: string; }) .populate<{ order_id: {_id:String, total_amount:string, seller_id: {seller_id:string, gas_saver:boolean}} }>({ path: 'order_id', - select: 'total_amount seller_id', + select: '_id total_amount seller_id', populate: { path: 'seller_id', model: 'Seller', // ensure this matches your actual Seller model name diff --git a/src/helpers/payment.ts b/src/helpers/payment.ts index 01fba28d..bb86d178 100644 --- a/src/helpers/payment.ts +++ b/src/helpers/payment.ts @@ -22,7 +22,7 @@ import { } from '../services/order.service'; import { IUser, NewOrder, PaymentDataType, PaymentDTO, PaymentInfo } from '../types'; import logger from '../config/loggingConfig'; -import { drainQueue } from '../utils/queues/queue'; +import { enqueuePayment } from '../utils/queues/queue'; function buildPaymentData( piPaymentId: string, @@ -254,7 +254,7 @@ export const processPaymentCompletion = async (paymentId: string, txid: string) u2aCompletedAt: new Date(), a2uPaymentId: null, }; - await createPaymentCrossReference(u2uRefData); + const xRef = await createPaymentCrossReference(u2uRefData); logger.info("U2U cross-reference created", u2uRefData); // Notify Pi Platform of successful completion @@ -266,7 +266,7 @@ export const processPaymentCompletion = async (paymentId: string, txid: string) logger.info("Payment marked completed on Pi blockchain", completedPiPayment.status); - await drainQueue(); + await enqueuePayment(xRef?._id.toString(), order?.seller_id.toString(), order.total_amount.toString(), completedPayment.memo); // Start A2U (App-to-User) payment to the seller // await createA2UPayment({ diff --git a/src/index.ts b/src/index.ts index 31004262..ff3078cc 100644 --- a/src/index.ts +++ b/src/index.ts @@ -6,6 +6,7 @@ import { connectDB } from "./config/dbConnection"; import app from "./utils/app"; import { env } from "./utils/env"; import logger from "./config/loggingConfig"; +// import dropCollection from "./cron/utils/dropTable"; dotenv.config(); @@ -26,7 +27,7 @@ const startServer = async () => { }); } - + // dropCollection("payments"); logger.info("Server setup initiated."); } catch (error) { @@ -37,6 +38,6 @@ const startServer = async () => { // Start the server setup process startServer(); // TODO: Remove cron job; Start the scheduled cron job(s) -// scheduleCronJobs(); +scheduleCronJobs(); export default app; \ No newline at end of file diff --git a/src/models/A2UPaymentQueue.ts b/src/models/A2UPaymentQueue.ts new file mode 100644 index 00000000..833f4ba7 --- /dev/null +++ b/src/models/A2UPaymentQueue.ts @@ -0,0 +1,23 @@ +import mongoose, { Schema } from "mongoose"; +import { IA2UJob } from "../types"; + +const A2UPaymentQueueSchema = new Schema( + { + sellerPiUid: { type: String, required: true }, + amount: { type: Number, required: true }, + xRef_ids: [{ type: String, required: true }], + memo: { type: String, require: true }, + status: { + type: String, + enum: ['pending', 'processing', 'completed', 'failed', 'batching'], + default: 'pending', + }, + last_a2u_date: { type: Date, default: null }, + attempts: { type: Number, default: 0 }, + last_error: { type: String, default: null } + }, + { timestamps: true } +); + +const A2UPaymentQueue = mongoose.model('A2UPaymentQueue', A2UPaymentQueueSchema); +export default A2UPaymentQueue; \ No newline at end of file diff --git a/src/models/Payment.ts b/src/models/Payment.ts index 4d98c102..603c5a96 100644 --- a/src/models/Payment.ts +++ b/src/models/Payment.ts @@ -2,6 +2,7 @@ import mongoose, { Schema, SchemaTypes, Types } from "mongoose"; import { IPayment } from "../types"; import { PaymentType } from "./enums/paymentType"; +import { PaymentDirection } from "./enums/paymentDirection"; const paymentSchema = new Schema( { @@ -40,6 +41,12 @@ const paymentSchema = new Schema( required: false, default: "" }, + direction: { + type: String, + enum: Object.values(PaymentDirection).filter(value => typeof value === 'string'), + required: false, + default: PaymentDirection.U2A + }, payment_type: { type: String, enum: Object.values(PaymentType).filter(value => typeof value === 'string'), diff --git a/src/services/payment.service.ts b/src/services/payment.service.ts index 28bbd99e..56c70263 100644 --- a/src/services/payment.service.ts +++ b/src/services/payment.service.ts @@ -243,14 +243,14 @@ export const createA2UPayment = async (a2uPaymentData: A2UPaymentDataType): Prom const a2uData = { amount: parseFloat(a2uPaymentData.amount), memo: a2uPaymentData.memo, - metadata: { direction: "A2U", sellerId: a2uPaymentData.sellerId }, - uid: a2uPaymentData.sellerId as string, + metadata: { direction: "A2U", sellerId: a2uPaymentData.sellerPiUid }, + uid: a2uPaymentData.sellerPiUid as string, }; const paymentId = await pi.createPayment(a2uData); logger.debug('Payment ID: ', { paymentId }); if (!paymentId) { - logger.error(`Failed to create A2U Pi payment for UID ${ a2uPaymentData.sellerId }`); + logger.error(`Failed to create A2U Pi payment for UID ${ a2uPaymentData.sellerPiUid }`); throw new Error('Failed to create A2U Pi payment'); } @@ -267,12 +267,13 @@ export const createA2UPayment = async (a2uPaymentData: A2UPaymentDataType): Prom const xRef = await PaymentCrossReference.findById(refId) .populate<{u2a_payment_id: {memo:string}}>({path:'u2a_payment_id', model: 'Payment', select: 'memo'}) + .populate<{order_id: {seller_id:string}}>({path:'order_id', model: 'Order', select: 'seller_id'}) .lean() .exec(); const completeda2uPayment = await Payment.create({ pi_payment_id: paymentId, - user_id: a2uPaymentData.sellerId, + user_id: xRef?.order_id.seller_id, amount: a2uPaymentData.amount, memo: xRef?.u2a_payment_id.memo as string, txid: txid, diff --git a/src/types.ts b/src/types.ts index 850210d1..1dff5fe9 100644 --- a/src/types.ts +++ b/src/types.ts @@ -10,6 +10,7 @@ import { OrderItemStatusType } from "./models/enums/orderItemStatusType"; import { PaymentType } from "./models/enums/paymentType"; import { U2UPaymentStatus } from "./models/enums/u2uPaymentStatus"; import { RestrictedArea } from "./models/enums/restrictedArea"; +import { PaymentDirection } from "./models/enums/paymentDirection"; // ======================== // USER MODELS @@ -184,9 +185,10 @@ export type OrderPaymentMetadataType = { export interface IA2UJob extends Document { sellerPiUid: string; amount: number; - orders: string[]; + xRef_ids: string[]; memo: string, status: 'pending' | 'processing' | 'completed' | 'failed'; + last_a2u_date: Date, attempts: number; last_error?: string; createdAt: Date; @@ -199,6 +201,7 @@ export interface IPayment extends Document { memo: string; pi_payment_id: string; txid?: string; + direction: PaymentDirection; payment_type: PaymentType; cancelled: boolean; createdAt: Date; @@ -250,7 +253,7 @@ export interface U2URefDataType { }; export interface A2UPaymentDataType { - sellerId: string, + sellerPiUid: string, amount: string, xRefIds: string[], memo: string diff --git a/src/utils/queues/a2uQueue.ts b/src/utils/queues/a2uQueue.ts new file mode 100644 index 00000000..08fe78fa --- /dev/null +++ b/src/utils/queues/a2uQueue.ts @@ -0,0 +1,19 @@ +// // queues/a2uQueue.ts +// import { Queue, Worker, QueueScheduler, Job } from 'bullmq'; +// import IORedis from 'ioredis'; + +// const connection = new IORedis(); // configure host/port/password as needed + +// // Scheduler handles stalled jobs, retries, etc. +// new QueueScheduler('a2u-payments', { connection }); + +// // This is the queue you will add jobs to: +// export const a2uQueue = new Queue('a2u-payments', { +// connection, +// defaultJobOptions: { +// removeOnComplete: true, +// removeOnFail: false, +// attempts: 3, // retry up to 3 times +// backoff: { type: 'exponential', delay: 5000 }, +// }, +// }); diff --git a/src/utils/queues/queue.ts b/src/utils/queues/queue.ts index 612d0b7e..d7e97499 100644 --- a/src/utils/queues/queue.ts +++ b/src/utils/queues/queue.ts @@ -1,38 +1,80 @@ import logger from "../../config/loggingConfig"; -import { gasSaver } from "../../helpers/gasSaver"; -import { createA2UPayment } from "../../services/payment.service"; +import A2UPaymentQueue from "../../models/A2UPaymentQueue"; +import Seller from "../../models/Seller"; -function sleep(ms: number) { - return new Promise(resolve => setTimeout(resolve, ms)); -} + const GAS_FEE = 0.01; -export async function drainQueue(): Promise { +const batchSellerRevenue = async ( + xRefId: string, + sellerPiUid: string, + amount: string, +): Promise => { try { - const paymentList = await gasSaver(); - const IS_EMPTY_LIST = paymentList.length === 0; - let delay = 10000; // 3 second delay + const onQueuePayment = await A2UPaymentQueue.findOne({ sellerPiUid, status:"batching", last_a2u_date: null }).exec(); + if (!onQueuePayment) { + const newAmount = parseFloat(amount) - GAS_FEE; + await A2UPaymentQueue.create({ + xRef_ids: [xRefId], + sellerPiUid, + amount: newAmount.toFixed(4), + status: "batching", + memo: "Map of Pi Payment for Order", + }); + logger.info("new payment added to queue for seller with ID: ", sellerPiUid) + return; + } - if (IS_EMPTY_LIST) { - logger.info("No payments to process in the queue."); - delay = 60000 * 60 * 60 * 4; // 4 hours delay + const updatedQueue = await A2UPaymentQueue.findOneAndUpdate( + { sellerPiUid }, + { + $inc: { amount: parseFloat(amount) }, + $push: { xRef_ids: xRefId }, + }, + { new: true } + ).exec(); + + if (!updatedQueue) { + logger.error(`Failed to update payment queue for seller: ${sellerPiUid}`); + throw new Error(`Failed to update payment queue for seller: ${sellerPiUid}`); } - for (const payment of paymentList) { - await createA2UPayment({ - sellerId: payment.sellerPiUid, - amount: payment.amount, - xRefIds: payment.xRef, - memo: "A2U payment for checkout", + logger.info(`Updated payment queue for seller: ${sellerPiUid}, new amount: ${updatedQueue.amount}`); + return - }); + } catch (error:any) { + logger.error("failed to enque payment") + } + +} - await sleep(delay); +export const enqueuePayment = async ( + xRefId: string, + sellerId: string, + amount: string, + memo:string +) => { + try { + // check if seller gas saver is on + const seller = await Seller.findById( sellerId ).lean().exec(); + + // check and compute seller revenue for gas saver + if (seller?.gas_saver) { + batchSellerRevenue(xRefId, seller.seller_id, amount); + return } + const newAmount = parseFloat(amount) - GAS_FEE; + await A2UPaymentQueue.create({ + xRef_ids: [xRefId], + sellerPiUid: seller?.seller_id, + amount: newAmount.toFixed(4), + status: "pending", + memo: memo, + }); + logger.info("new payment added to queue for seller with ID: ", {sellerId}) + return; + }catch(error:any){ - }catch (error) { - logger.error('Failed to drain queue:', error); - throw error; } } \ No newline at end of file From d6daa8a8a55261a8762da18ef4a13663e4a4c3f3 Mon Sep 17 00:00:00 2001 From: adisa39 Date: Sat, 5 Jul 2025 16:59:56 +0100 Subject: [PATCH 3/6] modify seller registration logics to include gas saver field and added script to populate existing sellers with default gas saver --- src/models/Seller.ts | 2 +- src/services/seller.service.ts | 3 ++- .../migrationScripts/applyGasSaverDefault.ts | 23 +++++++++++++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) create mode 100644 src/utils/migrationScripts/applyGasSaverDefault.ts diff --git a/src/models/Seller.ts b/src/models/Seller.ts index 16495d52..c2ae2ed2 100644 --- a/src/models/Seller.ts +++ b/src/models/Seller.ts @@ -79,7 +79,7 @@ const sellerSchema = new Schema( gas_saver: { type: Boolean, default: true, - required: false + required: true } }, { timestamps: true } // Adds timestamps to track creation and update times diff --git a/src/services/seller.service.ts b/src/services/seller.service.ts index d5244dae..0edc2660 100644 --- a/src/services/seller.service.ts +++ b/src/services/seller.service.ts @@ -214,7 +214,8 @@ export const registerOrUpdateSeller = async (authUser: IUser, formData: any): Pr sell_map_center: sellMapCenter, order_online_enabled_pref: formData.order_online_enabled_pref || existingSeller?.order_online_enabled_pref || false, fulfillment_method: formData.fulfillment_method || existingSeller?.fulfillment_method || FulfillmentType.CollectionByBuyer, - fulfillment_description: formData.fulfillment_description || existingSeller?.fulfillment_description || '' + fulfillment_description: formData.fulfillment_description || existingSeller?.fulfillment_description || '', + gas_saver: formData.gas_saver || true }; // Update existing seller or create a new one diff --git a/src/utils/migrationScripts/applyGasSaverDefault.ts b/src/utils/migrationScripts/applyGasSaverDefault.ts new file mode 100644 index 00000000..55c0c2fb --- /dev/null +++ b/src/utils/migrationScripts/applyGasSaverDefault.ts @@ -0,0 +1,23 @@ +import mongoose from 'mongoose'; +import Seller from '../../models/Seller'; +import dotenv from 'dotenv'; +import logger from '../../config/loggingConfig'; + +dotenv.config(); // if using .env for DB URI + + +// RUN SCRIPT ONLY ONCE FOR DB MIGRATION +async function applyGasSaverDefault() { + await mongoose.connect(process.env.MONGODB_URL || 'your_mongo_uri_here'); + + const result = await Seller.updateMany( + { gas_saver: { $exists: false } }, + { $set: { gas_saver: true } } + ); + + logger.info(`Updated ${result.modifiedCount} seller(s) with default gas_saver=true`); + + await mongoose.disconnect(); +} + +applyGasSaverDefault().catch(console.error); From 9750702d8182894064cd505d71a6d15a297f764a Mon Sep 17 00:00:00 2001 From: adisa39 Date: Sat, 5 Jul 2025 17:30:13 +0100 Subject: [PATCH 4/6] clean up this branch --- src/cron/index.ts | 12 +-- src/cron/jobs/sanctionBot.job.ts | 116 +++++++++++++-------------- src/cron/utils/dropTable.ts | 24 ------ src/helpers/gasSaver.ts | 129 ------------------------------- src/helpers/payment.ts | 41 ++++------ src/index.ts | 3 - src/utils/queues/a2uQueue.ts | 19 ----- src/utils/queues/queue.ts | 2 +- 8 files changed, 80 insertions(+), 266 deletions(-) delete mode 100644 src/cron/utils/dropTable.ts delete mode 100644 src/helpers/gasSaver.ts delete mode 100644 src/utils/queues/a2uQueue.ts diff --git a/src/cron/index.ts b/src/cron/index.ts index 9b119f5c..e42ae9c8 100644 --- a/src/cron/index.ts +++ b/src/cron/index.ts @@ -1,5 +1,5 @@ import schedule from "node-schedule"; -import { runSanctionBot } from "./jobs/sanctionBot.job"; +// import { runSanctionBot } from "./jobs/sanctionBot.job"; import logger from "../config/loggingConfig"; import processNextJob from "./jobs/a2uJobWorker"; @@ -7,21 +7,21 @@ export const scheduleCronJobs = () => { logger.info("Initializing scheduled cron jobs..."); // Run the Sanction Bot job daily at 22:00 UTC - const sanctionJobTime = '0 0 22 * * *'; + // const sanctionJobTime = '0 0 22 * * *'; - // Run drain payment queue every 5 min + // Drain A2U payment queue every 5 min const a2uPaymentJobTime = '0 */5 * * * *'; // Every 5 minutes schedule.scheduleJob(a2uPaymentJobTime, async () => { - logger.info('🕒 Sanction Bot job triggered (22:00 UTC).'); + logger.info('🕒 A2U payment worker job triggered at 5min.'); try { // await runSanctionBot(); await processNextJob(); - logger.info("✅ A2U payment Bot job completed successfully."); + logger.info("✅ A2U payment worker job completed successfully."); // logger.info("✅ Sanction Bot job completed successfully."); } catch (error) { - logger.error("❌ Sanction Bot job failed:", error); + logger.error("❌ A2U payment worker job failed:", error); } }); diff --git a/src/cron/jobs/sanctionBot.job.ts b/src/cron/jobs/sanctionBot.job.ts index 6ad96a14..c62177f0 100644 --- a/src/cron/jobs/sanctionBot.job.ts +++ b/src/cron/jobs/sanctionBot.job.ts @@ -1,67 +1,67 @@ -import Seller from "../../models/Seller"; -import { SanctionedSellerStatus } from "../../types"; -import { getAllSanctionedRegions } from "../../services/admin/report.service"; -import { - createBulkPreRestrictionOperation, - createGeoQueries -} from "../utils/geoUtils"; -import { - getSellersToEvaluate, - processSellersGeocoding, - processSanctionedSellers, - processUnsanctionedSellers -} from "../utils/sanctionUtils"; -import logger from "../../config/loggingConfig"; +// import Seller from "../../models/Seller"; +// import { SanctionedSellerStatus } from "../../types"; +// import { getAllSanctionedRegions } from "../../services/admin/report.service"; +// import { +// createBulkPreRestrictionOperation, +// createGeoQueries +// } from "../utils/geoUtils"; +// import { +// getSellersToEvaluate, +// processSellersGeocoding, +// processSanctionedSellers, +// processUnsanctionedSellers +// } from "../utils/sanctionUtils"; +// import logger from "../../config/loggingConfig"; -export async function runSanctionBot(): Promise { - logger.info('Sanction Bot cron job started.'); +// export async function runSanctionBot(): Promise { +// logger.info('Sanction Bot cron job started.'); - try { - /* Step 1: Reset the 'isPreRestricted' field to 'false' for all sellers - This clears any pre-existing restrictions before applying new ones. */ - await Seller.updateMany({}, { isPreRestricted: false }).exec(); - logger.info('Reset [isPreRestricted] for all sellers.'); +// try { +// /* Step 1: Reset the 'isPreRestricted' field to 'false' for all sellers +// This clears any pre-existing restrictions before applying new ones. */ +// await Seller.updateMany({}, { isPreRestricted: false }).exec(); +// logger.info('Reset [isPreRestricted] for all sellers.'); - /* Step 2: Get the list of all sanctioned regions */ - const sanctionedRegions = await getAllSanctionedRegions(); - // If no sanctioned regions are found, log the info and exit the job - if (!sanctionedRegions.length) { - logger.info('No sanctioned regions found. Exiting job.'); - return; - } +// /* Step 2: Get the list of all sanctioned regions */ +// const sanctionedRegions = await getAllSanctionedRegions(); +// // If no sanctioned regions are found, log the info and exit the job +// if (!sanctionedRegions.length) { +// logger.info('No sanctioned regions found. Exiting job.'); +// return; +// } - /* Step 3: Create geo-based queries and identify sellers to evaluate */ - const geoQueries = createGeoQueries(sanctionedRegions); - const sellersToEvaluate = await getSellersToEvaluate(geoQueries); - logger.info(`Evaluating ${sellersToEvaluate.length} sellers flagged or currently Restricted.`); +// /* Step 3: Create geo-based queries and identify sellers to evaluate */ +// const geoQueries = createGeoQueries(sanctionedRegions); +// const sellersToEvaluate = await getSellersToEvaluate(geoQueries); +// logger.info(`Evaluating ${sellersToEvaluate.length} sellers flagged or currently Restricted.`); - /* Step 4: Create the bulk update operations to mark sellers as pre-restricted */ - const bulkPreRestrictionOps = createBulkPreRestrictionOperation(sellersToEvaluate); - if (bulkPreRestrictionOps.length > 0) { - await Seller.bulkWrite(bulkPreRestrictionOps); - logger.info(`Marked ${bulkPreRestrictionOps.length} sellers as Pre-Restricted`) - } +// /* Step 4: Create the bulk update operations to mark sellers as pre-restricted */ +// const bulkPreRestrictionOps = createBulkPreRestrictionOperation(sellersToEvaluate); +// if (bulkPreRestrictionOps.length > 0) { +// await Seller.bulkWrite(bulkPreRestrictionOps); +// logger.info(`Marked ${bulkPreRestrictionOps.length} sellers as Pre-Restricted`) +// } - /* Step 5: Retrieve all sellers who are marked as pre-restricted */ - const preRestrictedSellers = await Seller.find({isPreRestricted: true}).exec(); - logger.info(`${preRestrictedSellers.length} sellers are Pre-Restricted`); +// /* Step 5: Retrieve all sellers who are marked as pre-restricted */ +// const preRestrictedSellers = await Seller.find({isPreRestricted: true}).exec(); +// logger.info(`${preRestrictedSellers.length} sellers are Pre-Restricted`); - /* Step 6: Process geocoding validation */ - const results: SanctionedSellerStatus[] = await processSellersGeocoding( - preRestrictedSellers, - sanctionedRegions - ); - const inZone = results.filter(r => r.isSanctionedRegion); - const outOfZone = results.filter(r => !r.isSanctionedRegion); +// /* Step 6: Process geocoding validation */ +// const results: SanctionedSellerStatus[] = await processSellersGeocoding( +// preRestrictedSellers, +// sanctionedRegions +// ); +// const inZone = results.filter(r => r.isSanctionedRegion); +// const outOfZone = results.filter(r => !r.isSanctionedRegion); - /* Step 7: Apply restrictions of in-zone sellers or restoration of out-zone sellers */ - await processSanctionedSellers(inZone); - await processUnsanctionedSellers(outOfZone); +// /* Step 7: Apply restrictions of in-zone sellers or restoration of out-zone sellers */ +// await processSanctionedSellers(inZone); +// await processUnsanctionedSellers(outOfZone); - /* Step 8: Clean up temp pre-restriction flags */ - await Seller.updateMany({isPreRestricted: true}, {isPreRestricted: false}).exec(); - logger.info('SanctionBot job completed.'); - } catch (error) { - logger.error('Error in Sanction Bot cron job:', error); - } -} \ No newline at end of file +// /* Step 8: Clean up temp pre-restriction flags */ +// await Seller.updateMany({isPreRestricted: true}, {isPreRestricted: false}).exec(); +// logger.info('SanctionBot job completed.'); +// } catch (error) { +// logger.error('Error in Sanction Bot cron job:', error); +// } +// } \ No newline at end of file diff --git a/src/cron/utils/dropTable.ts b/src/cron/utils/dropTable.ts deleted file mode 100644 index 95c89c13..00000000 --- a/src/cron/utils/dropTable.ts +++ /dev/null @@ -1,24 +0,0 @@ -import mongoose from 'mongoose'; -import logger from '../../config/loggingConfig'; - -async function dropCollection(collectionName:string) { - try { - - if (!mongoose.connection.db) { - throw new Error('Database connection not established.'); - } - - const collections = await mongoose.connection.db.listCollections({ name: collectionName }).toArray(); - - if (collections.length > 0) { - await mongoose.connection.db.dropCollection(collectionName); - logger.info(`${collectionName} collection dropped.`); - } else { - logger.info('Collection does not exist.'); - } - } catch (err) { - console.error('Error dropping collection:', err); - } -} - -export default dropCollection; diff --git a/src/helpers/gasSaver.ts b/src/helpers/gasSaver.ts deleted file mode 100644 index 083892ba..00000000 --- a/src/helpers/gasSaver.ts +++ /dev/null @@ -1,129 +0,0 @@ -import PaymentCrossReference from '../models/PaymentCrossReference'; -import { U2UPaymentStatus } from '../models/enums/u2uPaymentStatus'; -import Order from '../models/Order'; -import pi from '../config/platformAPIclient'; -import logger from '../config/loggingConfig'; -import Payment from '../models/Payment'; -import { PaymentDirection } from '../models/enums/paymentDirection'; -import Seller from '../models/Seller'; -import { IOrder } from '../types'; -import { createA2UPayment, updatePaymentCrossReference } from '../services/payment.service'; -import path from 'path'; - -const GAS_FEE = 0.01; -const THREE_DAYS_AGO = new Date(Date.now() - 3 * 24 * 60 * 60 * 1000); - -const computeRevenue = ( - xRefId: string, - sellerPiUid: string, - amount: string, - paymentList: { xRef: string[]; sellerPiUid: string; amount: string }[] -): void => { - const existing = paymentList.find(p => p.sellerPiUid === sellerPiUid); - if (existing) { - existing.amount = ( - parseFloat(existing.amount) + parseFloat(amount) - ).toFixed(3); - existing.xRef.push(xRefId); - } else { - paymentList.push({ - xRef: [xRefId], - sellerPiUid, - amount: parseFloat(amount).toFixed(2), - }); - } -}; - -export const gasSaver = async (): Promise<{ xRef: string[]; sellerPiUid: string; amount: string }[]> => { - try { - const paymentList: { xRef: string[]; sellerPiUid: string; amount: string }[] = []; - - // 1️⃣ Fetch all U2ACompleted or A2UFailed refs - const xRefs = await PaymentCrossReference.find({ - a2u_payment_id: null, - $or: [ - { u2u_status: U2UPaymentStatus.U2ACompleted }, - { u2u_status: U2UPaymentStatus.A2UFailed } - ] - }) - .populate<{ order_id: {_id:String, total_amount:string, seller_id: {seller_id:string, gas_saver:boolean}} }>({ - path: 'order_id', - select: '_id total_amount seller_id', - populate: { - path: 'seller_id', - model: 'Seller', // ensure this matches your actual Seller model name - select: 'seller_id gas_saver' - } - }) - .lean() - .exec(); - - logger.info(`Fetched ${xRefs.length} payment cross-references for gas saver processing`); - - // 2️⃣ Get latest A2U per seller via aggregation (1 DB query) - const latestA2uPerSeller = await PaymentCrossReference.aggregate([ - { $match: { a2u_payment_id: { $ne: null } } }, - { - $group: { - _id: '$seller_id', - latestDate: { $max: '$a2u_completed_at' } - } - } - ]); - - const sellerLatestMap = new Map(); - for (const entry of latestA2uPerSeller) { - sellerLatestMap.set(entry._id?.toString(), entry.latestDate); - } - - // 2️⃣ Build the batched paymentList - for (const ref of xRefs) { - const order = ref.order_id; - const seller = order?.seller_id - - if (!order || !seller || !seller.seller_id) { - logger.warn(`Missing order/seller for ref ${ref._id}`); - continue; - } - - const orderId = order._id.toString(); - const sellerPiUid = seller.seller_id; - const isGasSaver = seller.gas_saver; - - // query to get latest A2U date for each unique seller_id - const latestA2uDate = sellerLatestMap.get(sellerPiUid) || ref.u2a_completed_at; - const isWithin3Days = latestA2uDate && new Date(latestA2uDate) >= THREE_DAYS_AGO; - - const netAmount = parseFloat(order.total_amount.toString()) - GAS_FEE; - if (netAmount <= 0) { - logger.warn(`Order ${order._id.toString()} net amount is less than or equal to zero; skipping.`); - continue; - } - - // apply gasSaver logic - if (isGasSaver) { - - if (isWithin3Days) { - computeRevenue(ref._id.toString(), sellerPiUid, netAmount.toFixed(4), paymentList); - } else { - logger.info(`Skipping gas saver for seller ${sellerPiUid} due to recent A2U activity.`); - continue; - } - - } else { - paymentList.push({ - xRef: [ref._id.toString()], - sellerPiUid: sellerPiUid, - amount: netAmount.toFixed(4), - }) - } - } - logger.info(`Computed ${paymentList.length} payment batches for processing`); - - return paymentList - - } catch (err: any) { - logger.error('Error in gasSaver:', err); - return []; - } -}; diff --git a/src/helpers/payment.ts b/src/helpers/payment.ts index 19cda304..18b02b84 100644 --- a/src/helpers/payment.ts +++ b/src/helpers/payment.ts @@ -15,9 +15,7 @@ import { createPayment, completePayment, createPaymentCrossReference, - createA2UPayment, cancelPayment, - updatePaymentCrossReference, getxRefByOrderId } from '../services/payment.service'; import { IUser, NewOrder, PaymentDataType, PaymentDTO, PaymentInfo } from '../types'; @@ -149,14 +147,17 @@ export const processIncompletePayment = async (payment: PaymentInfo) => { if (!xRef) { logger.warn("No existing payment cross-reference found, creating a new one"); const xRefData = { - orderId: updatedOrder._id as string, - u2aPaymentId: updatedPayment._id as string, - u2uStatus: U2UPaymentStatus.U2ACompleted, - u2aCompletedAt: new Date(), - a2uPaymentId: null, - sellerId: updatedPayment.user_id.toString(), - } - await createPaymentCrossReference(xRefData); + orderId: updatedOrder._id as string, + u2aPaymentId: updatedPayment._id as string, + u2uStatus: U2UPaymentStatus.U2ACompleted, + u2aCompletedAt: new Date(), + a2uPaymentId: null, + sellerId: updatedPayment.user_id.toString(), + } + const newXref = await createPaymentCrossReference(xRefData); + + // Enqueue the payment for further processing (e.g., A2U payment) + await enqueuePayment(newXref?._id.toString(), updatedOrder?.seller_id.toString(), updatedOrder.total_amount.toString(), updatedPayment.memo); } } @@ -254,7 +255,10 @@ export const processPaymentCompletion = async (paymentId: string, txid: string) a2uPaymentId: null, }; const xRef = await createPaymentCrossReference(u2uRefData); - logger.info("U2U cross-reference created", u2uRefData); + logger.info("U2U cross-reference created", u2uRefData); + + // Enqueue the payment for further processing (e.g., A2U payment) + await enqueuePayment(xRef?._id.toString(), order?.seller_id.toString(), order.total_amount.toString(), completedPayment.memo); // Notify Pi Platform of successful completion const completedPiPayment = await platformAPIClient.post(`/v2/payments/${ paymentId }/complete`, { txid }); @@ -265,21 +269,6 @@ export const processPaymentCompletion = async (paymentId: string, txid: string) logger.info("Payment marked completed on Pi blockchain", completedPiPayment.status); - await enqueuePayment(xRef?._id.toString(), order?.seller_id.toString(), order.total_amount.toString(), completedPayment.memo); - - // Start A2U (App-to-User) payment to the seller - // await createA2UPayment({ - // sellerId: order.seller_id.toString(), - // amount: order.total_amount.toString(), - // buyerId: order.buyer_id.toString(), - // paymentType: PaymentType.BuyerCheckout, - // xRefId: xRef._id as string, - // memo: payentMemo - // }); - - // const paymentList = await gasSaver() - // logger.info('Gas saver payment list:', paymentList); - } else if (completedPayment?.payment_type === PaymentType.Membership) { // Notify Pi platform for membership payment completion await platformAPIClient.post(`/v2/payments/${ paymentId }/complete`, { txid }); diff --git a/src/index.ts b/src/index.ts index ff3078cc..70069f7f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -6,7 +6,6 @@ import { connectDB } from "./config/dbConnection"; import app from "./utils/app"; import { env } from "./utils/env"; import logger from "./config/loggingConfig"; -// import dropCollection from "./cron/utils/dropTable"; dotenv.config(); @@ -27,8 +26,6 @@ const startServer = async () => { }); } - // dropCollection("payments"); - logger.info("Server setup initiated."); } catch (error) { logger.error('Server failed to initialize:', error); diff --git a/src/utils/queues/a2uQueue.ts b/src/utils/queues/a2uQueue.ts deleted file mode 100644 index 08fe78fa..00000000 --- a/src/utils/queues/a2uQueue.ts +++ /dev/null @@ -1,19 +0,0 @@ -// // queues/a2uQueue.ts -// import { Queue, Worker, QueueScheduler, Job } from 'bullmq'; -// import IORedis from 'ioredis'; - -// const connection = new IORedis(); // configure host/port/password as needed - -// // Scheduler handles stalled jobs, retries, etc. -// new QueueScheduler('a2u-payments', { connection }); - -// // This is the queue you will add jobs to: -// export const a2uQueue = new Queue('a2u-payments', { -// connection, -// defaultJobOptions: { -// removeOnComplete: true, -// removeOnFail: false, -// attempts: 3, // retry up to 3 times -// backoff: { type: 'exponential', delay: 5000 }, -// }, -// }); diff --git a/src/utils/queues/queue.ts b/src/utils/queues/queue.ts index d7e97499..2d2ad8ac 100644 --- a/src/utils/queues/queue.ts +++ b/src/utils/queues/queue.ts @@ -3,7 +3,7 @@ import logger from "../../config/loggingConfig"; import A2UPaymentQueue from "../../models/A2UPaymentQueue"; import Seller from "../../models/Seller"; - const GAS_FEE = 0.01; +const GAS_FEE = 0.01; const batchSellerRevenue = async ( xRefId: string, From 9fad5e216f31fce02d49972c95d6935021f6a4cb Mon Sep 17 00:00:00 2001 From: adisa39 Date: Sat, 5 Jul 2025 17:54:36 +0100 Subject: [PATCH 5/6] include error handling in payment helper logics --- src/helpers/payment.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/helpers/payment.ts b/src/helpers/payment.ts index 18b02b84..75c59c60 100644 --- a/src/helpers/payment.ts +++ b/src/helpers/payment.ts @@ -244,6 +244,11 @@ export const processPaymentCompletion = async (paymentId: string, txid: string) if (completedPayment?.payment_type === PaymentType.BuyerCheckout) { // Update the associated order's status to paid const order = await updatePaidOrder(completedPayment._id as string); + + if (!order) { + logger.error("Failed to update order to paid status"); + throw new Error("Failed to update order to paid status"); + } logger.info("Order record updated to paid"); // Save cross-reference for U2U payment tracking @@ -256,6 +261,11 @@ export const processPaymentCompletion = async (paymentId: string, txid: string) }; const xRef = await createPaymentCrossReference(u2uRefData); logger.info("U2U cross-reference created", u2uRefData); + + if (!xRef) { + logger.error("Failed to create U2U cross-reference"); + throw new Error("Failed to create U2U cross-reference"); + } // Enqueue the payment for further processing (e.g., A2U payment) await enqueuePayment(xRef?._id.toString(), order?.seller_id.toString(), order.total_amount.toString(), completedPayment.memo); From 7ba859707fdff55a2a296c38198f1da735d0cc94 Mon Sep 17 00:00:00 2001 From: adisa39 Date: Sat, 5 Jul 2025 18:08:37 +0100 Subject: [PATCH 6/6] fix payment batching bug for seller with gas aver turned on --- src/utils/queues/queue.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/queues/queue.ts b/src/utils/queues/queue.ts index 2d2ad8ac..b25d6394 100644 --- a/src/utils/queues/queue.ts +++ b/src/utils/queues/queue.ts @@ -26,7 +26,7 @@ const batchSellerRevenue = async ( } const updatedQueue = await A2UPaymentQueue.findOneAndUpdate( - { sellerPiUid }, + { sellerPiUid, status:"batching", last_a2u_date: null }, { $inc: { amount: parseFloat(amount) }, $push: { xRef_ids: xRefId },