Перейти к основному содержимому
Перейти к основному содержимому

AvroConfluent

Входные данныеВыходные данныеПсевдоним

Описание

Apache Avro — это ориентированный на строки формат сериализации, который использует двоичное кодирование для эффективной обработки данных. Формат AvroConfluent поддерживает декодирование однообъектных сообщений Kafka, закодированных в формате Avro, сериализованных с использованием Confluent Schema Registry (или совместимых с API сервисов).

Каждое сообщение Avro встраивает идентификатор схемы, который ClickHouse автоматически разрешает, запрашивая настроенный реестр схем. После разрешения схемы кэшируются для оптимальной производительности.

Соответствие типов данных

Таблица ниже показывает все типы данных, поддерживаемые форматом Apache Avro, и их соответствующие типы данных ClickHouse в запросах INSERT и SELECT.

Avro тип данных INSERTClickHouse тип данныхAvro тип данных SELECT
boolean, int, long, float, doubleInt(8\16\32), UInt(8\16\32)int
boolean, int, long, float, doubleInt64, UInt64long
boolean, int, long, float, doubleFloat32float
boolean, int, long, float, doubleFloat64double
bytes, string, fixed, enumStringbytes или string *
bytes, string, fixedFixedString(N)fixed(N)
enumEnum(8\16)enum
array(T)Array(T)array(T)
map(V, K)Map(V, K)map(string, K)
union(null, T), union(T, null)Nullable(T)union(null, T)
union(T1, T2, …) **Variant(T1, T2, …)union(T1, T2, …) **
nullNullable(Nothing)null
int (date) ***Date, Date32int (date) ***
long (timestamp-millis) ***DateTime64(3)long (timestamp-millis) ***
long (timestamp-micros) ***DateTime64(6)long (timestamp-micros) ***
bytes (decimal) ***DateTime64(N)bytes (decimal) ***
intIPv4int
fixed(16)IPv6fixed(16)
bytes (decimal) ***Decimal(P, S)bytes (decimal) ***
string (uuid) ***UUIDstring (uuid) ***
fixed(16)Int128/UInt128fixed(16)
fixed(32)Int256/UInt256fixed(32)
recordTuplerecord

* bytes по умолчанию, контролируется настройкой output_format_avro_string_column_pattern

** Тип Variant неявно принимает null в качестве значения поля, поэтому, например, Avro union(T1, T2, null) будет преобразован в Variant(T1, T2). В результате, при создании Avro из ClickHouse, мы всегда должны включать тип null в набор типов Avro union, так как мы не знаем, является ли какое-либо значение фактически null во время вывода схемы.

*** Логические типы Avro

Не поддерживаемые логические типы данных Avro:

  • time-millis
  • time-micros
  • duration

Настройки формата

НастройкаОписаниеПо умолчанию
input_format_avro_allow_missing_fieldsИспользовать ли значение по умолчанию вместо генерации ошибки, когда поле не найдено в схеме.0
input_format_avro_null_as_defaultИспользовать ли значение по умолчанию вместо генерации ошибки при вставке значения null в ненулевую колонку.0
format_avro_schema_registry_urlURL реестра схем Confluent. Для базовой аутентификации учетные данные можно включить непосредственно в путь URL.

Примеры

Использование реестра схем

Чтобы прочитать топик Kafka, закодированный в формате Avro, используя движок таблиц Kafka, используйте настройку format_avro_schema_registry_url, чтобы указать URL реестра схем.

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;

Использование базовой аутентификации

Если ваш реестр схем требует базовой аутентификации (например, если вы используете Confluent Cloud), вы можете предоставить закодированные в URL учетные данные в настройке format_avro_schema_registry_url.

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';

Устранение неполадок

Чтобы отслеживать процесс приема данных и отлаживать ошибки с потребителем Kafka, вы можете запрашивать system.kafka_consumers системную таблицу. Если в вашей развертке несколько реплик (например, ClickHouse Cloud), вам нужно использовать табличную функцию clusterAllReplicas.

SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;

Если вы столкнулись с проблемами разрешения схемы, вы можете использовать kafkacat с clickhouse-local для устранения неполадок:

$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c