Skip to content
This repository has been archived by the owner on Nov 20, 2024. It is now read-only.

Commit

Permalink
Update SNAPSHOT to 3.1.4
Browse files Browse the repository at this point in the history
  • Loading branch information
spring-builds committed Sep 21, 2021
1 parent 265b881 commit 593cf44
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 17 deletions.
60 changes: 53 additions & 7 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ Note that this property is only applicable for pollable consumers.
Default: not set.
resetOffsets::
Whether to reset offsets on the consumer to the value provided by startOffset.
Must be false if a `KafkaRebalanceListener` is provided; see <<rebalance-listener>>.
Must be false if a `KafkaBindingRebalanceListener` is provided; see <<rebalance-listener>>.
See <<reset-offsets>> for more information about this property.
+
Default: `false`.
Expand Down Expand Up @@ -337,6 +337,11 @@ Usually needed if you want to synchronize another transaction with the Kafka tra
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
+
Default: none.
txCommitRecovered::
When using a transactional binder, the offset of a recovered record (e.g. when retries are exhausted and the record is sent to a dead letter topic) will be committed via a new transaction, by default.
Setting this property to `false` suppresses committing the offset of recovered record.
+
Default: true.

[[reset-offsets]]
==== Resetting Offsets
Expand All @@ -363,7 +368,7 @@ Set `resetOffsets` to `true` and `startOffset` to `latest`; the binding will per

IMPORTANT: If a rebalance occurs after the initial assignment, the seeks will only be performed on any newly assigned partitions that were not assigned during the initial assignment.

For more control over topic offsets, see <<rebalance-listener>>; when a listener is provided, `resetOffsets: true` is ignored.
For more control over topic offsets, see <<rebalance-listener>>; when a listener is provided, `resetOffsets` should not be set to `true`, otherwise, that will cause an error.

==== Consuming Batches

Expand Down Expand Up @@ -506,11 +511,11 @@ Default: `false`

In this section, we show the use of the preceding properties for specific scenarios.

===== Example: Setting `autoCommitOffset` to `false` and Relying on Manual Acking
===== Example: Setting `ackMode` to `MANUAL` and Relying on Manual Acknowledgement

This example illustrates how one may manually acknowledge offsets in a consumer application.

This example requires that `spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset` be set to `false`.
This example requires that `spring.cloud.stream.kafka.bindings.input.consumer.ackMode` be set to `MANUAL`.
Use the corresponding input channel name for your example.

[source]
Expand Down Expand Up @@ -622,6 +627,47 @@ Usually, applications may use principals that do not have administrative rights
Consequently, relying on Spring Cloud Stream to create/modify topics may fail.
In secure environments, we strongly recommend creating topics and managing ACLs administratively by using Kafka tooling.

====== Multi-binder configuration and JAAS

When connecting to multiple clusters in which each one requires separate JAAS configuration, then set the JAAS configuration using the property `sasl.jaas.config`.
When this property is present in the applicaiton, it takes precedence over the other strategies mentioned above.
See this https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients[KIP-85] for more details.

For example, if you have two clusters in your application with separate JAAS configuration, then the following is a template that you can use:

```
spring.cloud.stream:
binders:
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
kafka2:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9093
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"user1-secret\";"
kafka.binder:
configuration:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
```

Note that both the Kafka clusters, and the `sasl.jaas.config` values for each of them are different in the above configuration.

See this https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/multi-binder-samples/kafka-multi-binder-jaas[sample application] for more details on how to setup and run such an application.

[[pause-resume]]
===== Example: Pausing and Resuming the Consumer

Expand Down Expand Up @@ -774,10 +820,10 @@ public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key,
====

[[rebalance-listener]]
=== Using a KafkaRebalanceListener
=== Using a KafkaBindingRebalanceListener

Applications may wish to seek topics/partitions to arbitrary offsets when the partitions are initially assigned, or perform other operations on the consumer.
Starting with version 2.1, if you provide a single `KafkaRebalanceListener` bean in the application context, it will be wired into all Kafka consumer bindings.
Starting with version 2.1, if you provide a single `KafkaBindingRebalanceListener` bean in the application context, it will be wired into all Kafka consumer bindings.

====
[source, java]
Expand Down Expand Up @@ -830,7 +876,7 @@ You cannot set the `resetOffsets` consumer property to `true` when you provide a
If you want advanced customization of consumer and producer configuration that is used for creating `ConsumerFactory` and `ProducerFactory` in Kafka,
you can implement the following customizers.

* ConsusumerConfigCustomizer
* ConsumerConfigCustomizer
* ProducerConfigCustomizer

Both of these interfaces provide a way to configure the config map used for consumer and producer properties.
Expand Down
2 changes: 1 addition & 1 deletion docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.1.4-SNAPSHOT</version>
<version>3.1.4</version>
</parent>
<packaging>jar</packaging>
<name>spring-cloud-stream-binder-kafka-docs</name>
Expand Down
1 change: 0 additions & 1 deletion docs/src/main/asciidoc/_configprops.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
|spring.cloud.stream.dynamic-destinations | `[]` | A list of destinations that can be bound dynamically. If set, only listed destinations can be bound.
|spring.cloud.stream.function.batch-mode | `false` |
|spring.cloud.stream.function.bindings | |
|spring.cloud.stream.function.definition | | Definition of functions to bind. If several functions need to be composed into one, use pipes (e.g., 'fooFunc\|barFunc')
|spring.cloud.stream.instance-count | `1` | The number of deployed instances of an application. Default: 1. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-count" where 'foo' is the name of the binding.
|spring.cloud.stream.instance-index | `0` | The instance id of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index" where 'foo' is the name of the binding.
|spring.cloud.stream.instance-index-list | | A list of instance id's from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index-list" where 'foo' is the name of the binding. This setting will override the one set in 'spring.cloud.stream.instance-index'
Expand Down
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.1.4-SNAPSHOT</version>
<version>3.1.4</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>3.0.4-SNAPSHOT</version>
<version>3.0.4</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>2.6.10</spring-kafka.version>
<spring-integration-kafka.version>5.4.10</spring-integration-kafka.version>
<kafka.version>2.6.2</kafka.version>
<spring-cloud-schema-registry.version>1.1.4-SNAPSHOT</spring-cloud-schema-registry.version>
<spring-cloud-stream.version>3.1.4-SNAPSHOT</spring-cloud-stream.version>
<spring-cloud-schema-registry.version>1.1.4</spring-cloud-schema-registry.version>
<spring-cloud-stream.version>3.1.4</spring-cloud-stream.version>
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>
Expand Down
2 changes: 1 addition & 1 deletion spring-cloud-starter-stream-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.1.4-SNAPSHOT</version>
<version>3.1.4</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>
Expand Down
2 changes: 1 addition & 1 deletion spring-cloud-stream-binder-kafka-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.1.4-SNAPSHOT</version>
<version>3.1.4</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>
Expand Down
2 changes: 1 addition & 1 deletion spring-cloud-stream-binder-kafka-streams/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.1.4-SNAPSHOT</version>
<version>3.1.4</version>
</parent>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion spring-cloud-stream-binder-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.1.4-SNAPSHOT</version>
<version>3.1.4</version>
</parent>

<dependencies>
Expand Down

0 comments on commit 593cf44

Please sign in to comment.