Интеграция Apache Beam и ClickHouse
Apache Beam — это открытый, унифицированный программный интерфейс, который позволяет разработчикам определять и исполнять как пакетные, так и потоковые (непрерывные) конвейеры обработки данных. Гибкость Apache Beam заключается в его способности поддерживать широкий спектр сценариев обработки данных, от операций ETL (Извлечение, Преобразование, Загрузка) до комплексной обработки событий и аналитики в реальном времени. Эта интеграция использует официальный JDBC-коннектор ClickHouse для основной вставки данных.
Интеграционный пакет
Интеграционный пакет, необходимый для интеграции Apache Beam и ClickHouse, поддерживается и разрабатывается в рамках Apache Beam I/O Connectors — набора интеграций множества популярных систем хранения данных и баз данных. Реализация org.apache.beam.sdk.io.clickhouse.ClickHouseIO
расположена в репозитории Apache Beam.
Настройка пакета Apache Beam ClickHouse
Установка пакета
Добавьте следующую зависимость в вашу систему управления пакетами:
Коннектор ClickHouseIO
рекомендуется использовать начиная с версии Apache Beam 2.59.0
. Более ранние версии могут не полностью поддерживать функциональность коннектора.
Артефакты можно найти в официальном репозитории maven.
Пример кода
Следующий пример считывает CSV-файл с именем input.csv
как PCollection
, преобразует его в объект Row (с использованием определенной схемы) и вставляет его в локальный экземпляр ClickHouse с использованием ClickHouseIO
:
Поддерживаемые типы данных
ClickHouse | Apache Beam | Поддерживается | Примечания |
---|---|---|---|
TableSchema.TypeName.FLOAT32 | Schema.TypeName#FLOAT | ✅ | |
TableSchema.TypeName.FLOAT64 | Schema.TypeName#DOUBLE | ✅ | |
TableSchema.TypeName.INT8 | Schema.TypeName#BYTE | ✅ | |
TableSchema.TypeName.INT16 | Schema.TypeName#INT16 | ✅ | |
TableSchema.TypeName.INT32 | Schema.TypeName#INT32 | ✅ | |
TableSchema.TypeName.INT64 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.STRING | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.UINT8 | Schema.TypeName#INT16 | ✅ | |
TableSchema.TypeName.UINT16 | Schema.TypeName#INT32 | ✅ | |
TableSchema.TypeName.UINT32 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.UINT64 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.DATE | Schema.TypeName#DATETIME | ✅ | |
TableSchema.TypeName.DATETIME | Schema.TypeName#DATETIME | ✅ | |
TableSchema.TypeName.ARRAY | Schema.TypeName#ARRAY | ✅ | |
TableSchema.TypeName.ENUM8 | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.ENUM16 | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.BOOL | Schema.TypeName#BOOLEAN | ✅ | |
TableSchema.TypeName.TUPLE | Schema.TypeName#ROW | ✅ | |
TableSchema.TypeName.FIXEDSTRING | FixedBytes | ✅ | FixedBytes — это LogicalType , представляющий фиксированный массив байтов, расположенный по адресу org.apache.beam.sdk.schemas.logicaltypes |
Schema.TypeName#DECIMAL | ❌ | ||
Schema.TypeName#MAP | ❌ |
Параметры ClickHouseIO.Write
Вы можете настроить конфигурацию ClickHouseIO.Write
с помощью следующих функций-сеттеров:
Функция-сеттер параметра | Тип аргумента | Значение по умолчанию | Описание |
---|---|---|---|
withMaxInsertBlockSize | (long maxInsertBlockSize) | 1000000 | Максимальный размер блока строк для вставки. |
withMaxRetries | (int maxRetries) | 5 | Максимальное количество повторных попыток для неудачных вставок. |
withMaxCumulativeBackoff | (Duration maxBackoff) | Duration.standardDays(1000) | Максимальная кумулятивная задержка для повтора. |
withInitialBackoff | (Duration initialBackoff) | Duration.standardSeconds(5) | Начальная задержка перед первой попыткой повторения. |
withInsertDistributedSync | (Boolean sync) | true | Если true, синхронизирует операции вставки для распределенных таблиц. |
withInsertQuorum | (Long quorum) | null | Количество реплик, необходимых для подтверждения операции вставки. |
withInsertDeduplicate | (Boolean deduplicate) | true | Если true, включена дедупликация для операций вставки. |
withTableSchema | (TableSchema schema) | null | Схема целевой таблицы ClickHouse. |
Ограничения
Пожалуйста, учитывайте следующие ограничения при использовании коннектора:
- На сегодняшний день поддерживается только операция Sink. Коннектор не поддерживает операцию Source.
- ClickHouse выполняет дедупликацию при вставке в
ReplicatedMergeTree
илиDistributed
таблицы, построенные на основеReplicatedMergeTree
. Без репликации вставка в обычный MergeTree может привести к дубликатам, если вставка завершается неудачей и затем успешно повторяется. Однако каждый блок вставляется атомарно, и размер блока можно настроить с помощьюClickHouseIO.Write.withMaxInsertBlockSize(long)
. Дедупликация достигается с использованием контрольных сумм вставленных блоков. Для получения дополнительной информации о дедупликации, пожалуйста, посетите Deduplication и Deduplicate insertion config. - Коннектор не выполняет никаких операций DDL; поэтому целевая таблица должна существовать перед вставкой.
Связанный контент
- Документация класса
ClickHouseIO
доступна здесь. - Репозиторий
Github
с примерами clickhouse-beam-connector.