ksqlDB 101, part 2. Kafka Streams에 대해

이 글은 ksqlDB(당시에는 KSQL이라는 명칭이었습니다)를 학습하기 위해 정리한 연재글입니다.

이 글의 순서는 아래와 같습니다.

목차

  1. part 1. 배경지식
  2. part 2. Kafka Streams에 대해
  3. part 3. 실전 예시: 오픈소스를 통해 살펴보는 실시간 보안 이벤트 탐지 룰

2. Kafka Streams details

상술하였던 아파치 카프카의 API에 대해 상세히 살펴봅시다.

Stream 이란?

아파치 카프카에서의 Stream이 무슨의미를 지니는지 살펴봅시다.

그렇다면 Kafka Stream은 무엇을 의미하나요?

Kafka Streams의 주요 용어

카프카 스트림의 주요 용어에 대해 살펴보겠습니다.

Kafka Streams의 도식. 상기 내용들이 표시되어 있습니다.

KStreamKTable이란?

KStreamKTable의 차이?

Kafka Streams의 Time의 개념

스트림 프로세싱의 핵심은 시간을 가지고 작업하는 개념입니다.

Stateful, Stateless in ksqlDB

Kafka Streams 앱 구현: Java

  1. config 변수를 만들어서 APPLICATION_ID_CONFIG, BOOTSTRAP_SERVERS_CONFIG을 설정합니다.
    • APPLICATION_ID_CONFIG: 새로 만들 필터링 앱의 이름 (unique)
    • BOOSTRAP_SERVERS_CONFIG: 데이터 스트림을 가져올 카프카의 주소
  2. new StreamsBuilder() 구문으로 빌더를 만들고(토폴로지 정의 빌더), KStream 변수타입을 사용해서 어느 토픽에 어떤 필터로 값을 가져올지 정합니다.
  3. 그 후 KafkaStreams 타입의 변수를 만들고 빌더, 설정값을 세팅합니다.
  4. kafka stream 설정을 입력합니다.
  5. KStream, KTableGlobalKTable을 정의합니다 (앞서 입력한 설정을 추가)
  6. KafkaStreams 객체를 선언하고 consume 을 통해 새로운 스트림 생성합니다.

Faust: Python의 스트림 프로세싱 라이브러리

import os

import faust


CONSUMER_NAME = "TEST_CLICK_CONSUMER_01"
KAFKA_BROKER = "임의의 브로커 주소"
SRC_TOPIC = "임의의 토픽주소"

# 앱 구동 전, 기본 설정값 세팅
app = faust.App(
    CONSUMER_NAME,
    broker=f"kafka://{KAFKA_BROKER}",
)

# 값을 가져오기위한 메인토픽
# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app.topic(
    SRC_TOPIC,
    key_type=str,
    value_type=int,
)

# 새로이 저장할 토픽
# default value for missing URL will be 0 with `default=int`
counts = app.Table(
    'click_counts',
    default=int,
)

@app.agent(src_topic)
async def count_click(clicks):
    async for url, count in clicks.items():
        counts[url] += count

도커 이미지를 아래와 같이 작성 후 구동하면 파이썬 코드로도 상기 Kafka Streams 처리가 가능합니다.

FROM python:3.9

COPY . /app
WORKDIR /app

RUN \
    pip install -r requirements.txt

RUN ["python", "stream.py"]

Faust 에 대한 상세한 설명은 여기를 살펴봐주세요.

Caveats

만일 파이썬 베이스 이미지를 통해 librdkafka 기반의 카프카 처리 라이브러리를 사용해야 한다면, 추가적인 의존성을 필요로 할 수 있습니다. 관련 내용은 이 링크를 참고해주세요.

마무리

이번 글을 통해, 아래 내용들을 살펴볼 수 있었습니다:

  1. Kafka Streams과 주요 용어 및 개념을 살펴보았습니다.
  2. Kafka Streams 앱 구현을 Java 및 Python 라이브러리를 통하여 어떻게 진행하는지 살펴보았습니다.

다음 파트에선 본격적으로 ksqlDB가 무엇인지, 그리고 이를 통해 어떤식으로 실시간 이벤트 처리를 수행할 수 있는지를 대표적인 예시로 살펴보겠습니다.

읽어주셔서 대단히 감사합니다.