Skip to content

Commit

Permalink
swap from postgres to elastic for analytics
Browse files Browse the repository at this point in the history
  • Loading branch information
Dougley committed Feb 7, 2022
1 parent e23fc7c commit 2b91159
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 51 deletions.
80 changes: 73 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"typescript": "^4.5.5"
},
"dependencies": {
"@elastic/elasticsearch": "^7.17.0",
"@sentry/integrations": "^6.16.1",
"@sentry/node": "^6.16.1",
"@thesharks/jagtag-js": "^2.0.0",
Expand Down
19 changes: 19 additions & 0 deletions src/database/migrations/exec/1644249527-remove_analytics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import driver from '../../driver'

export async function up (db: typeof driver): Promise<void> {
await db`DROP TABLE IF EXISTS analytics;`
}

export async function down (db: typeof driver): Promise<void> {
await db`
CREATE TABLE IF NOT EXISTS analytics (
id UUID NOT NULL PRIMARY KEY,
timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT NOW(),
type INTEGER NOT NULL,
guild_id BIGINT,
user_id BIGINT NOT NULL,
data JSONB NOT NULL,
name TEXT NOT NULL
);
`
}
43 changes: 13 additions & 30 deletions src/interactions/base.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,23 @@
import { randomUUID } from 'crypto'
import { Constants, Interaction, Structures } from 'detritus-client'
import db from '../database/driver'
import { add, flush } from '../utils/elastic'
import { translate } from '../utils/i18n'
import { error } from '../utils/logger'
const { ApplicationCommandTypes, ApplicationCommandOptionTypes, MessageFlags, Permissions } = Constants

export class BaseInteractionCommand<ParsedArgsFinished = Interaction.ParsedArgs> extends Interaction.InteractionCommand<ParsedArgsFinished> {
async onSuccess (context: Interaction.InteractionContext, args: ParsedArgsFinished): Promise<void> {
await db`
INSERT INTO analytics (
id,
timestamp,
type,
guild_id,
user_id,
data,
name
) VALUES (
${randomUUID()},
NOW(),
${context.interaction.type},
${context.guildId ?? 0},
${context.user.id},
${JSON.stringify(args)},
${context.command.name}
);
`
add({
type: 'command',
guildId: context.guildId,
inDm: context.inDm,
userId: context.user.id,
data: {
args,
interaction_id: context.interactionId
},
name: context.command.name
})
await flush()
}

async onDmBlocked (context: Interaction.InteractionContext): Promise<unknown> {
Expand Down Expand Up @@ -84,15 +76,6 @@ export class BaseInteractionCommand<ParsedArgsFinished = Interaction.ParsedArgs>
flags: MessageFlags.EPHEMERAL
})
}

async safeReply (context: Interaction.InteractionContext, message: string | Structures.InteractionEditOrRespond): Promise<any> {
return await context.editOrRespond({
...((typeof message === 'string') ? { content: message } : message),
allowedMentions: {
parse: []
}
})
}
}

export class BaseCommandOption<ParsedArgsFinished = Interaction.ParsedArgs> extends Interaction.InteractionCommandOption<ParsedArgsFinished> {
Expand Down
14 changes: 0 additions & 14 deletions src/jobs/analytics-prune.cluster.ts

This file was deleted.

13 changes: 13 additions & 0 deletions src/jobs/elastic-flush.client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { ScheduledJob } from './base'
import { flush } from '../utils/elastic'

const job = new ScheduledJob('analytics-prune')
// every hour
.setInterval(60 * 60 * 1000)
.setExec(async () => {
await flush()
})

if (process.env.ELASTIC_URL !== undefined) job.start()

export default job
60 changes: 60 additions & 0 deletions src/utils/elastic.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { Client } from '@elastic/elasticsearch'
import { BulkStats } from '@elastic/elasticsearch/lib/Helpers'
import { format } from 'date-fns'
import discord from '../structures/client'
import { debug, trace, warn } from './logger'

const storage = new Set()

export function add (data: any): number {
data['@timestamp'] = new Date().toISOString()
storage.add(data)
return storage.size
}

export function drop (): number {
storage.clear()
return storage.size
}

export async function * drain (): AsyncGenerator<any> {
for (const data of storage) {
storage.delete(data)
yield data
}
}

export async function flush (): Promise<BulkStats> {
if (process.env.ELASTIC_URL === undefined) throw new Error('Trying to flush to Elasticsearch, but no URL is set.')
debug('Flushing elasticsearch', 'Elastic')
const client = new Client({
node: process.env.ELASTIC_URL,
// auth can be either be a b64 encoded api key, or basic auth incorporated into the URL
...(process.env.ELASTIC_API_KEY !== undefined
? {
auth: {
apiKey: process.env.ELASTIC_API_KEY
}
}
: {})
})
const result = await client.helpers.bulk({
datasource: drain(),
onDocument (doc) {
return {
create: {
_index: `wildbeast-${discord.client.applicationId}-${format(new Date(), 'yyyy-MM-dd')}`
}
}
}
})
debug(`Flushed ${result.successful} analytics documents to Elastic, ${result.failed} failed`, 'Elastic')
trace(result, 'Elastic')
if (result.failed > 0) {
warn(`Failed to flush ${result.failed} analytics documents to Elastic`, 'Elastic')
}
// Persistent connection to elastic isn't necessary, close it
// we recreate the client on the next flush anyway
await client.close()
return result
}

0 comments on commit 2b91159

Please sign in to comment.