kafka消息-----定时清理
项⽬中最近使⽤kafka需要定时清理消息,我们知道kafka有周期性清理消息机制,但是项⽬中往往因为数据量较⼤,需要⼿动控制分区已消费消息的清理。
此处使⽤的是反编译程序,具体⼏个⽅法已标出,
个⼈清理想法:⼤致流程就是根据topic获取所有的分区,然后根据遍历每个分区的偏移量,然后存⼊map,存⼊时先判断偏移量⼤⼩,保留最⼩该分区消费者最⼩偏移量的值,
然后删除该偏移量以前的数据,
下⾯的反码程序实际上没有做最⼩偏移量识别,直接⽤的分区最后⼀个最新的的偏移量,是通过创建⼀个消费者直接将其Offset toEnd,然后删掉该分区该offset之前的消
息,具体的⼤家可根据⾃⼰要求⾃⾏修改
package com.atguigu.kafkaclean;
import org.springframework.stereotype.*;
import t.annotation.*;
import com.ankki.kafkaclient.mapper.*;
import javax.annotation.*;
import java.time.format.*;
import com.ankki.kafkaclient.utils.*;
import urrent.*;
import org.apache.kafka.clients.admin.*;
import org.springframework.scheduling.annotation.*;
import com.sumer.*;
import org.apache.sumer.*;
import org.apache.kafkamon.*;
import com.del.*;
import java.time.*;
import org.slf4j.*;
import java.util.*;
@Slf4j
@Component
@Configuration
@EnableScheduling
public class SaticScheduleTask
{
private static final Logger log;
private static final String SASL_VALUE = "yes";
private static final String TOPIC;
private static final String KAFKA_SERVER_ADDRR;
@Resource
private SysLogMapper sysLogMapper;
@Scheduled(cron = "${cleanTime}")
public void configureTasks() {
log.info("\u6267\u884c\u5b9a\u65f6\u6e05\u7406\u4efb\u52a1\u65f6\u95f4: {}", (w().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
final Properties props = new Properties();
if ("yes".Property("openSasl"))) {
KafkaUtils.jaasConfig(props);
}
spring framework表达式assign((Hashtable<String, String>)props).put("bootstrap.servers", SaticScheduleTask.KAFKA_SERVER_ADDRR);
try (final AdminClient adminClient = ate(props)) {
final String[] topics = SaticScheduleTask.TOPIC.split(",");
final Map<TopicPartition, RecordsToDelete> recordsToDeleteMap = new HashMap<TopicPartition, RecordsToDelete>(16);
DeleteRecordsResult deleteRecordsResult = null;
for (final String topic : topics) {
final Map<Integer, Long> partitionInfoMap = PartitionsForTopic(topic);
for (final Map.Entry<Integer, Long> entry2 : Set()) {
final TopicPartition topicPartition = new TopicPartition(topic, (Key());
final RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset((Value());
recordsToDeleteMap.put(topicPartition, recordsToDelete);
}
}
SaticScheduleTask.log.debug("\u6e05\u7406\u96c6\u5408\uff1a{}", (String());
deleteRecordsResult = adminClient.deleteRecords((Map)recordsToDeleteMap);
final Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = (Map<TopicPartition, KafkaFuture<DeletedRecords>>)deleteRecordsResult.lowWatermarks();
final Exception ex;
Exception e;
try {
SaticScheduleTask.log.info("\u6e05\u7406\u6570\u636e\u4fe1\u606f \u4e3b\u9898\uff1a{} \u5206\u533a\uff1a{} \u6700\u65b0\u504f\u79fb\u91cf\uff1a{} \u7684\u7ed3\u679c\uff1a{}", new Object[] { Key().topic(), Key }
catch (InterruptedException | ExecutionException ex2) {
e = ex;
("\u83b7\u53d6kafka\u6e05\u7406\u6807\u8bb0\u7ed3\u679c\u5f02\u5e38\uff1a", (Throwable)e);
}
return;
});
SaticScheduleTask.log.debug("\u5b9a\u65f6\u6e05\u7406\u5df2\u6d88\u8d39\u7684\u6570\u636e\u6210\u529f");
}
catch (Exception e2) {
("kafka\u5b9a\u65f6\u6267\u884c\u6e05\u7406\u6807\u8bb0\u5f02\u5e38\uff1a", (Throwable)e2);
}
}
private Consumer<Long, String> createConsumer() {
final Consumer<Long, String> consumer = (Consumer<Long, String>)new sumerConfigs());
return consumer;
}
private Map<Integer, Long> getPartitionsForTopic(final String topic) {
final Map<Integer, Long> partitionInfoMap = new HashMap<Integer, Long>(16);
final Consumer<Long, String> consumer = ateConsumer();
final Collection<PartitionInfo> partitionInfos = (Collection<PartitionInfo>)consumer.partitionsFor(topic);
final List<TopicPartition> tp = new ArrayList<TopicPartition>();
final List<TopicPartition> list;
final Consumer consumer2;
final Map<Integer, Long> map;
partitionInfos.forEach(str -> {
list.add(new TopicPartition(topic, str.partition()));
consumer2.assign((Collection)list);
consumer2.seekToEnd((Collection)list);
map.put(str.partition(), consumer2.position(new TopicPartition(topic, str.partition())));
return;
});
return partitionInfoMap;
}
private void recordSystemLog(final String value) {
final SysLog systemLog = new SysLog();
systemLog.w().toEpochMilli());
systemLog.setLogType((Byte)2);
if ("true".equals(value)) {
systemLog.setLogStatus((Byte)1);
systemLog.setLogLevel((Byte)2);
systemLog.setLogDesc("\u5b9a\u65f6\u6e05\u7406\u5df2\u6d88\u8d39\u7684\u6570\u636e\u6210\u529f\uff01"); }
else {
systemLog.setLogStatus((Byte)0);
systemLog.setLogLevel((Byte)1);
systemLog.setLogDesc("\u5b9a\u65f6\u6e05\u7406\u5df2\u6d88\u8d39\u7684\u6570\u636e\u5931\u8d25\uff01"); }
this.sysLogMapper.insertSelective(systemLog);
}
static {
log = Logger((Class)SaticScheduleTask.class);
TOPIC = Property("cleanTopics");
KAFKA_SERVER_ADDRR = Property("bootstrapServers");
}
}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论