Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
J
joohanhong
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
JooHan Hong
joohanhong
Commits
20c16017
Commit
20c16017
authored
Jun 09, 2022
by
JooHan Hong
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
kafka consumer init
parent
6deec6af
Pipeline
#5741
passed with stages
in 1 minute and 5 seconds
Changes
1
Pipelines
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
90 additions
and
0 deletions
+90
-0
README.md
DEVOPS/KAFKA/CONSUMER/README.md
+90
-0
No files found.
DEVOPS/KAFKA/CONSUMER/README.md
0 → 100644
View file @
20c16017
[
![logo
](
https://www.hongsnet.net/images/logo.gif
)
](https://www.hongsnet.net)
# Consumer Overview
Kafka에서의
`Consumer는 메시지를 생산하는 소비`
로서, 일반적으로
**개발 언어를 이용해 구성하는 것이 일반적**
이다. 단, 기본적인 Consumer는 Kafka에서 제공은 된다.
# Telegraf Consumer 구성
1.
Telegraf Plugin 지원 확인
```
bash
# telegraf --input-list |grep kafka
kafka_consumer
kafka_consumer_legacy
```
> Producer와 다르게 `Input` Plugin에 해당 된다.
2.
Telegraf Output Plugin 설정
```
bash
[[
inputs.kafka_consumer]]
brokers
=
[
"broker01:9092"
,
"broker02:9092"
,
"broker03:9092"
]
topics
=
[
"TELEGRAF"
]
topic_tag
=
"kafka"
consumer_group
=
"telegrafgroup"
offset
=
"oldest"
max_message_len
=
1000000
data_format
=
"influx"
```
!각 파라미터 설명
*
brokers : Kafka의 전체 브로커 정보를 입력 한다.
*
topics : 소비할 주제(topic)를 설정 한다.
*
topic_tag : InfluxDB의 Tag Key를 생성 한다.
*
consumer_group : Consumer 그룹을 지정 한다.
*
offset : 소비할 Offset 기준을 지정한다. 가능 옵션 값은
`oldest`
와
**newest**
가 있다.
-
oldest : 예전의 데이터를 기준으로 한다. 즉, 브로커의 이슈 발생 시 누락된 데이터를 보증할 수 있다.
-
newest : 최신의 데이터를 기준으로 한다. 만약 브로커의 이슈 발생 시 데이터가 누락될 수 있다.
*
max_message_len : 메시지의 최대 사이즈를 지정 한다.
*
data_format : InfluxDB에 입수될 수 있는 데이터 포맷으로 지정 한다.
3.
InfluxDB 동시 입수를 위한 추가 설정
Telegraf Kafka Consumer의 경우 설정된 Output (InfluxDB 등)에 대한 입수가 동시에 가능하다. 단, 다음과 같이 설정을 진행해야 한다.
```
bash
# cat /etc/telegraf/telegraf.d/kafka_influxdb.conf
[[
outputs.influxdb]]
urls
=
[
"http://influxdb-main:8086"
]
database
=
"디비명"
skip_database_creation
=
true
retention_policy
=
""
write_consistency
=
"any"
timeout
=
"10s"
username
=
"유저명"
password
=
"패스워드"
[[
outputs.influxdb]]
urls
=
[
"http://influxdb-bak:8086"
]
database
=
"디비명"
skip_database_creation
=
true
retention_policy
=
""
write_consistency
=
"any"
timeout
=
"10s"
username
=
"유저명"
password
=
"패스워드"
```
4.
Telegraf 데몬을 재기동
```
bash
# systemctl restart telegraf
```
5.
Kafka Consuming 테스트
Kafka 브로커가 소비하는 과정은 Telegraf에서는 확인할 수 없고, 브로커에서 확인되어야 한다.
```
bash
[
root@broker01 ~]# /usr/local/kafka/bin/kafka-consumer-groups.sh
--bootstrap-server
localhost:9092
--group
telegrafgroup
--describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
telegrafgroup TELEGRAF 0 714435000 714435847 847 Telegraf-0376d924-7f44-4bf1-935c-f6250571ef3a /컨슈머IP Telegraf
```
위와 같이 주제(Topic)에 대해 소비(LAG)되고 있는 것을 확인할 수 있다.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment