CDC из DynamoDB в ClickHouse
Эта страница описывает, как настроить CDC из DynamoDB в ClickHouse с помощью ClickPipes. Это интеграция состоит из 2 компонентов:
- Начальный снимок через S3 ClickPipes
- Обновления в реальном времени через Kinesis ClickPipes
Данные будут загружены в ReplacingMergeTree
. Этот движок таблиц обычно используется для сценариев CDC, позволяя применять операции обновления. Больше о данной модели можно найти в следующих блоговых статьях:
- Сбор изменений данных (CDC) с помощью PostgreSQL и ClickHouse - Часть 1
- Сбор изменений данных (CDC) с помощью PostgreSQL и ClickHouse - Часть 2
1. Настройка потока Kinesis
Сначала вам нужно будет включить поток Kinesis на вашей таблице DynamoDB, чтобы захватывать изменения в реальном времени. Мы хотим сделать это перед созданием снимка, чтобы не пропустить никакие данные. Найдите руководство AWS, расположенное здесь.

2. Создание снимка
Далее мы создадим снимок таблицы DynamoDB. Это можно сделать через экспорт AWS в S3. Найдите руководство AWS, расположенное здесь. Вы должны сделать "Полный экспорт" в формате JSON DynamoDB.

3. Загрузка снимка в ClickHouse
Создание необходимых таблиц
Данные снимка из DynamoDB будут выглядеть примерно так:
Обратите внимание, что данные имеют вложенный формат. Нам нужно будет развернуть эти данные перед загрузкой их в ClickHouse. Это можно сделать с помощью функции JSONExtract
в ClickHouse в материализованном представлении.
Мы создадим три таблицы:
- Таблица для хранения сырых данных из DynamoDB
- Таблица для хранения конечных развернутых данных (таблица назначения)
- Материализованное представление для развертывания данных
Для примера данных DynamoDB выше таблицы ClickHouse будут выглядеть следующим образом:
Существует несколько требований к таблице назначения:
- Эта таблица должна быть таблицей
ReplacingMergeTree
- Таблица должна иметь колонку
version
- На следующих этапах мы будем сопоставлять поле
ApproximateCreationDateTime
из потока Kinesis с колонкойversion
.
- На следующих этапах мы будем сопоставлять поле
- Таблица должна использовать ключ партиционирования в качестве ключа сортировки (указанного с помощью
ORDER BY
)- Строки с одинаковым ключом сортировки будут дедуплицироваться на основе колонки
version
.
- Строки с одинаковым ключом сортировки будут дедуплицироваться на основе колонки
Создание ClickPipe снимка
Теперь вы можете создать ClickPipe для загрузки данных снимка из S3 в ClickHouse. Следуйте руководству по S3 ClickPipe здесь, но используйте следующие настройки:
- Путь загрузки: Вам нужно будет определить путь экспортированных json файлов в S3. Путь будет выглядеть примерно так:
- Формат: JSONEachRow
- Таблица: Ваша таблица снимка (например,
default.snapshot
в приведенном выше примере)
После создания данные начнут заполнять таблицы снимка и назначения. Вам не нужно ждать завершения загрузки снимка перед переходом к следующему шагу.
4. Создание Kinesis ClickPipe
Теперь мы можем настроить Kinesis ClickPipe для захвата изменений в реальном времени из потока Kinesis. Следуйте руководству по Kinesis ClickPipe здесь, но используйте следующие настройки:
- Поток: Поток Kinesis, использованный на шаге 1
- Таблица: Ваша таблица назначения (например,
default.destination
в приведенном выше примере) - Развернуть объект: true
- Сопоставления колонок:
ApproximateCreationDateTime
:version
- Сопоставьте другие поля с соответствующими колонками назначения, как показано ниже

5. Очистка (по желанию)
После завершения ClickPipe снимка вы можете удалить таблицу снимка и материализованное представление.