일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
- findTopBy
- ksqldb
- Linux
- 자바 ORM 표준 JPA 프로그래밍
- intellij 핵심 단축키
- java
- Stream
- CompletableFuture
- aws
- 원격 브랜 삭제
- 리팩토링 2판
- intellij favorites
- Spring Cloud Netflix
- IntelliJ
- 백명석님
- @Transactional Propagation
- HandlerMethodArgumentResolver
- JPA
- 친절한 SQL 튜닝
- multipart테스트
- 마이크로 서비스
- @TransactionalEventListener
- intellij 즐겨찾기
- #docker compose
- 자바 ORM 표준 JPA 프로그래밍 정리
- 리눅스
- vue.js
- ksql
- git
- javascript case
- Today
- Total
시그마 삽질==six 시그마
kafka schema registry 본문
카프카 스키마 레지스트리
데이터를 만들어내는 Producer와 데이터를 사용하는 Consumer 간의 계약으로 사용
• 스키마가 없으면 시간이 지남에 따라, 제어된 방식으로 데이터 구조를 발전시킬 수단이 없게 됨
• 데이터 구조는 항상 비즈니스에 따라서 진화하는데, 이것을 Schema Evolution(스키마 진화)라고 함
AVRO
Data Serialization System
• Avro는 Apache Open Source Software 프로젝트
• 데이터 Serialization 제공
• Java를 포함한 많은 프로그래밍 언어에서 지원
• 데이터구조형식제공
• Avro 데이터는 바이너리이므로 데이터를 효율적으로 저장
https://avro.apache.org/docs/1.11.1/specification/
Confluent Schema Registry 스키마 저장소
Confluent Schema Registry는 스키마의 중앙 집중식 관리를 제공
• 모든 스키마의 버전 기록을 저장
• Avro 스키마 저장 및 검색을 위한 RESTful 인터페이스 제공
• 스키마를 확인하고 데이터가 스키마와 일치하지 않으면 예외를 throw
• 호환성 설정에 따라 스키마 진화(Schema Evolution) 가능
각 메시지와 함께 Avro 스키마를 보내는 것은 비효율적
• 대신 Avro 스키마를 나타내는 Global Unique ID가 각 메시지와 함께 전송(메시지 +스키마 ID)
Schema Registry는 특별한 Kafka Topic에 스키마 정보를 저장
• “_schemas” <--브로커에 저장됨
•schema-registry.properties에서 kafkastore.topic 파라미터로 변경 가능
https://docs.confluent.io/platform/current/schema-registry/index.html
producer : Schema 가 자신 Local Cache에 없으면, Schema를 로컬캐쉬에 등록 후 Schema Registry에도 등록, 메시지에 스키마ID 담아서 브로커에 전송
consumer: 컨슈머는 스키마 ID가 있으면 자신 로컬 캐시 저장소 뒤져서 없으면 (스키마 ID로) Schema Registry로 요청 .가져와서 데이터를 파싱하는데 사용함.
버전업 되면 ID 바뀌고 프로듀서는 다시 로컬 캐싱에 넣고...
스키마 레지스트리 저장되는곳
1. 스키마 레지스트리는 기본적으로 It stores a copy of the schema from every incoming message in the local cache.( 아닌듯!!)
1분 58초
Schema Registry is a standalone server process that runs on a machine external to the Kafka brokers. Its job is to maintain a database of all of the schemas that have been written into topics in the cluster for which it is responsible. That “database” is persisted in an internal Kafka topic and cached in Schema Registry for low-latency access. Schema Registry can be run in a redundant, high-availability configuration, so it remains up if one instance fails.
https://developer.confluent.io/learn-kafka/apache-kafka/schema-registry/
https://hevodata.com/learn/kafka-schema-registry/#u
2. 브로커의 _schemas 저장됨(정답!!!)
https://blog.devgenius.io/kafka-schema-registry-abea44b88714
https://docs.confluent.io/platform/current/schema-registry/security/index.html
https://faun.pub/how-to-save-confluent-kafka-schema-registry-with-curl-30efadfbf162
https://cloud.yandex.com/en/docs/managed-kafka/tutorials/confluent-schema-registry
https://blog.voidmainvoid.net/463
Kafka is used as Schema Registry storage backend. The special Kafka topic <kafkastore.topic> (default _schemas), with a single partition, is used as a highly available write ahead log. All schemas, subject/version and ID metadata, and compatibility settings are appended as messages to this log. A Schema Registry instance therefore both produces and consumes messages under the _schemas topic. It produces messages to the log when, for example, new schemas are registered under a subject, or when updates to compatibility settings are registered. Schema Registry consumes from the _schemas log in a background thread, and updates its local caches on consumption of each new _schemas message to reflect the newly added schema or compatibility setting. Updating local state from the Kafka log in this manner ensures durability, ordering, and easy recoverability.
—>
/usr/local/confluent/etc/schema-registry/schema-registry.properties
// 스키마 레지스트리에서 사용할 TCP 포트를 8081 포트로 지정
listeners=http:/0.0.0.0:8081
//스키마의 버전 히스토리 및 관련 데이터를 저장할 카프카 주소를 입력
kafkastore.bootstrap.servers=PLAINTEXT;//peter-kafka01.foo.bar:9092,peter-kafka02.foo.bar:9092,peter-kafka03.foo.bar:9092
// 스키마의 버전 히스토리 및 관련 데이터 저장 토픽의 이름을 _schemas로 지정
kafkastore.topic=_schemas
//스키마 호환성 레벨을 full로 지정
schema.compatiblitiy.level=full
스키마 관리 목적으로 사용되는 메시지들은 순서가 중요하기 때문에 _schemas 토픽의 파티션 수는 항상 1
프로듀서에서 schema.registry.url, value.schema 설정해줘야함
컨슈머에서 schema.registry.url 설정해줘야함
# Produce a record with one field
kafka-avro-console-producer \
--broker-list 127.0.0.1:9092 --topic test-avro \
--property schema.registry.url=http://127.0.0.1:8081 \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
# let's trigger an error:
{"f2": "value4"}
# let's trigger another error:
{"f1": 1}
# Consume the records from the beginning of the topic:
kafka-avro-console-consumer --topic test-avro \
--bootstrap-server 127.0.0.1:9092 \
--property schema.registry.url=http://127.0.0.1:8081 \
--from-beginning
# Produce some errors with an incompatible schema (we changed to int) - should produce a 409
kafka-avro-console-producer \
--broker-list localhost:9092 --topic test-avro \
--property schema.registry.url=http://127.0.0.1:8081 \
--property value.schema='{"type":"int"}'
# Some schema evolution (we add a field f2 as an int with a default)
kafka-avro-console-producer \
--broker-list localhost:9092 --topic test-avro \
--property schema.registry.url=http://127.0.0.1:8081 \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"},{"name": "f2", "type": "int", "default": 0}]}'
{"f1": "evolution", "f2": 1 }
# Consume the records again from the beginning of the topic:
kafka-avro-console-consumer --topic test-avro \
--bootstrap-server localhost:9092 \
--from-beginning \
--property schema.registry.url=http://127.0.0.1:8081
int, long,float,double 다있고 null, bytes 도 있음!!!
avrò schema
type
name
doc
fields:
alias
namespace
-fileds:
type
name
doc
default
Arrays use the type name “array” and support a single attribute:
- items: the schema of the array’s items.
For example, an array of strings is declared with:
{
"type": "array",
"items" : "string",
"default": []
}
Maps use the type name “map” and support one attribute:
- values: the schema of the map’s values.
Map keys are assumed to be strings.
For example, a map from string to long is declared with:
{
"type": "map",
"values" : "long",
"default": {}
}
Unions
Unions, as mentioned above, are represented using JSON arrays. For example, ["null", "string"] declares a schema which may be either a null or string.
(Note that when a default value is specified for a record field whose type is a union, the type of the default value must match the first element of the union. Thus, for unions containing “null”, the “null” is usually listed first, since the default value of such unions is typically null.)
Unions may not contain more than one schema with the same type, except for the named types record, fixed and enum. For example, unions containing two array types or two map types are not permitted, but two types with different names are permitted. (Names permit efficient resolution when reading and writing unions.)
Unions may not immediately contain other unions.
Fixed
Fixed uses the type name “fixed” and supports the following attributes:
- name: a string naming this fixed (required).
- namespace, a string that qualifies the name (optional);
- aliases: a JSON array of strings, providing alternate names for this enum (optional).
- size: an integer, specifying the number of bytes per value (required)
For example, 16-byte quantity may be declared with:
{"type": "fixed", "size": 16, "name": "md5"}
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [{
"name": "name",
"type": "string"
},
{
"name": "favorite_number",
"type": ["int", "null"]
}, {
"name": "favorite_color",
"type": ["string", "null"]
}
]
}
doc은 avro의 optional field
[장점]
• 압축, 고성능, Binary 포맷
• Java를 포함한 많은 프로그래밍 언어에서 지원
• Avro 데이터가 파일에 저장되면 해당 스키마가 함께
저장되므로 나중에 모든 프로그램에서 파일 처리 가능
• Avro 스키마는 JSON으로 정의되므로, 이미 JSON 라이브러리가 있는 언어에서 구현이 용이(Schema
Evolution 을 쉽게 지원)
• 데이터의타입을알수있음
• Confluent Schema Registry에서 사용 가능
[단점]
• Binary 형태로 Serialization되기 때문에 데이터를
쉽게 보고 해석하기 어려움(디버깅, 개발시 불편함)
Backward Compatibility(default)
컨슈머가 새로운 스키마를 사용하여 이전 데이터를 읽는 것이 가능한것을의미
프로듀서가 보낸 버전1 메시지를 버전2인 컨슈머가 읽을 수 있는것
기본값없는 필드 삭제 혹은 기본값이 있는 필드 추가인 경우
BACKWARD compatibility means that consumers using the new schema can read data produced with the last schema.
Forward Compatibility
컨슈머가 이전 스키마를 사용하여 새로운 데이터를 읽는 것이 가능한 것을 의미
프로듀서가 보낸 버전2 메시지를 버전1인 컨슈머가 읽을 수 있는것
기본값 없는 필드 추가 혹은 기본값이 있는 필드 삭제
FORWARD compatibility means that data produced with a new schema can be read by consumers using the last schema
https://docs.confluent.io/platform/current/schema-registry/avro.html
Full Compatibility
기본값이 있는 필드 추가 혹은 삭제
Schema 설계시 고려할 점
• 삭제될 가능성이 있는 필드이면 default value 를 반드시 지정
• 추가되는 필드라면 default value 를 지정
• 필드의 이름을 변경하지 않음
'프로그래밍 > Kafka' 카테고리의 다른 글
kafka ksqlDB (0) | 2022.08.26 |
---|---|
kafka stream (0) | 2022.08.26 |
kafka connect (0) | 2022.08.25 |
kafka 모니터링 (0) | 2022.08.01 |
kafka 전략 (0) | 2022.07.31 |