在C#中使用Kafka客户端需要使用第三方库,如Confluent.Kafka。下面是一个简单的示例,演示如何在C#中使用Confluent.Kafka客户端与Kafka进行交互。 首先,需要使用NuGet安装Confluent.Kafka库: ``` Install-Package Confluent.Kafka ``` 然后可以按照以下示例代码来编写Kafka生产者和消费者: ```csharp using System; using Confluent.Kafka; using System.Threading; class Program { static void Main(string[] args) { // Kafka producer var config = new ProducerConfig { BootstrapServers = "localhost:9092" }; using (var producer = new ProducerBuilder<Null, string>(config).Build()) { producer.Produce("test-topic", new Message<Null, string> { Value = "Hello Kafka" }); producer.Flush(TimeSpan.FromSeconds(10)); Console.WriteLine("Message sent"); } // Kafka consumer var consumerConfig = new ConsumerConfig { BootstrapServers = "localhost:9092", GroupId = "test-group", AutoOffsetReset = AutoOffsetReset.Earliest }; using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build()) { consumer.Subscribe("test-topic"); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; try { while (true) { try { var message = consumer.Consume(cts.Token); Console.WriteLine($"Message received: {message.Value}"); } catch (ConsumeException e) { Console.WriteLine($"Error occurred: {e.Error.Reason}"); } } } catch (OperationCanceledException) { consumer.Close(); } } } } ``` 上面的示例代码中,首先创建了一个Kafka生产者,将一条消息发送到名为"test-topic"的主题中。然后创建了一个Kafka消费者,订阅了同样的主题并循环消费消息。在实际项目中,你需要根据自己的需求来配置Kafka客户端,并实现更复杂的逻辑。 [2024-12-18 13:18:43 | AI写代码神器 | 1537点数解答]