(翻译)ApacheKafka官⽅⽂档:接⼝
Apache Kafka 官⽅⽂档:接⼝
Kafka 接⼝
Apache Kafka 引⼊⼀个新的 java 客户端(在 org.apache.kafka.clients 包中),替代⽼的 Scala 客户端,但是为了兼容,将会共存⼀段时间。为了减少依赖,这些客户端都有⼀个独⽴的 jar,⽽旧的 Scala 客户端继续与服务端保留在同个包下。
Kafka 有4 个核⼼ API:
Producer API 允许应⽤程序发送数据流到 kafka 集中的 topic。
Consumer API 允许应⽤程序 从 kafka 集的 topic 中读取数据流。
Streams API 允许从输⼊ topic 转换数据流到输出 topic。
Connect API 通过实现连接器(connector),不断地从⼀些源系统或应⽤程序中拉取数据到 kafka,或从kafka 提交数据到宿系统(sink system)或应⽤程序。
kafka 公开了其所有的功能协议,与语⾔⽆关。只⽤ java 客户端作为 kafka 项⽬的⼀部分进⾏维护,其他的作为开源的项⽬提供,这⾥提供了⾮ java 客户端的列表。
Kafka ⽣产者 API
我们⿎励所有新开发的程序使⽤新的 Java ⽣产者,新的 Java ⽣产者客户端⽐以前的 Scala 的客户端更快、功能更全⾯。通过下⾯的例⼦,引⼊ Maven (可以更改新的版本号)。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.0</version>
</dependency>
Kafka ⽣产者客户端(0.10.1.1 API)
Kafka 客户端发布 record(消息)到 Kafka 集
新的⽣产者时线程安全的,在线程之间共享单个⽣产者实例,通常单例⽐多个实例要快。
⼀个简单的例⼦,使⽤ producer 发送⼀个有序的 key / value (键值对),放到 java 的main ⽅法⾥就能直接运⾏,
Properties props =new Properties();
props.put("bootstrap.servers","localhost:9092");
props.put("acks","all");
props.put("retries",0);
props.put("batch.size",16384);
props.put("linger.ms",1);
props.put("",33554432);
props.put("key.serializer","org.apache.kafkamon.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafkamon.serialization.StringSerializer");
Producer<String, String> producer =new KafkaProducer<>(props);
for(int i =0; i <100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", String(i), String(i)));
producer.close();
⽣产者的缓冲空间池保留尚未发送到服务器的消息,后台 I/O 线程负责将这些消息转换长请求发送到集。如果使⽤后不关闭⽣产者,则会泄漏这些资源。
send()⽅法是异步的,添加消息到缓冲区等待发送,并⽴即返回。⽣产者将单个的消息批量在⼀起发送来提⾼效率。
ack 是判别请求是否为完整的条件(就是判断是不是成功发送了)。我们指定 “all” 将会阻塞消息,这种设置性能最低,但是是最可靠的。
retries,如果请求失败,⽣产者会⾃动重试,我们指定是 0 次,如果启⽤重试,则会有重复消息的可能性。
producer (⽣产者)缓存每个分区未发送的消息。缓存的⼤⼩是通过 batch.size 配置指定的。值较⼤的话将会产⽣更⼤的批。并需要更多的内存(因为每个“活跃”的分区都有⼀个缓冲区)。
时间正则表达式java默认缓冲可⽴即发送,即便缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置 ⼤于 0 。这将指⽰⽣产者请求之前等待⼀段时间,希望更多的消息填补到未满的批中,这类似于 TCP 的算法,例如上⾯的代码段,可能 100 条消息在⼀个请求发送,因为我们设置了linger(逗留)时间为 1毫秒,然后,如果我们没有填满缓冲区,这个设置将增加 1 毫秒的延迟请求以等待更多的消息。需要注意的是,在⾼负载下,相近的时间⼀般也会组成批,即使是 = 0。在不处于⾼负载的情况下,如果设置⽐0 ⼤,以少量的延迟代价换取更少的,更有效的请求。
< 控制⽣产者可⽤的缓存总量,如果消息发送速度⽐其传输到服务器快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调⽤将被阻塞,阻塞时间的阙值 设定,之后它将抛出⼀个 TimeoutException。
key.serializer 和 value.serializer ⽰例,将⽤户提供的 key 和 value 对象 ProducerRecord 转换成字节,你可以使⽤附带的ByteArraySerializaer或StringSerializer处理简单的string或byte类型。
send()
public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)
异步发送⼀条消息到 topic ,并调⽤ callback(当发送已确认)。
send 是异步的,并且⼀旦消息被保存在 等待发送的消息缓存 中,此⽅法就⽴即返回。这样并⾏发送多条消息⽽不阻塞去等待每⼀条消息的响应。
发送的结果是⼀个 RecordMetadata ,它指定了消息发送的分区,分配的 offset 和消息的时间戳。如果 topic 使⽤的是 CreateTime,则使⽤⽤户提供的时间戳或发送的时间(如果⽤户没有指定消息的时间戳)如果 topic 使⽤的是 LogAppendTime ,则追加消息时,时间戳是 broker 的本地时间。
由于 send 调⽤时异步的,它将为分配消息的此消息的 RecordMetadata 返回⼀个 Future 。如果 future 调⽤ get(),则将阻塞,直到相关请求王城并返回该消息的 metadata,或抛出发送异常。
如果要模拟⼀个简单的阻塞调⽤,你可以调⽤ get()⽅法
byte[] key ="key".getBytes();
byte[] value ="value".getBytes();
ProducerRecord<byte[],byte[]> record =new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();
完全⽆阻塞的话,可以利⽤回调参数提供的请求完成时将调⽤的回调通知。
ProducerRecord<byte[],byte[]> record =new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
new Callback(){
public void onCompletion(RecordMetadata metadata, Exception e){
if(e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: "+ metadata.offset());
}
});
发送到同⼀分区的消息回调保证⼀定的顺序执⾏,也就是说,在下⾯的例⼦中 callback1 保证执⾏ callback2 之前:
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
注意:callback ⼀般在⽣产者的 I/O 线程中执⾏,所以是相当的快的,否则将延迟其他的线程的消息发送。如果你需要执⾏阻塞或计算昂贵(消耗)的回调,建议在 callback 主体中使⽤⾃⼰的 Executor 来并⾏处理。
Kafka 消费者 API
随着 0.9.0版本,我们已经增加了⼀个新的 Java 消费者替换我们现有的基于 zookeeper 的⾼级和低级消费者。这个客户端还是测试版的质量。为了确保⽤户平滑升级,我们仍然维护旧的 0.8 版本的消费者客户端继续在 0.9 集上⼯作。两个⽼的 0.8 API 的消费者
这个⼼的消费者 API ,清除了 0.8 版本的⾼版本和低版本消费者之间的区别,你可以通过下⾯的 maven,引⼊依赖到你的客户端。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.0</version>
</dependency>
Kafka 消费者客户端(0.10.0.1 API)
Kafka 客户端从集中消费消息,并透明地处理 kafka 集中出现故障服务器,透明地调节适应集中变化的数据分区。也和服务器交互,平衡均衡消费者。
public class KafkaConsumer<K,V>
extends Object
implements Consumer<K,V>
消费者 TCP 长连接到 broker 来拉取消息。故障导致的消费者关闭失败,将会泄漏这些链接,消费者不是线程安全的,可以查看更多关于Multi-threaded(多线程) 处理的细节。
跨版本兼容性
该客户端可以与 0.10.0 或更新版本的 broker 集进⾏通信,较早的版本可能不⽀持某些功能。例如, 0.10.0 版本 broker 不⽀持offsetForTimes,因为此功能是在版本 0.10.1 中添加的。如果你调⽤ broker 版本不可⽤的 API时,将报UnsupportedVersionException 异常。
偏移量和消费者的位置
kafka 为分区中的每条消息保存⼀个 偏移量(offset),这个偏移量 是该分区中 ⼀条消息的唯⼀标识符。也表⽰消费者在分区的位置。例如,⼀个位置是 5 的消费者(说明已经消费了0到4 的消息),下⼀个接收消息的偏移量为 5 的消息。实际上有两个与消费者相关的 “位置” 概念。
消费者的位置 给出了下⼀条记录的偏移量。它⽐消费者在该分区中看到的最⼤偏移量要⼤⼀个。它在每次消费者在调⽤ poll(long)中接受消息是⾃动增长。
“已提交”的位置是已安全保存的最后偏移量,如果进程失败或重新启动时,消费者将恢复到这个偏移量。消费者可以选择定期⾃动提交偏移量,也可以选择通过调⽤ commit API 来⼿动的控制(如:commitSync 和 commitAsync)。
这个区别是消费者来控制⼀条消息什么时候才被认为是已被消费的,控制权在消费者,下⾯我们进⼀步更详细地讨论
消费者组和主题订阅
Kafka 的消费者组 概念,通过进程池⽠分消息并处理消息。这些进程可以在⽤⼀台机器上运⾏,也可分布到多台机器上,以增加可扩展性和容错性,相同 的消费者将视为同⼀个 消费者组。
分组中的每个消费者都通过 subscribe API 动态的订阅⼀个 topic 列表。kafka 将已订阅topic 的消息发送到每个消费者组中。并通过平衡分区在消费者分组中所有成员之间来达到平均。因此每个分区恰好地分配1个消费者(⼀个消费者组中)。所有如果⼀个 topic 有4个分区,并且⼀个消费者分组有只有 2 个消费者,那么每个消费者将消费 2个分区。
消费者组的成员是动态维护的:如果⼀个消费者故障。分配给它的分区将重新分配给同⼀个分组中其他的消费者。同样的,如果⼀个新的消费者加⼊到分组,将从现有消费者中移⼀个给它。这被称为 重新平衡分组,并在下⾯更详细地讨论。当新分区添加到订阅的 topic 时,或者当创建与订阅的正则表达式匹配的新 topic 时,也将重新平衡。将通过定时刷新⾃动发现新的分区,并将其分配给分组的成员。
从概念上讲,你可以将消费者分组看做是由多个进程组成的单⼀逻辑订阅者。作为⼀个多订阅系统,Kafka ⽀持对于给定 topic 任何数量的消费者组,⽽不重复。
这是在消息系统中详见的功能的略微概括。所有进程都将是单个消费者分组的⼀部分(类似传统消息
系统中的队列的语义),因此消息传递就像队列⼀样,在组中平衡。与传统的消息系统不同的是,虽然,你可以有多个这样的组。但每个进程都有⾃⼰的消费者组(类似于传统消息系统中 pub-sub 的语义),因此每个进程都会订阅该主题的所有消息。
此外,当分组重新分配⾃动发⽣时,可以通过 ConsumerRealanceListener 通知消费者,这允许他们完成必要的应⽤程序级逻辑,例如状态清除,⼿动偏移提交等。有关更多详细信息,请参阅 Kafka 存储的偏移。
它也允许消费者通过使⽤ assign(Collection)⼿动分配指定分区,如果使⽤⼿动指定分配分区,那么动态分区分配和协调消费者组将失效。
发现消费者故障
订阅⼀组 topic 后,当调⽤ poll (long)时,消费者将⾃动加⼊到组中,只要持续的调⽤ poll,消费者将⼀折保持可⽤,并继续从分配的分区中接收消息。此外,消费者向服务器定时发送⼼跳。如果消费者崩溃或⽆法再 配置的时间内发送⼼跳,则消费者将被视为死亡,并且其分区将被重新分配。
还有⼀种可能,消费可能遇到 “活锁” 的情况,它持续的发送⼼跳,但是没处理。为了预防消费者在这种情况下⼀直持有分区,我们使⽤活跃检测机制。在此基础上,如果你调⽤的 poll 的频率⼤于最⼤间
隔,则客户端将主动地离开组,以便其他消费者接管该分区。发⽣这种情况时,你会看到 offset 提交失败(调⽤ commitSync()引发的CommitFailedException)。这是⼀种安全机制,保障只有活动成员能够提交 offset。所以要留在组中,你必须持续调⽤ poll。
消费者提供两个配置设置来控制 poll 循环:
1. :增⼤poll 的间隔,可以为消费者提供更多的时间去处理返回的消息(调⽤ poll(long)返回的消息,通常返回的消息都是⼀批)。
缺点是此值越⼤将会延迟组重新平衡。
2. ds:此设置限制每次调⽤ poll 返回的消息数,这样可以更容易的预测每次poll 间隔要处理的最⼤值。通过调整此值,
可以减少 poll 间隔,减少重新平衡分组的
对于消息处理时间不可预测地的情况,这些选项是不够的。处理这种情况的推荐⽅法是将消息处理移到另⼀个线程中,让消费者继续调⽤poll。但是必须注意确保已提交的 offset 不超过实际位置。另外,你必须禁⽤⾃动提交,并只有在线程完成处理后才为记录⼿动提交偏移量(取决于你)。还要注意,你需要 pause 暂停分区,不会从 poll 接收到新消息,让线程处理完之前返回的消息(如果你的处理能
⼒⽐拉取消息的慢,那创建新线程将导致你机器内存溢出)。
⽰例
这个消费者 API 提供了灵活性,已涵盖各种消费场景,下⾯是⼀些例⼦来演⽰如何使⽤它们。
⾃动提交偏移量
这是个【⾃动提交偏移量】的简单的 kafka 消费者 API。
Properties props =new Properties();
props.put("bootstrap.servers","localhost:9092");
props.put("group.id","test");
props.put("enable.automit","true");
props.put("automit.interval.ms","1000");
props.put("key.deserializer","org.apache.kafkamon.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafkamon.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer =new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo","bar"));
while(true){
ConsumerRecords<String, String> records = consumer.poll(100);
for(ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
设置 enable.automit,偏移量由 控制⾃动提交的频率。
集是通过配置 bootstrap.servers 指定⼀个或多个 broker。不⽤指定全部的 broker,它将⾃动发现集中的其余的 broker(最好指定多个,万⼀有服务器故障)。
在这个例⼦中,客户端订阅了主题 foo 和 bar 。消费者组叫 test。
broker 通过⼼跳机器⾃动检测 test 组汇总失败的进程,消费者会⾃动 ping 集,告诉进它还活着。只要消费者能够做到这⼀点,它就被认为是活着的,并保留分配给它分区的权利,如果它停⽌⼼跳的时间超过 ,那么就会认为是故障的,它的分区将被分配到别的进程。
这个 deserializer 设置如何把 byte 转成 object 类型,例⼦中,通过指定 string 解析器,我们告诉获取到的消息的 key 和 value 只是简单个 string 类型。
⼿动设置偏移量
不需要定时的提交 offset,可以⾃⼰控制 offset,当消息认为已消费过了,这个时候再去提交它们的偏移量。这个很有⽤的,当消费的消息结合了⼀些处理逻辑,这个消息就不应该认为是已经消费的,直到它完成了整个处理。
Properties props =new Properties();
props.put("bootstrap.servers","localhost:9092");
props.put("group.id","test");
props.put("enable.automit","false");
props.put("automit.interval.ms","1000");
props.put("session.timeout.ms","30000");
props.put("key.deserializer","org.apache.kafkamon.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafkamon.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer =new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo","bar"));
final int minBatchSize =200;
List<ConsumerRecord<String, String>> buffer =new ArrayList<>();
while(true){
ConsumerRecords<String, String> records = consumer.poll(100);
for(ConsumerRecord<String, String> record : records){
buffer.add(record);
}
if(buffer.size()>= minBatchSize){
insertIntoDb(buffer);
consumermitSync();
buffer.clear();
}
}
在这个例⼦中,我们将消费⼀批消息并将它们存储在内存中。当我们积累⾜够多的消息后,我们在将它们批量插⼊到数据库中。如果我们设置 offset ⾃动提交(之前说的例⼦),消费将被认为是已消费的。这样会出现问题,我们的进程可能在批处理记录之后,但在它们被插⼊到数据库之前失败了。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论