Коннектор HTTP Sink Confluent
Коннектор HTTP Sink не привязан к типу данных и, таким образом, не требует схемы Kafka, а также поддерживает специфические для ClickHouse типы данных, такие как Maps и Arrays. Эта дополнительная гибкость приводит к незначительному увеличению сложности конфигурации.
Ниже мы опишем простую установку, извлекая сообщения из одной темы Kafka и вставляя строки в таблицу ClickHouse.
Коннектор HTTP распространяется по Лицензии Confluent Enterprise.
Шаги быстрого старта
1. Соберите детали подключения
Чтобы подключиться к ClickHouse с помощью HTTP(S), вам необходима следующая информация:
-
ХОСТ и ПОРТ: как правило, порт 8443 при использовании TLS или 8123 при отсутствии TLS.
-
ИМЯ БАЗЫ ДАННЫХ: по умолчанию существует база данных с именем
default
, используйте имя базы данных, к которой вы хотите подключиться. -
ИМЯ ПОЛЬЗОВАТЕЛЯ и ПАРОЛЬ: по умолчанию имя пользователя
default
. Используйте имя пользователя, подходящее для вашего случая.
Данные для вашего сервиса ClickHouse Cloud доступны в консоли ClickHouse Cloud. Выберите сервис, к которому вы хотите подключиться, и нажмите Подключиться:

Выберите HTTPS, и данные будут доступны в примере команды curl
.

Если вы используете самоуправляемый ClickHouse, детали подключения устанавливаются вашим администратором ClickHouse.
2. Запустите Kafka Connect и коннектор HTTP Sink
У вас есть два варианта:
-
Самоуправляемый: Скачайте пакет Confluent и установите его локально. Следуйте инструкциям по установке коннектора, как документировано здесь. Если вы используете метод установки confluent-hub, ваши локальные конфигурационные файлы будут обновлены.
-
Confluent Cloud: Полностью управляемая версия HTTP Sink доступна для тех, кто использует Confluent Cloud для хостинга Kafka. Это требует, чтобы ваша среда ClickHouse была доступна из Confluent Cloud.
В следующих примерах используется Confluent Cloud.
3. Создайте целевую таблицу в ClickHouse
Прежде чем проводить тест на подключение, давайте начнем с создания тестовой таблицы в ClickHouse Cloud, эта таблица будет получать данные из Kafka:
4. Настройте HTTP Sink
Создайте тему Kafka и экземпляр коннектора HTTP Sink:

Настройте коннектор HTTP Sink:
- Укажите имя темы, которую вы создали
- Аутентификация
HTTP Url
- URL ClickHouse Cloud с указанным запросомINSERT
<protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow
. Примечание: запрос должен быть закодирован.Тип аутентификации для конечной точки
- BASICИмя пользователя аутентификации
- имя пользователя ClickHouseПароль аутентификации
- пароль ClickHouse
Этот HTTP Url подвержен ошибкам. Убедитесь, что экранирование точное, чтобы избежать проблем.

- Конфигурация
Формат значения записи Kafka
зависит от ваших исходных данных, но в большинстве случаев это JSON или Avro. В следующих настройках мы предполагаемJSON
.- В разделе
расширенные настройки
:Метод HTTP-запроса
- Установите на POSTФормат тела запроса
- jsonРазмер пакета
- в соответствии с рекомендациями ClickHouse, установите это на как минимум 1000.JSON как массив
- trueПовторная попытка при HTTP-кодах
- 400-500, но подстраивайте по мере необходимости, например, это может измениться, если у вас есть HTTP-прокси перед ClickHouse.Максимальные повторные попытки
- по умолчанию (10) является приемлемым, но вы можете настроить для более надежных повторных попыток.

5. Тестирование соединения
Создайте сообщение в теме, настроенной вашим HTTP Sink

и проверьте, что созданное сообщение было записано в ваш экземпляр ClickHouse.
Устранение неполадок
HTTP Sink не пакетирует сообщения
Коннектор HTTP Sink не пакетирует запросы для сообщений, содержащих разные значения заголовков Kafka.
- Убедитесь, что ваши записи Kafka имеют одинаковый ключ.
- Когда вы добавляете параметры к URL HTTP API, каждая запись может привести к уникальному URL. По этой причине пакетирование отключено при использовании дополнительных параметров URL.
400 неверный запрос
CANNOT_PARSE_QUOTED_STRING
Если HTTP Sink завершает работу с следующим сообщением при вставке объекта JSON в колонку String
:
Установите настройку input_format_json_read_objects_as_strings=1
в URL как закодированную строку SETTINGS%20input_format_json_read_objects_as_strings%3D1
Загрузите набор данных GitHub (по желанию)
Обратите внимание, что этот пример сохраняет поля Array из набора данных Github. Мы предполагаем, что у вас есть пустая тема github в примерах, и используем kcat для вставки сообщений в Kafka.
1. Подготовьте конфигурацию
Следуйте этим инструкциям для настройки Connect в зависимости от вашего типа установки, обратив внимание на различия между независимым и распределенным кластером. Если вы используете Confluent Cloud, соответствующая распределенная установка.
Наиболее важный параметр - это http.api.url
. HTTP интерфейс для ClickHouse требует, чтобы вы закодировали оператор INSERT как параметр в URL. Это должно включать формат (JSONEachRow
в данном случае) и целевую базу данных. Формат должен соответствовать данным Kafka, которые будут преобразованы в строку в полезной нагрузке HTTP. Эти параметры должны быть закодированы для URL. Пример этого формата для набора данных Github (при условии, что вы запускаете ClickHouse локально) представлен ниже:
Следующие дополнительные параметры имеют отношение к использованию HTTP Sink с ClickHouse. Полный список параметров можно найти здесь:
request.method
- Установите на POSTretry.on.status.codes
- Установите на 400-500, чтобы повторно попытаться при любых кодах ошибок. Уточняйте, исходя из ожидаемых ошибок в данных.request.body.format
- В большинстве случаев это будет JSON.auth.type
- Установите на BASIC, если у вас есть безопасность с ClickHouse. Другие механизмы аутентификации, совместимые с ClickHouse, в настоящее время не поддерживаются.ssl.enabled
- установите в true, если используете SSL.connection.user
- имя пользователя для ClickHouse.connection.password
- пароль для ClickHouse.batch.max.size
- количество строк, которые нужно отправлять в одном пакете. Убедитесь, что это значение установлено на соответствующе большое число. В соответствии с рекомендациями ClickHouse, значение 1000 следует считать минимумом.tasks.max
- Коннектор HTTP Sink поддерживает выполнение одной или нескольких задач. Это может быть использовано для увеличения производительности. Наряду с размером пакета это является основным способом повышения производительности.key.converter
- установите в соответствии с типами ваших ключей.value.converter
- установите в зависимости от типа данных в вашей теме. Эти данные не требуют схемы. Формат здесь должен соответствовать формату, указанному в параметреhttp.api.url
. Самым простым вариантом здесь является использование JSON и конвертера org.apache.kafka.connect.json.JsonConverter. Также возможно рассматривать значение как строку с помощью конвертера org.apache.kafka.connect.storage.StringConverter, хотя это потребует от пользователя извлечь значение в операторе вставки с использованием функций. Формат Avro также поддерживается в ClickHouse при использовании конвертера io.confluent.connect.avro.AvroConverter.
Полный список настроек, включая конфигурацию прокси, повторные попытки и расширенный SSL, можно найти здесь.
Примеры конфигурационных файлов для выборки данных GitHub можно найти здесь, при условии, что Connect выполняется в независимом режиме, а Kafka хостится в Confluent Cloud.
2. Создайте таблицу ClickHouse
Убедитесь, что таблица была создана. Пример для минимального набора данных github, использующего стандартный MergeTree, показан ниже.
3. Добавьте данные в Kafka
Вставьте сообщения в Kafka. Мы используем kcat для вставки 10 тысяч сообщений.
Простое считывание из целевой таблицы "Github" должно подтвердить вставку данных.