Skip to content

Add support for NATS JetStream as a transport #2299

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

joeriddles
Copy link

@joeriddles joeriddles commented May 16, 2025

closes #2103

Implement support for NATS using its built-in persistence layer JetStream.

Note

This pull request is still a work-in-progress, but I would love any feedback while it's a draft as this is my first contribution to kombu.

TODO:

  • Add docs
  • Add unit tests
  • Add integration tests

Example

You can test using either:

  • Installing the NATS server locally and running it with JetStream enabled: nats-server --js
  • Using the demo NATS server at demo.nats.io

The transport can be test using examples/nats_receive.py and examples/nats_send.py:

  1. Start the local NATS server (optional)
  2. Run python -m examples.nats_receive (append --demo if using the demo server)
  3. In another window, run python -m examples.nats_send (append --demo if using the demo server)

In the receive window, you should see something like:

Received message: 'hello world'
  properties:
{   'body_encoding': 'base64',
    'delivery_info': {'exchange': 'exchange', 'routing_key': 'messages'},
    'delivery_mode': 2,
    'delivery_tag': 'fa01f1a9-f666-46a0-af71-35e5164e1ad9',
    'priority': 0}
  delivery_info:
{'exchange': 'exchange', 'routing_key': 'messages'}

@joeriddles joeriddles mentioned this pull request May 16, 2025
@auvipy auvipy requested review from Copilot and auvipy May 17, 2025 08:16
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Adds initial support for NATS JetStream as a Kombu transport, including dependency, transport registration, and usage examples.

  • Adds nats-py extra requirement
  • Registers the nats transport in Kombu’s transport registry
  • Provides example scripts for sending and receiving messages via NATS JetStream

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

File Description
requirements/extras/nats.txt Pins the nats-py[nkeys] dependency
kombu/transport/init.py Registers 'nats' transport for JetStream
examples/nats_send.py Example for publishing messages to JetStream
examples/nats_receive.py Example for consuming messages from JetStream
Comments suppressed due to low confidence (1)

kombu/transport/init.py:48

  • [nitpick] The transport key 'nats' is generic and could conflict with other NATS-based transports; consider renaming it to 'nats_jetstream' to clearly distinguish this implementation.
'nats': 'kombu.transport.nats_jetstream:Transport',

@@ -0,0 +1 @@
nats-py[nkeys]==2.9.0
Copy link
Preview

Copilot AI May 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pinning the dependency to an exact patch version may prevent receiving important bug fixes and improvements; consider specifying a range (e.g., >=2.9.0,<3.0.0).

Suggested change
nats-py[nkeys]==2.9.0
nats-py[nkeys]>=2.9.0,<3.0.0

Copilot uses AI. Check for mistakes.

Comment on lines +17 to +22

exchange = Exchange("exchange", "direct", durable=False)
msg_queue = Queue("kombu_demo", exchange=exchange, routing_key="messages")


with Connection(f"nats://{server}:4222", transport_options={
Copy link
Preview

Copilot AI May 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Hardcoding the default NATS port (4222) limits flexibility; consider making the port configurable via CLI flag or environment variable.

Suggested change
exchange = Exchange("exchange", "direct", durable=False)
msg_queue = Queue("kombu_demo", exchange=exchange, routing_key="messages")
with Connection(f"nats://{server}:4222", transport_options={
# Resolve the port: CLI flag > environment variable > default
port = 4222 # Default NATS port
if len(sys.argv) > 2 and sys.argv[2].startswith("--port="):
port = int(sys.argv[2].split("=")[1])
else:
port = int(os.getenv("NATS_PORT", port))
exchange = Exchange("exchange", "direct", durable=False)
msg_queue = Queue("kombu_demo", exchange=exchange, routing_key="messages")
with Connection(f"nats://{server}:{port}", transport_options={

Copilot uses AI. Check for mistakes.

Copy link
Member

@Nusnus Nusnus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ping me when the PR is ready for review. Thank you for working on this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

NATS support
2 participants