.NET Aspire Apache Kafka component
In this article, you learn how to use the .NET Aspire Apache Kafka client message-broker. The Aspire.Confluent.Kafka
library registers an IProducer<TKey, TValue>
and an IConsumer<TKey, TValue>
in the dependency injection (DI) container for connecting to a Apache Kafka server. It enables corresponding health check, logging and telemetry.
Get started
To get started with the .NET Aspire Apache Kafka component, install the Aspire.Confluent.Kafka NuGet package.
dotnet add package Aspire.Confluent.Kafka
For more information, see dotnet add package or Manage package dependencies in .NET applications.
Example usage
In the Program.cs file of your component-consuming project, call the AddKafkaProducer extension method to register an IProducer<TKey, TValue>
for use via the dependency injection container. The method takes two generic parameters corresponding to the type of the key and the type of the message to send to the broker. These generic parameters will be used to new an instance of ProducerBuilder<TKey, TValue>
. This method also take connection name parameter.
builder.AddKafkaProducer<string, string>("messaging");
You can then retrieve the IProducer<TKey, TValue>
instance using dependency injection. For example, to retrieve the producer from an IHostedService
:
internal sealed class MyWorker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
App host usage
To model the Kafka resource in the app host, install the Aspire.Hosting.Kafka NuGet package.
dotnet add package Aspire.Hosting.Kafka
In your app host project, register a Kafka container and consume the connection using the following methods:
var builder = DistributedApplication.CreateBuilder(args);
var messaging = builder.AddKafka("messaging");
var myService = builder.AddProject<Projects.MyService>()
.WithReference(messaging);
The WithReference
method configures a connection in the MyService
project named messaging
. In the Program.cs file of MyService
, the Apache Kafka broker connection can be consumed using:
builder.AddKafkaProducer<string, string>("messaging");
or
builder.AddKafkaConsumer<string, string>("messaging");
Configuration
The .NET Aspire Apache Kafka component provides multiple options to configure the connection based on the requirements and conventions of your project.
Use a connection string
When using a connection string from the ConnectionStrings
configuration section, you can provide the name of the connection string when calling builder.AddKafkaProducer()
or builder.AddKafkaProducer()
:
builder.AddKafkaProducer<string, string>("myConnection");
And then the connection string will be retrieved from the ConnectionStrings
configuration section:
{
"ConnectionStrings": {
"myConnection": "broker:9092"
}
}
The value provided as connection string will be set to the BootstrapServers
property of the produced IProducer<TKey, TValue>
or IConsumer<TKey, TValue>
instance. Refer to BootstrapServers for more information.
Use configuration providers
The .NET Aspire Apache Kafka component supports Microsoft.Extensions.Configuration. It loads the KafkaProducerSettings
or KafkaConsumerSettings
from configuration by respectively using the Aspire:Confluent:Kafka:Producer
and Aspire.Confluent:Kafka:Consumer
keys. This example appsettings.json configures some of the options:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
The Config
properties of both Aspire:Confluent:Kafka:Producer
and Aspire.Confluent:Kafka:Consumer
configuration sections respectively bind to instances of ProducerConfig
and ConsumerConfig
.
Confluent.Kafka.Consumer<TKey, TValue>
requires the ClientId
property to be set to let the broker track consumed message offsets.
Use inline delegates
Configuring KafkaProducerSettings
and KafkaConsumerSettings
You can pass the Action<KafkaProducerSettings> configureSettings
delegate to set up some or all the options inline, for example to disable health checks from code:
builder.AddKafkaProducer<string, string>("messaging", settings => settings.DisableHealthChecks = true);
You can configure inline a consumer from code:
builder.AddKafkaConsumer<string, string>("messaging", settings => settings.DisableHealthChecks = true);
Configuring ProducerBuilder<TKey, TValue>
and ConsumerBuilder<TKey, TValue>
To configure Confluent.Kafka
builders, pass an Action<ProducerBuilder<TKey, TValue>>
(or Action<ConsumerBuilder<TKey, TValue>>
):
builder.AddKafkaProducer<string, MyMessage>("messaging", producerBuilder => {
producerBuilder.SetValueSerializer(new MyMessageSerializer());
})
Refer to ProducerBuilder<TKey, TValue>
and ConsumerBuilder<TKey, TValue>
API documentation for more information.
Health checks
By default, .NET Aspire components enable health checks for all services. For more information, see .NET Aspire components overview.
The .NET Aspire Apache Kafka component handles the following:
- Adds the
Aspire.Confluent.Kafka.Producer
health check when KafkaProducerSettings.DisableHealthChecks isfalse
. - Adds the
Aspire.Confluent.Kafka.Consumer
health check when KafkaConsumerSettings.DisableHealthChecks isfalse
. - Integrates with the
/health
HTTP endpoint, which specifies all registered health checks must pass for app to be considered ready to accept traffic.
Observability and telemetry
.NET Aspire components automatically set up Logging, Tracing, and Metrics configurations, which are sometimes known as the pillars of observability. For more information about component observability and telemetry, see .NET Aspire components overview. Depending on the backing service, some components may only support some of these features. For example, some components support logging and tracing, but not metrics. Telemetry features can also be disabled using the techniques presented in the Configuration section.
Logging
The .NET Aspire Apache Kafka component uses the following log categories:
Aspire.Confluent.Kafka
Tracing
The .NET Aspire Apache Kafka component will emit the following tracing activities using OpenTelemetry:
Aspire.Confluent.Kafka
Metrics
The .NET Aspire Apache Kafka component will emit the following metrics using OpenTelemetry:
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received
See also
.NET Aspire
Feedback
https://aka.ms/ContentUserFeedback.
Coming soon: Throughout 2024 we will be phasing out GitHub Issues as the feedback mechanism for content and replacing it with a new feedback system. For more information see:Submit and view feedback for