Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* [Circuit Breaker](community-plugins/circuit-breaker.md)
* [SendRemoteFile](community-plugins/sendremotefile.md)
* [RFC 7234 Cache](community-plugins/cache.md)
* [AMQP1](community-plugins/amqp1.md)

## 📡 App Server

Expand Down
283 changes: 283 additions & 0 deletions community-plugins/amqp1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
# Jobs — AMQP 1.0 Driver

AMQP 1.0 driver for RoadRunner provides unified support for both **Azure Service Bus** and **RabbitMQ** using the pure `github.com/Azure/go-amqp` library. It implements the standardized AMQP 1.0 protocol for better interoperability and broker-agnostic messaging.

{% hint style="warning" %}
This is a third-party plugin and isn't included by default. See the "Building RoadRunner with AMQP1" section for more information.
{% endhint %}

## Features

* **Pure AMQP 1.0 Implementation**: Uses `github.com/Azure/go-amqp` v1.4.0 for standardized protocol support
* **Dual Broker Support**: Works with both Azure Service Bus and RabbitMQ (with AMQP 1.0 plugin)
* **Automatic Broker Detection**: Identifies Azure Service Bus vs RabbitMQ automatically
* **Unified Configuration**: Same configuration format works with both brokers
* **TLS/SSL Support**: Automatic encryption for Azure Service Bus, configurable for RabbitMQ
* **Connection Resilience**: Built-in retry mechanisms and connection management
* **Distributed Tracing**: OpenTelemetry integration for observability
* **Container-based Architecture**: AMQP 1.0 sessions and links for efficient messaging
* **Event-driven Design**: Asynchronous message processing capabilities

## Building RoadRunner with AMQP1

To include the AMQP1 driver in your RoadRunner build, you need to configure it in your velox.toml build configuration.

{% code title="velox.toml" %}

```toml
[roadrunner]
ref = "master"

[debug]
enabled = false

[log]
level = "debug"
mode = "production"

# optional, needed only to download RR once (per version)
[github.token]
token = "${GITHUB_TOKEN}"

# Include the AMQP1 plugin
[plugins.amqp1]
tag = "master"
module_name = "github.com/ammadfa/amqp1"

# Include jobs plugin (required dependency)
[plugins.jobs]
tag = "latest"
module_name = "github.com/roadrunner-server/jobs/v5"

# Other common plugins
[plugins.server]
tag = "latest"
module_name = "github.com/roadrunner-server/server/v5"

[plugins.http]
tag = "latest"
module_name = "github.com/roadrunner-server/http/v5"
```

{% endcode %}

More info about customizing RR with your own plugins: [link](../customization/plugin.md)

## Configuration

### Azure Service Bus Configuration

For Azure Service Bus, use the `amqps://` protocol with your connection string:

{% code title=".rr.yaml" %}

```yaml
amqp1:
addr: "amqps://YOUR_POLICY_NAME:YOUR_ACCESS_KEY@YOUR_NAMESPACE.servicebus.windows.net:5671/"
container_id: "roadrunner-jobs-azure"

jobs:
consume: ["azure-queue"]

pipelines:
azure-queue:
driver: amqp1
config:
queue: "test-queue" # Must exist in Azure Service Bus
prefetch: 10
priority: 1
durable: false
exclusive: false
```

{% endcode %}

**Azure Service Bus Requirements:**
- Queue must be pre-created in Azure portal or via Azure CLI
- Uses Shared Access Key authentication
- TLS is automatically enabled with `amqps://` protocol
- Routing occurs directly to queue (no exchanges)

### RabbitMQ Configuration

For RabbitMQ, ensure the AMQP 1.0 plugin is enabled:

{% code title=".rr.yaml" %}

