Skip to content

Commit

Permalink
[FLINK-35177] Fix DataGen Connector documentation (apache#24692)
Browse files Browse the repository at this point in the history
* [FLINK-35177] Fix DataGen Connector documentation

* [FLINK-35177] Consolidate code between example and documentation
  • Loading branch information
morozov authored Aug 28, 2024
1 parent 8c116de commit bc3afba
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 14 deletions.
7 changes: 3 additions & 4 deletions docs/content.zh/docs/connectors/datastream/datagen.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,16 @@ Rate Limiting
-----

`DataGeneratorSource` has built-in support for rate limiting. The following code will produce a stream of
`Long` values at the overall source rate (across all source subtasks) not exceeding 100 events per second.
`String` values at the overall source rate (across all source subtasks) not exceeding 100 events per second.

```java
GeneratorFunction<Long, Long> generatorFunction = index -> index;
double recordsPerSecond = 100;
GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index;

DataGeneratorSource<String> source =
new DataGeneratorSource<>(
generatorFunction,
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(recordsPerSecond),
RateLimiterStrategy.perSecond(100),
Types.STRING);
```

Expand Down
7 changes: 3 additions & 4 deletions docs/content/docs/connectors/datastream/datagen.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,16 @@ Rate Limiting
-----

`DataGeneratorSource` has built-in support for rate limiting. The following code will produce a stream of
`Long` values at the overall source rate (across all source subtasks) not exceeding 100 events per second.
`String` values at the overall source rate (across all source subtasks) not exceeding 100 events per second.

```java
GeneratorFunction<Long, Long> generatorFunction = index -> index;
double recordsPerSecond = 100;
GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index;

DataGeneratorSource<String> source =
new DataGeneratorSource<>(
generatorFunction,
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(recordsPerSecond),
RateLimiterStrategy.perSecond(100),
Types.STRING);
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@
*
* <p>This source has built-in support for rate limiting. The following code will produce an
* effectively unbounded (Long.MAX_VALUE from practical perspective will never be reached) stream of
* Long values at the overall source rate (across all source subtasks) of 100 events per second.
* String values at the overall source rate (across all source subtasks) of 100 events per second.
*
* <pre>{@code
* GeneratorFunction<Long, Long> generatorFunction = index -> index;
* GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index;
*
* DataGeneratorSource<String> source =
* new DataGeneratorSource<>(
* generatorFunctionStateless,
* generatorFunction,
* Long.MAX_VALUE,
* RateLimiterStrategy.perSecond(100),
* Types.STRING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ public static void main(String[] args) throws Exception {

GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index;

DataGeneratorSource<String> generatorSource =
DataGeneratorSource<String> source =
new DataGeneratorSource<>(
generatorFunction,
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(4),
RateLimiterStrategy.perSecond(100),
Types.STRING);

DataStreamSource<String> streamSource =
env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data Generator");
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Data Generator");
streamSource.print();

env.execute("Data Generator Source Example");
Expand Down

0 comments on commit bc3afba

Please sign in to comment.