Kafka простая аутентификации SASL – PLAIN | inaword

Bootstrap the tutorial environment using Docker Compose.

First, let’s create some directories. We’ll create one for our policy files, a second one for built bundles, and a third
one or the OPA authorizer plugin.

Next, create an OPA policy that allows all requests. You will update this policy later in the tutorial.


With the policy in place, build a bundle from the contents of the policies directory and place it in the bundles
directory. The bundles directory will later be mounted into the Nginx container in order to distribute policy updates
to OPA.

Next, download the latest version of the Open Policy Agent plugin for Kafka authorization
plugin from the projects release pages.

Store the plugin in the plugin directory (replace ${version} with the version number of the plugin just downloaded):

For more information on how to configure the OPA plugin for Kafka, see the plugin repository.

Next, create a docker-compose.yaml file that runs OPA, Nginx, ZooKeeper, and Kafka.


Define a policy to restrict consumer access to topics containing Personally Identifiable Information (PII).

Update the policies/tutorial.rego with the following content.

# High level policy for controlling access to Kafka.
# * Deny operations by default.
# * Allow operations if no explicit denial.
# The kafka-authorizer-opa plugin will query OPA for decisions at
# /kafka/authz/allow. If the policy decision is _true_ the request is allowed.
# If the policy decision is _false_ the request is denied.
package kafka.authz

import future.keywords.in

default allow := false

