시그마 삽질==six 시그마

Spring Kafka Basic 본문

프로그래밍/Spring

Spring Kafka Basic

Ethan Matthew Hunt 2020. 8. 25. 23:00

1. 카프카 정의 및 특징

2011년 링크드인에서 개발한 분산 메시징 시스템

 

특징

소스와 타켓의 커플링을 느슨하게 함.데이터의 중앙집중화

1)대용량 실시간 로그처리에 특화됨 - 파티션을 통한 분산처리-> 컨슈머 늘려서 병렬처리 가능

2)확장성-신규브로커 추가로 수평확장가능

3)고가용성-레플리카로  복구가능

4)메세지 미삭제로 rewind가능

 

2. Kafka 설치 및 실행(mac 기준)

 


//zookeepr 자동 설치됨
brew install kafka

brew services start zookeeper

brew services start kafka

//카프카 설치 위치 알 수 있음
brew info kafka 

혹시 로컬에서 카프카 테스트를 하고 싶으면
cd 카프카설치위치/bin

./kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test2

./kafka-console-producer --broker-list localhost:9092 --topic test2

몇개 단어 입력후
ctrl+c로 나온후 

./kafka-console-consumer --bootstrap-server localhost:9092 --topic test2 --from-beginning
하면  위에 입력했던 단어들 나옴 완료 후에 

하단 두개 하면됨. 근데  Spring Kafka 테스트할거니 일단둠
brew services stop kafka
brew services stop zookeeper 


https://kafka.apache.org/quickstart

3. gradle 설정

 

하단의 주소에 가서 ->Learn -> 내 스프링버전에 맞는 reference doc 클릭 -> 그후 좌측 dependency-versions 클릭해서 

https://spring.io/projects/spring-boot#learn

 

spring-kafka로 검색해서 맞는 버전을 넣어줘야함

 

compile "org.springframework.kafka:spring-kafka:2.5.5.RELEASE"

 

 

4. application.properties 설정


spring.kafka.consumer.group-id=kafka-consumer_group_start
spring.kafka.bootstrap-servers=localhost:9092
server.port=9080

 

5. 코드

 

@SpringBootApplication
@EnableScheduling
public class KafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class, args);
    }

}

 

 

 

package com.example.kafka.task;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class KafkaTasks {

    private static final Logger log = LoggerFactory.getLogger(KafkaTasks.class);

    private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void produce(String topic, String data) {
        kafkaTemplate.send(topic, data);
        log.info("Message Sent: " + data + " topic: " + topic);
    }


    @Scheduled(fixedRate = 30000)
    public void scheduledMessage() {
        produce("this_is_topic_name", "bla_bla_bla...:"+ dateFormat.format(new Date()));
    }

    @KafkaListener(topics = "this_is_topic_name")
    public void receivedTopic(ConsumerRecord consumerRecord) {
        log.info("Received topic: "+consumerRecord.toString());
    }
}

 

 

-하단 로그는 일부 수정했습니다

