Skip to content

Commit 97e8c80

Browse files
committed
Implement Push Publisher
1 parent 743bc68 commit 97e8c80

File tree

2 files changed

+230
-0
lines changed

2 files changed

+230
-0
lines changed

src/NotificationManager.ts

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import io from '@pm2/io'
2+
import admin from 'firebase-admin'
3+
4+
import { ApiKey } from './models'
5+
6+
import BatchResponse = admin.messaging.BatchResponse
7+
8+
const successCounter = io.counter({
9+
id: 'notifications:success:total',
10+
name: 'Total Successful Notifications'
11+
})
12+
const failureCounter = io.counter({
13+
id: 'notifications:failure:total',
14+
name: 'Total Failed Notifications'
15+
})
16+
17+
export const createNotificationManager = async (
18+
apiKey: ApiKey | string
19+
): Promise<admin.app.App> => {
20+
if (typeof apiKey === 'string') apiKey = await ApiKey.fetch(apiKey)
21+
22+
const name = `app:${apiKey.appId}`
23+
let app: admin.app.App
24+
try {
25+
app = admin.app(name)
26+
} catch (err) {
27+
app = admin.initializeApp(
28+
{
29+
credential: admin.credential.cert(apiKey.adminsdk)
30+
},
31+
name
32+
)
33+
}
34+
return app
35+
}
36+
37+
export const sendNotification = async (
38+
app: admin.app.App,
39+
title: string,
40+
body: string,
41+
tokens: string[],
42+
data = {}
43+
): Promise<BatchResponse> => {
44+
const message: admin.messaging.MulticastMessage = {
45+
notification: {
46+
title,
47+
body
48+
},
49+
data,
50+
tokens
51+
}
52+
53+
try {
54+
const response = await app.messaging().sendMulticast(message)
55+
56+
successCounter.inc(response.successCount)
57+
failureCounter.inc(response.failureCount)
58+
59+
return response
60+
} catch (err) {
61+
console.error(JSON.stringify(err, null, 2))
62+
throw err
63+
}
64+
}

