Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
Tags
- ksql
- Stream
- IntelliJ
- 자바 ORM 표준 JPA 프로그래밍
- git
- @TransactionalEventListener
- JPA
- intellij 핵심 단축키
- 친절한 SQL 튜닝
- 리눅스
- #docker compose
- javascript case
- intellij 즐겨찾기
- java
- CompletableFuture
- 원격 브랜 삭제
- HandlerMethodArgumentResolver
- 마이크로 서비스
- aws
- multipart테스트
- vue.js
- ksqldb
- findTopBy
- intellij favorites
- 백명석님
- 자바 ORM 표준 JPA 프로그래밍 정리
- 리팩토링 2판
- Linux
- Spring Cloud Netflix
- @Transactional Propagation
Archives
- Today
- Total
시그마 삽질==six 시그마
Spring Kafka Basic 본문
1. 카프카 정의 및 특징
2011년 링크드인에서 개발한 분산 메시징 시스템
특징
소스와 타켓의 커플링을 느슨하게 함.데이터의 중앙집중화
1)대용량 실시간 로그처리에 특화됨 - 파티션을 통한 분산처리-> 컨슈머 늘려서 병렬처리 가능
2)확장성-신규브로커 추가로 수평확장가능
3)고가용성-레플리카로 복구가능
4)메세지 미삭제로 rewind가능
2. Kafka 설치 및 실행(mac 기준)
//zookeepr 자동 설치됨
brew install kafka
brew services start zookeeper
brew services start kafka
//카프카 설치 위치 알 수 있음
brew info kafka
혹시 로컬에서 카프카 테스트를 하고 싶으면
cd 카프카설치위치/bin
./kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test2
./kafka-console-producer --broker-list localhost:9092 --topic test2
몇개 단어 입력후
ctrl+c로 나온후
./kafka-console-consumer --bootstrap-server localhost:9092 --topic test2 --from-beginning
하면 위에 입력했던 단어들 나옴 완료 후에
하단 두개 하면됨. 근데 Spring Kafka 테스트할거니 일단둠
brew services stop kafka
brew services stop zookeeper
https://kafka.apache.org/quickstart
3. gradle 설정
하단의 주소에 가서 ->Learn -> 내 스프링버전에 맞는 reference doc 클릭 -> 그후 좌측 dependency-versions 클릭해서
https://spring.io/projects/spring-boot#learn
spring-kafka로 검색해서 맞는 버전을 넣어줘야함
compile "org.springframework.kafka:spring-kafka:2.5.5.RELEASE"
4. application.properties 설정
spring.kafka.consumer.group-id=kafka-consumer_group_start
spring.kafka.bootstrap-servers=localhost:9092
server.port=9080
5. 코드
@SpringBootApplication
@EnableScheduling
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
}
package com.example.kafka.task;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class KafkaTasks {
private static final Logger log = LoggerFactory.getLogger(KafkaTasks.class);
private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Autowired
private KafkaTemplate kafkaTemplate;
public void produce(String topic, String data) {
kafkaTemplate.send(topic, data);
log.info("Message Sent: " + data + " topic: " + topic);
}
@Scheduled(fixedRate = 30000)
public void scheduledMessage() {
produce("this_is_topic_name", "bla_bla_bla...:"+ dateFormat.format(new Date()));
}
@KafkaListener(topics = "this_is_topic_name")
public void receivedTopic(ConsumerRecord consumerRecord) {
log.info("Received topic: "+consumerRecord.toString());
}
}
-하단 로그는 일부 수정했습니다
2020-08-25 22:58:57.422 INFO 5924 --- [ main] com.example.kafka.KafkaApplication : Starting KafkaApplication on ISP with .....
2020-08-25 22:58:57.424 INFO 5924 --- [ main] com.example.kafka.KafkaApplication : No active profile set, falling back to default profiles: default
2020-08-25 22:58:58.126 INFO 5924 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 9080 (http)
2020-08-25 22:58:58.134 INFO 5924 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2020-08-25 22:58:58.134 INFO 5924 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.37]
2020-08-25 22:58:58.188 INFO 5924 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2020-08-25 22:58:58.188 INFO 5924 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 735 ms
2020-08-25 22:58:58.377 INFO 5924 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-08-25 22:58:58.510 INFO 5924 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2020-08-25 22:58:58.566 INFO 5924 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = kafka-consumer_group_start
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2020-08-25 22:58:58.613 INFO 5924 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.1
2020-08-25 22:58:58.614 INFO 5924 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 0efa8fb0fwc03d0f21
2020-08-25 22:58:58.614 INFO 5924 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1598363938612
2020-08-25 22:58:58.616 INFO 5924 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] Subscribed to topic(s): this_is_topic_name
2020-08-25 22:58:58.617 INFO 5924 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2020-08-25 22:58:58.643 INFO 5924 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 9080 (http) with context path ''
2020-08-25 22:58:58.652 INFO 5924 --- [ scheduling-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2020-08-25 22:58:58.652 INFO 5924 --- [ main] com.example.kafka.KafkaApplication : Started KafkaApplication in 1.513 seconds (JVM running for 1.98)
2020-08-25 22:58:58.666 INFO 5924 --- [ scheduling-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.1
2020-08-25 22:58:58.666 INFO 5924 --- [ scheduling-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 0efa8fb0fwc03d0f21
2020-08-25 22:58:58.666 INFO 5924 --- [ scheduling-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1598363938666
2020-08-25 22:58:58.858 INFO 5924 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: 7PNSDydfsQSCYFVfAsFfdsDSFs
2020-08-25 22:58:58.858 INFO 5924 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] Cluster ID: 7PNSDydfsQSCYFVfAsFfdsDSFs
2020-08-25 22:58:58.859 INFO 5924 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] Discovered group coordinator 192.168.35.xxx:9092 (id: 2147483647 rack: null)
2020-08-25 22:58:58.860 INFO 5924 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] (Re-)joining group
2020-08-25 22:58:58.871 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 22:58:58 topic: this_is_topic_name
2020-08-25 22:58:58.883 INFO 5924 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2020-08-25 22:58:58.883 INFO 5924 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] (Re-)joining group
2020-08-25 22:58:58.887 INFO 5924 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] Finished assignment for group at generation 3: {consumer-kafka-consumer_group_start-1-9e91cdd2-45d8-48c8-a83f-8001bb3787de=Assignment(partitions=[this_is_topic_name-0])}
2020-08-25 22:58:58.891 INFO 5924 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] Successfully joined group with generation 3
2020-08-25 22:58:58.894 INFO 5924 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] Adding newly assigned partitions: this_is_topic_name-0
2020-08-25 22:58:58.905 INFO 5924 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] Setting offset for partition this_is_topic_name-0 to the committed offset FetchPosition{offset=11, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.35.xxx:9092 (id: 0 rack: null)], epoch=0}}
2020-08-25 22:58:58.906 INFO 5924 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : kafka-consumer_group_start: partitions assigned: [this_is_topic_name-0]
2020-08-25 22:58:58.944 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 11, CreateTime = 1598363938858, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 22:58:58)
2020-08-25 22:59:28.652 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 22:59:28 topic: this_is_topic_name
2020-08-25 22:59:28.655 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 12, CreateTime = 1598363968652, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 22:59:28)
2020-08-25 22:59:58.655 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 22:59:58 topic: this_is_topic_name
2020-08-25 22:59:58.657 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 13, CreateTime = 1598363998655, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 22:59:58)
2020-08-25 23:00:28.653 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:00:28 topic: this_is_topic_name
2020-08-25 23:00:28.656 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 14, CreateTime = 1598364028653, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:00:28)
2020-08-25 23:00:58.654 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:00:58 topic: this_is_topic_name
2020-08-25 23:00:58.656 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 15, CreateTime = 1598364058654, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:00:58)
2020-08-25 23:01:28.656 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:01:28 topic: this_is_topic_name
2020-08-25 23:01:28.658 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 16, CreateTime = 1598364088656, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:01:28)
2020-08-25 23:01:58.655 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:01:58 topic: this_is_topic_name
2020-08-25 23:01:58.657 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 17, CreateTime = 1598364118655, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:01:58)
2020-08-25 23:02:28.655 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:02:28 topic: this_is_topic_name
2020-08-25 23:02:28.656 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 18, CreateTime = 1598364148654, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:02:28)
2020-08-25 23:02:58.655 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:02:58 topic: this_is_topic_name
2020-08-25 23:02:58.657 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 19, CreateTime = 1598364178655, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:02:58)
2020-08-25 23:03:28.658 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:03:28 topic: this_is_topic_name
2020-08-25 23:03:28.660 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 20, CreateTime = 1598364208658, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:03:28)
2020-08-25 23:03:58.658 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:03:58 topic: this_is_topic_name
2020-08-25 23:03:58.660 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 21, CreateTime = 1598364238658, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:03:58)
2020-08-25 23:04:28.658 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:04:28 topic: this_is_topic_name
2020-08-25 23:04:28.661 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 22, CreateTime = 1598364268658, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:04:28)
2020-08-25 23:04:58.660 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:04:58 topic: this_is_topic_name
2020-08-25 23:04:58.661 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 23, CreateTime = 1598364298659, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:04:58)
2020-08-25 23:05:28.660 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:05:28 topic: this_is_topic_name
2020-08-25 23:05:28.662 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 24, CreateTime = 1598364328660, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:05:28)
2020-08-25 23:05:58.660 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:05:58 topic: this_is_topic_name
2020-08-25 23:05:58.662 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 25, CreateTime = 1598364358660, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:05:58)
2020-08-25 23:06:28.659 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:06:28 topic: this_is_topic_name
2020-08-25 23:06:28.662 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 26, CreateTime = 1598364388659, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:06:28)
2020-08-25 23:06:58.659 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:06:58 topic: this_is_topic_name
2020-08-25 23:06:58.662 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 27, CreateTime = 1598364418659, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:06:58)
2020-08-25 23:07:28.659 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:07:28 topic: this_is_topic_name
2020-08-25 23:07:28.661 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 28, CreateTime = 1598364448659, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:07:28)
2020-08-25 23:07:58.660 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:07:58 topic: this_is_topic_name
2020-08-25 23:07:58.661 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 29, CreateTime = 1598364478659, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:07:58)
2020-08-25 23:08:28.662 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:08:28 topic: this_is_topic_name
2020-08-25 23:08:28.665 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 30, CreateTime = 1598364508662, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:08:28)
2020-08-25 23:08:58.661 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:08:58 topic: this_is_topic_name
2020-08-25 23:08:58.663 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 31, CreateTime = 1598364538661, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:08:58)
2020-08-25 23:09:28.662 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:09:28 topic: this_is_topic_name
2020-08-25 23:09:28.664 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 32, CreateTime = 1598364568662, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:09:28)
2020-08-25 23:09:58.661 INFO 5924 --- [ scheduling-1] com.example.kafka.task.KafkaTasks : Message Sent: bla_bla_bla...:2020-08-25 23:09:58 topic: this_is_topic_name
2020-08-25 23:09:58.664 INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 33, CreateTime = 1598364598661, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:09:58)
카프카 종료
brew services stop zookeeper
brew services stop kafka
'프로그래밍 > Spring' 카테고리의 다른 글
Spring webflux (0) | 2022.07.19 |
---|---|
Spring batch 요약 (0) | 2020.10.25 |
@TransactionalEventListener (0) | 2020.10.22 |
Spring Cloud Netflix (0) | 2020.10.18 |
ObjectMapper & OrikaMapper basic (0) | 2020.08.24 |
Comments