Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 54 additions & 49 deletions packages/api/src/controllers/Identities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,63 +100,68 @@ export class Identities {

@Post("update")
public async updateIdentities(req: Request, res: Response) {
const count = await prisma.project.count({
where: { email: { not: null } },
});

for (let i = 0; i < count; i += 99) {
const dbIdentities = await prisma.project.findMany({
try {
const count = await prisma.project.count({
where: { email: { not: null } },
select: { id: true, email: true },
skip: i,
take: 99,
});

const awsIdentities = await getIdentities(dbIdentities.map((i) => i.email as string));

for (const identity of awsIdentities) {
const projectId = dbIdentities.find((i) => i.email?.endsWith(identity.email));

const project = await ProjectService.id(projectId?.id as string);
for (let i = 0; i < count; i += 99) {
try {
const dbIdentities = await prisma.project.findMany({
where: { email: { not: null } },
select: { id: true, email: true },
skip: i,
take: 99,
});

if (identity.status === "Failed") {
signale.info(`Restarting verification for ${identity.email}`);
try {
void verifyIdentity(identity.email);
} catch (e) {
// @ts-ignore
if (e.Code === "Throttling") {
signale.warn("Throttling detected, waiting 5 seconds");
await new Promise((r) => setTimeout(r, 5000));
const awsIdentities = await getIdentities(dbIdentities.map((i) => i.email as string));

for (const identity of awsIdentities) {
try {
const projectId = dbIdentities.find((i) => i.email?.endsWith(identity.email));

const project = await ProjectService.id(projectId?.id as string);

if (identity.status === "Failed") {
signale.info(`Restarting verification for ${identity.email}`);
void verifyIdentity(identity.email);
}

await prisma.project.update({
where: { id: projectId?.id as string },
data: { verified: identity.status === "Success" },
});

if (project && !project.verified && identity.status === "Success") {
signale.success(`Successfully verified ${identity.email}`);
void ses.setIdentityFeedbackForwardingEnabled({
Identity: identity.email,
ForwardingEnabled: false,
});

await redis.del(Keys.Project.id(project.id));
await redis.del(Keys.Project.secret(project.secret));
await redis.del(Keys.Project.public(project.public));
}

if (project?.verified && identity.status !== "Success") {
await redis.del(Keys.Project.id(project.id));
await redis.del(Keys.Project.secret(project.secret));
await redis.del(Keys.Project.public(project.public));
}
} catch (identityError) {
signale.error(`Error processing identity ${identity.email}:`, identityError);
}
}
}

await prisma.project.update({
where: { id: projectId?.id as string },
data: { verified: identity.status === "Success" },
});

if (project && !project.verified && identity.status === "Success") {
signale.success(`Successfully verified ${identity.email}`);
void ses.setIdentityFeedbackForwardingEnabled({
Identity: identity.email,
ForwardingEnabled: false,
});

await redis.del(Keys.Project.id(project.id));
await redis.del(Keys.Project.secret(project.secret));
await redis.del(Keys.Project.public(project.public));
}

if (project?.verified && identity.status !== "Success") {
await redis.del(Keys.Project.id(project.id));
await redis.del(Keys.Project.secret(project.secret));
await redis.del(Keys.Project.public(project.public));
} catch (batchError) {
signale.error(`Error processing batch ${i}-${i + 99}:`, batchError);
}
}
}

return res.status(200).json({ success: true });
return res.status(200).json({ success: true });
} catch (error) {
signale.error("Error in updateIdentities:", error);
return res.status(500).json({ success: false, error: "Internal server error" });
}
}
}
227 changes: 119 additions & 108 deletions packages/api/src/controllers/Tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,125 +10,136 @@ import { ProjectService } from "../services/ProjectService";
export class Tasks {
@Post()
public async handleTasks(req: Request, res: Response) {
// Get all tasks with a runBy data in the past
const tasks = await prisma.task.findMany({
where: { runBy: { lte: new Date() } },
orderBy: { runBy: "asc" },
include: {
action: { include: { template: true, notevents: true } },
campaign: true,
contact: true,
},
});

for (const task of tasks) {
const { action, campaign, contact } = task;

const project = await ProjectService.id(contact.projectId);

// If the project does not exist or is disabled, delete all tasks
if (!project) {
await prisma.task.deleteMany({
where: {
contact: {
projectId: contact.projectId,
},
},
});
continue;
}

let subject = "";
let body = "";

let email = "";
let name = "";

if (action) {
const { template, notevents } = action;
try {
// Get all tasks with a runBy data in the past
const tasks = await prisma.task.findMany({
where: { runBy: { lte: new Date() } },
orderBy: { runBy: "asc" },
include: {
action: { include: { template: true, notevents: true } },
campaign: true,
contact: true,
},
});

if (notevents.length > 0) {
const triggers = await ContactService.triggers(contact.id);
if (notevents.some((e) => triggers.some((t) => t.contactId === contact.id && t.eventId === e.id))) {
await prisma.task.delete({ where: { id: task.id } });
for (const task of tasks) {
try {
const { action, campaign, contact } = task;

const project = await ProjectService.id(contact.projectId);

// If the project does not exist or is disabled, delete all tasks
if (!project) {
await prisma.task.deleteMany({
where: {
contact: {
projectId: contact.projectId,
},
},
});
continue;
}
}

email = project.verified && project.email ? template.email ?? project.email : "[email protected]";
name = template.from ?? project.from ?? project.name;

({ subject, body } = EmailService.format({
subject: template.subject,
body: template.body,
data: {
plunk_id: contact.id,
plunk_email: contact.email,
...JSON.parse(contact.data ?? "{}"),
},
}));
} else if (campaign) {
email = project.verified && project.email ? campaign.email ?? project.email : "[email protected]";
name = campaign.from ?? project.from ?? project.name;

({ subject, body } = EmailService.format({
subject: campaign.subject,
body: campaign.body,
data: {
plunk_id: contact.id,
plunk_email: contact.email,
...JSON.parse(contact.data ?? "{}"),
},
}));
}
let subject = "";
let body = "";

let email = "";
let name = "";

if (action) {
const { template, notevents } = action;

if (notevents.length > 0) {
const triggers = await ContactService.triggers(contact.id);
if (notevents.some((e) => triggers.some((t) => t.contactId === contact.id && t.eventId === e.id))) {
await prisma.task.delete({ where: { id: task.id } });
continue;
}
}

email = project.verified && project.email ? template.email ?? project.email : "[email protected]";
name = template.from ?? project.from ?? project.name;

({ subject, body } = EmailService.format({
subject: template.subject,
body: template.body,
data: {
plunk_id: contact.id,
plunk_email: contact.email,
...JSON.parse(contact.data ?? "{}"),
},
}));
} else if (campaign) {
email = project.verified && project.email ? campaign.email ?? project.email : "[email protected]";
name = campaign.from ?? project.from ?? project.name;

({ subject, body } = EmailService.format({
subject: campaign.subject,
body: campaign.body,
data: {
plunk_id: contact.id,
plunk_email: contact.email,
...JSON.parse(contact.data ?? "{}"),
},
}));
}

const { messageId } = await EmailService.send({
from: {
name,
email,
},
to: [contact.email],
content: {
subject,
html: EmailService.compile({
content: body,
footer: {
unsubscribe: campaign ? true : !!action && action.template.type === "MARKETING",
},
contact: {
id: contact.id,
const { messageId } = await EmailService.send({
from: {
name,
email,
},
project: {
name: project.name,
to: [contact.email],
content: {
subject,
html: EmailService.compile({
content: body,
footer: {
unsubscribe: campaign ? true : !!action && action.template.type === "MARKETING",
},
contact: {
id: contact.id,
},
project: {
name: project.name,
},
isHtml: (campaign && campaign.style === "HTML") ?? (!!action && action.template.style === "HTML"),
}),
},
isHtml: (campaign && campaign.style === "HTML") ?? (!!action && action.template.style === "HTML"),
}),
},
});
});

const emailData: {
messageId: string;
contactId: string;
actionId?: string;
campaignId?: string;
} = {
messageId,
contactId: contact.id,
};

if (action) {
emailData.actionId = action.id;
} else if (campaign) {
emailData.campaignId = campaign.id;
}

const emailData: {
messageId: string;
contactId: string;
actionId?: string;
campaignId?: string;
} = {
messageId,
contactId: contact.id,
};

if (action) {
emailData.actionId = action.id;
} else if (campaign) {
emailData.campaignId = campaign.id;
}
await prisma.email.create({ data: emailData });

await prisma.email.create({ data: emailData });
await prisma.task.delete({ where: { id: task.id } });

await prisma.task.delete({ where: { id: task.id } });
signale.success(`Task completed for ${contact.email} from ${project.name}`);
} catch (taskError) {
signale.error(`Error processing task for ${task.contact.email}:`, taskError);
// Optionally, you can delete the problematic task or mark it as failed
// await prisma.task.delete({ where: { id: task.id } });
}
}

signale.success(`Task completed for ${contact.email} from ${project.name}`);
return res.status(200).json({ success: true });
} catch (error) {
signale.error("Error in handleTasks:", error);
return res.status(500).json({ success: false, error: "Internal server error" });
}

return res.status(200).json({ success: true });
}
}