Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/controllers/paymentController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export const onPaymentApproval = async (req: Request, res: Response) => {
const approvedPayment = await processPaymentApproval(paymentId, currentUser);
return res.status(200).json(approvedPayment);
} catch (error) {
logger.error(`Failed to approve Pi payment for paymentID ${ paymentId }:`, error);
logger.error(`Failed to approve Pi payment for piPaymentID ${ paymentId }:`, error);
return res.status(500).json({
success: false,
message: 'An error occurred while approving Pi payment; please try again later'
Expand Down
20 changes: 13 additions & 7 deletions src/cron/index.ts
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ll work on tidying up the cron job setup so both your A2U process and the Sanction bot can run in parallel.

Copy link
Collaborator Author

@adisa39 adisa39 Jul 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! That will be nice, thanks.

I don’t know we are still using the cron job for sanction regions implementation, they sections were all commented out before I started working on it.

Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
import schedule from "node-schedule";
import { runSanctionBot } from "./jobs/sanctionBot.job";
// import { runSanctionBot } from "./jobs/sanctionBot.job";
import logger from "../config/loggingConfig";
import processNextJob from "./jobs/a2uJobWorker";

export const scheduleCronJobs = () => {
logger.info("Initializing scheduled cron jobs...");

// Run the Sanction Bot job daily at 22:00 UTC
const sanctionJobTime = '0 0 22 * * *';
// const sanctionJobTime = '0 0 22 * * *';

schedule.scheduleJob(sanctionJobTime, async () => {
logger.info('🕒 Sanction Bot job triggered (22:00 UTC).');
// Drain A2U payment queue every 5 min
const a2uPaymentJobTime = '0 */5 * * * *'; // Every 5 minutes

schedule.scheduleJob(a2uPaymentJobTime, async () => {
logger.info('🕒 A2U payment worker job triggered at 5min.');

try {
await runSanctionBot();
logger.info("✅ Sanction Bot job completed successfully.");
// await runSanctionBot();
await processNextJob();
logger.info("✅ A2U payment worker job completed successfully.");
// logger.info("✅ Sanction Bot job completed successfully.");
} catch (error) {
logger.error("❌ Sanction Bot job failed:", error);
logger.error("❌ A2U payment worker job failed:", error);
}
});

Expand Down
76 changes: 76 additions & 0 deletions src/cron/jobs/a2uJobWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import logger from "../../config/loggingConfig";
import A2UPaymentQueue from "../../models/A2UPaymentQueue";
import { createA2UPayment } from "../../services/payment.service";

// workers/mongodbA2UWorker.ts
async function processNextJob(): Promise<void> {
const now = new Date();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Declared but not used.

const MAXATTEMPT = 3

const threeDaysAgo = new Date();
threeDaysAgo.setDate(threeDaysAgo.getDate() - 3);

const job = await A2UPaymentQueue.findOneAndUpdate(
{
$or: [
{ status: 'pending' },
{ status: 'failed' },
{
status: 'batching',
last_a2u_date: { $lte: threeDaysAgo }
}
],
attempts: { $lt: 3 }
},
{
status: 'processing',
$inc: { attempts: 1 },
updatedAt: new Date(),
},
{
sort: { updatedAt: 1 },
new: true,
}
);


if (!job) return;

const { sellerPiUid, amount, xRef_ids, _id, attempts, memo, last_a2u_date } = job;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

memo and last_a2u_date is not being used.

Copy link
Collaborator Author

@adisa39 adisa39 Jul 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though it can be considered in the enqueue function, it’s is not used because of the payment batching for gas saver, that is multiple payment from different buyers with different memo descriptions.


try {
logger.info(`[→] Attempt ${attempts}/${MAXATTEMPT} for ${sellerPiUid}`);

await createA2UPayment({
sellerPiUid: sellerPiUid,
amount: amount.toString(),
memo: "A2U payment",
xRefIds: xRef_ids
})

await A2UPaymentQueue.findByIdAndUpdate(_id, {
status: 'completed',
updatedAt: new Date(),
last_a2u_date: new Date(),
last_error: null,
});

console.log(`[✔] A2U payment completed for ${sellerPiUid}`);
} catch (err: any) {

const willRetry = attempts < MAXATTEMPT;

await A2UPaymentQueue.findByIdAndUpdate(_id, {
status: willRetry ? 'pending' : 'failed',
last_error: err.message,
updatedAt: new Date(),
});

logger.error(`[✘] A2U payment failed for ${sellerPiUid}: ${err.message}`);
if (!willRetry) {
logger.info(`[⚠️] Job permanently failed after ${attempts} attempts.`);
}
}
}

export default processNextJob;
116 changes: 58 additions & 58 deletions src/cron/jobs/sanctionBot.job.ts
Original file line number Diff line number Diff line change
@@ -1,67 +1,67 @@
import Seller from "../../models/Seller";
import { SanctionedSellerStatus } from "../../types";
import { getAllSanctionedRegions } from "../../services/admin/report.service";
import {
createBulkPreRestrictionOperation,
createGeoQueries
} from "../utils/geoUtils";
import {
getSellersToEvaluate,
processSellersGeocoding,
processSanctionedSellers,
processUnsanctionedSellers
} from "../utils/sanctionUtils";
import logger from "../../config/loggingConfig";
// import Seller from "../../models/Seller";
// import { SanctionedSellerStatus } from "../../types";
// import { getAllSanctionedRegions } from "../../services/admin/report.service";
// import {
// createBulkPreRestrictionOperation,
// createGeoQueries
// } from "../utils/geoUtils";
// import {
// getSellersToEvaluate,
// processSellersGeocoding,
// processSanctionedSellers,
// processUnsanctionedSellers
// } from "../utils/sanctionUtils";
// import logger from "../../config/loggingConfig";

export async function runSanctionBot(): Promise<void> {
logger.info('Sanction Bot cron job started.');
// export async function runSanctionBot(): Promise<void> {
// logger.info('Sanction Bot cron job started.');

try {
/* Step 1: Reset the 'isPreRestricted' field to 'false' for all sellers
This clears any pre-existing restrictions before applying new ones. */
await Seller.updateMany({}, { isPreRestricted: false }).exec();
logger.info('Reset [isPreRestricted] for all sellers.');
// try {
// /* Step 1: Reset the 'isPreRestricted' field to 'false' for all sellers
// This clears any pre-existing restrictions before applying new ones. */
// await Seller.updateMany({}, { isPreRestricted: false }).exec();
// logger.info('Reset [isPreRestricted] for all sellers.');

/* Step 2: Get the list of all sanctioned regions */
const sanctionedRegions = await getAllSanctionedRegions();
// If no sanctioned regions are found, log the info and exit the job
if (!sanctionedRegions.length) {
logger.info('No sanctioned regions found. Exiting job.');
return;
}
// /* Step 2: Get the list of all sanctioned regions */
// const sanctionedRegions = await getAllSanctionedRegions();
// // If no sanctioned regions are found, log the info and exit the job
// if (!sanctionedRegions.length) {
// logger.info('No sanctioned regions found. Exiting job.');
// return;
// }

/* Step 3: Create geo-based queries and identify sellers to evaluate */
const geoQueries = createGeoQueries(sanctionedRegions);
const sellersToEvaluate = await getSellersToEvaluate(geoQueries);
logger.info(`Evaluating ${sellersToEvaluate.length} sellers flagged or currently Restricted.`);
// /* Step 3: Create geo-based queries and identify sellers to evaluate */
// const geoQueries = createGeoQueries(sanctionedRegions);
// const sellersToEvaluate = await getSellersToEvaluate(geoQueries);
// logger.info(`Evaluating ${sellersToEvaluate.length} sellers flagged or currently Restricted.`);

/* Step 4: Create the bulk update operations to mark sellers as pre-restricted */
const bulkPreRestrictionOps = createBulkPreRestrictionOperation(sellersToEvaluate);
if (bulkPreRestrictionOps.length > 0) {
await Seller.bulkWrite(bulkPreRestrictionOps);
logger.info(`Marked ${bulkPreRestrictionOps.length} sellers as Pre-Restricted`)
}
// /* Step 4: Create the bulk update operations to mark sellers as pre-restricted */
// const bulkPreRestrictionOps = createBulkPreRestrictionOperation(sellersToEvaluate);
// if (bulkPreRestrictionOps.length > 0) {
// await Seller.bulkWrite(bulkPreRestrictionOps);
// logger.info(`Marked ${bulkPreRestrictionOps.length} sellers as Pre-Restricted`)
// }

/* Step 5: Retrieve all sellers who are marked as pre-restricted */
const preRestrictedSellers = await Seller.find({isPreRestricted: true}).exec();
logger.info(`${preRestrictedSellers.length} sellers are Pre-Restricted`);
// /* Step 5: Retrieve all sellers who are marked as pre-restricted */
// const preRestrictedSellers = await Seller.find({isPreRestricted: true}).exec();
// logger.info(`${preRestrictedSellers.length} sellers are Pre-Restricted`);

/* Step 6: Process geocoding validation */
const results: SanctionedSellerStatus[] = await processSellersGeocoding(
preRestrictedSellers,
sanctionedRegions
);
const inZone = results.filter(r => r.isSanctionedRegion);
const outOfZone = results.filter(r => !r.isSanctionedRegion);
// /* Step 6: Process geocoding validation */
// const results: SanctionedSellerStatus[] = await processSellersGeocoding(
// preRestrictedSellers,
// sanctionedRegions
// );
// const inZone = results.filter(r => r.isSanctionedRegion);
// const outOfZone = results.filter(r => !r.isSanctionedRegion);

/* Step 7: Apply restrictions of in-zone sellers or restoration of out-zone sellers */
await processSanctionedSellers(inZone);
await processUnsanctionedSellers(outOfZone);
// /* Step 7: Apply restrictions of in-zone sellers or restoration of out-zone sellers */
// await processSanctionedSellers(inZone);
// await processUnsanctionedSellers(outOfZone);

/* Step 8: Clean up temp pre-restriction flags */
await Seller.updateMany({isPreRestricted: true}, {isPreRestricted: false}).exec();
logger.info('SanctionBot job completed.');
} catch (error) {
logger.error('Error in Sanction Bot cron job:', error);
}
}
// /* Step 8: Clean up temp pre-restriction flags */
// await Seller.updateMany({isPreRestricted: true}, {isPreRestricted: false}).exec();
// logger.info('SanctionBot job completed.');
// } catch (error) {
// logger.error('Error in Sanction Bot cron job:', error);
// }
// }
59 changes: 41 additions & 18 deletions src/helpers/payment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ import {
createPayment,
completePayment,
createPaymentCrossReference,
createA2UPayment,
cancelPayment
cancelPayment,
getxRefByOrderId
} from '../services/payment.service';
import { IUser, NewOrder, PaymentDataType, PaymentDTO, PaymentInfo } from '../types';
import logger from '../config/loggingConfig';
import { enqueuePayment } from '../utils/queues/queue';

function buildPaymentData(
piPaymentId: string,
Expand Down Expand Up @@ -137,9 +138,28 @@ export const processIncompletePayment = async (payment: PaymentInfo) => {

// If the completed payment was for a buyer checkout, update the associated order
if (updatedPayment?.payment_type === PaymentType.BuyerCheckout) {
await updatePaidOrder(updatedPayment._id as string);
const updatedOrder = await updatePaidOrder(updatedPayment._id as string);
logger.warn("Old order found and updated");
}

// update the payment cross-reference if it exists else create a new one
const xRef = getxRefByOrderId(updatedOrder._id as string);

if (!xRef) {
logger.warn("No existing payment cross-reference found, creating a new one");
const xRefData = {
orderId: updatedOrder._id as string,
u2aPaymentId: updatedPayment._id as string,
u2uStatus: U2UPaymentStatus.U2ACompleted,
u2aCompletedAt: new Date(),
a2uPaymentId: null,
sellerId: updatedPayment.user_id.toString(),
}
const newXref = await createPaymentCrossReference(xRefData);

// Enqueue the payment for further processing (e.g., A2U payment)
await enqueuePayment(newXref?._id.toString(), updatedOrder?.seller_id.toString(), updatedOrder.total_amount.toString(), updatedPayment.memo);
}
}

// Notify the Pi Platform that the payment is complete
await platformAPIClient.post(`/v2/payments/${ paymentId }/complete`, { txid });
Expand Down Expand Up @@ -224,16 +244,31 @@ export const processPaymentCompletion = async (paymentId: string, txid: string)
if (completedPayment?.payment_type === PaymentType.BuyerCheckout) {
// Update the associated order's status to paid
const order = await updatePaidOrder(completedPayment._id as string);

if (!order) {
logger.error("Failed to update order to paid status");
throw new Error("Failed to update order to paid status");
}
logger.info("Order record updated to paid");

// Save cross-reference for U2U payment tracking
const u2uRefData = {
u2aPaymentId: completedPayment._id as string,
u2uStatus: U2UPaymentStatus.U2ACompleted,
orderId: order._id as string,
u2aCompletedAt: new Date(),
a2uPaymentId: null,
};
await createPaymentCrossReference(order._id as string, u2uRefData);
logger.info("U2U cross-reference created", u2uRefData);
const xRef = await createPaymentCrossReference(u2uRefData);
logger.info("U2U cross-reference created", u2uRefData);

if (!xRef) {
logger.error("Failed to create U2U cross-reference");
throw new Error("Failed to create U2U cross-reference");
}

// Enqueue the payment for further processing (e.g., A2U payment)
await enqueuePayment(xRef?._id.toString(), order?.seller_id.toString(), order.total_amount.toString(), completedPayment.memo);

// Notify Pi Platform of successful completion
const completedPiPayment = await platformAPIClient.post(`/v2/payments/${ paymentId }/complete`, { txid });
Expand All @@ -244,18 +279,6 @@ export const processPaymentCompletion = async (paymentId: string, txid: string)

logger.info("Payment marked completed on Pi blockchain", completedPiPayment.status);

const paymentMemo = completedPiPayment.data.memo as string;

// Start A2U (App-to-User) payment to the seller
await createA2UPayment({
sellerId: order.seller_id.toString(),
amount: order.total_amount.toString(),
buyerId: order.buyer_id.toString(),
paymentType: PaymentType.BuyerCheckout,
orderId: order._id as string,
memo: paymentMemo
});

} else if (completedPayment?.payment_type === PaymentType.Membership) {
// Notify Pi platform for membership payment completion
await platformAPIClient.post(`/v2/payments/${ paymentId }/complete`, { txid });
Expand Down
4 changes: 1 addition & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ const startServer = async () => {
});
}



logger.info("Server setup initiated.");
} catch (error) {
logger.error('Server failed to initialize:', error);
Expand All @@ -37,6 +35,6 @@ const startServer = async () => {
// Start the server setup process
startServer();
// TODO: Remove cron job; Start the scheduled cron job(s)
// scheduleCronJobs();
scheduleCronJobs();

export default app;
23 changes: 23 additions & 0 deletions src/models/A2UPaymentQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import mongoose, { Schema } from "mongoose";
import { IA2UJob } from "../types";

const A2UPaymentQueueSchema = new Schema<IA2UJob>(
Copy link
Member

@swoocn swoocn Jul 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @adisa39 - I noticed that this DB schema doesn’t align with the PaymentQueue model outlined in the schema documentation @ https://docs.google.com/document/d/1i9JjD3veU4RmZXEiD_D9j7XvRIcZm9koJ_DFDITe1H0/edit?usp=sharing

It would have been helpful to get your feedback during the schema design phase or possibly during implementation. Just checking — was there a reason you disagreed with the proposed model, or was this an oversight? Thanks!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @swoocn - please note that various approaches have been tried before arriving at this implementation. So my thought is to raised this once execution is done and solid.

The proposed model has some repeated fields that can be accessed from other collections using foreign keys. More importantly batching of payments from different buyers with different payment details (payer_pi_username, u2a_blockchain_id, description etc.) cannot go with this feature implementation.

Please let me know your thoughts on this.
Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood; no problem @adisa39!
I will make a note of this in the card and adjust the DB collection schema accordingly. 👍

{
sellerPiUid: { type: String, required: true },
amount: { type: Number, required: true },
Copy link
Member

@swoocn swoocn Jul 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't amount field be Types.Decimal128 to maintain data type consistency?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type usually difficult to handle in the BE/FE logics

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha! I can see how that may be the case. Let's stick to just Number for now then.

xRef_ids: [{ type: String, required: true }],
memo: { type: String, require: true },
status: {
type: String,
enum: ['pending', 'processing', 'completed', 'failed', 'batching'],
default: 'pending',
},
last_a2u_date: { type: Date, default: null },
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @adisa39 - could you clarify the difference between the last_a2u_date timestamp and the updatedAt timestamp in the context of A2U queue payment processing? Thanks, in advance!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last_a2u_date is used in the payment batching to determine the date of the next A2U payment for sellers with their gas saver turned on (3days from last payout). While the updatedAt keeps tracks of payment status changes while determining its position on the queue.

Copy link
Member

@swoocn swoocn Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then perhaps it would make sense to rename the field to something like last_batched_a2u_date or last-gas-saver-a2u-date? Thoughts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary, since it’s applicable to all records and sellers with their gas saver turned off can also turned it on

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about last_a2u_payout_date @adisa39?

attempts: { type: Number, default: 0 },
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we change the attempts field to num_retries as per the DB schema document?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

last_error: { type: String, default: null }
},
{ timestamps: true }
);

const A2UPaymentQueue = mongoose.model<IA2UJob>('A2UPaymentQueue', A2UPaymentQueueSchema);
export default A2UPaymentQueue;
Loading
Loading