Kafka基础教程(三):C#使⽤Kafka消息队列
  接上篇Kafka的安装,我安装的Kafka集地址:192.168.209.133:9092,192.168.209.134:9092,192.168.209.135:9092,所以这⾥直接使⽤这个集来演⽰
  ⾸先创建⼀个项⽬,演⽰采⽤的是控制台( core 3.1),然后使⽤Nuget安装 Confluent.Kafka 包:
  上⾯的截图中有Confluent.Kafka的源码地址,感兴趣的可以去看看:
  消息发布
  先直接上Demo 
static void Main(string[] args)
{
ProducerConfig config = new ProducerConfig();
config.BootstrapServers = "192.168.209.133:9092,192.168.209.134:9092,192.168.209.135:9092";
var builder = new ProducerBuilder<string, object>(config);
builder.SetValueSerializer(new KafkaConverter());//设置序列化⽅式
var producer = builder.Build();
producer.Produce("test", new Message<string, object>() { Key = "Test", Value = "hello world" });
      Console.ReadKey();
}
  上述代码执⾏后,就可以使⽤上⼀节提到的kafkatool⼯具查看到消息了。
  1、消息发布需要使⽤⽣产者对象,它由ProducerBuilder<,>类构造,有两个泛型参数,第⼀个是路由Key的类型,第⼆个是消息的类型,开发过程中,我们多数使⽤ProducerBuilder<string, object>或者ProducerBuilder<string, string>。
  2、ProducerBuilder<string, object>在实例化时需要⼀个配置参数,这个配置参数是⼀个集合(IEnumerable<KeyValuePair<string,
string>>),ProducerConfig其实是实现了这个集合接⼝的⼀个类型,在旧版本的Confluent.Kafka中,是没有这个ProducerConfig类型的,之前都是使⽤Dictionary<string,string>来构建ProducerBuilder<string, object>,⽐如上⾯的Demo,其实也可以写成: 
static void Main(string[] args)
{
Dictionary<string, string> config = new Dictionary<string, string>();
config["bootstrap.servers"]= "192.168.209.133:9092,192.168.209.134:9092,192.168.209.135:9092";
var builder = new ProducerBuilder<string, object>(config);
builder.SetValueSerializer(new KafkaConverter());//设置序列化⽅式
var producer = builder.Build();
producer.Produce("test", new Message<string, object>() { Key = "Test", Value = "hello world" });
Console.ReadKey();
}
  这两种⽅式是⼀样的效果,只是ProducerConfig对象最终也是⽣成⼀个KeyValuePair<string, string>集合,ProducerConfig中的属性都会有⼀个Key与它对应,⽐如上⾯的ProducerConfig的BootstrapServers属性最终会映射成bootstrap.servers,表⽰Kafka集地址,多个地址之间使⽤逗号分隔。
  其他配置信息可以参考官⽅配置⽂档:
  3、Confluent.Kafka还要求提供⼀个实现了ISerializer<TValue>或者IAsyncSerializer<TValue>接⼝的序列化类型,⽐如上⾯的Demo中的KafkaConverter: 
public class KafkaConverter : ISerializer<object>
{
///<summary>
///序列化数据成字节
///</summary>
///<param name="data"></param>
///<param name="context"></param>
///<returns></returns>
public byte[] Serialize(object data, SerializationContext context)
{
var json = JsonConvert.SerializeObject(data);
return Encoding.UTF8.GetBytes(json);
}
}
  这⾥我采⽤的是Json格式序列化,需要使⽤Nuget安装Newtonsoft.Json。
  4、发布消息使⽤Produce⽅法,它有⼏个重载,还有⼏个异步发布⽅法。第⼀个参数是topic,如果想指定Partition,需要使⽤TopicPartition对象,第⼆个参数是消息,它是Message<TKey, TValue>类型,Key即路由,Value就是我们的消息,消息会经过ISerializer<TValue>接⼝序列化之后发送到Kafka,第三个参数是Action<DeliveryReport<TKey, TValue>>类型的委托,它是异步执⾏的,其实是发布的结果通知。
  消息消费
  先直接上Demo
