포스트

웹 클릭스트림 수집하기 :) 주간 회고 & 로컬 테스트

카프카 클러스터 띄우고 블로그 클릭스트림을 ingest api로 보내기까지

웹 클릭스트림 수집하기 :) 주간 회고 & 로컬 테스트

작심삼일 이랬던가

혼자서 삽질하다 보면 사기가 조금 떨어집니다. 그치만 작심을 삼일마다 반복하면서 매일 조금씩 나아가는 마음으로 노력해야겠습니다. 오늘은 금주의 진행상황 전체를 로컬에서 테스트해보려고 합니다.


금주 회고

우선 지금까지 한 내용을 정리해보자면 다음과 같습니다.

그럼 지금까지의 내용을 직접 실행시키기 위해서

  1. 카프카 클러스터를 띄우고 토픽을 만든다.
  2. 프로듀서 역할을 할 fastapi을 실행한다.
  3. 프론트를 띄워서 블로그를 접속한다.
  4. New 카프카 컨슈머가 데이터를 저장하도록 한다.

이와 같은 단계별 결과를 공유하며 한 주를 정리하려고 합니다.

전체 구조를 요약하자면 다음과 같습니다.

flowchart TB
  subgraph "Local Host"
    A["Jekyll Frontend<br>Click Event (JS)"] -->|"POST /ingest"| B["FastAPI Ingest API"]
    B -->|"produce to Kafka"| C["Kafka Broker (Docker)"]
    C --> D["Kafka Topic: clickstream"]
    D --> E["Kafka Consumer<br>consumer_store.py"]
    E --> F["output/clickstream_output.jsonl"]
  end

로컬 작업물 정리하고 버전 관리(git)

그러기에 앞서 로컬에서 작성한 코드들의 버전을 관리할 필요가 있을 것 같아 github repo를 만들고 업로드 했습니다.


실행하기

자, 지금부터는 순서대로 실행하는 코드-결과 입니다.

1. 카프카 클러스터를 띄우고 토픽을 만든다.

1
2
3
4
5
6
7
8
9
10
11
$ docker-compose up -d
[+] Running 3/3
 ⠿ Network kafka-ingest_default        Crea...                                0.1s
 ⠿ Container kafka-ingest-zookeeper-1  Started                                0.4s
 ⠿ Container kafka-ingest-kafka-1      St...                                  0.6s
$ docker ps
CONTAINER ID   IMAGE                             COMMAND                  CREATED         STATUS         PORTS                          NAMES
554e951918e2   confluentinc/cp-kafka:7.4.1       "/etc/confluent/dock…"   6 seconds ago   Up 5 seconds   0.0.0.0:9092->9092/tcp         kafka-ingest-kafka-1
b6e269a6dc23   confluentinc/cp-zookeeper:7.4.1   "/etc/confluent/dock…"   6 seconds ago   Up 5 seconds   2181/tcp, 2888/tcp, 3888/tcp   kafka-ingest-zookeeper-1
$ docker exec -it kafka-ingest-kafka-1 kafka-topics --create --topic clickstream --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
Created topic clickstream.

2. 프로듀서 역할을 할 fastapi을 실행한다.

1
2
3
$ source .venv/bin/activate
$ nohup uvicorn ingest_kafka:app --reload --port 8000 > ingest_kafka.log &
[1] 81854
  • 백그라운드 실행
  • log 파일로 과정 기록

3. 프론트를 띄워서 블로그를 접속한다.

1
2
3
# annmunju.github.io 폴더로 이동
$ nohup bundle exec jekyll serve > front.log &
[1] 86312
  • 동일하게 log 파일로 과정 기록
  • 프론트에서 ingest api로 요청을 보내고 있는지 확인
프론트 테스트 결과1 프론트 테스트 결과2
  • ingest api 로그 확인
    1
    2
    3
    4
    5
    
    ...
    INFO:ingest:Produced to clickstream: {'event': 'time_on_page', 'timestamp': '2025-05-23T08:01:07.007Z', 'path': '/posts/%EB%A1%9C%EC%BB%AC-%ED%85%8C%EC%8A%A4%ED%8A%B8/', 'referrer': 'http://127.0.0.1:4000/', 'duration': 19882}
    INFO:     127.0.0.1:49252 - "POST /ingest HTTP/1.1" 200 OK
    INFO:ingest:Produced to clickstream: {'event': 'session_end', 'timestamp': '2025-05-23T08:01:07.007Z', 'path': '/posts/%EB%A1%9C%EC%BB%AC-%ED%85%8C%EC%8A%A4%ED%8A%B8/', 'referrer': 'http://127.0.0.1:4000/'}
    INFO:     127.0.0.1:49254 - "POST /ingest HTTP/1.1" 200 OK
    

