Flink消费kafka的offset设置
1.问题问题简介及背景
在使用Flink自带的Kafka消费API时,我们可以像单纯的使用Kafka消费对象API对其进行相应的属性设置,例如,读取offset的方式、设置offset的方式等。但是,Flink具有checkpoint功能,保存各运算算子的状态,也包括消费kafka时的offset,那这两种情况分别在什么时候起作用呢?
2. Flink checkpoint设置
flink并不依赖kafka或zookeeper保证容错,其保存offset只是为了外部来查询监视kafka数据的消费情况。但其提供了提交消费kafka数据的offset给Kafka或者zookeeper(kafka0.8之前)的配置,因此最终是由kafka自动设置offset,还是由flink的checkpoint机制进行最终的offset设置,取决于开发过程中的相关设置。
配置offset的提交方式取决于是否为job设置开启checkpoint,可以使用env.enableCheckpointing(milliseconds)来设置开启checkpoint。
如果禁用了checkpoint,那么offset位置的提交取决于Flink读取kafka客户端的配置,enable.auto.commit ( auto.commit.enable【Kafka 0.8】)配置是否开启自动提交offset, auto.commit.interval.ms决定自动提交offset的周期。
如果开启了checkpoint,那么当checkpoint保存状态完成后,将checkpoint中保存的offset位置提交到kafka。这样保证了Kafka中保存的offset和checkpoint中保存的offset一致,可以通过配置setCommitOffsetsOnCheckpoints(boolean)来配置是否将checkpoint中的offset提交到kafka中(默认是true)。如果使用这种方式,那么properties中配置的kafka offset自动提交参数enable.auto.commit和周期提交参数auto.commit.interval.ms参数将被忽略。
// checkpoint设置
// 每隔1s进行启动一个检查点【设置checkpoint的周期】
env.enableCheckpointing(1000);
// 设置模式为:exactly_one,仅一次语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有1s的时间间隔【checkpoint最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
// 检查点必须在1s之内完成,或者被丢弃【checkpoint超时时间】
env.getCheckpointConfig().setCheckpointTimeout(1000);
// 同一时间只允许进行一次检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//表示一旦Flink程序被cancel后,会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//设置statebackend,将检查点保存在hdfs上面,默认保存在内存中。这里先保存到本地
// env.setStateBackend(new FsStateBackend("file:///F:/kafkaTool/aaa"));
// Kafka相关设置
val properties = new Properties()
properties.setProperty("bootstrap.servers", "hadoop:9091,hadoop:9092,hadoop:9093")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
// properties.setProperty("auto.offset.reset", "latest") // 默认是latest,当第一次运行,会读取最近提交的offset
// properties.setProperty("enable.auto.commit", "true") // 默认是true
val flinkKafkaConsumer011 = new FlinkKafkaConsumer011[String]("GMALL_EVENT_0105", new SimpleStringSchema(), properties)
// 设置根据程序checkpoint进行offset提交
flinkKafkaConsumer011.setCommitOffsetsOnCheckpoints(true)
flinkKafkaConsumer011.setStartFromGroupOffsets()
无论是否设置checkpoint,auto.offset.reset都默认为latest,enable.auto.commit都默认为true,第一次启动程序,会根据FlinkKafkaConsumer011对象的设置读取相应的offsets。
- setStartFromEarliest():从消息最开始消费;
- setStartFromLatest():从消息的最后开始消费;
- setStartFromTimestamp(long startupOffsetsTimestamp):从设定的时间戳消费;
- setStartFromGroupOffsets():从kafka保存的消费者组的offsets消费,如果没有,会根据auto.offset.reset设定进行消费(auto.offset.reset默认为latest),也就是说我第一次启动程序的时候从哪里消费也需要考虑;
setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets):从特定offset消费;
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhggjefj
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13