The documentation you are viewing is for Dapr v1.5 which is an older version of Dapr. For up-to-date documentation, see the latest version.
Apache Kafka
配置
要设置Apache Kafka pubsub,请创建一个pubsub.kafka
类型的组件。 请参阅本指南,了解如何创建和应用 pubsub 配置。 有关使用 secretKeyRef
的详细信息,请参阅有关如何在组件中引用Secret指南。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub
namespace: default
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # Required. Kafka broker connection setting
value: "dapr-kafka.myapp.svc.cluster.local:9092"
- name: consumerGroup # Optional. Used for input bindings.
value: "group1"
- name: clientID # Optional. Used as client tracing ID by Kafka brokers.
value: "my-dapr-app-id"
- name: authRequired # Required.
value: "true"
- name: saslUsername # Required if authRequired is `true`.
value: "adminuser"
- name: saslPassword # Required if authRequired is `true`.
secretKeyRef:
name: kafka-secrets
key: saslPasswordSecret
- name: maxMessageBytes # Optional.
value: 1024
- name: consumeRetryInterval # Optional.
value: 200ms
- name: version # Optional.
value: 0.10.2.0
元数据字段规范
字段 | 必填 | 详情 | 示例 |
---|---|---|---|
brokers | Y | 逗号分隔的kafka broker列表. | "localhost:9092,dapr-kafka.myapp.svc.cluster.local:9093" |
consumerGroup | N | 监听 kafka 消费者组。 发布到主题的每条记录都会传递给订阅该主题的每个消费者组中的一个消费者。 | "group1" |
clientID | N | 用户提供的字符串,随每个请求一起发送到 Kafka 代理,用于日志记录、调试和审计目的。 默认为 "sarama" 。 |
"my-dapr-app" |
authRequired | Y | 启用 SASL 对 Kafka broker 的身份验证。 | "true" , "false" |
saslUsername | N | 用于身份验证的 SASL 用户名。 仅当 authRequired 设置为 "true" 时才需要。 |
"adminuser" |
saslPassword | N | 用于身份验证的 SASL 密码。 可以用secretKeyRef 来引用 Secret。 仅当 authRequired 设置为 "true" 时才需要。 |
"" , "KeFg23!" |
initialOffset | N | 如果以前未提交任何偏移量,则要使用的初始偏移量。 应为"newest"或"oldest”。 默认为"newest”。 | "oldest" |
maxMessageBytes | N | 单条Kafka消息允许的最大消息的字节大小。 默认值为 1024。 | 2048 |
consumeRetryInterval | N | 尝试消费主题时重试的间隔。 将不带后缀的数字视为毫秒。 默认值为 100ms。 | 200ms |
version | N | Kafka 集群版本。 默认值为 2.0.0.0 | 0.10.2.0 |
caCert | N | 证书颁发机构证书,使用 TLS 时需要。 可以用secretKeyRef 来引用密钥。 |
"-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----" |
clientCert | N | 客户端证书,使用 TLS 时需要。 可以用secretKeyRef 来引用密钥。 |
"-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----" |
clientKey | N | 客户端密钥,使用 TLS 时需要。 可以用secretKeyRef 来引用密钥。 |
"-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded PKCS8>\n-----END RSA PRIVATE KEY-----" |
skipVerify | N | 跳过 TLS 验证,不建议在生产中使用。 默认值为 "false" |
"true" , "false" |
使用 TLS 通信
要配置使用 TLS 通信,需配置并确保 Kafka broker支持凭证。 前提条件包括certficate authority certificate
、ca issued client certificate
、client private key
。 下面是配置为使用 TLS 的 Kafka pubsub 组件的示例:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub
namespace: default
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # Required. Kafka broker connection setting
value: "dapr-kafka.myapp.svc.cluster.local:9092"
- name: consumerGroup # Optional. Used for input bindings.
value: "group1"
- name: clientID # Optional. Used as client tracing ID by Kafka brokers.
value: "my-dapr-app-id"
- name: authRequired # Required.
value: "true"
- name: saslUsername # Required if authRequired is `true`.
value: "adminuser"
- name: consumeRetryInterval # Optional.
value: 200ms
- name: version # Optional.
value: 0.10.2.0
- name: saslPassword # Required if authRequired is `true`.
secretKeyRef:
name: kafka-secrets
key: saslPasswordSecret
- name: maxMessageBytes # Optional.
value: 1024
- name: caCert # Certificate authority certificate.
secretKeyRef:
name: kafka-tls
key: caCert
- name: clientCert # Client certificate.
secretKeyRef:
name: kafka-tls
key: clientCert
- name: clientKey # Client key.
secretKeyRef:
name: kafka-tls
key: clientKey
auth:
secretStore: <SECRET_STORE_NAME>
上面的 secretKeyRef
引用了一个 kubernetes secrets 存储 来访问 tls 信息。 访问此处 ,了解有关如何配置密钥存储组件的详细信息。
每次调用的元数据字段
分区键
当调用Kafka 发布/订阅时,可以通过在请求url中使用metadata
查询参数来提供一个可选的分区键。
参数名是partitionKey
。
示例:
curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.partitionKey=key1 \
-H "Content-Type: application/json" \
-d '{
"data": {
"message": "Hi"
}
}'
创建 Kafka 实例
To run Kafka on Kubernetes, you can use any Kafka operator, such as Strimzi.
相关链接
- Dapr组件的基本格式
- 阅读 本指南,了解配置 发布/订阅组件的说明
- 发布/订阅构建块
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.