src/publishers/push.ts

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/**
2+
* A push publisher subscribes to a view that contains all tasks whose
3+
* arrays of {@link ActionEffect}s are all marked as completed. The
4+
* publisher's job is to push notifications to devices based on the
5+
* completed tasks.
6+
*/
7+
8+
import { viewToStream } from 'edge-server-tools'
9+
10+
import {
11+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
12+
tasksPublishing,
13+
updateInProgress
14+
} from '../database/views/couch-tasks'
15+
import {
16+
createNotificationManager,
17+
sendNotification
18+
} from '../NotificationManager'
19+
import { asPushActionData } from '../types/task/ActionData'
20+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
21+
import { ActionEffect } from '../types/task/ActionEffect'
22+
import { Task } from '../types/task/Task'
23+
import {
24+
asTaskDoc,
25+
dbTasks,
26+
logger,
27+
TaskDoc,
28+
wrappedDeleteFromDb
29+
} from '../utils/dbUtils'
30+
31+
/** Used in the retry mechnism for the publisher */
32+
const RETRY_LIMIT = 5
33+
34+
/**
35+
* Begins listening to the 'tasks_publishing' view defined in
36+
* {@link tasksPublishing}. For every new task document received, the
37+
* publisher checks if the action is in progress. If it is, skip the
38+
* processing. If it is not, the publisher will pick up the task by
39+
* executing the push notification action.
40+
*
41+
* If the action is marked as repeatable, the publisher will then mark
42+
* all {@link ActionEffect}s as completed so that 'task_listening'
43+
* view can pick the task up again for processing.
44+
*
45+
* @returns {Promise<number>} 0 if the connection is closed.
46+
*/
47+
export const runPushPublisher = async (): Promise<number> => {
48+
for await (const doc of viewToStream(async params =>
49+
dbTasks.view('tasks_publishing', 'tasks_publishing', params)
50+
)) {
51+
const clean: TaskDoc = asTaskDoc(doc)
52+
const currentTask = clean.doc
53+
if (!canExecute(currentTask)) continue
54+
55+
// Set the action of the task as in progress
56+
// If this process fails, we stop processing the current task
57+
await signalActionStarted(currentTask)
58+
if (currentTask.action.inProgress === false) continue
59+
60+
// Send notification to the devices
61+
await handlePushNotification(currentTask)
62+
63+
// Perform chores after the notification has been sent
64+
await handleActionAfterPushingNotification(currentTask)
65+
66+
// Set the action of the task as not in progress
67+
await finishCurrentTaskAction(currentTask)
68+
}
69+
return 0
70+
}
71+
72+
// -----------------------------------------------------------------------------
73+
// Helper functions
74+
// -----------------------------------------------------------------------------
75+
76+
/**
77+
* Determines if the current task from the view is eliglbe for pushing
78+
* notfications to devices.
79+
* @returns Whether the task is eligible for pushing notifications.
80+
*/
81+
const canExecute = (task: Task): boolean => {
82+
return (
83+
task.action.inProgress == null &&
84+
task.action.repeat == null &&
85+
task.action.inProgress === false
86+
)
87+
}
88+
89+
/**
90+
* By setting the action as in progress, and update that change in the
91+
* database, other publishers will not pick up the task.
92+
*
93+
* If the update fails, we stop processing the current task by setting
94+
* its inProgress flag to false. The {@link execute} function will
95+
* skip this task.
96+
*/
97+
const signalActionStarted = async (task: Task): Promise<void> => {
98+
task.action.inProgress = true
99+
await updateInProgress(task, `${task.userId}:${task.taskId}`).catch(_ => {
100+
task.action.inProgress = false
101+
})
102+
}
103+
104+
/**
105+
* Prepares and sends a push notification to the devices identified by tokenIds.
106+
*/
107+
const handlePushNotification = async (task: Task): Promise<void> => {
108+
const { apiKey, title, body, tokenIds } = asPushActionData(task.action.data)
109+
const notificationManager = await createNotificationManager(apiKey)
110+
await sendNotification(
111+
notificationManager,
112+
title,
113+
body,
114+
tokenIds,
115+
task.action.data.additionalData ?? {}
116+
)
117+
}
118+
119+
/**
120+
* Some actions are repeatable. If the action is repeatable, we mark all
121+
* {@link ActionEffect}s as completed so that 'task_listening' view
122+
* can pick the task up again for processing.
123+
*
124+
* Otherwise, we delete the task from the database.
125+
*/
126+
const handleActionAfterPushingNotification = async (
127+
task: Task
128+
): Promise<void> => {
129+
if (task.action.repeat === true) {
130+
// Reset all action effects as incomplete
131+
task.actionEffects.forEach(actionEffect => {
132+
actionEffect.completed = false
133+
})
134+
} else {
135+
await wrappedDeleteFromDb([task.taskId], task.userId)
136+
}
137+
}
138+
139+
/**
140+
* Setting the action as not in progress, and update that change in the
141+
* database. A retry mechinism is used to minimize the chance for
142+
* leaving a task whose inProgress flag is always true, which prevents
143+
* it from being ever picked up by publishers.
144+
*/
145+
const finishCurrentTaskAction = async (task: Task): Promise<void> => {
146+
// If not a repeatable action, that means the task has been deleted
147+
// from the db. Do nothing.
148+
if (task.action.repeat === false) return
149+
150+
var currentRetry = 0
151+
152+
// Use a while loop to implement a retry mechanism
153+
while (true) {
154+
try {
155+
task.action.inProgress = false
156+
await updateInProgress(task, `${task.userId}:${task.taskId}`)
157+
break
158+
} catch (e) {
159+
if (currentRetry++ > RETRY_LIMIT) {
160+
logger(`Failed to update inProgress flag after ${RETRY_LIMIT} retries`)
161+
break
162+
}
163+
logger(e)
164+
}
165+
}
166+
}

0 commit comments

Comments
 (0)