Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion apps/consumers/example.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@ TELEGRAM_BOT_TOKEN=your_telegram_bot_token
ANTICAPTURE_GRAPHQL_ENDPOINT=https://api-gateway/graphql
BLOCKFUL_API_TOKEN=
SUBSCRIPTION_SERVER_URL=http://localhost:3001
API_PORT=3000
API_PORT=3000

# OpenClaw consumer (optional β€” omit to disable)
# OPENCLAW_WEBHOOK_URL=https://your-openclaw-gateway/webhook/notifications
# OPENCLAW_API_KEY=your_api_key
26 changes: 25 additions & 1 deletion apps/consumers/src/app.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { AxiosInstance } from 'axios';
import { TelegramBotService } from './services/bot/telegram-bot.service';
import { SlackBotService } from './services/bot/slack-bot.service';
import { OpenClawBotService } from './services/bot/openclaw-bot.service';
import { SlackDAOService } from './services/dao/slack-dao.service';
import { SlackWalletService } from './services/wallet/slack-wallet.service';
import { TelegramDAOService } from './services/dao/telegram-dao.service';
Expand All @@ -12,12 +13,15 @@ import { SubscriptionAPIService } from './services/subscription-api.service';
import { RabbitMQNotificationConsumerService } from './services/rabbitmq-notification-consumer.service';
import { TelegramClientInterface } from './interfaces/telegram-client.interface';
import { SlackClientInterface } from './interfaces/slack-client.interface';
import { OpenClawClientInterface } from './interfaces/openclaw-client.interface';