static void Main(string[] args)
{
ConsumerConfig config = new ConsumerConfig();
config.BootstrapServers = "192.168.209.133:9092,192.168.209.134:9092,192.168.209.135:9092";
config.GroupId = "group.1";
config.AutoOffsetReset = AutoOffsetReset.Earliest;
config.EnableAutoCommit = false;
var builder = new ConsumerBuilder<string, object>(config);
builder.SetValueDeserializer(new KafkaConverter());//设置反序列化⽅式
var consumer = builder.Build();
consumer.Subscribe("test");//订阅消息使⽤Subscribe⽅法
//consumer.Assign(new TopicPartition("test", new Partition(1)));//从指定的Partition订阅消息使⽤Assign⽅法
while (true)
{
var result = consumer.Consume();
Console.WriteLine($"recieve message:{result.Message.Value}");
consumer.Commit(result);//⼿动提交,如果上⾯的EnableAutoCommit=true表⽰⾃动提交,则⽆需调⽤Commit⽅法
}
}
  1、和消息发布⼀样,消费者的构建是通过ConsumerBuilder<, >对象来完成的,同样也有⼀个ConsumerConfig配置对象,它在旧版本中也是不存在的,旧版本中也是使⽤Dictionary<string,string>来实现的,⽐如上⾯的例⼦等价于:
static void Main(string[] args)
{
Dictionary<string, string> config = new Dictionary<string, string>();
config["bootstrap.servers"] = "192.168.209.133:9092,192.168.209.134:9092,192.168.209.135:9092";
config["group.id"] = "group.1";
config["set"] = "earliest";
config["enable.automit"] = "false";
var builder = new ConsumerBuilder<string, object>(config);
builder.SetValueDeserializer(new KafkaConverter());//设置反序列化⽅式
var consumer = builder.Build();
consumer.Subscribe("test");//订阅消息使⽤Subscribe⽅法
//consumer.Assign(new TopicPartition("test", new Partition(1)));//从指定的Partition订阅消息使⽤Assign⽅法
while (true)
{
var result = consumer.Consume();
Console.WriteLine($"recieve message:{result.Message.Value}");
consumer.Commit(result);//⼿动提交,如果上⾯的EnableAutoCommit=true表⽰⾃动提交,则⽆需调⽤Commit⽅法
}
}
  实际上,它和ProducerConfig⼀样也是⼀个KeyValuePair<string, string>集合,它的属性最终都会有⼀个Key与它对应。其他配置信息可以参考官⽅配置⽂档:
  这⾥顺带提⼀下这个例⼦⽤到的⼏个配置: 
BootstrapServers:Kafka集地址,多个地址之间使⽤逗号分隔。
  GroupId:消费者的Group,注意了,Kafka以Group的形式消费消息,⼀个消息只会被同⼀Group中
的⼀个消费者消费,另外,⼀般的,同⼀Group中的消费者应该实现相同的逻辑
  EnableAutoCommit:是否⾃动提交,如果设置成true,那么消费者接收到消息就相当于被消费了,我们可以设置成false,然后在我们处理完逻辑之后⼿动提交。
  AutoOffsetReset:⾃动重置offset的⾏为,默认是Latest,这是kafka读取数据的策略,有三个可选值:Latest,Earliest,Error,个⼈推荐使⽤Earliest   
  关于AutoOffsetReset配置,这⾥再提⼀点 
  Latest:当各分区下有已提交的offset时,从提交的offset开始消费;⽆提交的offset时,消费新产⽣的该分区下的数据
  Earliest:当各分区下有已提交的offset时,从提交的offset开始消费;⽆提交的offset时,从头开始消费
  Error:topic各分区都存在已提交的offset时,从offset后开始消费;只要有⼀个分区不存在已提交的offset,则抛出异常
  上⾯⼏句话说得有点蒙,举个例⼦:
  当有⼀个消费者连接到Kafka,那这个消费者该从哪个位置开始消费呢?
  ⾸先,我们知道Kafka的消费者以组Group的形式去消费,Kafka会记录每个Group在每个Partition中的到哪个位置,也就是offset。 
  当有消费者连接到Kafka要消费消息是,如果这个消费者所在的组Group之前有消费过并提交过offset(也就是存在offset记录),那么这个消费者就从这个offset的位置开始消费,这⼀点Latest,Earliest,Error三个配置的⾏为是⼀样的。
  但是如果连接的消费者所在的组是⼀个新的组时(也就是不存在offset记录),Latest,Earliest,Error三个配置表现出不⼀样的⾏为:
  Latest:从连接到Kafka那⼀刻开始消费之后产⽣的消息,之前发布的消息不在消费,这也是默认的⾏为
  Earliest:从offset最⼩值(如果消息全部有效的话,那就是最开头)处开始消费,也就是说会消费连接到Kafka之前发布的消息
  Error:简单暴⼒的抛出异常
  2、⽣产消息需要序列化,消费消息就需要反序列化了,我们需要提供⼀个实现了IDeserializer<TValue>接⼝的类型,⽐如上⾯的例⼦采⽤Json序列化: 
public class KafkaConverter : IDeserializer<object>
{///<summary>
///反序列化字节数据成实体数据
///</summary>
///<param name="data"></param>
///<param name="context"></param>
///<returns></returns>
public object Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (isNull) return null;
var json = Encoding.UTF8.GetString(data.ToArray());
try
{
return JsonConvert.DeserializeObject(json);
}
catch
{
return json;
}
}
}
  3、Kafka是发布/订阅⽅式的消息队列,Confluent.Kafka提供了两个订阅的⽅法:Subscribe和Assign
  Subscribe:从⼀个或者多个topic订阅消息
  Assign:从⼀个或者多个topic的指定partition中订阅消息
  另外,Confluent.Kafka还提供了两个取消订阅的⽅法:Unsubscribe和Unassign
  4、获取消息使⽤Consume⽅法,⽅法返回⼀个ConsumeResult<TKey, TValue>对象,我们要的消息就在这个对象中,它还包含offset等等其他信息。
  另外,Consume⽅法会导致当前线程阻塞,直⾄有获取到消息可以消费,或者超时。
  5、如果我们创建消费者时,设置了EnableAutoCommit=false,那么我们就需要⼿动调⽤Commit⽅法提交消息,切记。
  完整的Demo例⼦
  上⾯有提到,⽣产消息需要⼀个实现序列化消息接⼝的对象,⽽消费消息需要⼀个实现了反序列化接⼝的对象,这两者建议⽤同⼀个类实现,于是⼀个完整的实现类: 
public class KafkaConverter : ISerializer<object>, IDeserializer<object>
{
///<summary>
///序列化数据成字节
///</summary>
///<param name="data"></param>
///<param name="context"></param>
///<returns></returns>
public byte[] Serialize(object data, SerializationContext context)
{
var json = JsonConvert.SerializeObject(data);
return Encoding.UTF8.GetBytes(json);
}
///<summary>
///反序列化字节数据成实体数据
///</summary>
///<param name="data"></param>
///<param name="context"></param>
///<returns></returns>
public object Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (isNull) return null;
var json = Encoding.UTF8.GetString(data.ToArray());
try
{
return JsonConvert.DeserializeObject(json);
}
catch
{
return json;
}
}
}
  ⼀个完整的Demo例⼦如下: 
static void Main(string[] args)
{
var bootstrapServers = "192.168.209.133:9092,192.168.209.134:9092,192.168.209.135:9092";
var group1 = "group.1";
var group2 = "group.2";
var topic = "test";
new Thread(() =>
{
ConsumerConfig config = new ConsumerConfig();
config.BootstrapServers = bootstrapServers;
config.GroupId = group1;
config.AutoOffsetReset = AutoOffsetReset.Earliest;
config.EnableAutoCommit = false;
var builder = new ConsumerBuilder<string, object>(config);
builder.SetValueDeserializer(new KafkaConverter());//设置反序列化⽅式
var consumer = builder.Build();
//consumer.Subscribe(topic);//订阅消息使⽤Subscribe⽅法
consumer.Assign(new TopicPartition(topic, new Partition(0)));//从指定的Partition订阅消息使⽤Assign⽅法
while (true)
{
var result = consumer.Consume();
Console.WriteLine($"{group1} recieve message:{result.Message.Value}");
consumer.Commit(result);//⼿动提交,如果上⾯的EnableAutoCommit=true表⽰⾃动提交,则⽆需调⽤Commit⽅法
}
}).Start();
new Thread(() =>
{
ConsumerConfig config = new ConsumerConfig();
config.BootstrapServers = bootstrapServers;
config.GroupId = group2;
config.AutoOffsetReset = AutoOffsetReset.Earliest;
config.EnableAutoCommit = false;
var builder = new ConsumerBuilder<string, object>(config);
builder.SetValueDeserializer(new KafkaConverter());//设置反序列化⽅式
var consumer = builder.Build();
//consumer.Subscribe(topic);//订阅消息使⽤Subscribe⽅法
consumer.Assign(new TopicPartition(topic, new Partition(1)));//从指定的Partition订阅消息使⽤Assign⽅法
while (true)
{
var result = consumer.Consume();
Console.WriteLine($"{group2} recieve message:{result.Message.Value}");
consumer.Commit(result);//⼿动提交,如果上⾯的EnableAutoCommit=true表⽰⾃动提交,则⽆需调⽤Commit⽅法
}
}).Start();
int index = 0;
ProducerConfig config = new ProducerConfig();
config.BootstrapServers = bootstrapServers;
var builder = new ProducerBuilder<string, object>(config);
builder.SetValueSerializer(new KafkaConverter());//设置序列化⽅式
var producer = builder.Build();
while (true)
{
Console.Write("请输⼊消息:");
var line = Console.ReadLine();
int partition = index % 3;
var topicPartition = new TopicPartition(topic, new Partition(partition));
producer.Produce(topicPartition, new Message<string, object>() { Key = "Test", Value = line });
index++;
}
}
  封装使⽤
  这⾥做⼀个简单的封装,使⽤⼏个常⽤的配置以⽅便使⽤,当然,还是要使⽤nuget安装 Confluent.Kafka 和 Newtonsoft.Json两个包,具体⼏个类如下: 
