Join GitHub today
GitHub is home to over 50 million developers working together to host and review code, manage projects, and build software together.
Sign up[FLINK-18449][table sql/api]Kafka topic discovery & partition discove… #12908
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 1a4e5ee (Wed Jul 15 13:11:58 UTC 2020) Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
|
@wuchong CC |
| Pattern pattern, | ||
| Properties properties, | ||
| DeserializationSchema<RowData> deserializationSchema) { |
|
|
||
| @Override | ||
| protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer( | ||
| Pattern pattern, |
| Pattern topicPattern, | ||
| Properties properties, | ||
| DeserializationSchema<RowData> deserializationSchema) { |
| DataType outputDataType, | ||
| @Nullable List<String> topics, | ||
| @Nullable Pattern topicPattern, | ||
| Properties properties, | ||
| DecodingFormat<DeserializationSchema<RowData>> decodingFormat, | ||
| StartupMode startupMode, | ||
| Map<KafkaTopicPartition, Long> specificStartupOffsets, | ||
| long startupTimestampMillis) { |
| Pattern topicPattern, | ||
| Properties properties, | ||
| DeserializationSchema<RowData> deserializationSchema); |
| <td>optional for source(use 'topic' instead if not set)</td> | ||
| <td style="word-wrap: break-word;">(none)</td> | ||
| <td>String</td> | ||
| <td>Topic pattern from which the table is read. It will use input value to build regex expression to discover matched topics.</td> |
wuchong
Jul 27, 2020
Member
The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of "topic-pattern" and "topic" can be specified for sources.
| options.add(SCAN_STARTUP_TIMESTAMP_MILLIS); | ||
| options.add(SCAN_TOPIC_PARTITION_DISCOVERY); |
| properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, | ||
| String.valueOf(tableOptions | ||
| .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY) | ||
| .map(val -> val.toMillis()) |
| )); | ||
| } else { | ||
| throw new ValidationException(String.format( | ||
| errorMessageTemp, "topic-list", tableOptions.get(TOPIC) |
| @@ -139,10 +152,12 @@ public void testTableSource() { | |||
| Thread.currentThread().getContextClassLoader()); | |||
|
|
|||
| // Test scan source equals | |||
| KAFKA_SOURCE_PROPERTIES.setProperty("flink.partition-discovery.interval-millis", "1000"); | |||
|
|
||
| private static boolean isSingleTopic(ReadableConfig tableOptions) { | ||
| // Option 'topic-pattern' is regarded as multi-topics. | ||
| return tableOptions.getOptional(TOPIC).isPresent() && tableOptions.get(TOPIC).split(",").length == 1; |
wuchong
Aug 4, 2020
Member
The community recommend to use List ConfigOption for list values, framework will handle the parsing. This will also change to use ; as the separator, but this is more align with other list options. You can declare a List ConfigOption by :
public static final ConfigOption<List<String>> TOPIC = ConfigOptions
.key("topic")
.stringType()
.asList()
.noDefaultValue()
.withDescription("...");Then you can call return tableOptions.getOptional(TOPIC).map(t -> t.size() == 1).orElse(false); here.
Sorry for the late reminder.
|
Btw, could you add an integration test for this? |
…partition discovery for Kafka source in Table API This closes #12908
|
LGTM. Will merge once build is passed. |
| @Nullable List<String> topics, | ||
| @Nullable Pattern topicPattern, |
wuchong
Aug 19, 2020
Member
Currently, it is very verbose to pass through these to parameters together here and there. An improvement is that we can use KafkaTopicsDescriptor, but this can be another issue in the future.

Formed in 2009, the Archive Team (not to be confused with the archive.org Archive-It Team) is a rogue archivist collective dedicated to saving copies of rapidly dying or deleted websites for the sake of history and digital heritage. The group is 100% composed of volunteers and interested parties, and has expanded into a large amount of related projects for saving online and digital history.

…ry dynamically in table api
What is the purpose of the change
Enable Kafka Connector topic discovery & partition discovery in table api.
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation