Releases: confluentinc/confluent-kafka-python
v1.3.0
Confluent's Python client for Apache Kafka
confluent-kafka-python is based on librdkafka v1.3.0, see the librdkafka v1.3.0 release notes for a complete list of changes, enhancements, fixes and upgrade considerations.
This is a feature release adding support for KIP-392 Fetch from follower, allowing a consumer to fetch messages from the closest replica to increase throughput and reduce cost.
Features
- KIP-392 - Fetch messages from closest replica / follower (by @mhowlett)
- Python 3.8 binary wheel support for OSX and Linux. Windows Python 3.8 binary wheels are not currently available.
Enhancements
- New example using python3 and asyncio (by @mhowlett)
- Add warnings for inconsistent security configuration.
- Optimizations to hdr histogram (stats) rollover.
- Print compression type per message-set when debug=msg
- Various doc fixes, updates and enhancements (@edenhill , @mhowlett)
Fixes
- Fix crash when new topic is not created. (@mostafa Razavi,#725)
- Fix stringer/repr for SerializerError class(@ferozed, #675)
- Fix consumer_lag in stats when consuming from broker versions <0.11.0.0 (regression in librdkafka v1.2.0).
- Properly handle new Kafka-framed SASL GSSAPI frame semantics on Windows (#2542). This bug was introduced in v1.2.0 and broke GSSAPI authentication on Windows.
- Fix msgq (re)insertion code to avoid O(N^2) insert sort operations on retry (#2508). The msgq insert code now properly handles interleaved and overlapping message range inserts, which may occur during Producer retries for high-throughput applications.
- Fix producer insert msgq regression in v1.2.1 (#2450).
- Upgrade builtin lz4 to 1.9.2 (CVE-2019-17543, #2598).
- Don't trigger error when broker hostname changes (#2591).
- Less strict message.max.bytes check for individual messages (#993).
- Don't call timespec_get() on OSX (since it was removed in recent XCode) by @maparent .
- LZ4 is available from ProduceRequest 0, not 3 (fixes assert in #2480).
- Address 12 code issues identified by Coverity static code analysis.
v1.2.0
Confluent's Python client for Apache Kafka
confluent-kafka-python is based on librdkafka v1.2.0, see the librdkafka v1.2.0 release notes for a complete list of changes, enhancements, fixes and upgrade considerations.
- Transaction aware consumer (
isolation.level=read_committed) implemented by @mhowlett. - Sub-millisecond buffering (
linger.ms) on the producer. - Improved authentication errors (KIP-152)
Consumer-side transaction support
This release adds consumer-side support for transactions.
In previous releases, the consumer always delivered all messages to the application, even those in aborted or not yet committed transactions. In this release, the consumer will by default skip messages in aborted transactions.
This is controlled through the new isolation.level configuration property which
defaults to read_committed (only read committed messages, filter out aborted and not-yet committed transactions), to consume all messages, including for aborted transactions, you may set this property to read_uncommitted to get the behaviour of previous releases.
For consumers in read_committed mode, the end of a partition is now defined to be the offset of the last message of a successfully committed transaction (referred to as the 'Last Stable Offset').
For non-transactional messages there is no change from previous releases, they will always be read, but a consumer will not advance into a not yet committed transaction on the partition.
Upgrade considerations
linger.msdefault was changed from 0 to 0.5 ms to promote some level of batching even with default settings.
New configuration properties
- Consumer property
isolation.level=read_committedensures the consumer will only read messages from successfully committed producer transactions. Default isread_committed. To get the previous behaviour, set the property toread_uncommitted, which will read all messages produced to a topic, regardless if the message was part of an aborted or not yet committed transaction.
Enhancements
- Cache FastAvro schema for improved Avro Serialization/Deserialization (@BBM89, #627)
- Protocol decoding optimization, increasing consume performance.
- Add CachedSchemaRegistry docs (@lowercase24 , #495)
Fixes
General:
- Rate limit IO-based queue wakeups to
linger.ms, this reduces CPU load and lock contention for high throughput producer applications. (#2509) - SSL: Use only hostname (not port) when valid broker hostname (by Hunter Jacksson)
- SSL: Ignore OpenSSL cert verification results if
enable.ssl.certificate.verification=false(@salisbury-espinosa) - SASL Kerberos/GSSAPI: don't treat kinit ECHILD errors as errors (@hannip)
- Refresh broker list metadata even if no topics to refresh (#2476)
- Correct AdminClient doc (@lowercase24, #653)
- Update Avro example to be compliant with csh (@andreyferriyan , #668)
- Correct Avro example typo (@AkhilGNair, #598)
Consumer:
- Make
pause|resume()synchronous, ensuring that a subsequentpoll()will not return messages for the paused partitions. - Consumer doc fixes (@hrchu , #646, #648)
Producer:
- Fix message timeout handling for leader-less partitions.
message.timeout.ms=0is now accepted even iflinger.ms> 0 (by Jeff Snyder)
v1.1.0
Confluent's Python client for Apache Kafka
confluent-kafka-python is based on librdkafka v1.1.0, see the librdkafka v1.1.0 release notes for a complete list of changes, enhancements, fixes and upgrade considerations.
- In-memory SSL certificates (PEM, DER, PKCS#12) support (by @noahdav at Microsoft)
- Use Windows Root/CA SSL Certificate Store (by @noahdav at Microsoft)
ssl.endpoint.identification.algorithm=https(off by default) to validate the broker hostname matches the certificate. Requires OpenSSL >= 1.0.2(included with Wheel installations))- Improved GSSAPI/Kerberos ticket refresh
- Confluent monitoring interceptor package bumped to v0.11.1 (#634)
Upgrade considerations
- Windows SSL users will no longer need to specify a CA certificate file/directory (
ssl.ca.location), librdkafka will load the CA certs by default from the Windows Root Certificate Store. - SSL peer (broker) certificate verification is now enabled by default (disable with
enable.ssl.certificate.verification=false) %{broker.name}is no longer supported insasl.kerberos.kinit.cmdsince kinit refresh is no longer executed per broker, but per client instance.
SSL
New configuration properties:
ssl.key.pem- client's private key as a string in PEM formatssl.certificate.pem- client's public key as a string in PEM formatenable.ssl.certificate.verification- enable(default)/disable OpenSSL's builtin broker certificate verification.enable.ssl.endpoint.identification.algorithm- to verify the broker's hostname with its certificate (disabled by default).- Add new
rd_kafka_conf_set_ssl_cert()to pass PKCS#12, DER or PEM certs in (binary) memory form to the configuration object. - The private key data is now securely cleared from memory after last use.
Enhancements
Fixes
- SASL GSSAPI/Kerberos: Don't run kinit refresh for each broker, just per client instance.
- SASL GSSAPI/Kerberos: Changed
sasl.kerberos.kinit.cmdto first attempt ticket refresh, then acquire. - SASL: Proper locking on broker name acquisition.
- Consumer:
max.poll.interval.msnow correctly handles blocking poll calls, allowing a longer poll timeout than the max poll interval. - configure: Fix libzstd static lib detection
- PyTest pinned to latest version supporting python 2 (#634)
Version 1.0.1
Confluent's Python client for Apache Kafka
confluent-kafka-python is based on librdkafka v1.0.1, see the librdkafka v1.0.1 release notes for a complete list of changes, enhancements, fixes and upgrade considerations.
v1.0.1 is a maintenance release with the following fixes:
- Fix consumer stall when broker connection goes down (issue #2266 introduced in v1.0.0)
- Fix AdminAPI memory leak when broker does not support request (@souradeep100, #2314)
- SR client: Don't disable cert verification if no ssl.ca.location set (#578)
- Treat ECONNRESET as standard Disconnects (#2291)
- OpenSSL version bump to 1.0.2s
- Update/fix protocol error response codes (@benesch)
- Update Consumer get_watermark_offsets docstring (@hrchu, #572)
- Update Consumer subscribe docstring to include on_assign and on_revoke args (@hrchu, #571)
- Update delivery report string formatting (@hrchu, #575)
- Update logging configuration code example document (@soxofaan , #579)
- Implement environment markers to fix poetry (@fishman, #583)
v1.0.0
Confluent's Python client for Apache Kafka v1.0.0
confluent-kafka-python is based on librdkafka v1.0.0, see the librdkafka v1.0.0 release notes for a complete list of changes, enhancements and fixes and upgrade considerations.
v1.0.0 is a major feature release:
- Idempotent producer - guaranteed ordering, exactly-once producing) support.
- Sparse/on-demand connections - connections are no longer maintained to all brokers in the cluster.
- KIP-62 -
max.poll.interval.mssupport in the Consumer.
This release also changes configuration defaults and deprecates a set
of configuration properties, make sure to read the Upgrade considerations
section below.
Upgrade considerations (IMPORTANT)
Configuration default changes
The following configuration properties have changed default values, which
may require application changes:
-
acks(aliasrequest.required.acks) now defaults toall; wait for all in-sync replica brokers to ack. The previous default,1, only waited for an ack from the partition leader. This change places a greater emphasis on durability at a slight cost to latency. It is not recommended that you lower this value unless latency takes a higher precedence than data durability in your application. -
broker.version.fallbacknow to defaults to0.10, previously0.9.broker.version.fallback.msnow defaults to 0. Users on Apache Kafka <0.10 must setapi.version.request=falseandbroker.version.fallback=..to their broker version. For users >=0.10 there is no longer any need to specify any of these properties. -
enable.partition.eofnow defaults tofalse.KafkaError._PARTITION_EOFwas previously emitted by default to signify the consumer has reached the end of a partition. Applications which rely on this behavior must now explicitly setenable.partition.eof=trueif this behavior is required. This change simplifies the more common case where consumer applications consume in an endless loop.
group.id is now required for Python consumers.
Deprecated configuration properties
The following configuration properties have been deprecated. Use of any deprecated configuration property will result in a warning when the client instance is created. The deprecated configuration properties will be removed in a future release.
librdkafka:
offset.store.method=fileis deprecated.offset.store.pathis deprecated.offset.store.sync.interval.msis deprecated.produce.offset.reportis no longer used. Offsets are always reported.queuing.strategywas an experimental property that is now deprecated.reconnect.backoff.jitter.msis no longer used, seereconnect.backoff.msandreconnect.backoff.max.ms.socket.blocking.max.msis no longer used.topic.metadata.refresh.fast.cntis no longer used.
confluent_kafka:
default.topic.configis deprecated.- `CachedSchemaRegistryClient: url: was str, now conf dict with all application config properties
Idempotent Producer
This release adds support for Idempotent Producer, providing exactly-once
producing and guaranteed ordering of messages.
Enabling idempotence is as simple as setting the enable.idempotence
configuration property to true.
There are no required application changes, but it is recommended to add
support for the newly introduced fatal errors that will be triggered when the idempotent producer encounters an unrecoverable error that would break the ordering or duplication guarantees.
See Idempotent Producer in the manual and the Exactly once semantics blog post for more information.
Sparse connections
In previous releases librdkafka would maintain open connections to all
brokers in the cluster and the bootstrap servers.
With this release librdkafka now connects to a single bootstrap server
to retrieve the full broker list, and then connects to the brokers
it needs to communicate with: partition leaders, group coordinators, etc.
For large scale deployments this greatly reduces the number of connections
between clients and brokers, and avoids the repeated idle connection closes
for unused connections.
Sparse connections is on by default (recommended setting), the old
behavior of connecting to all brokers in the cluster can be re-enabled
by setting enable.sparse.connections=false.
See Sparse connections in the manual for more information.
Original issue librdkafka #825.
KIP-62 - max.poll.interval.ms is enforced
This release adds support for max.poll.interval.ms (KIP-62), which requires
the application to call consumer.poll() at least every max.poll.interval.ms.
Failure to do so will make the consumer automatically leave the group, causing a group rebalance,
and not rejoin the group until the application has called ..poll() again, triggering yet another group rebalance.
max.poll.interval.ms is set to 5 minutes by default.
Enhancements
- OpenSSL version bumped to 1.0.2r
- AvroProducer now supports encoding with fastavro (#492)
- Simplify
CachedSchemaRegistryClientconfiguration with configuration dict for application configs - Add
Delete Schemasupport to CachedSchemaRegistryClient - CachedSchemaRegistryClient now supports HTTP Basic Auth (#440)
- MessageSerializer now supports specifying reader schema (#470)
Fixes
- Fix crash when calling
Consumer.consumewithout settinggroup.id(now required) CachedSchemaRegistryClienthandlesget_compatibilityproperly
Build/installation/tooling
- Integration tests moved to docker-compose to aid in cluster set-up/tear-down
- Runner script
./tests/run.shadded to simplify unit and integration test execution
v0.11.6
See librdkafka v0.11.6 release notes for enhancements and fixes in librdkafka.
New Features
- Confluent Monitoring Interceptors are now included with Linux and OSX binary wheel distributions. (#464)
- Experimental binary wheel distributions for Windows environments. (#451)
Enhancements
- OpenSSL version bump to 1.0.2p. (#437)
- Topic configurations have been moved into the global configuration dictionary to simplify configuration. The property
default.topic.configurationhas been deprecated and will be removed in 1.0, but still has precedence to topic configuration specified in the global configuration dictionary. (#446)
Fixes
v0.11.5
Admin Client support
v0.11.5 is a feature release that adds support for the Kafka Admin API (KIP-4).
Admin API
This release adds support for the Admin API, enabling applications and users to perform administrative Kafka tasks programmatically:
- Create topics - specifying partition count, replication factor and topic configuration.
- Delete topics - delete topics in cluster.
- Create partitions - extend a topic with additional partitions.
- Alter configuration - set, modify or delete configuration for any Kafka resource (topic, broker, ..).
- Describe configuration - view configuration for any Kafka resource.
The API closely follows the Java Admin API:
def example_create_topics(a, topics):
new_topics = [NewTopic(topic, num_partitions=3, replication_factor=1) for topic in topics]
# Call create_topics to asynchronously create topics
fs = a.create_topics(new_topics)
# Wait for operation to finish.
for topic, f in fs.items():
try:
f.result() # The result itself is None
print("Topic {} created".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))Additional examples can be found in examples/adminapi
Enhancements
- Schema Registry HTTPS support with TLS client auth added (#90)
- Metadata API list_topics() added (#161, @tbsaunde, @stephan-hof)
- Expose librdkafka built-in partitioner options directly (#396)
- Callback based throttle event handling;
throttle_cb(#237) (#377) - Added Unicode support for header values (#382)
- OpenSSL version bump to 1.0.2o (#410)
- Avro documentation added to the docs (#382)
- Python 3.7 support (#382)
- Allow passing headers as both list(tuples) and dict() (#355)
- Support for legacy setuptool's install_requires (#399)
Fixes
- Release GIL before making blocking calls (#412)
- Prevent application config dict mutation (#412)
- Intercept plugin configurations to ensure proper ordering (#404)
test_compatibility()should returnFalsenotNonewould returnNonewhen unable to check compatibility (#372, @Enether)- Schema Registry client returns false when unable to check compatibility(#372, @Enether)
- Fix invocation of SchemaParseException (#376)
- Fix call ordering to avoid callback crash on implicit close (#265)
- Fix memory leaks in generic client setters (#382)
- Fix AvroProducer/AvroConsumer key/value identity check (#342)
- Correct
Producer.producedocumentation to use correct time unit of seconds (#384) (#385) - Fix KafkaError refcounting which could lead to memory leaks (#382)
v0.11.4
Simplified installation
This release adds binary wheels containing all required dependencies (librdkafka, openssl, zlib, etc) for Linux and OSX.
Should these wheels not work on your platform then please file an issue outlining what is failing, and then use the previous method of installing librdkafka manually followed by pip install --no-binary all confluent-kafka
Message header support
Support for Kafka message headers has been added (requires broker version >= v0.11.0).
When producing messages simply provide a list of key,value tuples as headers=:
myproducer.produce(topic, 'A message payload', headers=[('hdr1', 'val1'), ('another', 'one'), ('hdr1', 'duplicates are supported and ordering is retained')])Message headers are returned as a list of tuples for consumed messages:
msg = myconsumer.poll(1)
if msg is not None and not msg.error():
headers = msg.headers()
if headers is not None:
# convert to dict, collapsing duplicate header keys
headers_dict = dict(headers)Enhancements
- Message header support (@johnistan)
- Added Consumer.seek()
- Added consumer.pause/resume support (closes #120, @dangra)
- Added Consumer.store_offsets() API (#245, @ctrochalakis)
- Support for passing librdkafka logs to the standard logging module (see
loggerkwarg in constructors) (#148) - Enable produce.offset.report by default (#266) (#267)
- Expose offsets_for_times consumer method. closes #224 (#268, @johnistan)
- Add batch consume() API (closes #252, #282, @tburmeister)
- Add hash func for UnionSchema (#228, @fyndiq)
- Use schemaless reader to handle complex schema (#251, @fpietka)
Fixes
- Fix librdkafka install command for macOS (#281, @vkroz)
- Constructors now support both dict and kwargs
- Add
__version__to__init__.py(@mrocklin) - Messages could be leaked&lost if exception raised from callback triggered by poll()
- Make Consumer.commit(..,asynchronous=False) return offset commit results
- Raise runtime error if accessing consumer after consumer close (#262, @johnistan)
- Pass py.test arguments from tox (@ctrochalakis)
- Rename
asynckwargs toasynchronous(asyncwill continue working until the 1.0 API bump)
v0.11.0
This is a minimal librdkafka version-synchronized release of the Python client.
Changes:
- Handle null/None values during deserialization
- Allow to pass custom schema registry instance.
- None conf values are now converted to NULL rather than the string "None" (#133)
- Fix memory leaks when certain exceptions were raised.
- Handle delivery.report.only.error in Python (#84)
- Proper use of Message error string on Producer (#129)
- Now Flake8 clean