```yaml
amqp1:
addr: "amqp://username:password@rabbitmq:5672/"
container_id: "roadrunner-jobs-rabbitmq"

jobs:
consume: ["rabbit-queue"]

pipelines:
rabbit-queue:
driver: amqp1
config:
queue: "test-queue"
routing_key: "test-queue"
exchange: "test-queue" # Use default exchange
exchange_type: "direct" # informational; configure server-side
prefetch: 10
priority: 10
durable: true
exclusive: false
```

{% endcode %}

**RabbitMQ Requirements:**
- Enable AMQP 1.0 plugin: `rabbitmq-plugins enable rabbitmq_amqp1_0`
- Queues and exchanges must be created ahead of time
- Supports exchange-based routing; ensure bindings are configured server-side
- For delayed messages, enable: `rabbitmq-plugins enable rabbitmq_delayed_message_exchange`

### TLS Configuration

For secure connections, configure TLS settings:

{% code title=".rr.yaml" %}

```yaml
amqp1:
addr: "amqps://guest:[email protected]:5671/"
container_id: "roadrunner-secure"
tls:
cert: "/path/to/cert.pem"
key: "/path/to/key.pem"
root_ca: "/path/to/ca.pem"
insecure_skip_verify: false
```

{% endcode %}

### Advanced Pipeline Configuration

{% code title=".rr.yaml" %}

```yaml
jobs:
pipelines:
# Azure Service Bus optimized pipeline
priority-orders:
driver: amqp1
config:
queue: "priority-orders"
prefetch: 50
priority: 5

# RabbitMQ with topic exchange
events-pipeline:
driver: amqp1
config:
queue: "events-queue"
exchange: "events-exchange"
exchange_type: "topic"
routing_key: "events.#"
prefetch: 25
priority: 3
durable: true
exclusive: false
```
Comment on lines +161 to +183
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

🧩 Analysis chain

YAML validity: fields look plausible; please lint to be safe.

Run markdownlint and a YAML linter on the added snippets per repo guidelines.


🏁 Script executed:

#!/bin/bash
# Lint Markdown (markdownlint config assumed in repo)
npx markdownlint-cli2 '**/*.md'

# Quick YAML syntax checks (prints offending snippet/line on error)
python - <<'PY'
from ruamel.yaml import YAML
import sys, re
yaml = YAML(typ='safe')
doc, buf = None, []
capture=False
import pathlib
for p in pathlib.Path('community-plugins').glob('amqp1.md'):
    for ln in p.read_text().splitlines():
        if ln.strip().startswith('```yaml'):
            capture=True; buf=[]; continue
        if capture and ln.strip().startswith('```'):
            try:
                yaml.load('\n'.join(buf))
            except Exception as e:
                print(f'YAML block failed in {p}: {e}')
            capture=False; continue
        if capture: buf.append(re.sub(r'^\s{0,3}', '', ln))
PY

Length of output: 56502


Fix markdownlint violations in community-plugins/amqp1.md; YAML fences validate.

markdownlint reported MD004, MD032, MD022, MD059 and MD047 in community-plugins/amqp1.md (unordered list style, missing blank lines around lists/headings, non‑descriptive link text, missing single trailing newline). The YAML fenced blocks parsed successfully with ruamel.yaml. Fix the markdownlint issues and re-run markdownlint-cli2 (and the YAML check).

🤖 Prompt for AI Agents
In community-plugins/amqp1.md around lines 161 to 183, fix the markdownlint
violations: normalize unordered list markers (use a single style, e.g.,
hyphens), add required blank lines above and below lists and headings, replace
non‑descriptive link text with meaningful text, and ensure the file ends with a
single trailing newline; keep the existing YAML fenced blocks intact (they
validate with ruamel.yaml). After making these edits, run markdownlint-cli2 and
the YAML checker to confirm MD004, MD032, MD022, MD059 and MD047 are resolved.


{% endcode %}

## Migration Benefits

The AMQP1 driver provides significant advantages over RabbitMQ-specific implementations:

