Skip to content

Commit

Permalink
Merge pull request #5080 from FlowFuse/mqtt-topic-metadata
Browse files Browse the repository at this point in the history
Allow topic metadata to be viewed and edited
  • Loading branch information
knolleary authored Feb 7, 2025
2 parents bafc741 + 3a141cb commit 7ce94e9
Show file tree
Hide file tree
Showing 12 changed files with 569 additions and 170 deletions.
36 changes: 35 additions & 1 deletion forge/db/models/MQTTTopicSchema.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,20 @@ module.exports = {
name: 'MQTTTopicSchema',
schema: {
topic: { type: DataTypes.STRING, allowNull: false },
metadata: { type: DataTypes.TEXT, allowNull: true },
metadata: {
type: DataTypes.TEXT,
allowNull: true,
get () {
const rawValue = this.getDataValue('metadata')
if (rawValue === undefined || rawValue === null) {
return rawValue
}
return JSON.parse(rawValue)
},
set (value) {
this.setDataValue('metadata', JSON.stringify(value))
}
},
inferredSchema: { type: DataTypes.TEXT, allowNull: true }
},
indexes: [
Expand Down Expand Up @@ -68,6 +81,27 @@ module.exports = {
topics: rows
}
},
/**
* Get a single topic based on team/broker/topic
*/
get: async (teamId, brokerId, topicId) => {
if (typeof topicId === 'string') {
topicId = M.MQTTTopicSchema.decodeHashid(topicId)
}
if (typeof teamId === 'string') {
teamId = M.Team.decodeHashid(teamId)
}
if (typeof brokerId === 'string') {
brokerId = M.BrokerCredentials.decodeHashid(brokerId)
}
return this.findOne({
where: {
id: topicId,
TeamId: teamId,
BrokerCredentialsId: brokerId
}
})
},
getTeamBroker: async (teamId, pagination = {}, where = {}) => {
if (typeof teamId === 'string') {
teamId = M.Team.decodeHashid(teamId)
Expand Down
2 changes: 1 addition & 1 deletion forge/db/views/MQTTTopicSchema.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module.exports = {
const cleaned = {
id: result.hashid,
topic: result.topic,
metadata: result.metadata ? JSON.parse(result.metadata) : undefined,
metadata: result.metadata || { },
inferredSchema: result.inferredSchema ? JSON.parse(result.inferredSchema) : undefined
}
return cleaned
Expand Down
100 changes: 44 additions & 56 deletions forge/ee/routes/teamBroker/3rdPartyBroker.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
module.exports = async function (app) {
app.addHook('preHandler', app.verifySession)

app.addHook('preHandler', async (request, reply) => {
if (request.params.teamId !== undefined || request.params.teamSlug !== undefined) {
// let teamId = request.params.teamId
Expand Down Expand Up @@ -410,7 +408,8 @@ module.exports = async function (app) {
reply.code('401').send({ code: 'unauthorized', error: 'unauthorized' })
}
} else {
reply.code('401').send({ code: 'unauthorized', error: 'unauthorized' })
const hasPermission = app.needsPermission('broker:topics:write')
await hasPermission(request, reply) // hasPermission sends the error response if required which stops the request
}
}
],
Expand All @@ -424,13 +423,6 @@ module.exports = async function (app) {
brokerId: { type: 'string' }
}
},
body: {
type: 'object',
properties: {

},
additionalProperties: true
},
response: {
201: {
type: 'object',
Expand All @@ -456,22 +448,32 @@ module.exports = async function (app) {
// Get the placeholder creds object id used for team brokers
brokerId = app.settings.get('team:broker:creds')
}
const topics = Object.keys(request.body)
topics.forEach(async topic => {
const type = request.body[topic].type
try {
await app.db.models.MQTTTopicSchema.upsert({
topic,
let body = request.body
if (!Array.isArray(body)) {
body = [body]
}
body.forEach(async topicInfo => {
if (topicInfo.topic) {
const topicObj = {
topic: topicInfo.topic,
BrokerCredentialsId: brokerId,
TeamId: teamId,
inferredSchema: JSON.stringify(type)
}, {
fields: ['inferredSchema'],
conflictFields: ['topic', 'TeamId', 'BrokerCredentialsId']
})
} catch (err) {
// reply.status(500).send({ error: 'unknown_erorr', message: err.toString() })
// return
TeamId: teamId
}
if (Object.hasOwn(topicInfo, 'type')) {
topicObj.inferredSchema = JSON.stringify(topicInfo.type)
}
if (Object.hasOwn(topicInfo, 'metadata')) {
topicObj.metadata = topicInfo.metadata
}
try {
await app.db.models.MQTTTopicSchema.upsert(topicObj, {
fields: ['inferredSchema', 'metadata'],
conflictFields: ['topic', 'TeamId', 'BrokerCredentialsId']
})
} catch (err) {
// reply.status(500).send({ error: 'unknown_erorr', message: err.toString() })
// return
}
}
})
reply.status(201).send({})
Expand Down Expand Up @@ -513,30 +515,25 @@ module.exports = async function (app) {
}
}
}, async (request, reply) => {
const topic = await app.db.models.MQTTTopicSchema.byId(request.params.topicId)
let brokerId = request.params.brokerId
if (brokerId === 'team-broker') {
brokerId = app.settings.get('team:broker:creds')
}
const topic = await app.db.models.MQTTTopicSchema.get(request.params.teamId, brokerId, request.params.topicId)
if (topic) {
if (topic.Team.hashid === request.params.teamId) {
topic.metadata = JSON.stringify(request.body.metadata)
if (request.body.metadata) {
topic.metadata = request.body.metadata
await topic.save()
// need view to clean up
const response = topic.toJSON()
if (response.metadata) {
response.metadata = JSON.parse(response.metadata)
}
response.id = response.hashid
delete response.hashid
reply.status(201).send(response)
} else {
reply.code('401').send({ code: 'unauthorized', error: 'unauthorized' })
}
reply.status(201).send(app.db.views.MQTTTopicSchema.clean(topic))
} else {
reply.status(404).send({ code: 'not_found', error: 'not found' })
}
})

/**
* Modify Topic metadata from a 3rd Party Broker
* @name /api/v1/teams/:teamId/broker/:brokerId/topics/:topicId
* Delete a topic entry
* @name /api/v1/teams/:teamId/broker/:brokerId/topics/*
* @static
* @memberof forge.routes.api.team.broker
*/
Expand Down Expand Up @@ -569,23 +566,14 @@ module.exports = async function (app) {
}
}
}, async (request, reply) => {
const topic = await app.db.models.MQTTTopicSchema.byId(request.params.topicId)
let brokerId = request.params.brokerId
if (brokerId === 'team-broker') {
brokerId = app.settings.get('team:broker:creds')
}
const topic = await app.db.models.MQTTTopicSchema.get(request.params.teamId, brokerId, request.params.topicId)
if (topic) {
if (topic.Team.hashid === request.params.teamId) {
// need to check if topic belongs to broker requested
if (request.params.brokerId !== 'team-broker' && topic.BrokerCredentials.hashid === request.params.brokerId) {
await topic.destroy()
reply.status(201).send({})
return
} else if (request.params.brokerId === 'team-broker' && topic.BrokerCredentialsId === app.settings.get('team:broker:creds')) {
await topic.destroy()
reply.status(201).send({})
return
}
reply.code('401').send({ code: 'unauthorized', error: 'unauthorized' })
} else {
reply.code('401').send({ code: 'unauthorized', error: 'unauthorized' })
}
await topic.destroy()
reply.status(201).send({})
} else {
reply.status(404).send({ code: 'not_found', error: 'not found' })
}
Expand Down
2 changes: 0 additions & 2 deletions forge/ee/routes/teamBroker/index.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
const schemaApi = require('./schema')

module.exports = async function (app) {
app.addHook('preHandler', app.verifySession)

app.addHook('preHandler', async (request, reply) => {
if (request.params.teamId !== undefined || request.params.teamSlug !== undefined) {
// let teamId = request.params.teamId
Expand Down
113 changes: 92 additions & 21 deletions forge/ee/routes/teamBroker/schema.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,50 @@
const YAML = require('yaml')
module.exports = async function (app) {
app.get('/team-broker/schema.yml', async (request, reply) => {
const topics = await app.db.models.MQTTTopicSchema.getTeamBroker(request.team.hashid)
const list = topics.topics.map(t => t.topic)
list.sort()
app.addHook('preHandler', async (request, reply) => {
if (request.params.teamId !== undefined || request.params.teamSlug !== undefined) {
// let teamId = request.params.teamId
if (request.params.teamSlug) {
// If :teamSlug is provided, need to lookup the team to get
// its id for subsequent checks
request.team = await app.db.models.Team.bySlug(request.params.teamSlug)
if (!request.team) {
reply.code(404).send({ code: 'not_found', error: 'Not Found' })
return
}
// teamId = request.team.hashid
}

if (!request.team) {
// For a :teamId route, we can now lookup the full team object
request.team = await app.db.models.Team.byId(request.params.teamId)
if (!request.team) {
reply.code(404).send({ code: 'not_found', error: 'Not Found' })
return
}

const teamType = await request.team.getTeamType()
if (!teamType.getFeatureProperty('teamBroker', false)) {
reply.code(404).send({ code: 'not_found', error: 'Not Found' })
return // eslint-disable-line no-useless-return
}
}

if (request.params.brokerId && request.params.brokerId !== 'team-broker') {
request.broker = await app.db.models.BrokerCredentials.byId(request.params.brokerId)
if (!request.broker) {
reply.code(404).send({ code: 'not_found', error: 'Not Found' })
return // eslint-disable-line no-useless-return
}
}
}
if (!request.teamMembership && request.session.User) {
request.teamMembership = await request.session.User.getTeamMembership(request.team.id)
}
})

app.get('/:brokerId/schema.yml', {
preHandler: app.needsPermission('broker:topics:list')
}, async (request, reply) => {
const schema = {
asyncapi: '3.0.0',
info: {
Expand All @@ -12,33 +53,63 @@ module.exports = async function (app) {
description: 'An auto-generated schema of the topics being used on the team broker'
}
}
// Add the team-broker details

// Figure out the hostname for the team broker
let teamBrokerHost = app.config.broker?.teamBroker?.host
if (!teamBrokerHost) {
// No explict value set, default to broker.${domain}
if (app.config.domain) {
teamBrokerHost = `broker.${app.config.domain}`
let topics
const isTeamBroker = request.params.brokerId === 'team-broker'
if (isTeamBroker) {
schema.info.title = `${request.team.name} Team Broker`
schema.info.description = 'An auto-generated schema of the topics being used on the team broker'
topics = await app.db.models.MQTTTopicSchema.getTeamBroker(request.team.hashid)
// Figure out the hostname for the team broker
let teamBrokerHost = app.config.broker?.teamBroker?.host
if (!teamBrokerHost) {
// No explict value set, default to broker.${domain}
if (app.config.domain) {
teamBrokerHost = `broker.${app.config.domain}`
}
}
}
if (teamBrokerHost) {
if (teamBrokerHost) {
schema.servers = {
'team-broker': {
host: teamBrokerHost,
protocol: 'mqtt',
security: [{
type: 'userPassword'
}]
}
}
}
} else {
schema.info.title = `${request.broker.name}`
schema.info.description = `An auto-generated schema of the topics being used on the '${request.broker.name}' broker`
topics = await app.db.models.MQTTTopicSchema.byBroker(request.broker.id)

schema.servers = {
'team-broker': {
host: teamBrokerHost,
protocol: 'mqtt',
security: [{
[request.broker.name]: {
host: request.broker.host + ':' + request.broker.port,
protocol: 'mqtt'
}
}
if (request.broker.credentials) {
const creds = JSON.parse(request.broker.credentials)
if (creds.username && creds.password) {
schema.servers[request.broker.name].security = [{
type: 'userPassword'
}]
}
}
}

if (list.length > 0) {
const topicList = topics.topics
topicList.sort((A, B) => A.topic.localeCompare(B.topic))
if (topicList.length > 0) {
schema.channels = {}
list.forEach(topic => {
schema.channels[topic] = {
address: topic
topicList.forEach(topicObj => {
schema.channels[topicObj.topic] = {
address: topicObj.topic
}
if (topicObj.metadata?.description) {
schema.channels[topicObj.topic].description = topicObj.metadata?.description
}
})
}
Expand Down
13 changes: 12 additions & 1 deletion frontend/src/api/broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ const getBrokerTopics = (teamId, brokerId) => {
.then(res => res.data)
}

const addBrokerTopic = (teamId, brokerId, payload) => {
return client.post(`/api/v1/teams/${teamId}/brokers/${brokerId}/topics`, payload)
.then(res => res.data)
}
const updateBrokerTopic = (teamId, brokerId, topicId, payload) => {
return client.put(`/api/v1/teams/${teamId}/brokers/${brokerId}/topics/${topicId}`, payload)
.then(res => res.data)
}

export default {
getClients,
getClient,
Expand All @@ -69,5 +78,7 @@ export default {
createBroker,
updateBroker,
deleteBroker,
getBrokerTopics
getBrokerTopics,
addBrokerTopic,
updateBrokerTopic
}
Loading

0 comments on commit 7ce94e9

Please sign in to comment.