-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaccountProjector.js
161 lines (145 loc) · 5.33 KB
/
accountProjector.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/**
* @fileoverview MongoDB projector for an account
* @author Joey Whelan <[email protected]>
*/
/*jshint esversion: 6 */
'use strict';
'use esversion 6';
const os = require('os');
const EventStoreClient = require('./eventStoreClient');
const logger = require('./accountProjectorLogger');
const MongoClient = require('mongodb').MongoClient;
const REDIS_PORT = 6379;
const REDIS_HOST = 'localhost';
const STREAM_NAME = 'accountStream';
const CONSUMER_NAME = 'accountProjector:' + os.hostname() + '_' + process.pid;
const CONNECTION_URL = 'mongodb://admin:mongo@localhost:27017';
const DB_NAME = 'accountDB';
const COLLECTION = 'accountCollection';
const READ_INTERVAL = 10000; //10 seconds, interval for checking subscription for new events
const PENDING_INTERVAL = 30000; //30 seconds, interval on checking the pending queue of events
const esClient = new EventStoreClient(REDIS_PORT, REDIS_HOST, READ_INTERVAL);
/**
* Helper function that chains promises to update a MongoDB and submit a Redis Ack for an event
* @param {string} accountId - unique id of account
* @param {string} timestamp - timestamp of the event
* @param {object} db - MongoDB database connection
* @param {object} query - query object for the db update
* @param {object} value - value object for the db update
* @return {promise}
*/
function update(accountId, timestamp, db, query, value, upsert) {
logger.debug(`AccountProjector, update - accountId:${accountId}, timestamp:${timestamp}, query:${JSON.stringify(query)}, value:${JSON.stringify(value)}`);
return db.collection(COLLECTION).updateOne(query, value, {'upsert' : upsert})
.then((result) => {
return esClient.ack(STREAM_NAME, timestamp);
})
.catch(err => {
if (err.code === 11000) { //covers a MongoDB bug. scenario is two more simulatneous attempts at an update with
//upsert = true on the same unique id.
console.log('11000 error');
return update(accountId, timestamp, db, query, value, false);
}
else {
throw err;
}
});
}
/**
* Function called from an event emitter that serves as subscriber to a Redis stream. Each event represents
* a message to a Redis stream for an account. The function determines the event type, updates the fund value accordingly,
* and submits the update to a MongoDB + Ack's the Redis stream for that message.
* @param {array} eventList - array of event objects
* @return none
*/
function eventHandler(eventList) {
logger.debug(`AccountProjector, eventHandler - number of events received:${eventList.length}`);
let connection;
MongoClient.connect(CONNECTION_URL, {'useNewUrlParser' : true})
.then((result) => {
connection = result;
const db = connection.db(DB_NAME);
let promises = [];
eventList.forEach((event) => {
let amount;
switch (event.type) {
case 'create':
amount = 0;
break;
case 'deposit':
amount = event.amount;
break;
case 'withdraw':
amount = -event.amount;
break;
}
//this query filters to the particular account id AND ensures there's not been update made with this
//timestamp twice
let query = {$and: [{'_id':event.id}, {'timestamps':{$not: {$elemMatch:{$eq: event.timestamp}}}}]};
//increment the funds and add the timestamp to the array of completed events
let value = { $inc : {'funds' : amount}, $addToSet : {'timestamps' : event.timestamp}};
promises.push(update(event.id, event.timestamp, db, query, value, true));
});
return Promise.all(promises);
})
.then((results) => {
logger.debug(`AccountProjector, eventHandler - number of events handled:${results.length}`);
})
.catch((err) => {
logger.error(`AccountProjector, eventHandler - ${err}`);
throw err;
})
.finally(_ => {
if (connection) {
connection.close();
}
});
}
/**
* Function for fetching all the events in the pending queue. These would be events that are stuck
* in processing due to some failure. This is called out of a setInterval function with the interval being
* the value of PENDING_INTERVAL
* @return {array} - array of event objects
*/
function processPending() {
//fetch those events that have aged = PENDING_INTERVAL in the pending queue
logger.debug(`AccountProjector, processPending`);
esClient.getPending(STREAM_NAME, CONSUMER_NAME, PENDING_INTERVAL)
.then((eventList) => {
logger.debug(`AccountProjector, processPending - eventList.length:${eventList.length}`);
eventHandler(eventList);
})
.catch((err) => {
logger.error(`AccountProjector, processPending - ${err}`);
throw err;
});
}
/** @desc Account projector */
module.exports = class AccountProjector {
constructor() {
logger.debug(`AccountProjector constructor `);
//sets up a timer to check the pending queue
this._interval = setInterval(processPending, PENDING_INTERVAL);
}
/**
* Function for releasing resources (Redis connection and time interval object
*/
close() {
logger.debug(`AccountProjector.close`);
clearInterval(this._interval);
esClient.close();
}
/**
* Function creates Redis connection through eventStoreClient and set ups an event listener
* for Redis stream events
*/
connect() {
logger.debug(`AccountProjector.connect`);
esClient.connect();
this._emitter = esClient.subscribe(STREAM_NAME, CONSUMER_NAME);
this._emitter.on('event', eventHandler);
}
};
const AccountProjector = require('./accountProjector');
const projector = new AccountProjector();
projector.connect();