The documentation you are viewing is for Dapr v1.5 which is an older version of Dapr. For up-to-date documentation, see the latest version.

Apache Kafka

关于Apache Kafka pubsub组件的详细文档

配置

要设置Apache Kafka pubsub,请创建一个pubsub.kafka类型的组件。 请参阅本指南,了解如何创建和应用 pubsub 配置。 有关使用 secretKeyRef的详细信息,请参阅有关如何在组件中引用Secret指南

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub
  namespace: default
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # Required. Kafka broker connection setting
    value: "dapr-kafka.myapp.svc.cluster.local:9092"
  - name: consumerGroup # Optional. Used for input bindings.
    value: "group1"
  - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
    value: "my-dapr-app-id"
  - name: authRequired # Required.
    value: "true"
  - name: saslUsername # Required if authRequired is `true`.
    value: "adminuser"
  - name: saslPassword # Required if authRequired is `true`.
    secretKeyRef:
      name: kafka-secrets
      key: saslPasswordSecret
  - name: maxMessageBytes # Optional.
    value: 1024
  - name: consumeRetryInterval # Optional.
    value: 200ms
  - name: version # Optional.
    value: 0.10.2.0

元数据字段规范

字段 必填 详情 示例
brokers Y 逗号分隔的kafka broker列表. "localhost:9092,dapr-kafka.myapp.svc.cluster.local:9093"
consumerGroup N 监听 kafka 消费者组。 发布到主题的每条记录都会传递给订阅该主题的每个消费者组中的一个消费者。 "group1"
clientID N 用户提供的字符串,随每个请求一起发送到 Kafka 代理,用于日志记录、调试和审计目的。 默认为 "sarama" "my-dapr-app"
authRequired Y 启用 SASL 对 Kafka broker 的身份验证。 "true", "false"
saslUsername N 用于身份验证的 SASL 用户名。 仅当 authRequired 设置为 "true"时才需要。 "adminuser"
saslPassword N 用于身份验证的 SASL 密码。 可以用secretKeyRef引用 Secret。 仅当 authRequired 设置为 "true"时才需要。 "", "KeFg23!"
initialOffset N 如果以前未提交任何偏移量,则要使用的初始偏移量。 应为"newest"或"oldest”。 默认为"newest”。 "oldest"
maxMessageBytes N 单条Kafka消息允许的最大消息的字节大小。 默认值为 1024。 2048
consumeRetryInterval N 尝试消费主题时重试的间隔。 将不带后缀的数字视为毫秒。 默认值为 100ms。 200ms
version N Kafka 集群版本。 默认值为 2.0.0.0 0.10.2.0
caCert N 证书颁发机构证书,使用 TLS 时需要。 可以用secretKeyRef来引用密钥。 "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientCert N 客户端证书,使用 TLS 时需要。 可以用secretKeyRef来引用密钥。 "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
clientKey N 客户端密钥,使用 TLS 时需要。 可以用secretKeyRef来引用密钥。 "-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded PKCS8>\n-----END RSA PRIVATE KEY-----"
skipVerify N 跳过 TLS 验证,不建议在生产中使用。 默认值为 "false" "true", "false"

使用 TLS 通信

要配置使用 TLS 通信,需配置并确保 Kafka broker支持凭证。 前提条件包括certficate authority certificateca issued client certificateclient private key。 下面是配置为使用 TLS 的 Kafka pubsub 组件的示例:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub
  namespace: default
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers # Required. Kafka broker connection setting
    value: "dapr-kafka.myapp.svc.cluster.local:9092"
  - name: consumerGroup # Optional. Used for input bindings.
    value: "group1"
  - name: clientID # Optional. Used as client tracing ID by Kafka brokers.
    value: "my-dapr-app-id"
  - name: authRequired # Required.
    value: "true"
  - name: saslUsername # Required if authRequired is `true`.
    value: "adminuser"
  - name: consumeRetryInterval # Optional.
    value: 200ms
  - name: version # Optional.
    value: 0.10.2.0
  - name: saslPassword # Required if authRequired is `true`.
    secretKeyRef:
      name: kafka-secrets
      key: saslPasswordSecret
  - name: maxMessageBytes # Optional.
    value: 1024
  - name: caCert # Certificate authority certificate.
    secretKeyRef:
      name: kafka-tls
      key: caCert
  - name: clientCert # Client certificate.
    secretKeyRef:
      name: kafka-tls
      key: clientCert
  - name: clientKey # Client key.
    secretKeyRef:
      name: kafka-tls
      key: clientKey
auth:
  secretStore: <SECRET_STORE_NAME>

上面的 secretKeyRef 引用了一个 kubernetes secrets 存储 来访问 tls 信息。 访问此处 ,了解有关如何配置密钥存储组件的详细信息。

每次调用的元数据字段

分区键

当调用Kafka 发布/订阅时,可以通过在请求url中使用metadata查询参数来提供一个可选的分区键。

参数名是partitionKey

示例:

curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.partitionKey=key1 \
  -H "Content-Type: application/json" \
  -d '{
        "data": {
          "message": "Hi"
        }
      }'

创建 Kafka 实例


你可以使用这个Docker镜像在本地运行Kafka。 要在没有Docker的情况下运行,请参阅入门指南


To run Kafka on Kubernetes, you can use any Kafka operator, such as Strimzi.

相关链接