4. New 카프카 컨슈머가 데이터를 저장하도록 한다.

컨슈머 코드를 미리 만들어두지 않아서 일단 로컬에 저장하는 형태로 코드를 구현해 실행해보았습니다. 커밋 내역

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# consumer_store.py
import json
import os
from pathlib import Path
from confluent_kafka import Consumer, KafkaError

# Kafka 설정
KAFKA_BOOTSTRAP = os.getenv("BOOTSTRAP_SERVERS", "localhost:9092")
KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "clickstream")

# 출력 경로 설정
OUTPUT_DIR = Path("output")
OUTPUT_DIR.mkdir(exist_ok=True)
OUTPUT_FILE = OUTPUT_DIR / "clickstream_output.jsonl"

# Kafka Consumer 설정
consumer_conf = {
    "bootstrap.servers": KAFKA_BOOTSTRAP,
    "group.id": "clickstream-consumer-group",
    "auto.offset.reset": "earliest",
}

consumer = Consumer(consumer_conf)
consumer.subscribe([KAFKA_TOPIC])

print(f"Consuming from topic '{KAFKA_TOPIC}'...")

try:
    with open(OUTPUT_FILE, "a", encoding="utf-8") as f:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() != KafkaError._PARTITION_EOF:
                    print("Error:", msg.error())
                continue

            payload = msg.value().decode("utf-8")
            print("Received:", payload)
            f.write(payload + "\n")

except KeyboardInterrupt:
    print("Stopped by user.")

finally:
    consumer.close()
  • output/ 디렉토리에 로그를 저장
  • clickstream_output.jsonl 파일에 Kafka 메시지를 한 줄씩 저장
  • 메시지를 계속 실시간으로 소비하며 콘솔 출력과 저장 수행

저장된 로그 결과는 다음과 같았습니다.

1
2
3
4
$ tail -3 output/clickstream_output.jsonl
{"event": "scroll_event", "timestamp": "2025-05-23T08:22:31.045Z", "path": "/posts/%EC%B9%B4%ED%94%84%EC%B9%B4-%EA%B0%9C%EB%85%90/", "referrer": "http://127.0.0.1:4000/", "scroll_percentage": 0}
{"event": "scroll_event", "timestamp": "2025-05-23T08:22:32.278Z", "path": "/posts/%EC%B9%B4%ED%94%84%EC%B9%B4-%EA%B0%9C%EB%85%90/", "referrer": "http://127.0.0.1:4000/", "scroll_percentage": 4}
{"event": "scroll_event", "timestamp": "2025-05-23T08:22:33.571Z", "path": "/posts/%EC%B9%B4%ED%94%84%EC%B9%B4-%EA%B0%9C%EB%85%90/", "referrer": "http://127.0.0.1:4000/", "scroll_percentage": 5}

[번외] 리소스 정리하기

1
2
3
4
5
6
# 카프카 종료
docker-compose down
# ingest api 종료
$ kill -9 $(ps -fu ahnmunju | grep uvicorn | awk '{print $2}')
# 프론트 종료
$ pkill -f jekyll

최종 결과

최종 결과

  • 왼쪽은 컨슈머 동작시에 나타나는 화면입니다.
  • 오른쪽 블로그에 접속해서 보거나, 클릭하거나, 스크롤 하면 이에 대한 로그 이벤트가 발생합니다.
  • 로그 이벤트는 왼쪽 컨슈머에 출력됩니다.

회고

이번 주는 “내가 직접 로그 파이프라인을 만들 수 있다”는 자신감을 얻은 한 주였습니다. 혼자였지만 매일 정리하고, 실패하더라도 되짚어가며 결국 Kafka에 로그를 보내고 소비하는 전 과정을 완성했습니다. 다음 주에는 로그를 클라우드 환경에서 저장하고 쌓인 데이터를 분석 / 시각화 하는 전체 과정을 큰 그림으로 그리고 각각을 완성해나갈 계획입니다. 끝!

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.