export class App {
private telegramBotService: TelegramBotService;
private slackBotService: SlackBotService;
private openclawBotService?: OpenClawBotService;
private rabbitmqTelegramConsumerService?: RabbitMQNotificationConsumerService<TelegramBotService>;
private rabbitmqSlackConsumerService?: RabbitMQNotificationConsumerService<SlackBotService>;
private rabbitmqOpenClawConsumerService?: RabbitMQNotificationConsumerService<OpenClawBotService>;
private rabbitmqUrl: string;

constructor(
Expand All @@ -26,7 +30,8 @@ export class App {
rabbitmqUrl: string,
ensResolver: EnsResolverService,
telegramClient: TelegramClientInterface,
slackClient: SlackClientInterface
slackClient: SlackClientInterface,
openclawClient?: OpenClawClientInterface
) {
const subscriptionApi = new SubscriptionAPIService(subscriptionServerUrl);
const anticaptureClient = new AnticaptureClient(httpClient);
Expand All @@ -53,6 +58,12 @@ export class App {
slackDaoService,
slackWalletService
);

// OpenClaw consumer β€” only initialized when webhook URL is configured
if (openclawClient) {
this.openclawBotService = new OpenClawBotService(openclawClient);
}

this.rabbitmqUrl = rabbitmqUrl;
}

Expand All @@ -70,6 +81,16 @@ export class App {
'slack'
);
console.log('βœ… Slack consumer connected to RabbitMQ');

// OpenClaw consumer β€” only connect to RabbitMQ when configured
if (this.openclawBotService) {
this.rabbitmqOpenClawConsumerService = await RabbitMQNotificationConsumerService.create(
this.rabbitmqUrl,
this.openclawBotService,
'openclaw'
);
console.log('βœ… OpenClaw consumer connected to RabbitMQ');
}

this.telegramBotService.launch();
this.slackBotService.launch();
Expand All @@ -84,6 +105,9 @@ export class App {
if (this.rabbitmqSlackConsumerService) {
await this.rabbitmqSlackConsumerService.stop();
}
if (this.rabbitmqOpenClawConsumerService) {
await this.rabbitmqOpenClawConsumerService.stop();
}
this.telegramBotService.stop('SIGINT');
this.slackBotService.stop('SIGINT');
}
Expand Down
64 changes: 64 additions & 0 deletions apps/consumers/src/clients/openclaw.client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* OpenClaw Client Implementation
* Sends notifications to an OpenClaw agent via webhook HTTP POST.
*
* The webhook endpoint receives JSON payloads and forwards them
* to the configured OpenClaw agent session (e.g., CRA for governance analysis).
*/

import axios, { AxiosInstance } from 'axios';
import { OpenClawClientInterface } from '../interfaces/openclaw-client.interface';

export class OpenClawClient implements OpenClawClientInterface {
private httpClient: AxiosInstance;
private ready: boolean = false;

/**
* @param webhookUrl The OpenClaw webhook endpoint URL
* @param apiKey Optional API key for authentication
*/
constructor(
private readonly webhookUrl: string,
private readonly apiKey?: string
) {
this.httpClient = axios.create({
baseURL: this.webhookUrl,
headers: {
'Content-Type': 'application/json',
...(this.apiKey && { Authorization: `Bearer ${this.apiKey}` }),
},
timeout: 30000,
});
this.ready = true;
}

async sendMessage(
message: string,
metadata?: Record<string, any>
): Promise<string> {
const payload = {
message,
timestamp: new Date().toISOString(),
...(metadata && { metadata }),
};

try {
const response = await this.httpClient.post('', payload);
const responseId =
response.data?.id || response.data?.messageId || `openclaw-${Date.now()}`;
console.log(`[OpenClaw] Message sent successfully: ${responseId}`);
return responseId;
} catch (error: any) {
const status = error.response?.status || 'unknown';
const detail = error.response?.data?.message || error.message;
console.error(
`[OpenClaw] Failed to send message (status=${status}): ${detail}`
);
throw new Error(`OpenClaw webhook failed: ${detail}`);
}
}

isReady(): boolean {
return this.ready;
}
}
7 changes: 6 additions & 1 deletion apps/consumers/src/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ const envSchema = z.object({
RABBITMQ_URL: z.string().url(),
PORT: z.coerce.number().positive().optional().default(3002),
RPC_URL: z.string().optional(),
// OpenClaw consumer (optional β€” omit to disable)
OPENCLAW_WEBHOOK_URL: z.string().url().optional(),
OPENCLAW_API_KEY: z.string().optional(),
});

export function loadConfig() {
Expand All @@ -30,6 +33,8 @@ export function loadConfig() {
subscriptionServerUrl: env.SUBSCRIPTION_SERVER_URL,
rabbitmqUrl: env.RABBITMQ_URL,
port: env.PORT,
rpcUrl: env.RPC_URL
rpcUrl: env.RPC_URL,
openclawWebhookUrl: env.OPENCLAW_WEBHOOK_URL,
openclawApiKey: env.OPENCLAW_API_KEY,
} as const;
}
15 changes: 14 additions & 1 deletion apps/consumers/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { loadConfig } from './config/env';
import { EnsResolverService } from './services/ens-resolver.service';
import { TelegramClient } from './clients/telegram.client';
import { SlackClient } from './clients/slack.client';
import { OpenClawClient } from './clients/openclaw.client';

const config = loadConfig();

Expand All @@ -32,6 +33,17 @@ const slackClient = new SlackClient(
config.port
);

// Create OpenClaw client (optional β€” only if webhook URL is configured)
const openclawClient = config.openclawWebhookUrl
? new OpenClawClient(config.openclawWebhookUrl, config.openclawApiKey)
: undefined;

if (openclawClient) {
console.log('🦞 OpenClaw client configured');
} else {
console.log('⚠️ OpenClaw webhook not configured β€” consumer will run in noop mode');
}

// Create and start the application
const app = new App(
config.subscriptionServerUrl,
Expand All @@ -46,7 +58,8 @@ const app = new App(
config.rabbitmqUrl,
ensResolver,
telegramClient,
slackClient
slackClient,
openclawClient
);

(async () => {
Expand Down
20 changes: 20 additions & 0 deletions apps/consumers/src/interfaces/openclaw-client.interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Interface for OpenClaw client operations
* Abstracts the OpenClaw webhook/API to allow for testing and different implementations
*/

export interface OpenClawClientInterface {
/**
* Send a message to the OpenClaw agent
* @param message The message text
* @param metadata Optional metadata to include
* @returns Response identifier
*/
sendMessage(message: string, metadata?: Record<string, any>): Promise<string>;

/**
* Check if the client is connected and ready
* @returns true if client is operational
*/
isReady(): boolean;
}
43 changes: 43 additions & 0 deletions apps/consumers/src/services/bot/openclaw-bot.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* OpenClaw Bot Service
* Handles notification delivery to an OpenClaw agent (e.g., CRA for governance voting).
* Implements BotServiceInterface following the same pattern as Telegram and Slack services.
*
* Unlike Telegram/Slack, this service is notification-only β€” no interactive commands.
*/

import { NotificationPayload } from '../../interfaces/notification.interface';
import { BotServiceInterface } from '../../interfaces/bot-service.interface';
import { OpenClawClientInterface } from '../../interfaces/openclaw-client.interface';

export class OpenClawBotService implements BotServiceInterface {
constructor(private readonly openclawClient: OpenClawClientInterface) {}

/**
* Send a notification to the OpenClaw agent
* Forwards the notification message along with structured metadata
* that the receiving agent can parse for analysis.
*/
async sendNotification(payload: NotificationPayload): Promise<string> {
const metadata: Record<string, any> = {
channel: payload.channel,
userId: payload.userId,
};

// Forward structured metadata when available
if (payload.metadata?.triggerType) metadata.triggerType = payload.metadata.triggerType;
if (payload.metadata?.addresses) metadata.addresses = payload.metadata.addresses;
if (payload.metadata?.transaction) metadata.transaction = payload.metadata.transaction;
if (payload.metadata?.buttons) metadata.buttons = payload.metadata.buttons;

const responseId = await this.openclawClient.sendMessage(
payload.message,
metadata
);

console.log(
`[OpenClaw] Notification delivered for user ${payload.userId}: ${responseId}`
);
return responseId;
}
}
1 change: 1 addition & 0 deletions apps/dispatcher/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export class App {
const notificationFactory = new NotificationClientFactory();
notificationFactory.addClient('telegram', new RabbitMQNotificationService(this.publisher));
notificationFactory.addClient('slack', new RabbitMQNotificationService(this.publisher));
notificationFactory.addClient('openclaw', new RabbitMQNotificationService(this.publisher));
const triggerProcessorService = new TriggerProcessorService();

triggerProcessorService.addHandler(
Expand Down
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ services:
RABBITMQ_URL: amqp://admin:admin@rabbitmq:5672
TOKEN_ENCRYPTION_KEY: ${TOKEN_ENCRYPTION_KEY}
API_PORT_SLACK: 3002
# OpenClaw consumer (optional)
OPENCLAW_WEBHOOK_URL: ${OPENCLAW_WEBHOOK_URL:-}
OPENCLAW_API_KEY: ${OPENCLAW_API_KEY:-}
depends_on:
subscription-server:
condition: service_started
Expand Down
92 changes: 92 additions & 0 deletions docs/openclaw-consumer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# OpenClaw Consumer

A third notification consumer alongside Telegram and Slack that delivers DAO governance proposal notifications to an [OpenClaw](https://openclaw.ai) agent via webhook.

## Purpose

When a new proposal is detected in any monitored DAO, the OpenClaw consumer forwards the notification to a configured OpenClaw agent (e.g., a governance research agent) that can then:

- Analyze the proposal
- Draft voting rationale
- Trigger downstream governance workflows

## Architecture

The OpenClaw consumer follows the exact same pattern as the Telegram and Slack consumers:

```
Logic System β†’ RabbitMQ β†’ Dispatcher β†’ notifications.openclaw.* β†’ OpenClaw Consumer β†’ Webhook POST
```

### New Files

| File | Description |
|------|-------------|
| `apps/consumers/src/interfaces/openclaw-client.interface.ts` | Client interface (mirrors `telegram-client.interface.ts`) |
| `apps/consumers/src/clients/openclaw.client.ts` | HTTP webhook client |
| `apps/consumers/src/services/bot/openclaw-bot.service.ts` | Bot service implementing `BotServiceInterface` (notification-only, no interactive commands) |

### Modified Files

| File | Change |
|------|--------|
| `apps/consumers/src/app.ts` | Wired OpenClaw as third consumer |
| `apps/consumers/src/index.ts` | Creates `OpenClawClient` from env config |
| `apps/consumers/src/config/env.ts` | Added `OPENCLAW_WEBHOOK_URL` + `OPENCLAW_API_KEY` (both optional) |
| `apps/consumers/example.env` | Documented new env vars |
| `apps/dispatcher/src/app.ts` | Added `openclaw` channel to notification factory |
| `docker-compose.yml` | Passes env vars to consumers container |

## Configuration

### Environment Variables

| Variable | Required | Description |
|----------|----------|-------------|
| `OPENCLAW_WEBHOOK_URL` | No | The webhook endpoint URL for the OpenClaw agent. If omitted, the consumer runs in noop mode (no impact on Telegram/Slack). |
| `OPENCLAW_API_KEY` | Yes (when URL is set) | API key sent as `Authorization: Bearer <key>` header. The receiving endpoint rejects requests without a valid key. |

### Webhook Payload

The consumer sends `POST` requests to `OPENCLAW_WEBHOOK_URL` with the following JSON body:

```json
{
"message": "πŸ› New Proposal in UNI\n\n**Title:** Example Proposal\n...",
"source": "anticapture-notification-system",
"timestamp": "2026-03-20T18:00:00.000Z",
"metadata": {
"triggerType": "newProposal",
"addresses": { "proposer": "0x..." },
"transaction": { "hash": "0x...", "chainId": 1 },
"buttons": [{ "text": "View Proposal", "url": "https://..." }]
}
}
```

## Database Setup (Required for Production)

After deploying the code changes, an OpenClaw user must be registered in the subscription database for notifications to be routed. Use the subscription server API:

```bash
# Register the OpenClaw user and subscribe to each DAO
# Replace $SUBSCRIPTION_SERVER_URL with the production URL

for DAO in AAVE NOUNS GTC OBOL COMP ENS SCR UNI SHU; do
curl -X POST "$SUBSCRIPTION_SERVER_URL/subscriptions/$DAO" \
-H "Content-Type: application/json" \
-d '{
"channel": "openclaw",
"channel_user_id": "cra",
"is_active": true
}'
done
```

This creates a user with `channel: "openclaw"` and subscribes it to all monitored DAOs. The dispatcher will then route `notifications.openclaw.*` messages for this subscriber.

## Graceful Degradation

- If `OPENCLAW_WEBHOOK_URL` is **not set**: the consumer starts in noop mode. RabbitMQ messages on `notifications.openclaw.*` are consumed and silently acknowledged. Zero impact on existing Telegram/Slack consumers.
- If the **webhook endpoint is unreachable**: the consumer logs the error. The notification is marked as failed (not re-queued). Other consumers are unaffected.
- If **no openclaw user exists in the DB**: the dispatcher simply has no subscribers for the `openclaw` channel and sends nothing to the queue. No errors.
Loading