forked from Chrono-Tech/middleware-eth-blockprocessor
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.js
More file actions
147 lines (116 loc) · 4.43 KB
/
index.js
File metadata and controls
147 lines (116 loc) · 4.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
/**
* Copyright 2017–2018, LaborX PTY
* Licensed under the AGPL Version 3 license.
*/
/**
* Middleware service for handling emitted events on chronobank platform
* @module Chronobank/eth-blockprocessor
*/
const mongoose = require('mongoose'),
config = require('./config'),
MasterNodeService = require('./services/MasterNodeService'),
Promise = require('bluebird');
mongoose.Promise = Promise;
mongoose.connect(config.mongo.data.uri, {useMongoClient: true});
mongoose.accounts = mongoose.createConnection(config.mongo.accounts.uri, {useMongoClient: true});
const _ = require('lodash'),
BlockWatchingService = require('./services/blockWatchingService'),
SyncCacheService = require('./services/syncCacheService'),
bunyan = require('bunyan'),
Web3 = require('web3'),
net = require('net'),
amqp = require('amqplib'),
log = bunyan.createLogger({name: 'app'}),
filterTxsByAccountService = require('./services/filterTxsByAccountService');
[mongoose.accounts, mongoose.connection].forEach(connection =>
connection.on('disconnected', function () {
log.error('mongo disconnected!');
process.exit(0);
})
);
const init = async () => {
const web3s = config.web3.providers.map((providerURI) => {
const provider = /^http/.test(providerURI) ?
new Web3.providers.HttpProvider(providerURI) :
new Web3.providers.IpcProvider(`${/^win/.test(process.platform) ? '\\\\.\\pipe\\' : ''}${providerURI}`, net);
const web3 = new Web3();
web3.setProvider(provider);
if (_.has(web3, 'currentProvider.connection.on')) {
web3.currentProvider.connection.on('end', async () => {
await Promise.delay(5000);
web3.reset();
});
web3.currentProvider.connection.on('error', async () => {
await Promise.delay(5000);
web3.reset();
});
}
return web3;
});
let amqpInstance = await amqp.connect(config.rabbit.url)
.catch(() => {
log.error('rabbitmq process has finished!');
process.exit(0);
});
let channel = await amqpInstance.createChannel();
channel.on('close', () => {
log.error('rabbitmq process has finished!');
process.exit(0);
});
await channel.assertExchange('events', 'topic', {durable: false});
const masterNodeService = new MasterNodeService(channel, (msg) => log.info(msg));
await masterNodeService.start();
const syncCacheService = new SyncCacheService(web3s);
let blockEventCallback = async block => {
log.info(`${block.hash} (${block.number}) added to cache.`);
await channel.publish('events', `${config.rabbit.serviceName}_block`, new Buffer(JSON.stringify({block: block.number})));
const filteredTxs = await filterTxsByAccountService(block.transactions);
for (let tx of filteredTxs) {
let addresses = _.chain([tx.to, tx.from])
.union(tx.logs.map(log => log.address))
.uniq()
.value();
for (let address of addresses)
await channel.publish('events', `${config.rabbit.serviceName}_transaction.${address}`, new Buffer(JSON.stringify(tx)));
}
};
let txEventCallback = async tx => {
const data = await filterTxsByAccountService([tx]);
for (let filteredTx of data) {
let addresses = _.chain([filteredTx.to, filteredTx.from])
.uniq()
.value();
filteredTx = _.omit(filteredTx, ['blockHash', 'transactionIndex']);
filteredTx.blockNumber = -1;
for (let address of addresses)
await channel.publish('events', `${config.rabbit.serviceName}_transaction.${address}`, new Buffer(JSON.stringify(filteredTx)));
}
};
syncCacheService.events.on('block', blockEventCallback);
let endBlock = await syncCacheService.start()
.catch((err) => {
if (_.get(err, 'code') === 0) {
log.info('nodes are down or not synced!');
process.exit(0);
}
log.error(err);
});
await new Promise((res) => {
if (config.sync.shadow)
return res();
syncCacheService.events.on('end', () => {
log.info(`cached the whole blockchain up to block: ${endBlock}`);
res();
});
});
let blockWatchingService = new BlockWatchingService(web3s, endBlock);
blockWatchingService.events.on('block', blockEventCallback);
blockWatchingService.events.on('tx', txEventCallback);
await blockWatchingService.startSync().catch(err => {
if (_.get(err, 'code') === 0) {
log.error('no connections available or blockchain is not synced!');
process.exit(0);
}
});
};
module.exports = init();