diff --git a/src/controllers/paymentController.ts b/src/controllers/paymentController.ts index 51e7f326..e05c25dd 100644 --- a/src/controllers/paymentController.ts +++ b/src/controllers/paymentController.ts @@ -30,7 +30,7 @@ export const onPaymentApproval = async (req: Request, res: Response) => { const approvedPayment = await processPaymentApproval(paymentId, currentUser); return res.status(200).json(approvedPayment); } catch (error) { - logger.error(`Failed to approve Pi payment for paymentID ${ paymentId }:`, error); + logger.error(`Failed to approve Pi payment for piPaymentID ${ paymentId }:`, error); return res.status(500).json({ success: false, message: 'An error occurred while approving Pi payment; please try again later' diff --git a/src/cron/index.ts b/src/cron/index.ts index f65f8073..e42ae9c8 100644 --- a/src/cron/index.ts +++ b/src/cron/index.ts @@ -1,21 +1,27 @@ import schedule from "node-schedule"; -import { runSanctionBot } from "./jobs/sanctionBot.job"; +// import { runSanctionBot } from "./jobs/sanctionBot.job"; import logger from "../config/loggingConfig"; +import processNextJob from "./jobs/a2uJobWorker"; export const scheduleCronJobs = () => { logger.info("Initializing scheduled cron jobs..."); // Run the Sanction Bot job daily at 22:00 UTC - const sanctionJobTime = '0 0 22 * * *'; + // const sanctionJobTime = '0 0 22 * * *'; - schedule.scheduleJob(sanctionJobTime, async () => { - logger.info('🕒 Sanction Bot job triggered (22:00 UTC).'); + // Drain A2U payment queue every 5 min + const a2uPaymentJobTime = '0 */5 * * * *'; // Every 5 minutes + + schedule.scheduleJob(a2uPaymentJobTime, async () => { + logger.info('🕒 A2U payment worker job triggered at 5min.'); try { - await runSanctionBot(); - logger.info("✅ Sanction Bot job completed successfully."); + // await runSanctionBot(); + await processNextJob(); + logger.info("✅ A2U payment worker job completed successfully."); + // logger.info("✅ Sanction Bot job completed successfully."); } catch (error) { - logger.error("❌ Sanction Bot job failed:", error); + logger.error("❌ A2U payment worker job failed:", error); } }); diff --git a/src/cron/jobs/a2uJobWorker.ts b/src/cron/jobs/a2uJobWorker.ts new file mode 100644 index 00000000..4618f379 --- /dev/null +++ b/src/cron/jobs/a2uJobWorker.ts @@ -0,0 +1,76 @@ +import logger from "../../config/loggingConfig"; +import A2UPaymentQueue from "../../models/A2UPaymentQueue"; +import { createA2UPayment } from "../../services/payment.service"; + +// workers/mongodbA2UWorker.ts +async function processNextJob(): Promise { + const now = new Date(); + const MAXATTEMPT = 3 + + const threeDaysAgo = new Date(); + threeDaysAgo.setDate(threeDaysAgo.getDate() - 3); + + const job = await A2UPaymentQueue.findOneAndUpdate( + { + $or: [ + { status: 'pending' }, + { status: 'failed' }, + { + status: 'batching', + last_a2u_date: { $lte: threeDaysAgo } + } + ], + attempts: { $lt: 3 } + }, + { + status: 'processing', + $inc: { attempts: 1 }, + updatedAt: new Date(), + }, + { + sort: { updatedAt: 1 }, + new: true, + } + ); + + + if (!job) return; + + const { sellerPiUid, amount, xRef_ids, _id, attempts, memo, last_a2u_date } = job; + + try { + logger.info(`[→] Attempt ${attempts}/${MAXATTEMPT} for ${sellerPiUid}`); + + await createA2UPayment({ + sellerPiUid: sellerPiUid, + amount: amount.toString(), + memo: "A2U payment", + xRefIds: xRef_ids + }) + + await A2UPaymentQueue.findByIdAndUpdate(_id, { + status: 'completed', + updatedAt: new Date(), + last_a2u_date: new Date(), + last_error: null, + }); + + console.log(`[✔] A2U payment completed for ${sellerPiUid}`); + } catch (err: any) { + + const willRetry = attempts < MAXATTEMPT; + + await A2UPaymentQueue.findByIdAndUpdate(_id, { + status: willRetry ? 'pending' : 'failed', + last_error: err.message, + updatedAt: new Date(), + }); + + logger.error(`[✘] A2U payment failed for ${sellerPiUid}: ${err.message}`); + if (!willRetry) { + logger.info(`[⚠️] Job permanently failed after ${attempts} attempts.`); + } + } +} + +export default processNextJob; diff --git a/src/cron/jobs/sanctionBot.job.ts b/src/cron/jobs/sanctionBot.job.ts index 6ad96a14..c62177f0 100644 --- a/src/cron/jobs/sanctionBot.job.ts +++ b/src/cron/jobs/sanctionBot.job.ts @@ -1,67 +1,67 @@ -import Seller from "../../models/Seller"; -import { SanctionedSellerStatus } from "../../types"; -import { getAllSanctionedRegions } from "../../services/admin/report.service"; -import { - createBulkPreRestrictionOperation, - createGeoQueries -} from "../utils/geoUtils"; -import { - getSellersToEvaluate, - processSellersGeocoding, - processSanctionedSellers, - processUnsanctionedSellers -} from "../utils/sanctionUtils"; -import logger from "../../config/loggingConfig"; +// import Seller from "../../models/Seller"; +// import { SanctionedSellerStatus } from "../../types"; +// import { getAllSanctionedRegions } from "../../services/admin/report.service"; +// import { +// createBulkPreRestrictionOperation, +// createGeoQueries +// } from "../utils/geoUtils"; +// import { +// getSellersToEvaluate, +// processSellersGeocoding, +// processSanctionedSellers, +// processUnsanctionedSellers +// } from "../utils/sanctionUtils"; +// import logger from "../../config/loggingConfig"; -export async function runSanctionBot(): Promise { - logger.info('Sanction Bot cron job started.'); +// export async function runSanctionBot(): Promise { +// logger.info('Sanction Bot cron job started.'); - try { - /* Step 1: Reset the 'isPreRestricted' field to 'false' for all sellers - This clears any pre-existing restrictions before applying new ones. */ - await Seller.updateMany({}, { isPreRestricted: false }).exec(); - logger.info('Reset [isPreRestricted] for all sellers.'); +// try { +// /* Step 1: Reset the 'isPreRestricted' field to 'false' for all sellers +// This clears any pre-existing restrictions before applying new ones. */ +// await Seller.updateMany({}, { isPreRestricted: false }).exec(); +// logger.info('Reset [isPreRestricted] for all sellers.'); - /* Step 2: Get the list of all sanctioned regions */ - const sanctionedRegions = await getAllSanctionedRegions(); - // If no sanctioned regions are found, log the info and exit the job - if (!sanctionedRegions.length) { - logger.info('No sanctioned regions found. Exiting job.'); - return; - } +// /* Step 2: Get the list of all sanctioned regions */ +// const sanctionedRegions = await getAllSanctionedRegions(); +// // If no sanctioned regions are found, log the info and exit the job +// if (!sanctionedRegions.length) { +// logger.info('No sanctioned regions found. Exiting job.'); +// return; +// } - /* Step 3: Create geo-based queries and identify sellers to evaluate */ - const geoQueries = createGeoQueries(sanctionedRegions); - const sellersToEvaluate = await getSellersToEvaluate(geoQueries); - logger.info(`Evaluating ${sellersToEvaluate.length} sellers flagged or currently Restricted.`); +// /* Step 3: Create geo-based queries and identify sellers to evaluate */ +// const geoQueries = createGeoQueries(sanctionedRegions); +// const sellersToEvaluate = await getSellersToEvaluate(geoQueries); +// logger.info(`Evaluating ${sellersToEvaluate.length} sellers flagged or currently Restricted.`); - /* Step 4: Create the bulk update operations to mark sellers as pre-restricted */ - const bulkPreRestrictionOps = createBulkPreRestrictionOperation(sellersToEvaluate); - if (bulkPreRestrictionOps.length > 0) { - await Seller.bulkWrite(bulkPreRestrictionOps); - logger.info(`Marked ${bulkPreRestrictionOps.length} sellers as Pre-Restricted`) - } +// /* Step 4: Create the bulk update operations to mark sellers as pre-restricted */ +// const bulkPreRestrictionOps = createBulkPreRestrictionOperation(sellersToEvaluate); +// if (bulkPreRestrictionOps.length > 0) { +// await Seller.bulkWrite(bulkPreRestrictionOps); +// logger.info(`Marked ${bulkPreRestrictionOps.length} sellers as Pre-Restricted`) +// } - /* Step 5: Retrieve all sellers who are marked as pre-restricted */ - const preRestrictedSellers = await Seller.find({isPreRestricted: true}).exec(); - logger.info(`${preRestrictedSellers.length} sellers are Pre-Restricted`); +// /* Step 5: Retrieve all sellers who are marked as pre-restricted */ +// const preRestrictedSellers = await Seller.find({isPreRestricted: true}).exec(); +// logger.info(`${preRestrictedSellers.length} sellers are Pre-Restricted`); - /* Step 6: Process geocoding validation */ - const results: SanctionedSellerStatus[] = await processSellersGeocoding( - preRestrictedSellers, - sanctionedRegions - ); - const inZone = results.filter(r => r.isSanctionedRegion); - const outOfZone = results.filter(r => !r.isSanctionedRegion); +// /* Step 6: Process geocoding validation */ +// const results: SanctionedSellerStatus[] = await processSellersGeocoding( +// preRestrictedSellers, +// sanctionedRegions +// ); +// const inZone = results.filter(r => r.isSanctionedRegion); +// const outOfZone = results.filter(r => !r.isSanctionedRegion); - /* Step 7: Apply restrictions of in-zone sellers or restoration of out-zone sellers */ - await processSanctionedSellers(inZone); - await processUnsanctionedSellers(outOfZone); +// /* Step 7: Apply restrictions of in-zone sellers or restoration of out-zone sellers */ +// await processSanctionedSellers(inZone); +// await processUnsanctionedSellers(outOfZone); - /* Step 8: Clean up temp pre-restriction flags */ - await Seller.updateMany({isPreRestricted: true}, {isPreRestricted: false}).exec(); - logger.info('SanctionBot job completed.'); - } catch (error) { - logger.error('Error in Sanction Bot cron job:', error); - } -} \ No newline at end of file +// /* Step 8: Clean up temp pre-restriction flags */ +// await Seller.updateMany({isPreRestricted: true}, {isPreRestricted: false}).exec(); +// logger.info('SanctionBot job completed.'); +// } catch (error) { +// logger.error('Error in Sanction Bot cron job:', error); +// } +// } \ No newline at end of file diff --git a/src/helpers/payment.ts b/src/helpers/payment.ts index a0f32863..75c59c60 100644 --- a/src/helpers/payment.ts +++ b/src/helpers/payment.ts @@ -15,11 +15,12 @@ import { createPayment, completePayment, createPaymentCrossReference, - createA2UPayment, - cancelPayment + cancelPayment, + getxRefByOrderId } from '../services/payment.service'; import { IUser, NewOrder, PaymentDataType, PaymentDTO, PaymentInfo } from '../types'; import logger from '../config/loggingConfig'; +import { enqueuePayment } from '../utils/queues/queue'; function buildPaymentData( piPaymentId: string, @@ -137,9 +138,28 @@ export const processIncompletePayment = async (payment: PaymentInfo) => { // If the completed payment was for a buyer checkout, update the associated order if (updatedPayment?.payment_type === PaymentType.BuyerCheckout) { - await updatePaidOrder(updatedPayment._id as string); + const updatedOrder = await updatePaidOrder(updatedPayment._id as string); logger.warn("Old order found and updated"); - } + + // update the payment cross-reference if it exists else create a new one + const xRef = getxRefByOrderId(updatedOrder._id as string); + + if (!xRef) { + logger.warn("No existing payment cross-reference found, creating a new one"); + const xRefData = { + orderId: updatedOrder._id as string, + u2aPaymentId: updatedPayment._id as string, + u2uStatus: U2UPaymentStatus.U2ACompleted, + u2aCompletedAt: new Date(), + a2uPaymentId: null, + sellerId: updatedPayment.user_id.toString(), + } + const newXref = await createPaymentCrossReference(xRefData); + + // Enqueue the payment for further processing (e.g., A2U payment) + await enqueuePayment(newXref?._id.toString(), updatedOrder?.seller_id.toString(), updatedOrder.total_amount.toString(), updatedPayment.memo); + } + } // Notify the Pi Platform that the payment is complete await platformAPIClient.post(`/v2/payments/${ paymentId }/complete`, { txid }); @@ -224,16 +244,31 @@ export const processPaymentCompletion = async (paymentId: string, txid: string) if (completedPayment?.payment_type === PaymentType.BuyerCheckout) { // Update the associated order's status to paid const order = await updatePaidOrder(completedPayment._id as string); + + if (!order) { + logger.error("Failed to update order to paid status"); + throw new Error("Failed to update order to paid status"); + } logger.info("Order record updated to paid"); // Save cross-reference for U2U payment tracking const u2uRefData = { u2aPaymentId: completedPayment._id as string, u2uStatus: U2UPaymentStatus.U2ACompleted, + orderId: order._id as string, + u2aCompletedAt: new Date(), a2uPaymentId: null, }; - await createPaymentCrossReference(order._id as string, u2uRefData); - logger.info("U2U cross-reference created", u2uRefData); + const xRef = await createPaymentCrossReference(u2uRefData); + logger.info("U2U cross-reference created", u2uRefData); + + if (!xRef) { + logger.error("Failed to create U2U cross-reference"); + throw new Error("Failed to create U2U cross-reference"); + } + + // Enqueue the payment for further processing (e.g., A2U payment) + await enqueuePayment(xRef?._id.toString(), order?.seller_id.toString(), order.total_amount.toString(), completedPayment.memo); // Notify Pi Platform of successful completion const completedPiPayment = await platformAPIClient.post(`/v2/payments/${ paymentId }/complete`, { txid }); @@ -244,18 +279,6 @@ export const processPaymentCompletion = async (paymentId: string, txid: string) logger.info("Payment marked completed on Pi blockchain", completedPiPayment.status); - const paymentMemo = completedPiPayment.data.memo as string; - - // Start A2U (App-to-User) payment to the seller - await createA2UPayment({ - sellerId: order.seller_id.toString(), - amount: order.total_amount.toString(), - buyerId: order.buyer_id.toString(), - paymentType: PaymentType.BuyerCheckout, - orderId: order._id as string, - memo: paymentMemo - }); - } else if (completedPayment?.payment_type === PaymentType.Membership) { // Notify Pi platform for membership payment completion await platformAPIClient.post(`/v2/payments/${ paymentId }/complete`, { txid }); diff --git a/src/index.ts b/src/index.ts index 31004262..70069f7f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -26,8 +26,6 @@ const startServer = async () => { }); } - - logger.info("Server setup initiated."); } catch (error) { logger.error('Server failed to initialize:', error); @@ -37,6 +35,6 @@ const startServer = async () => { // Start the server setup process startServer(); // TODO: Remove cron job; Start the scheduled cron job(s) -// scheduleCronJobs(); +scheduleCronJobs(); export default app; \ No newline at end of file diff --git a/src/models/A2UPaymentQueue.ts b/src/models/A2UPaymentQueue.ts new file mode 100644 index 00000000..833f4ba7 --- /dev/null +++ b/src/models/A2UPaymentQueue.ts @@ -0,0 +1,23 @@ +import mongoose, { Schema } from "mongoose"; +import { IA2UJob } from "../types"; + +const A2UPaymentQueueSchema = new Schema( + { + sellerPiUid: { type: String, required: true }, + amount: { type: Number, required: true }, + xRef_ids: [{ type: String, required: true }], + memo: { type: String, require: true }, + status: { + type: String, + enum: ['pending', 'processing', 'completed', 'failed', 'batching'], + default: 'pending', + }, + last_a2u_date: { type: Date, default: null }, + attempts: { type: Number, default: 0 }, + last_error: { type: String, default: null } + }, + { timestamps: true } +); + +const A2UPaymentQueue = mongoose.model('A2UPaymentQueue', A2UPaymentQueueSchema); +export default A2UPaymentQueue; \ No newline at end of file diff --git a/src/models/Payment.ts b/src/models/Payment.ts index 28117602..0c5e1495 100644 --- a/src/models/Payment.ts +++ b/src/models/Payment.ts @@ -2,6 +2,7 @@ import mongoose, { Schema, SchemaTypes, Types } from "mongoose"; import { IPayment } from "../types"; import { PaymentType } from "./enums/paymentType"; +import { PaymentDirection } from "./enums/paymentDirection"; const paymentSchema = new Schema( { @@ -39,6 +40,12 @@ const paymentSchema = new Schema( required: false, default: "" }, + direction: { + type: String, + enum: Object.values(PaymentDirection).filter(value => typeof value === 'string'), + required: false, + default: PaymentDirection.U2A + }, payment_type: { type: String, enum: Object.values(PaymentType).filter(value => typeof value === 'string'), diff --git a/src/models/Seller.ts b/src/models/Seller.ts index fe9f5c96..c2ae2ed2 100644 --- a/src/models/Seller.ts +++ b/src/models/Seller.ts @@ -75,6 +75,11 @@ const sellerSchema = new Schema( type: Boolean, default: false, required: false + }, + gas_saver: { + type: Boolean, + default: true, + required: true } }, { timestamps: true } // Adds timestamps to track creation and update times diff --git a/src/models/enums/paymentDirection.ts b/src/models/enums/paymentDirection.ts new file mode 100644 index 00000000..f9e7c637 --- /dev/null +++ b/src/models/enums/paymentDirection.ts @@ -0,0 +1,4 @@ +export enum PaymentDirection { + A2U = 'a2u', + U2A = 'u2a' +} diff --git a/src/services/payment.service.ts b/src/services/payment.service.ts index 350b6d9d..f7879bfc 100644 --- a/src/services/payment.service.ts +++ b/src/services/payment.service.ts @@ -13,6 +13,9 @@ import { PaymentDTO, } from "../types"; import logger from "../config/loggingConfig"; +import { PaymentDirection } from "../models/enums/paymentDirection"; +import { dir } from "console"; +import { PaymentType } from "../models/enums/paymentType"; export const createPayment = async (paymentData: NewPayment): Promise => { try { @@ -59,167 +62,63 @@ export const completePayment = async ( }; export const createPaymentCrossReference = async ( - orderId: string, refData: U2URefDataType ): Promise => { try { const newRef = new PaymentCrossReference({ - order_id: orderId, + order_id: refData.orderId, u2a_payment_id: refData.u2aPaymentId, + u2u_status: refData.u2uStatus, u2a_completed_at: new Date(), a2u_payment_id: null }); return await newRef.save(); } catch (error: any) { - logger.error(`Failed to create Payment xRef for orderID ${ orderId }: ${ error }`); + logger.error(`Failed to create Payment xRef for orderID ${ refData.orderId }: ${ error }`); throw error; } }; +export const getxRefByOrderId = async (orderId: string) => { + try { + const existingXRef = await PaymentCrossReference.findOne({order_id: orderId}).lean().exec() + return existingXRef + } catch (error:any) { + logger.error("Can't find xRef with id: ", orderId) + } +} + export const updatePaymentCrossReference = async ( - orderId: string, + xRefIds: string[], refData: U2URefDataType -): Promise => { +): Promise => { try { - const updatedRef = await PaymentCrossReference.findOneAndUpdate( - { order_id: orderId }, + const result = await PaymentCrossReference.updateMany( + { _id: { $in: xRefIds } }, { - a2u_payment_id: refData.a2uPaymentId, - a2u_completed_at: new Date(), - u2u_status: refData.u2uStatus - }, - { new: true } - ).lean().exec(); + $set: { + u2a_payment_id: refData.u2aPaymentId, + a2u_payment_id: refData.a2uPaymentId, + a2u_completed_at: new Date(), + u2u_status: refData.u2uStatus, + }, + } + ).exec(); - if (!updatedRef) { - logger.error(`No Payment xRef found to update for orderID ${ orderId }`); - throw new Error('No Payment xRef found to update'); + if (result.modifiedCount === 0) { + logger.warn(`No PaymentCrossReference updated for IDs: ${xRefIds.join(', ')}`); + } else { + logger.info(`Updated ${result.modifiedCount} PaymentCrossReferences.`); } - return updatedRef; + return result.modifiedCount; } catch (error: any) { - logger.error(`Failed to update Payment xRef for orderID ${ orderId }: ${ error }`); + logger.error(`Failed to update Payment xRef for orderID ${ refData.orderId }: ${ error }`); throw error; } }; -export const createA2UPayment = async (a2uPaymentData: A2UPaymentDataType): Promise => { - try { - /* Step 1: Subtract gas fee from original amount to calculate net transfer payment amount */ - const gasFee = 0.01; - const newAmount = parseFloat(a2uPaymentData.amount) - gasFee; - logger.info('Adjusted A2U payment amount: ', { newAmount }); - if (newAmount <= 0) { - logger.error(`Invalid A2U payment amount ${ newAmount }; must be > 0 after gas fee deduction`); - throw new Error('Invalid A2U payment amount'); - } - - /* Step 2: Get seller's Pi UID using the seller's _id field */ - const existingSeller = await Seller.findById(a2uPaymentData.sellerId) - .select('seller_id -_id') // Include seller_id, exclude _id - .exec(); - - if (!existingSeller?.seller_id) { - logger.error(`Failed to find seller with ID ${ a2uPaymentData.sellerId }`); - throw new Error('Failed to find seller; no record found'); - } - - /* Step 3: Create a Pi blockchain payment request using seller's Pi UID */ - const a2uData = { - amount: newAmount, - memo: a2uPaymentData.memo, - metadata: { - direction: "A2U", - orderId: a2uPaymentData.orderId, - sellerId: a2uPaymentData.sellerId, - buyerId: a2uPaymentData.buyerId - }, - uid: existingSeller?.seller_id as string, - }; - - const paymentId = await pi.createPayment(a2uData); - logger.debug('Payment ID: ', { paymentId }); - if (!paymentId) { - logger.error(`Failed to create A2U Pi payment for UID ${ existingSeller.seller_id }`); - throw new Error('Failed to create A2U Pi payment'); - } - - /* Step 4: Save the new A2U payment in the DB collection */ - const newPayment = await createPayment({ - piPaymentId: paymentId, - userId: a2uPaymentData.buyerId as string, - amount: newAmount.toString(), - memo: a2uPaymentData.memo, - paymentType: a2uPaymentData.paymentType - }); - logger.info('New A2U payment record created'); - if (!newPayment) { - logger.error(`Failed to create A2U payment DB record with Payment ID ${ paymentId }`); - throw new Error('Failed to create A2U payment DB record'); - } - - /* Step 5: Submit the Pi payment to finalize the blockchain transaction */ - const txid = await pi.submitPayment(paymentId); - if (!txid) { - logger.error(`Failed to submit A2U Pi payment with Payment ID ${ paymentId }`); - throw new Error('Failed to submit A2U Pi payment'); - } - logger.info('Transaction ID: ', { txid }); - - /* Step 6: Mark the payment record as completed */ - const updatedPayment = await completePayment(paymentId, txid); - if (!updatedPayment) { - logger.error(`Failed to complete A2U payment DB record with Payment ID ${ paymentId } + Txn ID ${ txid }`); - throw new Error('Failed to complete A2U payment DB record'); - } - logger.info('Marked A2U payment record as completed'); - - /* Step 7: Update the Payment xRef with A2U completion status */ - const u2uRefData = { - u2uStatus: U2UPaymentStatus.A2UCompleted, - a2uPaymentId: updatedPayment?._id as string, - } - const u2uRef = await updatePaymentCrossReference(a2uPaymentData.orderId, u2uRefData); - if (!u2uRef) { - logger.error(`Failed to update Payment xRef with A2U Payment ID ${ updatedPayment?._id }`); - throw new Error('Failed to update Payment xRef with A2U payment data'); - } - - logger.info('Updated Payment xRef record', u2uRef); - - /* Step 8: Mark the payment as complete in the Pi blockchain (final confirmation) */ - const completedPiPayment = await pi.completePayment(paymentId, txid); - if (!completedPiPayment) { - logger.error(`Failed to complete A2U Pi payment with Payment ID ${ paymentId } + Txn ID ${ txid }`); - throw new Error('Failed to complete A2U Pi payment transaction'); - } - logger.info(`A2U payment process completed successfully for Order ID ${ a2uPaymentData.orderId }`); - return updatedPayment; - - } catch (error: any) { - if (axios.isAxiosError(error)) { - logger.error(`Axios error during A2U payment: ${error}`, { - status: error.response?.status, - data: error.response?.data, - config: error.config, - }); - } else { - logger.error(`Failed to create A2U payment for Order ID ${a2uPaymentData.orderId}:`, { - message: error.message, - stack: error.stack, - }); - } - - // Handle cancellation of the payment if it was created but not completed - const {incomplete_server_payments} = await getIncompleteServerPayments(); - logger.info("Found incomplete server payments", incomplete_server_payments); - if (incomplete_server_payments && incomplete_server_payments.length > 0) { - await completeServerPayment(incomplete_server_payments); - } - return null; - } -}; export const getIncompleteServerPayments = async (): Promise => { try { @@ -245,7 +144,7 @@ export const completeServerPayment = async (serverPayments: PaymentDTO[]): Promi for (const payment of serverPayments) { let transaction = payment.transaction || null; const piPaymentId = payment.identifier; - const metadata = payment.metadata as { orderId: string; sellerId: string; buyerId: string }; + const metadata = payment.metadata as { xRefId: string; sellerId: string; buyerId: string }; if (!piPaymentId) { logger.error(`Missing Pi payment ID for payment: ${JSON.stringify(payment)}`); @@ -271,13 +170,13 @@ export const completeServerPayment = async (serverPayments: PaymentDTO[]): Promi logger.info(`Marked A2U payment as completed for ${piPaymentId}`); // Update U2U payment cross reference - const u2uRef = await updatePaymentCrossReference(metadata?.orderId, { + const u2uRef = await updatePaymentCrossReference([metadata?.xRefId], { u2uStatus: U2UPaymentStatus.A2UCompleted, a2uPaymentId: updatedPayment._id as string, }); if (!u2uRef) { - throw new Error(`Failed to update payment cross reference for order ${metadata.orderId}`); + throw new Error(`Failed to update payment cross reference for ${metadata.xRefId}`); } logger.info('Updated U2U reference record', u2uRef); @@ -288,15 +187,15 @@ export const completeServerPayment = async (serverPayments: PaymentDTO[]): Promi throw new Error(`Failed to confirm Pi payment on blockchain for ${piPaymentId}`); } - logger.info(`✅ A2U payment process completed for Order ID: ${metadata.orderId}`); + logger.info(`✅ A2U payment process completed for xRef ID: ${metadata.xRefId}`); } catch (error: any) { if (axios.isAxiosError(error)) { - logger.error(`Axios error during A2U payment for order ${metadata.orderId || 'unknown'}: ${error.message}`, { + logger.error(`Axios error during A2U payment for xRef ${metadata.xRefId || 'unknown'}: ${error.message}`, { status: error.response?.status, data: error.response?.data, }); } else { - logger.error(`❌ Error completing server payment for Order ID ${metadata.orderId || 'unknown'}: ${error.message}`); + logger.error(`❌ Error completing server payment for xRef ID ${metadata.xRefId || 'unknown'}: ${error.message}`); } } } @@ -323,7 +222,7 @@ export const cancelPayment = async (piPaymentId: string): Promise => { + try { + const a2uData = { + amount: parseFloat(a2uPaymentData.amount), + memo: a2uPaymentData.memo, + metadata: { direction: "A2U", sellerId: a2uPaymentData.sellerPiUid }, + uid: a2uPaymentData.sellerPiUid as string, + }; + + const paymentId = await pi.createPayment(a2uData); + logger.debug('Payment ID: ', { paymentId }); + if (!paymentId) { + logger.error(`Failed to create A2U Pi payment for UID ${ a2uPaymentData.sellerPiUid }`); + throw new Error('Failed to create A2U Pi payment'); + } + + /* Step 5: Submit the Pi payment to finalize the blockchain transaction */ + const txid = await pi.submitPayment(paymentId); + if (!txid) { + logger.error(`Failed to submit A2U Pi payment with Payment ID ${ paymentId }`); + throw new Error('Failed to submit A2U Pi payment'); + } + logger.info('Transaction ID: ', { txid }); + + // get xRef for each xRefIds + for (const refId of a2uPaymentData.xRefIds) { + + const xRef = await PaymentCrossReference.findById(refId) + .populate<{u2a_payment_id: {memo:string}}>({path:'u2a_payment_id', model: 'Payment', select: 'memo'}) + .populate<{order_id: {seller_id:string}}>({path:'order_id', model: 'Order', select: 'seller_id'}) + .lean() + .exec(); + + const completeda2uPayment = await Payment.create({ + pi_payment_id: paymentId, + user_id: xRef?.order_id.seller_id, + amount: a2uPaymentData.amount, + memo: xRef?.u2a_payment_id.memo as string, + txid: txid, + direction: PaymentDirection.A2U, + payment_type: PaymentType.BuyerCheckout, + paid: true, + cancelled: false, + }) + + const xRefData = { + u2uStatus: U2UPaymentStatus.A2UCompleted, + a2uPaymentId: completeda2uPayment?._id as string, + }; + + const updatedRef = await PaymentCrossReference.findByIdAndUpdate( + refId, + { + $set: { + a2u_payment_id: completeda2uPayment?._id as string, + a2u_completed_at: new Date(), + u2u_status: xRefData.u2uStatus, + error_message: '', + } + } + ).lean() + .exec(); + + logger.info('updated Payment xRef record', updatedRef?._id.toString()); + + } + + const completedPiPayment = await pi.completePayment(paymentId, txid); + if (!completedPiPayment) { + logger.error(`Failed to complete A2U Pi payment with Payment ID ${ paymentId } + Txn ID ${ txid }`); + throw new Error('Failed to complete A2U Pi payment transaction'); + } + + logger.info(`A2U payment process completed successfully for xRef ID ${ a2uPaymentData.xRefIds }`); + return paymentId; + + } catch (error: any) { + if (axios.isAxiosError(error)) { + logger.error(`Axios error during A2U payment: ${error.message}`, { + status: error.response?.status, + data: error.response?.data, + config: error.config, + }); + } else { + logger.error(`Failed to create A2U payment for Order ID ${a2uPaymentData.xRefIds}:`, { + message: error.message, + stack: error.stack, + }); + } + + // Handle cancellation of the payment if it was created but not completed + const {incomplete_server_payments} = await getIncompleteServerPayments(); + logger.info("found incomplete server payments", incomplete_server_payments); + if (incomplete_server_payments && incomplete_server_payments.length > 0) { + await completeServerPayment(incomplete_server_payments); + } + return null; + } }; \ No newline at end of file diff --git a/src/services/seller.service.ts b/src/services/seller.service.ts index d5244dae..0edc2660 100644 --- a/src/services/seller.service.ts +++ b/src/services/seller.service.ts @@ -214,7 +214,8 @@ export const registerOrUpdateSeller = async (authUser: IUser, formData: any): Pr sell_map_center: sellMapCenter, order_online_enabled_pref: formData.order_online_enabled_pref || existingSeller?.order_online_enabled_pref || false, fulfillment_method: formData.fulfillment_method || existingSeller?.fulfillment_method || FulfillmentType.CollectionByBuyer, - fulfillment_description: formData.fulfillment_description || existingSeller?.fulfillment_description || '' + fulfillment_description: formData.fulfillment_description || existingSeller?.fulfillment_description || '', + gas_saver: formData.gas_saver || true }; // Update existing seller or create a new one diff --git a/src/types.ts b/src/types.ts index 8431ea14..11252e2f 100644 --- a/src/types.ts +++ b/src/types.ts @@ -10,6 +10,7 @@ import { OrderItemStatusType } from "./models/enums/orderItemStatusType"; import { PaymentType } from "./models/enums/paymentType"; import { U2UPaymentStatus } from "./models/enums/u2uPaymentStatus"; import { RestrictedArea } from "./models/enums/restrictedArea"; +import { PaymentDirection } from "./models/enums/paymentDirection"; // ======================== // USER MODELS @@ -74,6 +75,7 @@ export interface ISeller extends Document { fulfillment_description?: string; pre_restriction_seller_type?: SellerType | null; isPreRestricted: boolean; + gas_saver: boolean; }; // Combined interface representing a seller with selected user settings @@ -178,6 +180,20 @@ export type OrderPaymentMetadataType = { // ======================== // PAYMENT MODELS // ======================== + + +export interface IA2UJob extends Document { + sellerPiUid: string; + amount: number; + xRef_ids: string[]; + memo: string, + status: 'pending' | 'processing' | 'completed' | 'failed'; + last_a2u_date: Date, + attempts: number; + last_error?: string; + createdAt: Date; + updatedAt: Date; +} export interface IPayment extends Document { user_id: Types.ObjectId; amount: Types.Decimal128; @@ -185,6 +201,7 @@ export interface IPayment extends Document { memo: string; pi_payment_id: string; txid?: string; + direction: PaymentDirection; payment_type: PaymentType; cancelled: boolean; createdAt: Date; @@ -231,15 +248,14 @@ export interface NewPayment { export interface U2URefDataType { u2aPaymentId?: string, u2uStatus: U2UPaymentStatus, + orderId?: string, a2uPaymentId: string | null, }; export interface A2UPaymentDataType { - sellerId: string, + sellerPiUid: string, amount: string, - buyerId: string, - paymentType: PaymentType, - orderId: string, + xRefIds: string[], memo: string }; diff --git a/src/utils/migrationScripts/applyGasSaverDefault.ts b/src/utils/migrationScripts/applyGasSaverDefault.ts new file mode 100644 index 00000000..55c0c2fb --- /dev/null +++ b/src/utils/migrationScripts/applyGasSaverDefault.ts @@ -0,0 +1,23 @@ +import mongoose from 'mongoose'; +import Seller from '../../models/Seller'; +import dotenv from 'dotenv'; +import logger from '../../config/loggingConfig'; + +dotenv.config(); // if using .env for DB URI + + +// RUN SCRIPT ONLY ONCE FOR DB MIGRATION +async function applyGasSaverDefault() { + await mongoose.connect(process.env.MONGODB_URL || 'your_mongo_uri_here'); + + const result = await Seller.updateMany( + { gas_saver: { $exists: false } }, + { $set: { gas_saver: true } } + ); + + logger.info(`Updated ${result.modifiedCount} seller(s) with default gas_saver=true`); + + await mongoose.disconnect(); +} + +applyGasSaverDefault().catch(console.error); diff --git a/src/utils/queues/queue.ts b/src/utils/queues/queue.ts new file mode 100644 index 00000000..b25d6394 --- /dev/null +++ b/src/utils/queues/queue.ts @@ -0,0 +1,80 @@ + +import logger from "../../config/loggingConfig"; +import A2UPaymentQueue from "../../models/A2UPaymentQueue"; +import Seller from "../../models/Seller"; + +const GAS_FEE = 0.01; + +const batchSellerRevenue = async ( + xRefId: string, + sellerPiUid: string, + amount: string, +): Promise => { + try { + const onQueuePayment = await A2UPaymentQueue.findOne({ sellerPiUid, status:"batching", last_a2u_date: null }).exec(); + if (!onQueuePayment) { + const newAmount = parseFloat(amount) - GAS_FEE; + await A2UPaymentQueue.create({ + xRef_ids: [xRefId], + sellerPiUid, + amount: newAmount.toFixed(4), + status: "batching", + memo: "Map of Pi Payment for Order", + }); + logger.info("new payment added to queue for seller with ID: ", sellerPiUid) + return; + } + + const updatedQueue = await A2UPaymentQueue.findOneAndUpdate( + { sellerPiUid, status:"batching", last_a2u_date: null }, + { + $inc: { amount: parseFloat(amount) }, + $push: { xRef_ids: xRefId }, + }, + { new: true } + ).exec(); + + if (!updatedQueue) { + logger.error(`Failed to update payment queue for seller: ${sellerPiUid}`); + throw new Error(`Failed to update payment queue for seller: ${sellerPiUid}`); + } + + logger.info(`Updated payment queue for seller: ${sellerPiUid}, new amount: ${updatedQueue.amount}`); + return + + } catch (error:any) { + logger.error("failed to enque payment") + } + +} + +export const enqueuePayment = async ( + xRefId: string, + sellerId: string, + amount: string, + memo:string +) => { + try { + // check if seller gas saver is on + const seller = await Seller.findById( sellerId ).lean().exec(); + + // check and compute seller revenue for gas saver + if (seller?.gas_saver) { + batchSellerRevenue(xRefId, seller.seller_id, amount); + return + } + const newAmount = parseFloat(amount) - GAS_FEE; + await A2UPaymentQueue.create({ + xRef_ids: [xRefId], + sellerPiUid: seller?.seller_id, + amount: newAmount.toFixed(4), + status: "pending", + memo: memo, + }); + logger.info("new payment added to queue for seller with ID: ", {sellerId}) + return; + + }catch(error:any){ + + } +} \ No newline at end of file