allow {
	not deny

deny {
	not consumer_is_allowlisted_for_pii

# Data structures for controlling access to topics. In real-world deployments,
# these data structures could be loaded into OPA as raw JSON data. The JSON
# data could be pulled from external sources like AD, Git, etc.

consumer_allowlist := {"pii": {"pii_consumer"}}

topic_metadata := {"credit-scores": {"tags": ["pii"]}}

# Helpers for checking topic access.

topic_contains_pii {
	"pii" in topic_metadata[topic_name].tags

consumer_is_allowlisted_for_pii {
	principal.name in consumer_allowlist.pii

# Helpers for processing Kafka operation input. This logic could be split out
# into a separate file and shared. For conciseness, we have kept it all in one
# place.

is_write_operation {
    input.action.operation == "WRITE"

is_read_operation {
	input.action.operation == "READ"

is_topic_resource {
	input.action.resourcePattern.resourceType == "TOPIC"

topic_name := input.action.resourcePattern.name {

principal := {"fqn": parsed.CN, "name": cn_parts[0]} {
	parsed := parse_user(input.requestContext.principal.name)
	cn_parts := split(parsed.CN, ".")

# If client certificates aren't used for authentication
else := {"fqn": "", "name": input.requestContext.principal.name}

parse_user(user) := {key: value |
	parts := split(user, ",")
	[key, value] := split(parts[_], "=")

The Kafka authorization plugin is configured to query for the
data.kafka.authz.allow decision. If the response is true the operation is
allowed, otherwise the operation is denied. When the integration queries OPA it
supplies a JSON representation of the operation, resource, client, and principal.

  "action": {
    "logIfAllowed": true,
    "logIfDenied": true,
    "operation": "READ",
    "resourcePattern": {
      "name": "credit-scores",
      "patternType": "LITERAL",
      "resourceType": "TOPIC",
      "unknown": false
    "resourceReferenceCount": 1
  "requestContext": {
    "clientAddress": "/",
    "clientInformation": {
      "softwareName": "apache-kafka-java",
      "softwareVersion": "2.8.1"
    "connectionId": "",
    "header": {
      "headerVersion": 2,
      "name": {
        "clientId": "consumer-console-producer-63933-1",
        "correlationId": 10,
        "requestApiKey": 1,
        "requestApiVersion": 12
    "listenerName": "SSL",
    "principal": {
      "name": "CN=pii_consumer,OU=developers",
      "principalType": "User"
    "securityProtocol": "SSL"

With the input value above, the answer is:

The ./bundles directory is mounted into the Docker container running Nginx.
When the bundle under this directory change, OPA is notified via the bundle API,
and the policies are automatically reloaded.

You can update the bundle at any time by rebuilding it.

At this point, you can exercise the policy.

Exercise the policy that restricts producer access to topics with high fanout.

First, run kafka-console-producer and simulate a service with access to the
click-stream topic.

Next, run the kafka-console-consumer to confirm that the messages were published.

Once you see the 10 messages produced by the first part of this step, exit the console consumer (^C).

Lastly, run kafka-console-producer to simulate a service that should not
have access to high fanout topics.

Because anon_producer is not authorized to write to high fanout topics, the
request will be denied and the producer will output an error message.

Not authorized to access topics: [click-stream]


The Docker Compose file defined above requires SSL client authentication
for clients that connect to the broker. Enabling SSL client authentication
allows for service identities to be provided as input to your policy. The
example below shows the input structure.


Authorization using simpleaclauthorizer:

Apache Kafka ships out with simple default authorization implementation kafka.security.auth.SimpleAclAuthorizerfor all ACL operations. SimpleAcl Authroizer comes up create, read, write, cluster_action, alter_configs, describe and delete permissions.

Command line interface

Kafka Authorization management CLI can be found under bin directory with all the other CLIs. The CLI script is called kafka-acls.sh. Following lists all the options that the script supports. 

OptionDescriptionDefaultOption type
–addIndicates to the script that user is trying to add an acl. Action
–removeIndicates to the script that user is trying to remove an acl. Action
–listIndicates to the script that user is trying to list acls. Action

Fully qualified class name of the authorizer.


comma separated key=val pairs that will be passed to authorizer for initialization.

–clusterSpecifies cluster as resource. Resource
–topic <topic-name>Specifies the topic as resource. Resource
–consumer-group <consumer-group>Specifies the consumer-group as resource. Resource

Principal is in PrincipalType:name format.

These principals will be used to generate an ACL with Allow permission.

Multiple principals can be specified in a single command by specifying this option multiple times, i.e.

–allow-principal User:test1@EXAMPLE.COM –allow-principal User:test2@EXAMPLE.COM


Principal is in PrincipalType:name format.

These principals will be used to generate an ACL with Deny permission.

Multiple principals can be specified in the same way as described in –allow-principal option.


Comma separated list of hosts from which principals listed in –allow-principals will have access.

if –allow-principals is specified defaults to * which translates to “all hosts”Host
–deny-hostsComma separated list of hosts from which principals listed in –deny-principals will be denied access.if –deny-principals is specified defaults to * which translates to “all hosts”Host

Comma separated list of operations.

Valid values are : Read, Write, Create, Delete, Alter, Describe, ClusterAction, All


Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE,
DESCRIBE on topic and CREATE on cluster.


Convenience option to add/remove acls for consumer role. This will generate acls that allows READ,
DESCRIBE on topic and READ on consumer-group.

Похожее:  4. UserGate Getting Started. Работа с пользователями / Хабр

Encryption using ssl:

SSL(Secure Sockets Layer) can be configured for encryption and also serves as 2-way authentication between client and server. i.e broker authenticates client using client truststore certificate and client authenticates broker using broker truststore certificate.

You could refer this Github repo for spinning up secure cluster by adding the required configs locally on your machine.Now, Let’s get onto working example:

Download Apache Kafka binary from open source Apache Kafka Downloads.

SSL Certificate and Key generation: Create Kafka broker SSL keystore and truststore certificate using confluent-platform-security-tools generate ssl script. Make a note of keystore and truststore password which you pass in while executing this script, we would need these for configuring Kafka server.properties.

keystore and truststore in config folder

After generating truststore and keystore folders through this script, we will copy them over to config folder in downloaded Kafka binary directory.

We will execute all below commands by navigating to <kafka-binary-dir>/ in terminal.

Start Zookeeper

./bin/zookeeper-server-start.sh config/zookeeper.properties


This tutorial shows how to enforce fine-grained access control over Kafka
topics. In this tutorial you will use OPA to define and enforce an
authorization policy stating:

  • Consumers of topics containing Personally Identifiable Information (PII) must be on allow list.
  • Producers to topics with high fanout must be on allow list.

In addition, this tutorial shows how to break up a policy with small helper
rules to reuse logic and improve overall readability.

Java клиент на spring boot

Напишем минималистичный клиент с возможностью публикации и чтения сообщений из kafka. Для этого создадим новый проект через

или любой другой привычный вам инструмент. Будет достаточно подключить одну зависимость:

Настройку Spring Boot приложения я предпочитаю по возможности реализовывать через конфигурационные yaml файлы с минимальной конфигурацией в Java коде. За счет встроенного структурирования в Yaml-формат, эти файлы гораздо удобнее для человеческого восприятия, чем .properties файлы и позволяют группировать настройки по модулям.

List acls

We can list acls for any resource by specifying the –list option with the resource. To list all acls for Test-topic we can execute the CLI with following options:


This tutorial requires Docker Compose to run Kafka, ZooKeeper, and OPA.

Additionally, we’ll use Nginx for serving policy and data bundles to OPA. This component is however easily replaceable
by any other bundle server implementation.

Ssl для других механизмов sasl протокола

Мы рассмотрели защиту соединения с помощью SSL  для протокола SASL и его механизма аутентификации PLAIN. Из первой статьи мы знаем что у SASL сейчас как минимум четыре механизма аутентификации:

Можно рассмотреть добавление SSL к каждому механизму, а можно просто понять принцип добавления SSL к каждому механизму.

А принцип простой

  1. В соединении указываем SASL_SSL
  2. Добавляем выбор самого механизма SASL
  3. Добавляем настройки выбранного механизма аутентификации SASL
  4. Добавляем настройки SSL

Следуя этой инструкции, Вы сможете настроить любой механизм аутентификации SASL с SSL.

Wrap up

Congratulations on finishing the tutorial!

At this point you have learned how to enforce fine-grained access control
over Kafka topics. In addition, you have seen how to break down policies into
smaller rules that can be reused and improve the overall readability over the

If you want to use the Kafka Authorizer plugin that integrates Kafka with
OPA, see the build and install instructions in the

Выбор docker образа apache kafka zookeeper

Для начала придется определиться с вендором образа Apache Kafka для нашего контейнера. Требования каждого конкретного проекта разнятся по необходимому уровню безопасности и надежности решения, в некоторых случаях конечно придется собирать свой собственный образ, но для большинства проектов будет разумным выбрать один из доступных на docker hub. На 2020й год есть три основных популярных вендора образов kafka zookeper:

Добавим классы producer и consumer

Слушатель получает сообщение и логирует все доступные сведения о нем в консоль и комитит факт считывания.


В предыдущей статье мы уже написали кастомный аутентификатор, который соединяется с сервером с учетными данными и отправляет ему запрос на проверку аккаунта клиента.

В этой статье мы добавим в нашу схему авторизатор, который будет подключаться к тому же серверу и запрашивать группы пользователей, в которых состоит пользователь. А доступ к топикам будем давать группам пользователей.

Например, если мы дадим группе ALICE_TOPIC_WRITE_GROUP доступ на запись в топик topic.alice, то все пользователи, которые состоят в группе ALICE_TOPIC_WRITE_GROUP смогут писать в топик topic.alice. Т.е. задача нашего авторизатора получить список групп по пользователю у сервера с учетными данными и проверить, имеет ли право пользователь выполнять действия на основе списка ACL.

Похожее:  Нижнекамск: оплата квитанций ЖКХ. Коммунальные платежи онлайн

В итоге, что нам нужно сделать:

  1. Написать кастомный авторизатор
  2. Подсунуть его Kafka
  3. Указать в конфиге Kafka, что будем использовать свой авторизатор

Запуск контейнеров

В итоге мы должны получить 2 файла, которые находятся в одной директории:

  1. docker-compose.yml
  2. kafka_server_jaas.conf

В этой директории нужно вызвать команду:

$ docker-compose up -d



позволяет запуститься в Detached mode и закрыть консоль при необходимости без выключения образов.

Запустим добавление сообщений в очередь каждые 3 секунды

для включения шедулера надо повесить аннотацию @EnableScheduling на наше spring приложение

Voilà! и приложение готово к запуску. Можно запускать dev вариант с локальным клиентом и локальной kafka, а так же к kafka можно получить доступ из других сетей если открыть 9092 порт и указать в application.yml адрес хоста.

Настройки безопасности kafka

Прежде всего напомним понятия:

  • аутентификация — проверка что клиент является тем за кого себя выдает
  • авторизация — проверка прав клиента на доступ к ресурсу

В предыдущей статье мы разобрали три протокола аутентификации в Kafka.

  1. Без аутентификации
  2. SSL
  3. SASL

При этом канал может быть защищен SSL.

В конфигурации сервера Kafka, выбранный протокол задается с помощью опций, следующим образом:

  1. Без аутентификации — по умолчанию, можно ничего не указывать
  2. SSL — необходимо в конфиге сервера и клиента указать
  3. SASL — необходимо в конфиге сервера и клиента указать:
    если без SSL:
  4. если c SSL:

    А затем нам необходимо уточнить SASL механизм аутентификации (мы рассмотрим только два):

    1. встроенный механизм:
      На сервере
    2. На клиенте

    3. Для GSSAPI (Kerberos):
      На сервере
    4. На клиенте

Для наглядности приведем диаграмму с настройками выбора

Помните, что security.protocol=SASL_SSL — это аутентификация протоколами SASL по каналу, защищенному SSL. А security.protocol=SSL — это аутентификация с помощью SSL по каналу защищенному SSL.

Важно: если указать на сервере  security.protocol=SASL_SSL, но не указать sasl.enabled.mechanism, то мы получим ошибку при запуске сервера: 

Однако, такая же ошибка может выскочить, если мы не указали serviceName в конфигурации JAAS клиента (с JAAS начнем знакомиться в следующей статье)

Затем на сервере указываем адрес и порт и протокол аутентификации:



Где [port], [domain] — порт и домен на котором Kafka будет работать с протоколом аутентификации [security_protocol]. При этом [security_protocol] — это то что Вы писали в свойстве security.protocol на сервере.


Kafka умеет работать с несколькими протоколами аутентификации одновременно, но на разных портах и IP, для этого просто необходимо через запятую в listeners,  например:

Kafka также умеет работать с несколькими SASL механизмами, для этого их достаточно перечислить в sasl.enabled.mechanisms через запятую.

В Kafka можно выделить два типа соединений:

  1. Соединения с обычными клиентами
  2. Межброкерное взаимодейтсвие. Т.е. взаимодействие между узлами Kafka. Зачем это нужно? Kafka — это масштабируемаяотказоустойчивая система. Вкратце. Для обеспечения масштабируемости и отказоустойчивости Kafkа запускают на нескольких серверах. Грубо говоря, чтобы дублировать сообщения. Если один сервер выйдет из строя, то его заменит другой. В этом случае, каждый сервер Kafka должен уметь общаться с остальными, чтобы получать копии сообщений и держать актуальное состояние! Это и есть межброкерное или inter-broker взаимодействие.

В listeners описываются параметры всех соединений, как для обычных соединений так и для соединений межброкерного взаимодействия!

Вдаваться в подробности зачем нужен advertised.listeners мы пока не будем. Просто примем за правило что advertised.listeners дублирует listeners.

В наших статьях мы занимаемся настройкой прежде всего защиты соединения между внешними клиентами и Kafka! А межброкерное взаимодействие мы пока оставляем как есть, т.е. PLAINTEXT! Поэтому у нас всегда будет описано одно соединение с PLAINTEXT и одно дополнительное для наших клиентов, если оно отличается от  PLAINTEXT.

Если Kafka не найдет описание соединения для межброкерного взаимодействия в listeners, то может быть такая ошибка:

Тут упоминается SASL_PLAINTEXT. Дело в том что протокол межброкерного взаимодействия указывается отдельно. А если не указан, то ставится по умолчанию PLAINTEXT. Поэтому в listeners и должно быть описано одно соединение с PLAINTEXT. В данном случае пользователь про это забыл и указал только соединение SASL_PLAINTEXT.

Пока что, в наших статьях, мы будем настраивать защиту именно для внешних клиентов!

Помимо настроек выбора протокола аутентификации и механизма, нам еще потребуется указать сами настройки для каждого конкретного механизма. Их мы будем разбирать на конкретных примерах в следующих статьях.

Настройки клиента

Как мы узнали из второй статьи, выбор протокола SASL_PLAINTEXT и механизма PLAIN для клиента задается настройками:

Операции в списках acl kafka

В предыдущей статье для проверки работы авторизации AC, мы давали все (ALL) права клиенту, чтобы не разбираться. Но, как Вы понимаете, это не безопасно. Зачем вводить систему авторизации, если мы все равно даем все права? Это было сделано для простоты рассмотрения примера. Сейчас настало время разобраться с операциями и настроить списки ACL правильно.

Мы рассмотрим пока только некоторые операции необходимые для управления топиками и группами. Названия у них говорят сами за себя, поэтому пояснять детально не будем.

  • All — все права
  • Read — право на чтение. Например, из топика или их группы
  • Write — право на запись. Например, в топик или группы
  • Create — создание. Например, право на запись в топик или группу
  • Delete — удаление
  • Alter — изменение

Пишем yaml файл для compose

Общая схема нашей конфигурации для всех клиентов и контейнеров будет следующей:

К одному координатору будет подключаться один брокер с открытыми портами 9093 — для localhost подключений и 9092 для подключений с других машин.

Подготовка к тестированию

Для подготовки к тестированию нам сначала нужно поправить конфигурацию сервера Kafka, а именно:

  1. Добавить к конфигурации прошлой статьи конфигурацию ACL и указать наш авторизатор
  2. Добавить суперпользователя ACL
  3. Добавить настройку нашего авторизатора, в которой указан сервер учетных данных

Теперь конфиг будет выглядеть так:

Пример с группой

С группами не все так однозначно, как с топиками. Для того, чтобы клиент мог читать из группы, необходимо дать клиенту право на чтение из топика, из которого будем читать и право на чтение из группы, из которой будем читать. Если такой группы нет, то она создастся автоматически и право на создание группы не потребуется.

Давайте дадим Робину право на чтение из топика Алисы topic.alice и право на чтение из группы group.robin.from.alice.

Для чтения сообщение нам потребуется соответствующий клиент. В репозитории он называется SimpleConsumer. Давайте приведем код клиента:

И код, который будет его запускать:

И давайте запустим клиент. В итоге, мы получим информацию о том, что прочитано одно сообщение:

Что собой представляет операция чтения из группы? Это операция считывания сообщения и операция сохранения в группе номера следующего за считываемым сообщением, чтобы при следующем чтении из группы Kafka отдавала уже следующее сообщение. Номер следующего сообщения называется смещением, а операция установки смещения — коммит.

А теперь давайте попробуем считать Алисой сообщения из своего топика — alice.topic из группы group.robin.from.alice. Но для этого она должна иметь право на чтение из этого топика. Ранее мы давали только право на запись и создание. Давайте дадим право на чтение:

Пример с топиком

Давайте дадим Алисе право на запись в топик topic.alice, а также право на создание топика topic.alice. Право на создание дается, потому что такого топика еще нет и при первой записи в него Алисе потребуется его создать. Обратите внимание, что мы даем право на две операции одной командой:

А теперь попытаемся писать в топик от имени Алисы с помощью нашего клиента (поменяйте в клиенте учетные данные и название топика). В результате получим:

Топик создался и сообщения благополучно в него пишутся.

Установка docker-compose

Для запуска и конфигурирования образов используется

, который позволяет легко запускать мультиконтейнерные приложения. Без него не обойтись, потому что минимальная конфигурация kafka предусматривает связку из zookeeper и хотя бы одного брокера.

Extend the policy to prevent services from accidentally writing to topics with large fanout.

First, add the following content to the policy file (./policies/tutorial.rego):

Next, update the topic_metadata data structure in the same file to indicate
that the click-stream topic has a high fanout.

Last, build a bundle from the updated policy.

1 Звезда2 Звезды3 Звезды4 Звезды5 Звезд (1 оценок, среднее: 5,00 из 5)

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *