Skip to content

Add support for sse #48

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ The Elasticsearch MCP Server supports configuration options to connect to your E

| Environment Variable | Description | Required |
|---------------------|-------------|----------|
| `SSE_ADDR` | Enable SSE and set the ADDR | No |
| `ES_URL` | Your Elasticsearch instance URL | Yes |
| `ES_API_KEY` | Elasticsearch API key for authentication | No |
| `ES_USERNAME` | Elasticsearch username for basic authentication | No |
Expand Down
124 changes: 92 additions & 32 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,24 @@
* SPDX-License-Identifier: Apache-2.0
*/

import express, { Request, Response } from "express";
import morgan from "morgan";
import { z } from "zod";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { Client, estypes, ClientOptions } from "@elastic/elasticsearch";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import { Client, ClientOptions, estypes } from "@elastic/elasticsearch";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import fs from "fs";

// Configuration schema with auth options
const ConfigSchema = z
.object({
sse_addr: z
.string()
.trim()
.optional()
.describe("Address for SSE server (optional)"),

url: z
.string()
.trim()
Expand Down Expand Up @@ -65,13 +74,13 @@ const ConfigSchema = z
message:
"Either ES_API_KEY or both ES_USERNAME and ES_PASSWORD must be provided, or no auth for local development",
path: ["username", "password"],
}
},
);

type ElasticsearchConfig = z.infer<typeof ConfigSchema>;

export async function createElasticsearchMcpServer(
config: ElasticsearchConfig
config: ElasticsearchConfig,
) {
const validatedConfig = ConfigSchema.parse(config);
const { url, apiKey, username, password, caCert } = validatedConfig;
Expand All @@ -96,7 +105,7 @@ export async function createElasticsearchMcpServer(
console.error(
`Failed to read certificate file: ${
error instanceof Error ? error.message : String(error)
}`
}`,
);
}
}
Expand Down Expand Up @@ -140,7 +149,7 @@ export async function createElasticsearchMcpServer(
console.error(
`Failed to list indices: ${
error instanceof Error ? error.message : String(error)
}`
}`,
);
return {
content: [
Expand All @@ -153,7 +162,7 @@ export async function createElasticsearchMcpServer(
],
};
}
}
},
);

// Tool 2: Get mappings for an index
Expand Down Expand Up @@ -181,19 +190,21 @@ export async function createElasticsearchMcpServer(
},
{
type: "text" as const,
text: `Mappings for index ${index}: ${JSON.stringify(
mappingResponse[index]?.mappings || {},
null,
2
)}`,
text: `Mappings for index ${index}: ${
JSON.stringify(
mappingResponse[index]?.mappings || {},
null,
2,
)
}`,
},
],
};
} catch (error) {
console.error(
`Failed to get mappings: ${
error instanceof Error ? error.message : String(error)
}`
}`,
);
return {
content: [
Expand All @@ -206,7 +217,7 @@ export async function createElasticsearchMcpServer(
],
};
}
}
},
);

// Tool 3: Search an index with simplified parameters
Expand All @@ -233,10 +244,10 @@ export async function createElasticsearchMcpServer(
},
{
message: "queryBody must be a valid Elasticsearch query DSL object",
}
},
)
.describe(
"Complete Elasticsearch query DSL object that can include query, size, from, sort, etc."
"Complete Elasticsearch query DSL object that can include query, size, from, sort, etc.",
),
},
async ({ index, queryBody }) => {
Expand All @@ -257,9 +268,11 @@ export async function createElasticsearchMcpServer(
if (indexMappings.properties) {
const textFields: Record<string, estypes.SearchHighlightField> = {};

for (const [fieldName, fieldData] of Object.entries(
indexMappings.properties
)) {
for (
const [fieldName, fieldData] of Object.entries(
indexMappings.properties,
)
) {
if (fieldData.type === "text" || "dense_vector" in fieldData) {
textFields[fieldName] = {};
}
Expand All @@ -285,9 +298,11 @@ export async function createElasticsearchMcpServer(

for (const [field, highlights] of Object.entries(highlightedFields)) {
if (highlights && highlights.length > 0) {
content += `${field} (highlighted): ${highlights.join(
" ... "
)}\n`;
content += `${field} (highlighted): ${
highlights.join(
" ... ",
)
}\n`;
}
}

Expand Down Expand Up @@ -319,7 +334,7 @@ export async function createElasticsearchMcpServer(
console.error(
`Search failed: ${
error instanceof Error ? error.message : String(error)
}`
}`,
);
return {
content: [
Expand All @@ -332,7 +347,7 @@ export async function createElasticsearchMcpServer(
],
};
}
}
},
);

// Tool 4: Get shard information
Expand Down Expand Up @@ -383,7 +398,7 @@ export async function createElasticsearchMcpServer(
console.error(
`Failed to get shard information: ${
error instanceof Error ? error.message : String(error)
}`
}`,
);
return {
content: [
Expand All @@ -396,36 +411,81 @@ export async function createElasticsearchMcpServer(
],
};
}
}
},
);

return server;
}

const config: ElasticsearchConfig = {
sse_addr: process.env.SSE_ADDR || "",
url: process.env.ES_URL || "",
apiKey: process.env.ES_API_KEY || "",
username: process.env.ES_USERNAME || "",
password: process.env.ES_PASSWORD || "",
caCert: process.env.ES_CA_CERT || "",
};

async function create_sse_server(server: any) {
const app = express();

// Use morgan to log every request
app.use(morgan("combined"));

const transports: { [sessionId: string]: SSEServerTransport } = {};

app.get("/sse", async (_: Request, res: Response) => {
const transport = new SSEServerTransport("/messages", res);
transports[transport.sessionId] = transport;
res.on("close", () => {
delete transports[transport.sessionId];
});
await server.connect(transport);
});

app.post("/messages", async (req: Request, res: Response) => {
const sessionId = req.query.sessionId as string;
const transport = transports[sessionId];
if (transport) {
await transport.handlePostMessage(req, res);
} else {
res.status(400).send("No transport found for sessionId");
}
});

const sseAddr = process.env.SSE_ADDR || "127.0.0.1:3000";
const [host, port] = sseAddr.split(":");
if (!port) {
console.error("Invalid SSE_ADDR format. Expected 'host:port'.");
process.exit(1);
}

app.listen(Number(port), host, () => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest we need a way to handle termination signals in SSE mode gracefully by stopping the SSE server properly and disconnect any active SSEServerTransport instances and clean up the transports object.

console.info(`SSE server started on: ${host}:${port}`);
});
}

async function main() {
const transport = new StdioServerTransport();
console.info("Starting Elasticsearch MCP server...");
const server = await createElasticsearchMcpServer(config);

await server.connect(transport);
if (process.env.SSE_ADDR) {
await create_sse_server(server);
} else {
const transport = new StdioServerTransport();
await server.connect(transport);

process.on("SIGINT", async () => {
await server.close();
process.exit(0);
});
process.on("SIGINT", async () => {
await server.close();
process.exit(0);
});
}
}

main().catch((error) => {
console.error(
"Server error:",
error instanceof Error ? error.message : String(error)
error instanceof Error ? error.message : String(error),
);
process.exit(1);
});
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@
},
"dependencies": {
"@elastic/elasticsearch": "^8.17.1",
"@modelcontextprotocol/sdk": "1.9.0"
"@modelcontextprotocol/sdk": "1.9.0",
"@types/express": "^5.0.1",
"@types/morgan": "^1.9.9",
"express": "^5.1.0",
"morgan": "^1.10.0"
},
"engines": {
"node": ">=18"
Expand Down
Loading