public abstract class KafkaBaseOptions
{
///<summary>
///服务器地址
///</summary>
public string[] BootstrapServers { get; set; }
}
KafkaBaseOptions
public class KafkaConsumer : IDisposable
{
ConsumerBuilder<string, object> builder;
List<IConsumer<string, object>> consumers;
bool disposed = false;
///<summary>
/// kafka服务节点
///</summary>
public string BootstrapServers { get; private set; }
///<summary>
///组
///</summary>
public string GroupId { get; private set; }
///<summary>
/
//是否允许⾃动提交(enable.automit)
///</summary>
public bool EnableAutoCommit { get; set; } = false;
///<summary>
///异常事件
writeline教程///</summary>
public event Action<object, Exception> ErrorHandler;
///<summary>
///统计事件
///</summary>
public event Action<object, string> StatisticsHandler;
/
//<summary>
///⽇志事件
///</summary>
public event Action<object, KafkaLogMessage> LogHandler;
public KafkaConsumer(string groupId, params string[] bootstrapServers)
{
if (bootstrapServers == null || bootstrapServers.Length == 0)
{
throw new Exception("at least one server must be assigned");
}
this.GroupId = groupId;
this.BootstrapServers = string.Join(",", bootstrapServers);
}
#region Private
///<summary>
///创建消费者⽣成器
///</summary>
private void CreateConsumerBuilder()
{
if (disposed)
{
throw new ObjectDisposedException(nameof(KafkaConsumer));
}
if (builder == null)
{
lock (this)
{
if (builder == null)
{
ConsumerConfig config = new ConsumerConfig();
config.BootstrapServers = BootstrapServers;
config.GroupId = GroupId;
config.AutoOffsetReset = AutoOffsetReset.Earliest;
config.EnableAutoCommit = EnableAutoCommit;
//config.EnableAutoOffsetStore = true;
//config.IsolationLevel = IsolationLevel.ReadCommitted;
//config.MaxPollIntervalMs = 10000;
//List<KeyValuePair<string, string>> config = new List<KeyValuePair<string, string>>();
//config.Add(new KeyValuePair<string, string>("bootstrap.servers", BootstrapServers));
//config.Add(new KeyValuePair<string, string>("group.id", GroupId));
//config.Add(new KeyValuePair<string, string>("set", "earliest"));
//config.Add(new KeyValuePair<string, string>("enable.automit", EnableAutoCommit.ToString().ToLower()));
//config.Add(new KeyValuePair<string, string>("max.poll.interval.ms", "10000"));
//config.Add(new KeyValuePair<string, string>("session.timeout.ms", "10000"));
//config.Add(new KeyValuePair<string, string>("isolation.level", "read_uncommitted"));
builder = new ConsumerBuilder<string, object>(config);
Action<Delegate, object> tryCatchWrap = (@delegate, arg) =>
{
try
{
@delegate?.DynamicInvoke(arg);
}
catch { }
};
builder.SetErrorHandler((p, e) => tryCatchWrap(ErrorHandler, new Exception(e.Reason)));
builder.SetStatisticsHandler((p, e) => tryCatchWrap(StatisticsHandler, e));
builder.SetLogHandler((p, e) => tryCatchWrap(LogHandler, new KafkaLogMessage(e)));
builder.SetValueDeserializer(new KafkaConverter());
}
}
}
}
///<summary>
///内部处理消息
///</summary>
/
//<param name="consumer"></param>
///<param name="cancellationToken"></param>
///<param name="action"></param>
private void InternalListen(IConsumer<string, object> consumer, CancellationToken cancellationToken, Action<RecieveResult> action)        {
try
{
var result = consumer.Consume(cancellationToken);
if (!cancellationToken.IsCancellationRequested)
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
if (!EnableAutoCommit && result != null)
{
cancellationTokenSource.Token.Register(() =>
{

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。