Перейти к основному содержимому
Перейти к основному содержимому

CDC из DynamoDB в ClickHouse

Experimental feature. Learn more.

Эта страница описывает, как настроить CDC из DynamoDB в ClickHouse с помощью ClickPipes. Это интеграция состоит из 2 компонентов:

  1. Начальный снимок через S3 ClickPipes
  2. Обновления в реальном времени через Kinesis ClickPipes

Данные будут загружены в ReplacingMergeTree. Этот движок таблиц обычно используется для сценариев CDC, позволяя применять операции обновления. Больше о данной модели можно найти в следующих блоговых статьях:

1. Настройка потока Kinesis

Сначала вам нужно будет включить поток Kinesis на вашей таблице DynamoDB, чтобы захватывать изменения в реальном времени. Мы хотим сделать это перед созданием снимка, чтобы не пропустить никакие данные. Найдите руководство AWS, расположенное здесь.

Поток Kinesis DynamoDB

2. Создание снимка

Далее мы создадим снимок таблицы DynamoDB. Это можно сделать через экспорт AWS в S3. Найдите руководство AWS, расположенное здесь. Вы должны сделать "Полный экспорт" в формате JSON DynamoDB.

Экспорт S3 DynamoDB

3. Загрузка снимка в ClickHouse

Создание необходимых таблиц

Данные снимка из DynamoDB будут выглядеть примерно так:

{
  "age": {
    "N": "26"
  },
  "first_name": {
    "S": "sally"
  },
  "id": {
    "S": "0A556908-F72B-4BE6-9048-9E60715358D4"
  }
}

Обратите внимание, что данные имеют вложенный формат. Нам нужно будет развернуть эти данные перед загрузкой их в ClickHouse. Это можно сделать с помощью функции JSONExtract в ClickHouse в материализованном представлении.

Мы создадим три таблицы:

  1. Таблица для хранения сырых данных из DynamoDB
  2. Таблица для хранения конечных развернутых данных (таблица назначения)
  3. Материализованное представление для развертывания данных

Для примера данных DynamoDB выше таблицы ClickHouse будут выглядеть следующим образом:

/* Snapshot table */
CREATE TABLE IF NOT EXISTS "default"."snapshot"
(
    `item` String
)
ORDER BY tuple();

/* Table for final flattened data */
CREATE MATERIALIZED VIEW IF NOT EXISTS "default"."snapshot_mv" TO "default"."destination" AS
SELECT
    JSONExtractString(item, 'id', 'S') AS id,
    JSONExtractInt(item, 'age', 'N') AS age,
    JSONExtractString(item, 'first_name', 'S') AS first_name
FROM "default"."snapshot";

/* Table for final flattened data */
CREATE TABLE IF NOT EXISTS "default"."destination" (
    "id" String,
    "first_name" String,
    "age" Int8,
    "version" Int64
)
ENGINE ReplacingMergeTree("version")
ORDER BY id;

Существует несколько требований к таблице назначения:

  • Эта таблица должна быть таблицей ReplacingMergeTree
  • Таблица должна иметь колонку version
    • На следующих этапах мы будем сопоставлять поле ApproximateCreationDateTime из потока Kinesis с колонкой version.
  • Таблица должна использовать ключ партиционирования в качестве ключа сортировки (указанного с помощью ORDER BY)
    • Строки с одинаковым ключом сортировки будут дедуплицироваться на основе колонки version.

Создание ClickPipe снимка

Теперь вы можете создать ClickPipe для загрузки данных снимка из S3 в ClickHouse. Следуйте руководству по S3 ClickPipe здесь, но используйте следующие настройки:

  • Путь загрузки: Вам нужно будет определить путь экспортированных json файлов в S3. Путь будет выглядеть примерно так:
https://{bucket}.s3.amazonaws.com/{prefix}/AWSDynamoDB/{export-id}/data/*
  • Формат: JSONEachRow
  • Таблица: Ваша таблица снимка (например, default.snapshot в приведенном выше примере)

После создания данные начнут заполнять таблицы снимка и назначения. Вам не нужно ждать завершения загрузки снимка перед переходом к следующему шагу.

4. Создание Kinesis ClickPipe

Теперь мы можем настроить Kinesis ClickPipe для захвата изменений в реальном времени из потока Kinesis. Следуйте руководству по Kinesis ClickPipe здесь, но используйте следующие настройки:

  • Поток: Поток Kinesis, использованный на шаге 1
  • Таблица: Ваша таблица назначения (например, default.destination в приведенном выше примере)
  • Развернуть объект: true
  • Сопоставления колонок:
    • ApproximateCreationDateTime: version
    • Сопоставьте другие поля с соответствующими колонками назначения, как показано ниже
Сопоставление колонок DynamoDB

5. Очистка (по желанию)

После завершения ClickPipe снимка вы можете удалить таблицу снимка и материализованное представление.

DROP TABLE IF EXISTS "default"."snapshot";
DROP TABLE IF EXISTS "default"."snapshot_clickpipes_error";
DROP VIEW IF EXISTS "default"."snapshot_mv";