EC2에 Kafka를 설치해보려고 하는데 관리하기 쉽도록 오픈 소스인 provectuslabs/kafka-ui를 사용하려고 한다.

  1. Docker-compose로 설치하는 방법
  2. Docker 없이 수행하는 방법

두가지로 진행해보려고 한다.

왜냐하면 Docker-compose로 실행을 했더니 외부에서 해당 카프카를 접속하는데 계속 실패를 했다... (개발 환경이었기 때문에 접근이 불가능했지만 배포 후에는 잘 동작한다)

그래서 도커 없이 Kafka를 실행시키는 방식으로 구현했지만 기록으로 남겨둔다.

 

1. Docker-compose로 설치

mkdir kafka 
cd kafka 
vi docker-compose.yml
version: '3.8'
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:5.5.1
    ports:
      - '32181:32181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000

  kafka-1:
    image: confluentinc/cp-kafka:5.5.1
    ports:
      - '9092:9092'
    depends_on:
      - zookeeper-1
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      KAFKA_NUM_PARTITIONS: 1

  kafka-ui:
      image: provectuslabs/kafka-ui
      container_name: kafka-ui
      ports:
        - "8989:8080"
      restart: always
      environment:
        - KAFKA_CLUSTERS_0_NAME=local
        - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-1:29092
        - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper-1:22181
