From 1ca49eb0b12da2db678b8381a340db98408b3128 Mon Sep 17 00:00:00 2001 From: oh-alban Date: Fri, 9 Jan 2026 21:57:39 -0500 Subject: [PATCH] Feat : Add ClickHouse Backend Support * Chores : Add comprehensive AI coding instructions for Cryptofeed * Feature : Add ClickHouse backend support - Implement ClickHouse backend for all cryptofeed data types - Add TradeClickHouse, TickerClickHouse, BookClickHouse, CandlesClickHouse, etc. - Support authenticated channels (OrderInfo, Fills, Transactions, Balances) - Include comprehensive SQL schema with optimized table structures - Add demo_clickhouse.py example with all supported data types - Update setup.py with clickhouse-connect dependency - Add documentation in docs/clickhouse.md - Update README.md and INSTALL.md to list ClickHouse backend - Update CHANGES.md for version 2.4.2 --------- Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com> Co-authored-by: oh-alban --- .github/copilot-instructions.md | 118 ++++++++++ CHANGES.md | 4 + INSTALL.md | 10 +- LICENSE | 7 +- README.md | 29 ++- cryptofeed/backends/clickhouse.py | 380 ++++++++++++++++++++++++++++++ docs/backends/clickhouse.md | 188 +++++++++++++++ docs/callbacks.md | 5 +- examples/clickhouse_tables.sql | 244 +++++++++++++++++++ examples/demo_clickhouse.py | 309 ++++++++++++++++++++++++ setup.py | 2 + 11 files changed, 1275 insertions(+), 21 deletions(-) create mode 100644 .github/copilot-instructions.md create mode 100644 cryptofeed/backends/clickhouse.py create mode 100644 docs/backends/clickhouse.md create mode 100644 examples/clickhouse_tables.sql create mode 100644 examples/demo_clickhouse.py diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md new file mode 100644 index 000000000..e078d28f8 --- /dev/null +++ b/.github/copilot-instructions.md @@ -0,0 +1,118 @@ +# Cryptofeed AI Coding Instructions + +## Project Overview +Cryptofeed is a normalized cryptocurrency exchange feed handler library that aggregates real-time market data (trades, order books, tickers) from 40+ exchanges via WebSocket and REST APIs, exposing them through a unified callback-based interface. + +## Architecture + +### Core Components +- **FeedHandler** (`cryptofeed/feedhandler.py`): Main orchestrator managing multiple feeds, async event loop, and lifecycle (startup/shutdown signals) +- **Feed** (`cryptofeed/feed.py`): Per-exchange handler inheriting from Exchange, manages channel subscriptions, reconnection logic, and message parsing +- **Exchange** (`cryptofeed/exchange.py`): Base class defining interface: `websocket_channels`, `rest_endpoints`, symbol mapping, order validation +- **Connection** (`cryptofeed/connection.py`): HTTP/WebSocket client abstractions (`HTTPSync`, `HTTPAsyncConn`, `WSAsyncConn`) with raw data collection hooks +- **Types** (`cryptofeed/types.pyx`): Cython-compiled data classes (Trade, Ticker, OrderBook, Candle, etc.) for performance-critical paths; use `Decimal` for prices + +### Data Flow +1. FeedHandler adds feeds → Feed initializes symbol mapping → WebSocket/REST connections open +2. Messages arrive → Feed's channel-specific parser (e.g., `parse_l2_book`) deserializes and normalizes +3. Normalized objects (Trade, Ticker) passed to user-provided async/sync callbacks via Callback wrapper +4. Optional backends (Kafka, Redis, Postgres, etc.) persist data + +## Key Patterns & Conventions + +### Exchange Implementation +- **Inherit from Feed, not Exchange directly** (Feed extends Exchange) +- Define class attributes: + - `id = 'EXCHANGE_NAME'` (from `defines.py`) + - `websocket_endpoints = [WebsocketEndpoint(...)]` / `rest_endpoints = [RestEndpoint(...)]` + - `websocket_channels = {TRADES: 'channel_name', L2_BOOK: '...'}` (map cryptofeed types to exchange channel names) +- Implement `_parse_symbol_data(cls, data: dict) -> Tuple[Dict, Dict]` to normalize exchange symbols to Symbol objects +- Implement channel parsers (e.g., `async def _parse_trades(self, msg, receipt_timestamp)`) that deserialize exchange JSON into cryptofeed types +- Use `self.normalized_symbol_mapping` (str → Symbol) for lookups + +### Symbol & Type System +- Use `Symbol` objects (not strings) for type safety: `Symbol(base_asset, quote_asset, type=SPOT|FUTURES|PERPETUAL, expiry_date=None)` +- All prices/amounts are `Decimal`, never float (for precision) +- Timestamp normalization: exchanges often send milliseconds; normalize to seconds with `cls.timestamp_normalize()` + +### Callbacks & Async Patterns +- Callbacks are user functions called with signature: `callback(data_object, receipt_timestamp)` +- Can be `async def` or sync; wrapper in `Callback` handles execution in executor if needed +- Lightweight callbacks preferred; FeedHandler is paused during callback execution + +### Configuration +- Config loaded from `config.yaml` or passed as dict to FeedHandler/Exchange constructors +- API keys stored under exchange IDs in lowercase (e.g., `config['binance']` with `.key_id`, `.key_secret`, `.key_passphrase`) +- Supports sandbox mode via `sandbox=True` parameter + +## Developer Workflow + +### Testing +- Run unit tests: `pytest tests/unit/` +- Integration tests with live API: `pytest tests/integration/` (requires API keys) +- No formal test suite runner; use pytest directly + +### Code Style & Linting +- **Type annotations required** on public methods (see existing exchanges for examples) +- Prefer long lines (130+ chars) over line wrapping; isort for import formatting +- Run `flake8` before PRs to catch style issues +- Import order: stdlib → third-party → `cryptofeed` + +### Adding an Exchange +1. Create `cryptofeed/exchanges/new_exchange.py` +2. Implement Feed subclass with required attributes and parsers +3. Add sample data file in `sample_data/` for testing (format: `EXCHANGE_NAME.0`, `EXCHANGE_NAME.ws.1.0`) +4. Update `cryptofeed/exchanges/__init__.py` to export the class +5. Reference API docs in PR; include link in README + +### Cython Types +- `types.pyx` compiled with `cythonize` in setup.py; edit as `.pyx`, not `.py` +- Use `@cython.freelist(n)` for frequently allocated types (Trade, Candle) +- Compile with: `python setup.py build_ext --inplace` + +### Backend Implementation +"Backends" in cryptofeed are not just database connectors—they're **callback wrappers** (middleware) that can aggregate, transform, persist, or format data from any channel. They follow two patterns: + +**Pattern 1: Data Persistence** (e.g., Kafka, Postgres, Redis) +- Inherit from a base backend class or implement async callback interface +- Implement `async def __call__(self, data, receipt_timestamp)` to receive normalized objects (Trade, Ticker, OrderBook, etc.) +- Batch/buffer writes for efficiency; handle connection lifecycle (start/stop) +- Example: `cryptofeed/backends/kafka.py` buffers trades and flushes to Kafka + +**Pattern 2: Data Transformation/Aggregation** (e.g., OHLCV, Throttle, Renko) +- Inherit from `AggregateCallback` in `cryptofeed/backends/aggregate.py` +- Wrap and chain to other handlers: `self.handler(data, receipt_timestamp)` +- Aggregate across time windows or price movements before forwarding +- Example: `Throttle` allows 1 callback per time window; `OHLCV` aggregates trades into OHLCV bars + +**Adding a Backend:** +1. Create `cryptofeed/backends/my_backend.py` +2. Implement class with `async def __call__(self, data, receipt_timestamp)` +3. Optional: add `start()` and `stop()` lifecycle methods for resource management +4. Pass to FeedHandler: `fh.add_feed(Exchange(..., callbacks={TRADES: MyBackend(...), TICKER: MyBackend(...)}))` +5. Backend receives each data type independently; use `hasattr(data, 'attribute')` to handle multiple types if needed + +### Import Structure +- Exchange-specific symbols mapped through `cryptofeed/defines.py` (BINANCE, KRAKEN, etc.) +- Channel constants: L2_BOOK, L3_BOOK, TRADES, TICKER, FUNDING, LIQUIDATIONS, CANDLES, etc. +- Callback types in `callback.py` (TradeCallback, TickerCallback, etc.) + +## Cross-Component Communication +- **Symbol resolution**: Feed queries Symbols registry (singleton) to map exchange symbols ↔ normalized formats +- **Raw data collection**: Optional raw_data_callback on Connection class intercepts ALL HTTP/WebSocket traffic for debugging +- **Backends**: Decouple data persistence; passed as callbacks (e.g., KafkaBackend implements async write logic) +- **NBBO**: Synthetic feed aggregating best bids/asks across multiple exchanges (see `nbbo.py` and `examples/demo_nbbo.py`) + +## Common Pitfalls +- Forgetting `async def` on parsers that call I/O operations +- Using float for prices instead of Decimal → precision loss +- Not normalizing exchange timestamps to UTC seconds +- Missing `checkpoint_validation` or `cross_check` flags for exchanges with weak message ordering +- Hardcoding depth limits instead of checking exchange's `valid_depths` parameter + +## Quick Reference Files +- Channel definitions: `cryptofeed/defines.py` +- Callback signatures: `cryptofeed/callback.py` +- Example integration: `examples/demo.py`, `examples/demo_binance_authenticated.py` +- Configuration template: `config.yaml` +- Documentation index: `docs/README.md` diff --git a/CHANGES.md b/CHANGES.md index 81c6c4174..1b32b8f3d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ ## Changelog +### 2.4.2 (TBD) + * Feature: ClickHouse backend support for storing real-time cryptocurrency data + * Update: Added copilot-instructions.md + ### 2.4.1 (2025-02-08) * Update: Added `is_data_json` to `write()` in `HTTPSync` from `connection.py` to support JSON payloads (#1071) * Bugfix: Handle empty nextFundingRate in OKX diff --git a/INSTALL.md b/INSTALL.md index 8236bece8..d676dc769 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -31,6 +31,11 @@ for the exhaustive list of these *extra* dependencies. pip install --user --upgrade cryptofeed[arctic] +* ClickHouse backend + To install Cryptofeed along with [ClickHouse](https://clickhouse.com/) in one bundle: + + pip install --user --upgrade cryptofeed[clickhouse] + * Google Cloud Pub / Sub backend pip install --user --upgrade cryptofeed[gcp_pubsub] @@ -65,8 +70,9 @@ for the exhaustive list of these *extra* dependencies. pip install --user --upgrade cryptofeed[zmq] If you have a problem with the installation/hacking of Cryptofeed, you are welcome to: -* open a new issue: https://github.com/bmoscon/cryptofeed/issues/ + +* open a new issue: * join us on Slack: [cryptofeed-dev.slack.com](https://join.slack.com/t/cryptofeed-dev/shared_invite/enQtNjY4ODIwODA1MzQ3LTIzMzY3Y2YxMGVhNmQ4YzFhYTc3ODU1MjQ5MDdmY2QyZjdhMGU5ZDFhZDlmMmYzOTUzOTdkYTZiOGUwNGIzYTk) -* or on GitHub Discussion: https://github.com/bmoscon/cryptofeed/discussions +* or on GitHub Discussion: Your Pull Requests are also welcome, even for minor changes. diff --git a/LICENSE b/LICENSE index 1c0cdb930..fbe9aaa17 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,6 @@ -Copyright (C) 2017-2025 Bryant Moscon - bmoscon@gmail.com +# License + +Copyright (C) 2017-2025 Bryant Moscon - Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to @@ -17,7 +19,7 @@ Copyright (C) 2017-2025 Bryant Moscon - bmoscon@gmail.com 3. The end-user documentation included with the redistribution, if any, must include the following acknowledgment: "This product includes software - developed by Bryant Moscon (http://www.bryantmoscon.com/)", in the same + developed by Bryant Moscon ()", in the same place and form as other third-party acknowledgments. Alternately, this acknowledgment may appear in the software itself, in the same form and location as other such third-party acknowledgments. @@ -27,7 +29,6 @@ Copyright (C) 2017-2025 Bryant Moscon - bmoscon@gmail.com other dealings in this Software without prior written authorization from the author. - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL diff --git a/README.md b/README.md index f501fb70a..19c8e558e 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ # Cryptocurrency Exchange Feed Handler + [![License](https://img.shields.io/badge/license-XFree86-blue.svg)](LICENSE) ![Python](https://img.shields.io/badge/Python-3.8+-green.svg) [![PyPi](https://img.shields.io/badge/PyPi-cryptofeed-brightgreen.svg)](https://pypi.python.org/pypi/cryptofeed) @@ -37,7 +38,7 @@ Handles multiple cryptocurrency exchange data feeds and returns normalized and s * [Huobi](https://www.hbg.com/) * [Huobi DM](https://www.huobi.com/en-us/markets/hb_dm/) * Huobi Swap (Coin-M and USDT-M) -* [Independent Reserve](https://www.independentreserve.com/) +* [Independent Reserve](https://www.independentreserve.com/) * [Kraken](https://www.kraken.com/) * [Kraken Futures](https://futures.kraken.com/) * [KuCoin](https://www.kucoin.com/) @@ -48,7 +49,6 @@ Handles multiple cryptocurrency exchange data feeds and returns normalized and s * [ProBit](https://www.probit.com/) * [Upbit](https://sg.upbit.com/home) - ## Basic Usage Create a FeedHandler object and add subscriptions. For the various data channels that an exchange supports, you can supply callbacks for data events, or use provided backends (described below) to handle the data for you. Start the feed handler and you're done! @@ -76,10 +76,8 @@ fh.run() Please see the [examples](https://github.com/bmoscon/cryptofeed/tree/master/examples) for more code samples and the [documentation](https://github.com/bmoscon/cryptofeed/blob/master/docs/README.md) for more information about the library usage. - For an example of a containerized application using cryptofeed to store data to a backend, please see [Cryptostore](https://github.com/bmoscon/cryptostore). - ## National Best Bid/Offer (NBBO) Cryptofeed also provides a synthetic [NBBO](examples/demo_nbbo.py) (National Best Bid/Offer) feed that aggregates the best bids and asks from the user specified feeds. @@ -123,18 +121,19 @@ Cryptofeed supports the following channels from exchanges: * BALANCES - Updates on wallet funds * FILLS - User's executed trades - ## Backends Cryptofeed supports `backend` callbacks that will write directly to storage or other interfaces. Supported Backends: + * Redis (Streams and Sorted Sets) * [Arctic](https://github.com/manahl/arctic) * ZeroMQ * UDP Sockets * TCP Sockets * Unix Domain Sockets +* [ClickHouse](https://clickhouse.com/) * [InfluxDB v2](https://github.com/influxdata/influxdb) * MongoDB * Kafka @@ -144,36 +143,40 @@ Supported Backends: * GCP Pub/Sub * [QuestDB](https://questdb.io/) - ## Installation **Note:** cryptofeed requires Python 3.8+ Cryptofeed can be installed from PyPi. (It's recommended that you install in a virtual environment of your choosing). - pip install cryptofeed +```bash +pip install cryptofeed +``` Cryptofeed has optional dependencies, depending on the backends used. You can install them individually, or all at once. To install Cryptofeed along with all its optional dependencies in one bundle: - pip install cryptofeed[all] +```bash +pip install cryptofeed[all] +``` If you wish to clone the repository and install from source, run this command from the root of the cloned repository. - python setup.py install +```bash +python setup.py install +``` Alternatively, you can install in 'edit' mode (also called development mode): - python setup.py develop +```bash +python setup.py develop +``` See more discussion of package installation in [INSTALL.md](https://github.com/bmoscon/cryptofeed/blob/master/INSTALL.md). - - ## Rest API Cryptofeed supports some REST interfaces for retrieving real-time and historical data, as well as order placement and account management. These are integrated into the exchange classes directly. You can view the supported methods by calling the `info()` method on any exchange. The methods for interacting with the exchange RET endpoints exist in two flavors, the synchronous methods (suffixed with `_sync`) as well as the asynchronous which can be utilized with asyncio. For more information see the [documentation](docs/rest.md). - ## Future Work There are a lot of planned features, new exchanges, etc planned! If you'd like to discuss ongoing development, please join the [discord](https://discord.gg/zaBYaGAYfR) or open a thread in the [discussions](https://github.com/bmoscon/cryptofeed/discussions) in GitHub. diff --git a/cryptofeed/backends/clickhouse.py b/cryptofeed/backends/clickhouse.py new file mode 100644 index 000000000..3ff13b06a --- /dev/null +++ b/cryptofeed/backends/clickhouse.py @@ -0,0 +1,380 @@ +''' +Copyright (C) 2017-2025 Bryant Moscon - bmoscon@gmail.com + +Please see the LICENSE file for the terms and conditions +associated with this software. +''' +import asyncio +from collections import defaultdict +from datetime import datetime as dt +import logging +from typing import Tuple + +import clickhouse_connect + +LOG = logging.getLogger('feedhandler') +from yapic import json + +from cryptofeed.backends.backend import BackendBookCallback, BackendCallback, BackendQueue +from cryptofeed.defines import CANDLES, FUNDING, OPEN_INTEREST, TICKER, TRADES, LIQUIDATIONS, INDEX, ORDER_INFO, FILLS, TRANSACTIONS, BALANCES + + +class ClickHouseCallback(BackendQueue): + def __init__(self, host='127.0.0.1', port=8123, user='default', password='', db='default', table=None, custom_columns: dict = None, none_to=None, numeric_type=float, **kwargs): + """ + host: str + ClickHouse server host address + port: int + ClickHouse HTTP port (default: 8123) + user: str + The name of the database user for authentication + password: str + Password for authentication + db: str + The name of the database to use + table: str + Table name to insert into. Defaults to default_table that should be specified in child class + custom_columns: dict + A dictionary which maps Cryptofeed's data type fields to ClickHouse's table column names + Can be a subset of Cryptofeed's available fields (see the cdefs listed under each data type in types.pyx) + """ + self.client = None + self.table = table if table else self.default_table + self.custom_columns = custom_columns + self.numeric_type = numeric_type + self.none_to = none_to + self.user = user + self.db = db + self.password = password + self.host = host + self.port = port + self.running = True + + def _get_client(self): + if self.client is None: + self.client = clickhouse_connect.get_client( + host=self.host, + port=self.port, + username=self.user, + password=self.password, + database=self.db + ) + return self.client + + def format(self, data: Tuple): + """ + Format data tuple into a list suitable for ClickHouse batch insert + Returns: list of values in correct order for default table schema + """ + feed = data[0] + symbol = data[1] + timestamp = data[2] + receipt_timestamp = data[3] + data_dict = data[4] + + return [timestamp, receipt_timestamp, feed, symbol, json.dumps(data_dict)] + + def _custom_format(self, data: Tuple): + """ + Format data using custom column mapping + """ + d = { + **data[4], + **{ + 'exchange': data[0], + 'symbol': data[1], + 'timestamp': data[2], + 'receipt': data[3], + } + } + + # Cross-ref data dict with user column names from custom_columns dict + return [d.get(field, None) for field in self.custom_columns.keys()] + + async def writer(self): + while self.running: + async with self.read_queue() as updates: + if len(updates) > 0: + batch = [] + for data in updates: + ts = dt.utcfromtimestamp(data['timestamp']) if data['timestamp'] else None + rts = dt.utcfromtimestamp(data['receipt_timestamp']) + batch.append((data['exchange'], data['symbol'], ts, rts, data)) + await self.write_batch(batch) + + async def write_batch(self, updates: list): + client = self._get_client() + data_rows = [self.format(u) for u in updates] + + if self.custom_columns: + columns = list(self.custom_columns.values()) + else: + columns = None + + try: + # Use run_in_executor to avoid blocking the event loop when multiprocessing is disabled + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, lambda: client.insert(self.table, data_rows, column_names=columns)) + except Exception as e: + # Log error but continue processing + LOG.error("ClickHouse insert error: %s", e) + + +class TradeClickHouse(ClickHouseCallback, BackendCallback): + default_table = TRADES + + def format(self, data: Tuple): + if self.custom_columns: + return self._custom_format(data) + else: + exchange, symbol, timestamp, receipt, data_dict = data + return [ + timestamp, + receipt, + exchange, + symbol, + data_dict['side'], + float(data_dict['amount']), + float(data_dict['price']), + data_dict.get('id'), + data_dict.get('type') + ] + + +class FundingClickHouse(ClickHouseCallback, BackendCallback): + default_table = FUNDING + + def format(self, data: Tuple): + if self.custom_columns: + if data[4]['next_funding_time']: + data[4]['next_funding_time'] = dt.utcfromtimestamp(data[4]['next_funding_time']) + return self._custom_format(data) + else: + exchange, symbol, timestamp, receipt, data_dict = data + next_funding = dt.utcfromtimestamp(data_dict['next_funding_time']) if data_dict['next_funding_time'] else None + return [ + timestamp, + receipt, + exchange, + symbol, + float(data_dict['mark_price']) if data_dict['mark_price'] else None, + float(data_dict['rate']), + next_funding, + float(data_dict['predicted_rate']) if data_dict['predicted_rate'] else None + ] + + +class TickerClickHouse(ClickHouseCallback, BackendCallback): + default_table = TICKER + + def format(self, data: Tuple): + if self.custom_columns: + return self._custom_format(data) + else: + exchange, symbol, timestamp, receipt, data_dict = data + return [ + timestamp, + receipt, + exchange, + symbol, + float(data_dict['bid']), + float(data_dict['ask']) + ] + + +class OpenInterestClickHouse(ClickHouseCallback, BackendCallback): + default_table = OPEN_INTEREST + + def format(self, data: Tuple): + if self.custom_columns: + return self._custom_format(data) + else: + exchange, symbol, timestamp, receipt, data_dict = data + return [ + timestamp, + receipt, + exchange, + symbol, + float(data_dict['open_interest']) + ] + + +class IndexClickHouse(ClickHouseCallback, BackendCallback): + default_table = INDEX + + def format(self, data: Tuple): + if self.custom_columns: + return self._custom_format(data) + else: + exchange, symbol, timestamp, receipt, data_dict = data + return [ + timestamp, + receipt, + exchange, + symbol, + float(data_dict['price']) + ] + + +class LiquidationsClickHouse(ClickHouseCallback, BackendCallback): + default_table = LIQUIDATIONS + + def format(self, data: Tuple): + if self.custom_columns: + return self._custom_format(data) + else: + exchange, symbol, timestamp, receipt, data_dict = data + return [ + timestamp, + receipt, + exchange, + symbol, + data_dict['side'], + float(data_dict['quantity']), + float(data_dict['price']), + data_dict['id'], + data_dict['status'] + ] + + +class CandlesClickHouse(ClickHouseCallback, BackendCallback): + default_table = CANDLES + + def format(self, data: Tuple): + if self.custom_columns: + return self._custom_format(data) + else: + exchange, symbol, timestamp, receipt, data_dict = data + return [ + timestamp, + receipt, + exchange, + symbol, + dt.utcfromtimestamp(data_dict['start']), + dt.utcfromtimestamp(data_dict['stop']), + data_dict['interval'], + float(data_dict['open']), + float(data_dict['high']), + float(data_dict['low']), + float(data_dict['close']), + float(data_dict['volume']), + data_dict.get('closed') + ] + + +class BookClickHouse(ClickHouseCallback, BackendBookCallback): + default_table = 'book' + + def __init__(self, *args, snapshots_only=False, snapshot_interval=1000, **kwargs): + self.snapshots_only = snapshots_only + self.snapshot_interval = snapshot_interval + self.snapshot_count = defaultdict(int) + super().__init__(*args, **kwargs) + + def format(self, data: Tuple): + if self.custom_columns: + if 'book' in data[4]: + data[4]['data'] = json.dumps({'snapshot': data[4]['book']}) + else: + data[4]['data'] = json.dumps({'delta': data[4]['delta']}) + return self._custom_format(data) + else: + feed = data[0] + symbol = data[1] + timestamp = data[2] + receipt_timestamp = data[3] + data_dict = data[4] + + if 'delta' in data_dict: + book_data = json.dumps({'delta': data_dict['delta']}) + else: + book_data = json.dumps({'snapshot': data_dict['book']}) + + return [timestamp, receipt_timestamp, feed, symbol, book_data] + + +class OrderInfoClickHouse(ClickHouseCallback, BackendCallback): + default_table = ORDER_INFO + + def format(self, data: Tuple): + if self.custom_columns: + return self._custom_format(data) + else: + exchange, symbol, timestamp, receipt, data_dict = data + return [ + timestamp, + receipt, + exchange, + symbol, + data_dict['id'], + data_dict.get('client_order_id'), + data_dict['side'], + data_dict['status'], + data_dict['type'], + float(data_dict['price']) if data_dict['price'] else None, + float(data_dict['amount']), + float(data_dict['remaining']) if data_dict['remaining'] else None, + data_dict.get('account') + ] + + +class FillsClickHouse(ClickHouseCallback, BackendCallback): + default_table = FILLS + + def format(self, data: Tuple): + if self.custom_columns: + return self._custom_format(data) + else: + exchange, symbol, timestamp, receipt, data_dict = data + return [ + timestamp, + receipt, + exchange, + symbol, + float(data_dict['price']), + float(data_dict['amount']), + data_dict['side'], + data_dict.get('fee'), + data_dict['id'], + data_dict['order_id'], + data_dict.get('liquidity'), + data_dict['type'], + data_dict.get('account') + ] + + +class TransactionsClickHouse(ClickHouseCallback, BackendCallback): + default_table = TRANSACTIONS + + def format(self, data: Tuple): + if self.custom_columns: + return self._custom_format(data) + else: + exchange, symbol, timestamp, receipt, data_dict = data + return [ + timestamp, + receipt, + exchange, + data_dict['currency'], + data_dict['type'], + data_dict['status'], + float(data_dict['amount']) + ] + + +class BalancesClickHouse(ClickHouseCallback, BackendCallback): + default_table = BALANCES + + def format(self, data: Tuple): + if self.custom_columns: + return self._custom_format(data) + else: + exchange, symbol, timestamp, receipt, data_dict = data + return [ + timestamp, + receipt, + exchange, + data_dict['currency'], + float(data_dict['balance']), + float(data_dict.get('reserved', 0)) + ] diff --git a/docs/backends/clickhouse.md b/docs/backends/clickhouse.md new file mode 100644 index 000000000..1731772ef --- /dev/null +++ b/docs/backends/clickhouse.md @@ -0,0 +1,188 @@ +# ClickHouse Backend for Cryptofeed + +This backend enables storing real-time cryptocurrency market data in [ClickHouse](https://clickhouse.com/), a high-performance column-oriented database management system designed for analytics. + +## Why ClickHouse? + +ClickHouse is ideal for storing time-series data from cryptocurrency exchanges because: + +- **Column-oriented storage**: Optimized for analytical queries on large datasets +- **High compression**: Typical compression ratios of 10-15x reduce storage costs +- **Real-time analytics**: Generate reports on billions of rows with subsecond latency +- **Time-series optimized**: Built-in functions for time-based aggregations +- **Horizontal scaling**: Add nodes to scale write and query throughput + +## Quick Start + +Install Cryptofeed with ClickHouse support + +```bash +pip install cryptofeed[clickhouse] +``` + +Create the dataabase + +```bash +clickhouse-client --query "CREATE DATABASE IF NOT EXISTS cryptofeed" +``` + +Create tables using the provided schema + +```bash +clickhouse-client --database=cryptofeed < examples/clickhouse_tables.sql +``` + +## Usage + +```python +from cryptofeed import FeedHandler +from cryptofeed.backends.clickhouse import TradeClickHouse, BookClickHouse +from cryptofeed.defines import TRADES, L2_BOOK +from cryptofeed.exchanges import Coinbase + +def main(): + clickhouse_config = { + 'host': '127.0.0.1', + 'port': 8123, # HTTP interface port + 'user': 'default', + 'password': '', + 'db': 'cryptofeed' + } + + f = FeedHandler() + f.add_feed(Coinbase( + channels=[TRADES, L2_BOOK], + symbols=['BTC-USD'], + callbacks={ + TRADES: TradeClickHouse(**clickhouse_config), + L2_BOOK: BookClickHouse(**clickhouse_config, snapshots_only=True) + } + )) + f.run() + +if __name__ == '__main__': + main() +``` + +## Supported Data Types + +All cryptofeed data types are supported: + +- **Market Data**: `TradeClickHouse`, `TickerClickHouse`, `BookClickHouse`, `CandlesClickHouse`, `FundingClickHouse`, `OpenInterestClickHouse`, `LiquidationsClickHouse`, `IndexClickHouse` +- **Authenticated**: `OrderInfoClickHouse`, `FillsClickHouse`, `TransactionsClickHouse`, `BalancesClickHouse` + +## Schema Design + +The default schema uses ClickHouse best practices: + +- **MergeTree engine**: Optimized for inserts and range queries +- **Partitioning by month**: `PARTITION BY toYYYYMM(timestamp)` enables efficient data management +- **Sorting key**: `ORDER BY (exchange, symbol, timestamp)` optimizes queries filtering by these columns +- **DateTime64(3)**: Millisecond precision timestamps + +## Custom Columns + +You can customize column names and selectively store fields: + +```python +custom_columns = { + 'symbol': 'instrument', + 'price': 'trade_price', + 'amount': 'volume', + 'side': 'direction' +} + +TradeClickHouse(**clickhouse_config, custom_columns=custom_columns) +``` + +## Example Queries + +### Recent trades for BTC-USD + +```sql +SELECT timestamp, exchange, price, amount, side +FROM trades +WHERE symbol = 'BTC-USD' +ORDER BY timestamp DESC +LIMIT 100; +``` + +### Volume by exchange and hour + +```sql +SELECT + toStartOfHour(timestamp) AS hour, + exchange, + sum(amount) AS volume, + count() AS trade_count +FROM trades +WHERE symbol = 'BTC-USD' + AND timestamp >= now() - INTERVAL 24 HOUR +GROUP BY hour, exchange +ORDER BY hour DESC, exchange; +``` + +### Price OHLCV aggregation (1-minute candles) + +```sql +SELECT + toStartOfMinute(timestamp) AS minute, + exchange, + symbol, + argMin(price, timestamp) AS open, + max(price) AS high, + min(price) AS low, + argMax(price, timestamp) AS close, + sum(amount) AS volume, + count() AS trades +FROM trades +WHERE symbol = 'BTC-USD' + AND timestamp >= now() - INTERVAL 1 HOUR +GROUP BY minute, exchange, symbol +ORDER BY minute DESC; +``` + +### Spread analysis across exchanges + +```sql +SELECT + timestamp, + exchange, + symbol, + ask - bid AS spread, + (ask - bid) / bid * 100 AS spread_pct +FROM ticker +WHERE symbol = 'BTC-USD' + AND timestamp >= now() - INTERVAL 1 HOUR +ORDER BY timestamp DESC; +``` + +## Performance Tips + +1. **Batch inserts**: The backend automatically batches writes for efficiency +2. **Use materialized views**: Pre-aggregate common queries (see examples in clickhouse_tables.sql) +3. **Partition management**: Drop old partitions to manage storage: `ALTER TABLE trades DROP PARTITION '202401'` +4. **Compression codecs**: Add compression for specific columns: `price Float64 CODEC(Delta, ZSTD)` +5. **Projections**: Create alternative sort orders for different query patterns + +## Monitoring + +View table sizes and compression: + +```sql +SELECT + table, + formatReadableSize(sum(bytes)) AS size, + formatReadableSize(sum(bytes_on_disk)) AS compressed, + sum(rows) AS rows +FROM system.parts +WHERE database = 'cryptofeed' + AND active +GROUP BY table; +``` + +## See Also + +- Full example: [examples/demo_clickhouse.py](../examples/demo_clickhouse.py) +- Table schemas: [examples/clickhouse_tables.sql](../examples/clickhouse_tables.sql) +- ClickHouse documentation: diff --git a/docs/callbacks.md b/docs/callbacks.md index 90574deeb..4e4e57139 100644 --- a/docs/callbacks.md +++ b/docs/callbacks.md @@ -26,10 +26,9 @@ It's important to note that if your choose to use the raw callbacks and your cal Every callback has the same signature, two positional arguments, the data object and the receipt timestamp. The data object differs by data type. The data objects are defined in [types.pyx](../cryptofeed/types.pyx) - ### Backends -The backends are defined [here](../cryptofeed/backends/). Currently the following are supported: +The backends are defined [in this folder](../cryptofeed/backends/). Currently the following are supported: * Arctic * ElasticSearch @@ -46,7 +45,7 @@ The backends are defined [here](../cryptofeed/backends/). Currently the followin * VictoriaMetrics * ZMQ -There are also a handful of wrappers defined [here](../cryptofeed/backends/aggregate.py) that can be used in conjunction with these and raw callbacks to convert data to OHLCV, throttle data, etc. +There are also a handful of wrappers defined [here](../cryptofeed/backends/aggregate.py) that can be used in conjunction with these and raw callbacks to convert data to OHLCV, throttle data, etc. ### Performance Considerations diff --git a/examples/clickhouse_tables.sql b/examples/clickhouse_tables.sql new file mode 100644 index 000000000..4ba22ff1c --- /dev/null +++ b/examples/clickhouse_tables.sql @@ -0,0 +1,244 @@ +-- ClickHouse Table Creation SQL for Cryptofeed +-- Execute these commands to create the necessary tables for storing cryptofeed data +-- +-- First, create the database: +-- clickhouse-client --query "CREATE DATABASE IF NOT EXISTS cryptofeed" +-- +-- Then run this script: +-- clickhouse-client --database=cryptofeed < clickhouse_tables.sql + +-- Trades table +CREATE TABLE IF NOT EXISTS trades +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + side String, + amount Float64, + price Float64, + id Nullable(String), + type Nullable(String) +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, timestamp) +SETTINGS index_granularity = 8192; + +-- Ticker table +CREATE TABLE IF NOT EXISTS ticker +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + bid Float64, + ask Float64 +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, timestamp) +SETTINGS index_granularity = 8192; + +-- Book table (stores order book snapshots and deltas as JSON) +CREATE TABLE IF NOT EXISTS book +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + data String +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, timestamp) +SETTINGS index_granularity = 8192; + +-- Funding table +CREATE TABLE IF NOT EXISTS funding +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + mark_price Nullable(Float64), + rate Float64, + next_funding_time Nullable(DateTime64(3)), + predicted_rate Nullable(Float64) +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, timestamp) +SETTINGS index_granularity = 8192; + +-- Open Interest table +CREATE TABLE IF NOT EXISTS open_interest +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + open_interest Float64 +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, timestamp) +SETTINGS index_granularity = 8192; + +-- Liquidations table +CREATE TABLE IF NOT EXISTS liquidations +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + side String, + quantity Float64, + price Float64, + id String, + status String +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, timestamp) +SETTINGS index_granularity = 8192; + +-- Candles table (OHLCV data) +CREATE TABLE IF NOT EXISTS candles +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + start DateTime64(3), + stop DateTime64(3), + interval String, + open Float64, + high Float64, + low Float64, + close Float64, + volume Float64, + closed Nullable(Bool) +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, start) +SETTINGS index_granularity = 8192; + +-- Index table +CREATE TABLE IF NOT EXISTS index +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + price Float64 +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, timestamp) +SETTINGS index_granularity = 8192; + +-- Order Info table (authenticated channel) +CREATE TABLE IF NOT EXISTS order_info +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + id String, + client_order_id Nullable(String), + side String, + status String, + type String, + price Nullable(Float64), + amount Float64, + remaining Nullable(Float64), + account Nullable(String) +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, timestamp) +SETTINGS index_granularity = 8192; + +-- Fills table (authenticated channel - executed trades) +CREATE TABLE IF NOT EXISTS fills +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + price Float64, + amount Float64, + side String, + fee Nullable(String), + id String, + order_id String, + liquidity Nullable(String), + type String, + account Nullable(String) +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, timestamp) +SETTINGS index_granularity = 8192; + +-- Transactions table (authenticated channel - deposits/withdrawals) +CREATE TABLE IF NOT EXISTS transactions +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + currency String, + type String, + status String, + amount Float64 +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, timestamp) +SETTINGS index_granularity = 8192; + +-- Balances table (authenticated channel) +CREATE TABLE IF NOT EXISTS balances +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + currency String, + balance Float64, + reserved Float64 +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, currency, timestamp) +SETTINGS index_granularity = 8192; + +-- Optional: Create materialized views for common queries +-- Example: Aggregate trade volume by minute +CREATE MATERIALIZED VIEW IF NOT EXISTS trades_by_minute +ENGINE = SummingMergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, minute) +AS SELECT + exchange, + symbol, + toStartOfMinute(timestamp) AS minute, + sum(amount) AS total_volume, + count() AS trade_count +FROM trades +GROUP BY exchange, symbol, minute; + +-- Example: Latest ticker prices (keeps only the most recent record) +CREATE MATERIALIZED VIEW IF NOT EXISTS latest_ticker +ENGINE = ReplacingMergeTree(receipt_timestamp) +PARTITION BY exchange +ORDER BY (exchange, symbol) +AS SELECT + exchange, + symbol, + timestamp, + receipt_timestamp, + bid, + ask +FROM ticker; diff --git a/examples/demo_clickhouse.py b/examples/demo_clickhouse.py new file mode 100644 index 000000000..ae560356e --- /dev/null +++ b/examples/demo_clickhouse.py @@ -0,0 +1,309 @@ +''' +Copyright (C) 2017-2026 Bryant Moscon - bmoscon@gmail.com + +Please see the LICENSE file for the terms and conditions +associated with this software. +''' +from cryptofeed import FeedHandler +from cryptofeed.backends.clickhouse import ( + TradeClickHouse, TickerClickHouse, BookClickHouse, + CandlesClickHouse, FundingClickHouse, OpenInterestClickHouse +) +from cryptofeed.defines import CANDLES, FUNDING, L2_BOOK, OPEN_INTEREST, TRADES, TICKER +from cryptofeed.exchanges import Bitfinex, Bitmex, Coinbase, Gemini, Binance + + +def main(): + """ + Before running this example, you need to: + 1. Install ClickHouse: https://clickhouse.com/docs/en/install + 2. Install the ClickHouse Python client: pip install clickhouse-connect + 3. Create the database: clickhouse-client --query "CREATE DATABASE IF NOT EXISTS cryptofeed" + 4. Create the tables using the SQL schema below + """ + + # Configure ClickHouse connection + clickhouse_config = { + 'host': '127.0.0.1', + 'port': 8123, + 'user': 'default', + 'password': '', + 'db': 'cryptofeed' + } + + config = { + 'log': {'filename': 'clickhouse-demo.log', 'level': 'INFO'}, + 'backend_multiprocessing': True + } + + f = FeedHandler(config=config) + + # Add feeds with ClickHouse backends + f.add_feed(Bitmex( + channels=[TRADES, FUNDING, OPEN_INTEREST], + symbols=['BTC-USD-PERP'], + callbacks={ + TRADES: TradeClickHouse(**clickhouse_config), + FUNDING: FundingClickHouse(**clickhouse_config), + OPEN_INTEREST: OpenInterestClickHouse(**clickhouse_config) + } + )) + + f.add_feed(Bitfinex( + channels=[TRADES], + symbols=['BTC-USD'], + callbacks={TRADES: TradeClickHouse(**clickhouse_config)} + )) + + f.add_feed(Coinbase( + config=config, + channels=[TRADES, TICKER], + symbols=['BTC-USD'], + callbacks={ + TRADES: TradeClickHouse(**clickhouse_config), + TICKER: TickerClickHouse(**clickhouse_config) + } + )) + + f.add_feed(Coinbase( + channels=[L2_BOOK], + symbols=['BTC-USD'], + callbacks={L2_BOOK: BookClickHouse(**clickhouse_config, snapshots_only=True)} + )) + + f.add_feed(Gemini( + symbols=['BTC-USD'], + callbacks={TRADES: TradeClickHouse(**clickhouse_config)} + )) + + f.add_feed(Binance( + candle_closed_only=True, + symbols=['BTC-USDT'], + channels=[CANDLES], + callbacks={CANDLES: CandlesClickHouse(**clickhouse_config)} + )) + + f.add_feed(Binance( + max_depth=10, + symbols=['BTC-USDT'], + channels=[L2_BOOK], + callbacks={L2_BOOK: BookClickHouse(**clickhouse_config, snapshots_only=True)} + )) + + f.run() + + +if __name__ == '__main__': + main() + + +""" +ClickHouse Table Creation SQL: + +-- Trades table +CREATE TABLE IF NOT EXISTS cryptofeed.trades +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + side String, + amount Float64, + price Float64, + id Nullable(String), + type Nullable(String) +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, timestamp) +SETTINGS index_granularity = 8192; + +-- Ticker table +CREATE TABLE IF NOT EXISTS cryptofeed.ticker +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + bid Float64, + ask Float64 +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, timestamp) +SETTINGS index_granularity = 8192; + +-- Book table +CREATE TABLE IF NOT EXISTS cryptofeed.book +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + data String +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, timestamp) +SETTINGS index_granularity = 8192; + +-- Funding table +CREATE TABLE IF NOT EXISTS cryptofeed.funding +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + mark_price Nullable(Float64), + rate Float64, + next_funding_time Nullable(DateTime64(3)), + predicted_rate Nullable(Float64) +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, timestamp) +SETTINGS index_granularity = 8192; + +-- Open Interest table +CREATE TABLE IF NOT EXISTS cryptofeed.open_interest +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + open_interest Float64 +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, timestamp) +SETTINGS index_granularity = 8192; + +-- Liquidations table +CREATE TABLE IF NOT EXISTS cryptofeed.liquidations +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + side String, + quantity Float64, + price Float64, + id String, + status String +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, timestamp) +SETTINGS index_granularity = 8192; + +-- Candles table +CREATE TABLE IF NOT EXISTS cryptofeed.candles +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + start DateTime64(3), + stop DateTime64(3), + interval String, + open Float64, + high Float64, + low Float64, + close Float64, + volume Float64, + closed Nullable(Bool) +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, start) +SETTINGS index_granularity = 8192; + +-- Index table +CREATE TABLE IF NOT EXISTS cryptofeed.index +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + price Float64 +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, timestamp) +SETTINGS index_granularity = 8192; + +-- Order Info table +CREATE TABLE IF NOT EXISTS cryptofeed.order_info +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + id String, + client_order_id Nullable(String), + side String, + status String, + type String, + price Nullable(Float64), + amount Float64, + remaining Nullable(Float64), + account Nullable(String) +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, timestamp) +SETTINGS index_granularity = 8192; + +-- Fills table +CREATE TABLE IF NOT EXISTS cryptofeed.fills +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + symbol String, + price Float64, + amount Float64, + side String, + fee Nullable(String), + id String, + order_id String, + liquidity Nullable(String), + type String, + account Nullable(String) +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, symbol, timestamp) +SETTINGS index_granularity = 8192; + +-- Transactions table +CREATE TABLE IF NOT EXISTS cryptofeed.transactions +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + currency String, + type String, + status String, + amount Float64 +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, timestamp) +SETTINGS index_granularity = 8192; + +-- Balances table +CREATE TABLE IF NOT EXISTS cryptofeed.balances +( + timestamp DateTime64(3), + receipt_timestamp DateTime64(3), + exchange String, + currency String, + balance Float64, + reserved Float64 +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(timestamp) +ORDER BY (exchange, currency, timestamp) +SETTINGS index_granularity = 8192; +""" diff --git a/setup.py b/setup.py index e4a39a1db..2ae14615b 100644 --- a/setup.py +++ b/setup.py @@ -87,6 +87,7 @@ def run_tests(self): ], extras_require={ "arctic": ["arctic", "pandas"], + "clickhouse": ["clickhouse-connect>=0.6.0"], "gcp_pubsub": ["google_cloud_pubsub>=2.4.1", "gcloud_aio_pubsub"], "kafka": ["aiokafka>=0.7.0"], "mongo": ["motor"], @@ -97,6 +98,7 @@ def run_tests(self): "zmq": ["pyzmq"], "all": [ "arctic", + "clickhouse-connect>=0.6.0", "google_cloud_pubsub>=2.4.1", "gcloud_aio_pubsub", "aiokafka>=0.7.0",