Kafka
Движок работает с Apache Kafka.
Kafka позволяет:
- Публиковать/подписываться на потоки данных.
- Организовать отказоустойчивое хранилище.
- Обрабатывать потоки по мере их появления.
Создание таблицы
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N]
[kafka_commit_every_batch = 0,]
[kafka_client_id = '',]
[kafka_poll_timeout_ms = 0,]
[kafka_poll_max_batch_size = 0,]
[kafka_flush_interval_ms = 0,]
[kafka_thread_per_consumer = 0,]
[kafka_handle_error_mode = 'default',]
[kafka_commit_on_select = false,]
[kafka_max_rows_per_message = 1];
Обязательные параметры:
kafka_broker_list
— перечень брокеров, разделенный запятыми (localhost:9092
).kafka_topic_list
— перечень необходимых топиков Kafka.kafka_group_name
— группа потребителя Kafka. Отступы для чтения отслеживаются для каждой группы отдельно. Если необходимо, чтобы сообщения не повторялись на кластере, используйте везде одно имя группы.kafka_format
— формат сообщений. Названия форматов должны быть теми же, что можно использовать в секцииFORMAT
, например,JSONEachRow
. Подробнее читайте в разделе Форматы.
Опциональные параметры:
kafka_row_delimiter
— символ-разделитель записей (строк), которым завершается сообщение.kafka_schema
— опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, Cap’n Proto требует путь к файлу со схемой и название корневого объектаschema.capnp:Message
.kafka_num_consumers
— количество потребителей (consumer) на таблицу. По умолчанию:1
. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна. Общее число потребителей не должно превышать количество партиций в топике, так как на одну партицию может быть назначено не более одного потребителя.kafka_max_block_size
— максимальный размер пачек (в сообщениях) для poll (по умолчаниюmax_block_size
).kafka_skip_broken_messages
— максимальное количество некорректных сообщений в блоке. Еслиkafka_skip_broken_messages = N
, то движок отбрасываетN
сообщений Кафки, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию – 0.kafka_commit_every_batch
— включает или отключает режим записи каждой принятой и обработанной пачки по отдельности вместо единой записи целого блока (по умолчанию0
).kafka_client_id
— идентификатор клиента. Значение по умолчанию пусто – ''.kafka_poll_timeout_ms
- Таймаут для poll. По умолчанию: (../../../operations/settings/settings.md#stream_poll_timeout_ms)kafka_poll_max_batch_size
- Максимальное количество сообщений в одном poll Kafka. По умолчанию: (../../../operations/settings/settings.md#setting-max_block_size)kafka_flush_interval_ms
- Таймаут для сброса данных из Kafka. По умолчанию: (../../../operations/settings/settings.md#stream-flush-interval-ms)kafka_thread_per_consumer
— включает или отключает предоставление отдельного потока каждому потребителю (по умолчанию0
). При включенном режиме каждый потребитель сбрасывает данные независимо и параллельно, при отключённом — строки с данными от нескольких потребителей собираются в один блок.kafka_handle_error_mode
- Способ обработки ошибок для Kafka. Возможные значения: default, stream.kafka_commit_on_select
- Сообщение о commit при запросе select. По умолчанию:false
.kafka_max_rows_per_message
- Максимальное количество строк записанных в одно сообщение Kafka для формата row-based. По умолчанию:1
.
Примеры
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
SELECT * FROM queue LIMIT 5;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
SETTINGS kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
Устаревший способ создания таблицы
Не используйте этот метод в новых проектах. По возможности переключите старые проекты на метод, описанный выше.
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_skip_broken_messages])
Описание
Полученные сообщения отслеживаются автоматически, поэтому из одной группы каждое сообщение считывается только один раз. Если необходимо получить данные дважды, то создайте копию таблицы с другим именем группы.
Группы пластичны и синхронизированы на кластере. Например, если есть 10 топиков и 5 копий таблицы в кластере, то в каждую копию попадет по 2 топика. Если количество копий изменится, то распределение топиков по копиям изменится автоматически. Подробно читайте об этом на http://kafka.apache.org/intro.
Чтение сообщения с помощью SELECT
не слишком полезно (разве что для отладки), поскольку каждое сообщения может быть прочитано только один раз. Практичнее создавать потоки реального времени с помощью материализованных преставлений. Для этого:
- Создайте потребителя Kafka с помощью движка и рассматривайте его как поток данных.
- Создайте таблицу с необходимой структурой.
- Создайте материализованное представление, которое преобразует данные от движка и помещает их в ранее созданную таблицу.
Когда к движку присоединяется материализованное представление (MATERIALIZED VIEW
), оно начинает в фоновом режиме собирать данные. Это позволяет непрерывно получать сообщения от Kafka и преобразовывать их в необходимый формат с помощью SELECT
.
Материализованных представлений у одной kafka таблицы может быть сколько угодно, они не считывают данные из таблицы kafka непосредственно, а получают новые записи (блоками), таким образом можно писать в несколько таблиц с разным уровнем детализации (с группировкой - агрегацией и без).
Пример:
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
CREATE TABLE daily (
day Date,
level String,
total UInt64
) ENGINE = SummingMergeTree(day, (day, level), 8192);
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;
SELECT level, sum(total) FROM daily GROUP BY level;
Для улучшения производительности полученные сообщения группируются в блоки размера max_insert_block_size. Если блок не удалось сформировать за stream_flush_interval_ms миллисекунд, то данные будут сброшены в таблицу независимо от полноты блока.
Чтобы остановить получение данных топика или изменить логику преобразования, отсоедините материализованное представление:
DETACH TABLE consumer;
ATTACH TABLE consumer;
Если необходимо изменить целевую таблицу с помощью ALTER
, то материализованное представление рекомендуется отключить, чтобы избежать несостыковки между целевой таблицей и данными от представления.
Конфигурация
Аналогично GraphiteMergeTree, движок Kafka поддерживает расширенную конфигурацию с помощью конфигурационного файла ClickHouse. Существует два конфигурационных ключа, которые можно использовать: глобальный (kafka
) и по топикам (kafka_topic_*
). Сначала применяется глобальная конфигурация, затем конфигурация по топикам (если она существует).
<!-- Global configuration options for all tables of Kafka engine type -->
<kafka>
<debug>cgrp</debug>
<auto_offset_reset>smallest</auto_offset_reset>
</kafka>
<!-- Configuration specific for topic "logs" -->
<kafka_logs>
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_logs>
В документе librdkafka configuration reference можно увидеть список возможных опций конфигурации. Используйте подчеркивание (_
) вместо точки в конфигурации ClickHouse. Например, check.crcs=true
будет соответствовать <check_crcs>true</check_crcs>
.
Поддержка Kerberos
Чтобы начать работу с Kafka с поддержкой Kerberos, добавьте дочерний элемент security_protocol
со значением sasl_plaintext
. Этого будет достаточно, если получен тикет на получение тикета (ticket-granting ticket) Kerberos и он кэшируется средствами ОС.
ClickHouse может поддерживать учетные данные Kerberos с помощью файла keytab. Рассмотрим дочерние элементы sasl_kerberos_service_name
, sasl_kerberos_keytab
и sasl_kerberos_principal
.
Пример:
<!-- Kerberos-aware Kafka -->
<kafka>
<security_protocol>SASL_PLAINTEXT</security_protocol>
<sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
<sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
</kafka>
Виртуальные столбцы
_topic
— топик Kafka._key
— ключ сообщения._offset
— оффсет сообщения._timestamp
— временная метка сообщения._timestamp_ms
— временная метка сообщения в миллисекундах._partition
— секция топика Kafka._headers.name
- Массив ключей заголовков сообщений._headers.value
- Массив значений заголовков сообщений.
Смотрите также