-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpino-mq.js
133 lines (116 loc) · 3.25 KB
/
pino-mq.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env node
/* eslint-disable no-console */
'use strict';
const path = require('path');
// const { PassThrough } = require('stream');
const fs = require('fs');
const split = require('split2');
const pump = require('pump');
const nopt = require('nopt');
const through = require('through2');
const pkgInfo = require('./package.json');
const defaultOptions = {
type: 'RABBITMQ',
uri: null,
exchange: '',
queue: null,
queuePattern: null,
queueMap: null,
fields: null,
config: null,
};
const longOptions = {
type: ['RABBITMQ'],
uri: String,
exchange: String,
queue: String,
queuePattern: String,
fields: String,
config: String,
help: Boolean,
version: Boolean,
generateConfig: Boolean,
};
const shortOptions = {
t: '--type',
u: '--uri',
e: '--exchange',
q: '--queue',
qp: '--queuePattern',
f: '--fields',
c: '--config',
h: '--help',
v: '--version',
g: '--generateConfig',
};
const argv = nopt(longOptions, shortOptions, process.argv);
const configOptions = Object.assign({}, defaultOptions, argv);
if (configOptions.version) {
console.log(pkgInfo.version);
process.exit(0);
}
if (configOptions.help) {
console.log(fs.readFileSync(path.join(__dirname, 'help.txt'), 'utf8'));
process.exit(0);
}
if (configOptions.generateConfig) {
const cfgSample = JSON.stringify({
type: 'RABBITMQ',
uri: 'amqp://guest:guest@localhost/',
exchange: '',
queue: 'pino-mq',
queuePattern: null,
queueMap: null,
fields: [],
}, null, ' ');
fs.writeFileSync('pino-mq.json', cfgSample);
console.log('Configuration is written in file "pino-mq.json"');
console.log('You can use now:');
console.log('\n\nnode script.js | pino-mq -c pino-mq.json\n\n');
process.exit(0);
}
if (configOptions.fields && (configOptions.fields.length !== 0)) {
configOptions.fields = configOptions.fields.split(',');
}
if (configOptions.config !== null) {
try {
// eslint-disable-next-line import/no-dynamic-require, global-require
const cfgFile = require(path.resolve(configOptions.config));
Object.keys(configOptions).map((key) => {
if (configOptions[key]) {
return null;
}
if (cfgFile[key]) {
configOptions[key] = cfgFile[key];
}
return null;
});
} catch (e) {
console.log(`Error loading config file: ${e.message}`);
process.exit(1);
}
}
if (configOptions.uri === null) {
console.log('You must specify connection uri');
process.exit(1);
}
// eslint-disable-next-line import/no-dynamic-require
const getMqTransport = require(path.join(__dirname, 'index')).getTransport;
const t = getMqTransport({
type: configOptions.type,
transportParams: {
uri: configOptions.uri,
},
exchange: configOptions.exchange,
queue: configOptions.queue,
queuePattern: configOptions.queuePattern,
queueMap: configOptions.queueMap,
fields: configOptions.fields,
});
process.stdin.on('close', t.close.bind(t));
process.on('SIGINT', t.close.bind(t));
process.on('SIGTERM', t.close.bind(t));
pump(
process.stdin,
split(JSON.parse),
through.obj(t.write.bind(t), t.close.bind(t)));