### From RabbitMQ-specific to Universal
- **Previous**: `github.com/rabbitmq/rabbitmq-amqp-go-client` (RabbitMQ-only)
- **Current**: `github.com/Azure/go-amqp` (Works with any AMQP 1.0 broker)
- **Benefit**: Single codebase supports multiple message brokers

### Protocol Standardization
- **AMQP 1.0 Compliance**: Standardized protocol ensures better interoperability
- **Container Model**: Modern connection architecture with sessions and links
- **Message Format**: Structured AMQP 1.0 messages with application properties

### Cloud-Native Ready
- **Azure Service Bus**: Native support for Microsoft's cloud messaging service
- **Hybrid Deployments**: Use RabbitMQ on-premises and Azure Service Bus in cloud
- **Consistent API**: Same job queue interface regardless of broker

## Implementation Details

### Driver Components

1. **Plugin** (`plugin.go`): Main plugin interface and registration
2. **Driver** (`amqp1jobs/driver.go`): Core driver with pure AMQP 1.0 support
3. **Config** (`amqp1jobs/config.go`): Configuration structure and validation
4. **Item** (`amqp1jobs/item.go`): Message/job item handling and serialization

### Connection Management

The driver uses the AMQP 1.0 container model:

```go
// Create AMQP 1.0 connection
conn, err := amqp.Dial(ctx, addr, &amqp.ConnOptions{
ContainerID: conf.ContainerID,
TLSConfig: tlsConfig,
})

// Session for logical grouping
session, err := conn.NewSession(ctx, nil)

// Receivers and senders for message flow
receiver, err := session.NewReceiver(ctx, queueName, options)
sender, err := session.NewSender(ctx, target, options)
```

### Broker Detection

The driver automatically detects the broker type and adapts message routing:

- **Azure Service Bus**: Direct queue addressing
- **RabbitMQ**: Exchange-based routing with AMQP v2 addressing

### Message Processing

Unified message processing pattern for both brokers:

```go
// Receive messages
msg, err := receiver.Receive(ctx, nil)

// Process job
jobItem := convertFromAMQP1Message(msg)

// Acknowledge based on result
if success {
receiver.AcceptMessage(ctx, msg)
} else {
receiver.RejectMessage(ctx, msg, nil)
}
```

## Troubleshooting

### Common Issues

1. **RabbitMQ AMQP 1.0 Plugin**: Ensure `rabbitmq_amqp1_0` plugin is enabled
2. **Azure Service Bus**: Verify access keys and queue existence
3. **TLS Configuration**: Check certificate paths and validity
4. **Queue Declaration**: Queues must be pre-created for AMQP 1.0

### Debugging

Enable debug logging to troubleshoot connection and message issues:

```yaml
log:
level: "debug"
mode: "development"
```

### Performance Tuning

- **Prefetch**: Adjust based on message processing speed
- **Priority**: Set appropriate job priority levels
- **Connection Pooling**: Use appropriate container IDs for load distribution
1 change: 1 addition & 0 deletions community-plugins/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ Here is a list of community plugins that are available:
| SendRemoteFile | [link](https://github.com/roadrunner-server/sendremotefile)| Used to send a file from a remote endpoint into the response | [docs](./sendremotefile.md) |
| CircuitBreaker | [link](https://github.com/roadrunner-server/circuit-breaker)| Circuit breaker pattern implementation for RoadRunner | [docs](./circuit-breaker.md) |
| RFC 7234 Cache | [link](https://github.com/darkweak/souin/tree/master/plugins/roadrunner)| RFC 7234 Cache implementation for RoadRunner | [docs](https://github.com/darkweak/souin?tab=readme-ov-file#roadrunner-middleware) |
| AMQP1 | [link](https://github.com/ammadfa/amqp1)| AMQP 1.0 driver for Azure Service Bus and RabbitMQ with pure go-amqp library | [docs](./amqp1.md) |

Feel free to make a PR to add your plugin to the list.