2020-08-25 22:58:57.422  INFO 5924 --- [           main] com.example.kafka.KafkaApplication       : Starting KafkaApplication on ISP with .....
2020-08-25 22:58:57.424  INFO 5924 --- [           main] com.example.kafka.KafkaApplication       : No active profile set, falling back to default profiles: default
2020-08-25 22:58:58.126  INFO 5924 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 9080 (http)
2020-08-25 22:58:58.134  INFO 5924 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2020-08-25 22:58:58.134  INFO 5924 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.37]
2020-08-25 22:58:58.188  INFO 5924 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2020-08-25 22:58:58.188  INFO 5924 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 735 ms
2020-08-25 22:58:58.377  INFO 5924 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-08-25 22:58:58.510  INFO 5924 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
2020-08-25 22:58:58.566  INFO 5924 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	bootstrap.servers = [localhost:9092]
	check.crcs = true
	client.dns.lookup = default
	client.id = 
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = kafka-consumer_group_start
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2020-08-25 22:58:58.613  INFO 5924 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.1
2020-08-25 22:58:58.614  INFO 5924 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 0efa8fb0fwc03d0f21
2020-08-25 22:58:58.614  INFO 5924 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1598363938612
2020-08-25 22:58:58.616  INFO 5924 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] Subscribed to topic(s): this_is_topic_name
2020-08-25 22:58:58.617  INFO 5924 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2020-08-25 22:58:58.643  INFO 5924 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 9080 (http) with context path ''
2020-08-25 22:58:58.652  INFO 5924 --- [   scheduling-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [localhost:9092]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = producer-1
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2020-08-25 22:58:58.652  INFO 5924 --- [           main] com.example.kafka.KafkaApplication       : Started KafkaApplication in 1.513 seconds (JVM running for 1.98)
2020-08-25 22:58:58.666  INFO 5924 --- [   scheduling-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.1
2020-08-25 22:58:58.666  INFO 5924 --- [   scheduling-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 0efa8fb0fwc03d0f21
2020-08-25 22:58:58.666  INFO 5924 --- [   scheduling-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1598363938666
2020-08-25 22:58:58.858  INFO 5924 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: 7PNSDydfsQSCYFVfAsFfdsDSFs
2020-08-25 22:58:58.858  INFO 5924 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] Cluster ID: 7PNSDydfsQSCYFVfAsFfdsDSFs
2020-08-25 22:58:58.859  INFO 5924 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] Discovered group coordinator 192.168.35.xxx:9092 (id: 2147483647 rack: null)
2020-08-25 22:58:58.860  INFO 5924 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] (Re-)joining group
2020-08-25 22:58:58.871  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 22:58:58 topic: this_is_topic_name
2020-08-25 22:58:58.883  INFO 5924 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2020-08-25 22:58:58.883  INFO 5924 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] (Re-)joining group
2020-08-25 22:58:58.887  INFO 5924 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] Finished assignment for group at generation 3: {consumer-kafka-consumer_group_start-1-9e91cdd2-45d8-48c8-a83f-8001bb3787de=Assignment(partitions=[this_is_topic_name-0])}
2020-08-25 22:58:58.891  INFO 5924 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] Successfully joined group with generation 3
2020-08-25 22:58:58.894  INFO 5924 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] Adding newly assigned partitions: this_is_topic_name-0
2020-08-25 22:58:58.905  INFO 5924 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-kafka-consumer_group_start-1, groupId=kafka-consumer_group_start] Setting offset for partition this_is_topic_name-0 to the committed offset FetchPosition{offset=11, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.35.xxx:9092 (id: 0 rack: null)], epoch=0}}
2020-08-25 22:58:58.906  INFO 5924 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : kafka-consumer_group_start: partitions assigned: [this_is_topic_name-0]
2020-08-25 22:58:58.944  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 11, CreateTime = 1598363938858, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 22:58:58)
2020-08-25 22:59:28.652  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 22:59:28 topic: this_is_topic_name
2020-08-25 22:59:28.655  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 12, CreateTime = 1598363968652, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 22:59:28)
2020-08-25 22:59:58.655  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 22:59:58 topic: this_is_topic_name
2020-08-25 22:59:58.657  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 13, CreateTime = 1598363998655, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 22:59:58)
2020-08-25 23:00:28.653  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:00:28 topic: this_is_topic_name
2020-08-25 23:00:28.656  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 14, CreateTime = 1598364028653, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:00:28)
2020-08-25 23:00:58.654  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:00:58 topic: this_is_topic_name
2020-08-25 23:00:58.656  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 15, CreateTime = 1598364058654, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:00:58)
2020-08-25 23:01:28.656  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:01:28 topic: this_is_topic_name
2020-08-25 23:01:28.658  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 16, CreateTime = 1598364088656, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:01:28)
2020-08-25 23:01:58.655  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:01:58 topic: this_is_topic_name
2020-08-25 23:01:58.657  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 17, CreateTime = 1598364118655, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:01:58)
2020-08-25 23:02:28.655  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:02:28 topic: this_is_topic_name
2020-08-25 23:02:28.656  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 18, CreateTime = 1598364148654, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:02:28)
2020-08-25 23:02:58.655  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:02:58 topic: this_is_topic_name
2020-08-25 23:02:58.657  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 19, CreateTime = 1598364178655, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:02:58)
2020-08-25 23:03:28.658  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:03:28 topic: this_is_topic_name
2020-08-25 23:03:28.660  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 20, CreateTime = 1598364208658, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:03:28)
2020-08-25 23:03:58.658  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:03:58 topic: this_is_topic_name
2020-08-25 23:03:58.660  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 21, CreateTime = 1598364238658, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:03:58)
2020-08-25 23:04:28.658  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:04:28 topic: this_is_topic_name
2020-08-25 23:04:28.661  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 22, CreateTime = 1598364268658, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:04:28)
2020-08-25 23:04:58.660  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:04:58 topic: this_is_topic_name
2020-08-25 23:04:58.661  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 23, CreateTime = 1598364298659, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:04:58)
2020-08-25 23:05:28.660  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:05:28 topic: this_is_topic_name
2020-08-25 23:05:28.662  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 24, CreateTime = 1598364328660, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:05:28)
2020-08-25 23:05:58.660  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:05:58 topic: this_is_topic_name
2020-08-25 23:05:58.662  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 25, CreateTime = 1598364358660, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:05:58)
2020-08-25 23:06:28.659  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:06:28 topic: this_is_topic_name
2020-08-25 23:06:28.662  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 26, CreateTime = 1598364388659, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:06:28)
2020-08-25 23:06:58.659  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:06:58 topic: this_is_topic_name
2020-08-25 23:06:58.662  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 27, CreateTime = 1598364418659, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:06:58)
2020-08-25 23:07:28.659  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:07:28 topic: this_is_topic_name
2020-08-25 23:07:28.661  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 28, CreateTime = 1598364448659, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:07:28)
2020-08-25 23:07:58.660  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:07:58 topic: this_is_topic_name
2020-08-25 23:07:58.661  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 29, CreateTime = 1598364478659, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:07:58)
2020-08-25 23:08:28.662  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:08:28 topic: this_is_topic_name
2020-08-25 23:08:28.665  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 30, CreateTime = 1598364508662, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:08:28)
2020-08-25 23:08:58.661  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:08:58 topic: this_is_topic_name
2020-08-25 23:08:58.663  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 31, CreateTime = 1598364538661, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:08:58)
2020-08-25 23:09:28.662  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:09:28 topic: this_is_topic_name
2020-08-25 23:09:28.664  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 32, CreateTime = 1598364568662, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:09:28)
2020-08-25 23:09:58.661  INFO 5924 --- [   scheduling-1] com.example.kafka.task.KafkaTasks        : Message Sent: bla_bla_bla...:2020-08-25 23:09:58 topic: this_is_topic_name
2020-08-25 23:09:58.664  INFO 5924 --- [ntainer#0-0-C-1] com.example.kafka.task.KafkaTasks        : Received topic: ConsumerRecord(topic = this_is_topic_name, partition = 0, leaderEpoch = 0, offset = 33, CreateTime = 1598364598661, serialized key size = -1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = bla_bla_bla...:2020-08-25 23:09:58)

 

카프카 종료

brew services stop zookeeper
brew services stop kafka

'프로그래밍 > Spring' 카테고리의 다른 글

Spring webflux  (0) 2022.07.19
Spring batch 요약  (0) 2020.10.25
@TransactionalEventListener  (0) 2020.10.22
Spring Cloud Netflix  (0) 2020.10.18
ObjectMapper & OrikaMapper basic  (0) 2020.08.24
Comments