sudo docker-compose up -d

 배포환경에서는 같은 네트워크를 잡아주고 해당 도커 네임(http://kafka1-1:9092)으로 레코드를 보내면 잘 전송이 되지만 개발 환경에서 해당 브로커로 데이터를 보내는데 실패했다. 일단, 개발환경이기 때문에 Docker없이 데이터를 전송하기로 결정하고 아래의 환경으로 개발했다.

 


2. Docker 없이 수행

sudo apt-get install wget

# kafka download
wget https://archive.apache.org/dist/kafka/3.2.1/kafka_2.13-3.2.1.tgz

tar -xzf kafka_2.13-3.2.1.tgz
cd kafka_2.13-3.2.1/

vi config/server.properties

 

config/server.properties 설정

1. 외부 IP 알아내기

curl ifconfig.me

 

2. 해당 IP로 config/server.properties 설정

#listeners=PLAINTEXT://:9092

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://{외부 IP}:9092  

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

 

3. Kafka 실행

# zookeeper 먼저 실행
./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties

# kafka server 실행
./bin/kafka-server-start.sh -daemon ./config/server.properties

Kafka 설치 끝!!

 


이제 Kafka를 관리하기 위한 UI를 설치해보려고 한다.

 

kafka-UI 설치

docker run -p 8989:8080 \\
-e KAFKA_CLUSTERS_0_NAME=local \\
-e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS={카프카가 동작하는 외부 IP}:9092 \\
-e KAFKA_CLUSTERS_0_ZOOKEEPER={주키퍼가 동작하는 외부 IP}:2183 \\
-d provectuslabs/kafka-ui:latest

Kafka UI로 접근

http://{ip or domain}:8989/

 

 

UI 수행되는지 확인 

혹시 접근이 안된다면 아래에 있는 방화벽 설정 확인!!

이렇게 토픽을 생성하거나 파티션을 관리하는 등 편리한 ui를 제공해준다.

 

 


이제 외부에서 해당 카프카로 데이터를 전송해보도록 하겠습니다.

 

방화벽 설정

일단 방화벽부터 설정해서 외부에서 해당 포트로 접근하도록 허가해줍니다.

sudo ufw enable

// kafka
sudo ufw allow 9092

// kafka ui (혹시 Kafka UI 접속이 안된다면 다음처럼 설정)
sudo ufw allow 8989

 

이제 EC2가 아닌 local 컴퓨터(내 컴퓨터)에서 카프카 프로듀서로 데이터를 전송해보도록 하겠습니다.

내 컴퓨터의 터미널에서

 

카프카 다운로드

wget https://www.apache.org/dyn/closer.cgi?path=/kafka/3.2.1/kafka_2.13-3.2.1.tgz
tar -xzf kafka_2.13-3.2.1.tgz

 

카프카 프로듀서(내 컴퓨터) -> 카프카 브로커(EC2)

cd kafka_2.13-3.2.1.tgz

// 토픽 리스트를 확인
bin/kafka-topics.sh --bootstrap-server {Kafka가 있는 EC2 외부 IP}:9092 --list

// 토픽에 데이터 전송하기
bin/kafka-console-producer.sh --bootstrap-server {Kafka가 있는 EC2 외부 IP}:9092 --topic {TOPIC NAME}

 

이제 UI를 통해서 데이터가 잘 받아져왔는지 확인할 수 있다!!!

 

 

 

 

 


참고

https://yooloo.tistory.com/98

https://velog.io/@jwpark06/AWS-EC2에-Kafka-설치하기#2-topic-생성

 

프로젝트를 진행하면서 HDFS에 데이터를 저장해야할 일이 생겼다.

컨슈머를 직접 개발해서 HDFS에 저장하는 방식도 있겠지만 Connector를 이용해서 저장해보기로 했다!!

 

 

[Kafka] Kafka Connector 정리

카프카 커넥트 (소스 커넥터 + 싱크 커넥터) : 컨슈머와 프로듀서를 통해서 데이터를 보내는 것이 아니라 작업의 형태를 템플릿으로 만들어 놓은 커넥터를 통해서 실행하여 반복작업을 줄인다.

gaebalrecord.tistory.com

 

confluent kafka 설치

 

1. Home(/root)에서 다운로드

curl -O <https://packages.confluent.io/archive/7.3/confluent-7.3.1.tar.gz>

 

2. 압축 해제

tar -zxvf confluent-community-7.3.1.tar.gz

 

3. zookeeper 실행

cd confluent-7.3.1 
./bin/zookeeper-server-start etc/kafka/zookeeper.properties

 

4. Kafka 실행

./bin/kafka-server-start ./etc/kafka/server.properties

 

5. 토픽 생성

./bin/kafka-topics --create --topic HADOOP --bootstrap-server localhost:9092

기본 설정 끝!! 이제 Connect를 실행시키기 위한 Connector 드라이버를 추가해야 한다

 


confluent hub 설치

 

1. Home에서 폴더 하나 생성

mkdir ConfluentHub

 

 

2. 해당 폴더에 confluent-client 다운

wget https://client.hub.confluent.io/confluent-hub-client-latest.tar.gz

 

3. 압축 해제

tar -xvf confluent-hub-client-latest.tar.gz

그러면 bin, etc, share 폴더가 생성이 된다!

 

4. 환경변수 설정

cd /etc vi bash.bashrc 

// ---- 내용 추가 ---- 
export CONFLUENT_HOME='/home/{userName}/ConfluentHub' 
export PATH=$PATH:$CONFLUENT_HOME/bin

// ---- 저장하고 나가기 ---

source bash.bashrc

 

5. connect 폴더 생성 : 이곳에 connector 라이브러리들이 추가가 될 예정

mkdir connect
cd connect

 

6. 설치

confluent-hub install confluentinc/kafka-connect-hdfs3:1.1.25 --component-dir /home/{userName}/connect --worker-configs /home/confluent-7.0.1/etc/kafka/connect-distributed.properties

다음과 같이 나왔다면 성공!!!!!!!!

 


Kafka Connect 실행

 

1. 커넥트 실행

./bin/connect-distributed ./etc/kafka/connect-distributed.properties

 

2. 실행된 커넥트 확인

1. 프로세스 확인

curl localhost:8083

2. connector 설치 확인확인했을 때 설치한 라이브러리가 목록에 있으면 성공

curl localhost:8083/connector-plugins

 

3. Avro를 이용해서 카프카에 데이터 전송

더보기
Avro (아브로)

json으로 날라올 데이터를 형식을 저장해둘 수 있다. 카프카의 토픽으로 들어온 데이터 형식을 지정해 둘 수 있는데 형식을 아브로로 지정하여 직렬화/역직렬화를 서로 약속해 둘 수 있어서 사용한다.


Avro는 데이터 직렬화 형식 중 하나로, 특히 카프카와 같은 분산 스트리밍 플랫폼에서 많이 사용됩니다. Avro를 사용하는 이유에는 여러 가지 장점이 있습니다.

  1. 스키마 관리: Avro는 데이터의 스키마를 정의할 수 있는 기능을 제공합니다. 스키마는 데이터의 구조와 타입을 정의하는데 사용되며, 스키마를 사용하면 데이터의 일관성을 보장할 수 있습니다. 스키마는 진화할 수 있어서 데이터 구조가 변경되어도 역호환성을 유지하면서 새로운 데이터를 처리할 수 있습니다.
  2. 압축: Avro는 데이터를 효율적으로 압축할 수 있는 기능을 제공합니다. 이는 데이터 전송 및 저장 공간을 절약할 수 있어 네트워크 대역폭 및 디스크 공간을 절감할 수 있습니다.
  3. 직렬화 및 역직렬화 성능: Avro는 빠른 직렬화 및 역직렬화 성능을 제공합니다. 이는 데이터의 효율적인 전송 및 처리를 가능하게 합니다.
  4. 다양한 프로그래밍 언어 지원: Avro 스키마는 여러 프로그래밍 언어로 생성하고 읽을 수 있으며, 다양한 언어에서 Avro 데이터를 처리할 수 있습니다.
  5. 스키마 호환성: Avro는 스키마 진화를 지원하며, 스키마의 업데이트가 기존 데이터와 호환성을 유지하도록 도와줍니다. 이는 시스템을 업그레이드하거나 변경할 때 데이터 호환성을 유지하는 데 도움이 됩니다.

그러나 JSON과 Avro를 사용할 때 각각의 상황에 맞게 선택해야 합니다. JSON은 인간이 읽기 쉬우며 다양한 응용 프로그램에서 사용할 수 있습니다. Avro는 대규모 데이터 스트리밍 시스템에서 더 효율적으로 동작하도록 최적화되어 있습니다. 따라서 프로젝트의 요구 사항과 성능 요구 사항을 고려하여 데이터 형식을 선택해야 합니다.

 

- Avro 스키마 형식 예시 (JSON, 확장자 .avsc)

{"namespace": "example.avro", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic HADOOP \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"}, {"name":"address","type":"string"}, {"name" : "age", "type" : "int"}, {"name" : "is_customer", "type" : "boolean"}]}'

// --- 한줄씩 붙여 넣기 
{"name":"Peter", "address":"Mountain View", "age":27, "is_customer":true}
{"name":"David", "address":"Mountain View", "age":37, "is_customer":false}
{"name":"Kat", "address":"Palo Alto", "age":30, "is_customer":true}
{"name":"David", "address":"San Francisco", "age":35, "is_customer":false}
{"name":"Leslie", "address":"San Jose", "age":26, "is_customer":true}
{"name":"Dani", "address":"Seatle", "age":32, "is_customer":false}
{"name":"Kim", "address":"San Jose", "age":30, "is_customer":true}
{"name":"Steph", "address":"Seatle", "age":31, "is_customer":false}

 

4. 커넥터 생성

curl -X POST \
	http://localhost:8083/connectors \
	-H 'Content-Type: application/json' \
 -d '{
    "name": "hdfs3-parquet-field",
    "config": {
        "connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
        "tasks.max": "1",
        "topics": "HADOOP",
        "hdfs.url": "hdfs://localhost:9000",
        "flush.size": "3",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url":"http://localhost:8081",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1",

				"format.class":"io.confluent.connect.hdfs3.parquet.ParquetFormat",
        "partitioner.class":"io.confluent.connect.storage.partitioner.FieldPartitioner",
        "partition.field.name":"is_customer"
    }
}'

 

5. 커넥터가 제대로 생성이 되었는지 확인

curl -X GET "http://localhost:8083/connectors/"
curl -X GET "http://localhost:8083/connectors/hdfs3-parquet-field/status"
curl -X GET "http://localhost:8083/connectors/hdfs3-parquet-field/topics"

 


HDFS에 저장되었는지 확인

hadoop fs -ls /topics/HADOOP/is_customer=true

 

 

 


 

회고

 

Apache Kafka를 이용하면서 etc에서 경로를 설정해서 따로 라이브러리를 받아 추가하는 방법이 있지만 경로를 잘 찾지 못해서 해결 못함 → confluent hub로 다운받을 경우 경로를 잘 찾음;;

 

 

참고

 

Kafka Connect로 데이터 파이프라인 구축

 

guide-gov.ncloud-docs.com

 

 

HDFS 3 Sink Connector for Confluent Platform | Confluent Documentation

Note The first few settings are common settings you’ll specify for all connectors. The topics parameter specifies the topics to export data from. In this case, test_hdfs. hdfs.url specifies the HDFS having data written to it. You should set this accordin

docs.confluent.io

 

카프카 커넥트 (소스 커넥터 + 싱크 커넥터)

: 컨슈머와 프로듀서를 통해서 데이터를 보내는 것이 아니라 작업의 형태를 템플릿으로 만들어 놓은 커넥터를 통해서 실행하여 반복작업을 줄인다.

  • 소스 커넥터 : Producer 역할
  • 싱크 커넥터 : Consumer 역할

내가 직접 구현하는 것이 아닌 만들어 놓은 템플릿을 활용하여 사용할 수 있다!!!

 

- 커넥트: 커넥터를 실행시키기 위한 프로세스 단위
- 커넥터: 데이터 처리를 실제로 구현한 jar파일

 

장점

  • 여러 쓰레드를 두어서 병렬처리를 할 수 있다
  • REST API를 통해서 실행시킬 수 있다.
  • JMX 데이터 수집 도구를 통해서 한번에 모니터링이 가능하다.

 

커넥터

  • 소스 커넥터
    • 프로듀서 역할(데이터를 넣음)
  • 싱크 커넥터
    • 컨슈머 역할(데이터를 사용함)
  • 커넥터 플러그인
    • 여러 플러그인을 추가할 수 있다.
    • MySQL, MongoDB 등등
    • jar 파일
  • 만약 직접 개발한다면 Connector, Task 개발 필요 (100개 넘는 오픈소스 존재)
  • 컨플루언트 허브를 통해서 오픈소스 커넥트를 가져올 수 있다. (라이선스 확인, 사용 범위가 다르다)

 

  •  

커넥트 실행 방법

  • 단일 모드
    • 고가용성 문제 → 단일 장애점(SPOF: Single Point Of Failure)
  • 분산 모드
    • 2 ~ 100대를 사용하여 클러스터로 묶임
      • 스레드 단위로 실행
      • scale-out
      • 데이터 처리량의 변화에도 유연하게 대처할 수 있다.
    • 커넥트가 실행되는 서버 개수를 늘림으로써 무중단으로 스케일 아웃하면서 처리량을 늘릴 수 있다.
    • REST API : 어떤 커넥터를 실행시켜 달라고 요청 → 신규로 커넥트를 새로운 스레드를 생성하게 된다.
GET / 실행 중인 커넥트 정보 확인
GET /connectors 실행 중인 커넥터 이름 확인
POST /connectors 새로운 커넥터 생성 요청
GET /connectors/{커넥터 이름} 실행 중인 커넥터 정보 확인
GET /connectors/{커넥터 이름}/config 실행 중인 커넥터의 설정값 확인
PUT /connectors/{커넥터 이름}/config 실행 중인 커넥터 설정값 변경 요청
GET /connectors/{커넥터 이름}/status 실행 중인 커넥터 상태 확인
POST /connectors/{커넥터 이름}/restart 실행 중인 커넥터 재시작 요청

 

분산모드 커넥트

  • 테스크를 나누어 분산 처리한다.
  • 단일 모드와는 다르게 커넥트가 여러개이기 때문에 하나의 장애가 발생하더라도 다른 커넥트를 이용해서 데이터를 처리할 수 있다.
  • REST API를 이용하여 쉽게 터넥트를 생성 → 저장 → 삭제가 가능하기 때문에 갑자기 많아지는 데이터에 대해서도 유연하게 대처할 수 있다.
  • 보통 이런 관리를 위해 웹페이지(또는 오픈 소스)를 생성하여 관리하는 것을 추천!!

 

사용하는 이유

  1. 높은 처리량으로 실시간 처리
  2. 임의의 타이밍에 데이터 읽기
  3. 다양한 제품과 시스템에 쉽게 연동
  4. 메시지를 잃지 않음

⇒ 데이터의 이동이 서비스나 모니터링, 데이터베이스 등 다양하게 이동하게 되는데 거기에 따른 많은 파이프라인 구축이 필요하게 된다.

이것을 한곳으로 모아서 데이터가 이동하게 해서 확장성과 안정성을 높이자!! ⇒ 카프카

 

💡 브로커가 있는 이유
- 접속처를 하나로 할 수 있다.
- 증감에 대응할 수 있다.

 

Kafka Cluster

  • 카프카 브로커 여러개가 모여서 카프카 클러스터를 구성하게 된다.
  • 브로커를 여러개 두는 이유는 ISR(In Sync Replica)가 가능하기 때문
    • ISR

위와 같이 브로커를 여러개를 두게 되면 토픽에 대해서 복제가 가능하게 된다. 이럴 경우 하나의 브로커가 갑자기 일을 중단하게 되더라도 토픽 내부에 있는 레코드를 잃을 걱정을 하지 않아도 되기 때문에 클러스터 환경을 구성하게 된다.

 

 

Kafka Broker

  • 카프카 서버라고 생각하면 된다
  • Topic
    • 같은 토픽에 대해서는 같은 곳으로 모을 수 있다.
  • Partition
    • 키에 대해서 같은 파티션으로 이동하게 된다.
      • 만약 순서가 중요한 데이터일 경우에는 키를 적절히 정하여 같은 파티션으로 들어가게 하는 것이 필요함
    • 키가 없을 경우에는 해시로 적절히 배분된다.
    • 컨슈머의 개수 <= 파티션의 개수
      • 같은 것이 가장 이상적 

Kafka Producer

  • 레코드를 생성하는 어플리케이션

Kafka Consumer

  • 레코드를 받는 어플리케이션
  • 그룹(Kafka Consumer Group)으로 운영 가능
    • 하나의 그룹은 MySQL로 수신, 다른 그룹은 MongoDB로 수신하는 등 같은 레코드에 대해서도 다르게 처리할 수 있다.
    • rebalance 지원: 장애에 대응하기 편리하다

Kafka Connect

  • 작업의 형태를 하나의 템플릿으로 만들어 스레드나 프로세스로 병렬처리가 가능하도록 한다.
  • REST API로 통신하며 해당 통신에 따라 새로 생성하고 삭제하며 관리할 수 있다.
  • 오픈소스로 다양하게 제공된다(Mysql, HDFS 등)
  • 다음과 같이 동기화하는데에도 사용할 수 있다.

+ Recent posts