Data & Analytics

RAG 시스템의 진짜 병목: 벡터 DB가 아니라 원본 데이터의 1:N 관계입니다

RAG 정확도 문제를 벡터 DB 튜닝으로 해결하려는 팀이 많습니다. 하지만 실제 병목은 원본 데이터의 관계형 구조를 무시한 Chunking에서 발생합니다.

RAG 시스템의 진짜 병목: 벡터 DB가 아니라 원본 데이터의 1:N 관계입니다

RAG 시스템의 정확도 문제를 벡터 DB 튜닝으로 해결하려는 팀이 많습니다. 하지만 실제 병목은 원본 데이터의 관계형 구조를 무시한 Chunking에서 발생합니다. 고객-주문-상품의 1:N:N 관계를 flat하게 임베딩하면, 아무리 좋은 벡터 DB를 써도 hallucination은 피할 수 없습니다.

이 글에서는 SQL 관계형 데이터를 RAG 시스템에 올바르게 통합하는 방법을 다룹니다.

1. 왜 벡터 DB만으로는 부족한가

현실에서 마주치는 문제

RAG 시스템을 구축하면서 이런 질문을 받아본 적 있을 것입니다:

"고객 A가 최근 3개월간 주문한 상품 중 반품률이 가장 높은 카테고리는?"

이 질문에 정확히 답하려면 다음 데이터가 동시에 필요합니다:

  • 고객 정보 (customer_id, name, segment)
  • 주문 정보 (order_id, order_date, customer_id)
  • 주문 상세 (order_id, product_id, quantity, return_status)
  • 상품 정보 (product_id, category, name)

문제는 대부분의 RAG 시스템이 이 데이터를 각각 별도의 chunk로 임베딩한다는 것입니다.

python
# 흔히 보는 잘못된 접근
chunks = [
    "고객 A는 VIP 등급이며 서울에 거주합니다.",
    "주문 #1234는 2024-01-15에 생성되었습니다.",
    "상품 X는 전자제품 카테고리입니다.",
]

# 각 chunk를 독립적으로 임베딩
for chunk in chunks:
    vector = embed(chunk)
    vector_db.insert(vector)

이렇게 하면 벡터 검색 시 "고객 A의 주문"이라는 관계를 찾을 수 없습니다.

벡터 유사도의 한계

벡터 검색은 의미적 유사성을 기반으로 합니다. 하지만 관계형 데이터에서 중요한 것은 구조적 연결입니다.

python
# 벡터 유사도 예시
query = "고객 A의 주문"

# 의미적으로 유사한 결과 (잘못된 결과)
results = [
    "고객 B의 주문 내역입니다.",  # '주문' 키워드 유사
    "A사의 대량 주문 처리 방법",   # 'A'와 '주문' 유사
    "고객 만족도 조사 결과",       # '고객' 유사
]

# 실제로 필요한 결과
expected = [
    "고객 A (ID: 123)의 주문 #456, #789",
    "주문 #456: 상품 X, Y (총 150,000원)",
]

벡터 DB는 "고객"과 "주문"이라는 단어의 의미는 이해하지만, customer_id = 123인 레코드와 연결된 order_id들이라는 관계는 이해하지 못합니다.

근본적인 원인

데이터의 두 가지 속성

1. 의미적 속성 (Semantic)

- "이 텍스트가 무슨 내용인가?"

- 벡터 임베딩으로 캡처 가능

- 유사도 검색에 적합

2. 구조적 속성 (Structural)

- "이 데이터가 어떤 데이터와 연결되는가?"

- Foreign Key, JOIN으로 표현

- 벡터로는 캡처 불가능

RAG 시스템의 정확도 문제는 구조적 속성을 무시하고 의미적 속성만 인덱싱했기 때문에 발생합니다.

2. 관계형 데이터의 본질: 1:N, N:M 이해하기

E-Commerce 도메인의 관계 구조

실제 비즈니스 데이터는 복잡한 관계를 가진다. E-Commerce를 예로 들면:

sql
-- 고객 테이블 (1의 측면)
CREATE TABLE customers (
    customer_id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(255),
    segment VARCHAR(50),  -- VIP, Regular, New
    created_at TIMESTAMP
);

-- 주문 테이블 (N의 측면, 고객과 1:N)
CREATE TABLE orders (
    order_id SERIAL PRIMARY KEY,
    customer_id INTEGER REFERENCES customers(customer_id),
    order_date TIMESTAMP,
    total_amount DECIMAL(10, 2),
    status VARCHAR(50)  -- pending, completed, cancelled
);

-- 주문 상세 테이블 (주문과 1:N, 상품과 N:1)
CREATE TABLE order_items (
    item_id SERIAL PRIMARY KEY,
    order_id INTEGER REFERENCES orders(order_id),
    product_id INTEGER REFERENCES products(product_id),
    quantity INTEGER,
    unit_price DECIMAL(10, 2),
    return_status VARCHAR(50)  -- none, requested, completed
);

-- 상품 테이블
CREATE TABLE products (
    product_id SERIAL PRIMARY KEY,
    name VARCHAR(255),
    category VARCHAR(100),
    description TEXT,
    price DECIMAL(10, 2)
);

관계의 방향성과 카디널리티

관계 유형별 특성

1:N (One-to-Many)

고객 → 주문: 한 고객이 여러 주문 가능

주문 → 주문상세: 한 주문에 여러 상품 포함

Chunk 전략: Parent-Child 패턴 적용

N:M (Many-to-Many)

상품 ↔ 태그: 상품은 여러 태그, 태그는 여러 상품

고객 ↔ 상품(위시리스트): 다대다 관계

Chunk 전략: 중간 테이블 기준 그룹핑

1:1 (One-to-One)

고객 → 고객상세: 확장 정보

Chunk 전략: 단일 문서로 병합

관계를 무시하면 생기는 정보 손실

python
# 원본 데이터의 관계
customer_123 = {
    "id": 123,
    "name": "김철수",
    "orders": [
        {
            "order_id": 456,
            "items": [
                {"product": "노트북", "qty": 1, "returned": False},
                {"product": "마우스", "qty": 2, "returned": True},
            ]
        },
        {
            "order_id": 789,
            "items": [
                {"product": "키보드", "qty": 1, "returned": True},
            ]
        }
    ]
}

# 잘못된 Chunking (관계 손실)
flat_chunks = [
    "김철수 고객 정보",
    "주문 456 정보",
    "주문 789 정보",
    "노트북 상품 정보",
    "마우스 상품 정보",
    "키보드 상품 정보",
]
# 문제: "김철수가 반품한 상품은?" 질문에 답할 수 없음

# 올바른 Chunking (관계 보존)
relational_chunk = """
고객: 김철수 (ID: 123)
- 주문 #456 (2024-01-15)
  - 노트북 x1 (반품: 없음)
  - 마우스 x2 (반품: 완료)
- 주문 #789 (2024-02-20)
  - 키보드 x1 (반품: 완료)
반품 요약: 마우스, 키보드 (총 2건)
"""

3. AS-IS: 잘못된 Chunking이 만드는 Hallucination

전형적인 실수 패턴

패턴 1: 테이블별 독립 Chunking

python
# AS-IS: 테이블별로 따로 임베딩
def wrong_chunking_by_table():
    # 고객 테이블 chunk
    customers = db.query("SELECT * FROM customers")
    for customer in customers:
        chunk = f"고객 {customer.name}은 {customer.segment} 등급입니다."
        embed_and_store(chunk)

    # 주문 테이블 chunk (별도)
    orders = db.query("SELECT * FROM orders")
    for order in orders:
        chunk = f"주문 #{order.order_id}는 {order.order_date}에 생성되었습니다."
        embed_and_store(chunk)

    # 상품 테이블 chunk (별도)
    products = db.query("SELECT * FROM products")
    for product in products:
        chunk = f"{product.name}은 {product.category} 카테고리 상품입니다."
        embed_and_store(chunk)

왜 문제인가요?

python
# 사용자 질문
query = "VIP 고객이 가장 많이 구매한 카테고리는?"

# 검색 결과 (관계 없이 유사도만으로 검색)
results = [
    "고객 A는 VIP 등급입니다.",           # VIP 매칭
    "고객 B는 VIP 등급입니다.",           # VIP 매칭
    "전자제품은 인기 카테고리입니다.",      # 카테고리 매칭
    "의류 카테고리 상품 목록",             # 카테고리 매칭
]

# LLM이 받는 context에는 "누가 무엇을 샀는지" 정보가 없음
# → Hallucination 발생: "VIP 고객은 전자제품을 많이 구매합니다" (근거 없음)

패턴 2: JOIN 결과를 무분별하게 Flatten

python
# AS-IS: JOIN 결과를 그대로 chunk로
def wrong_flattening():
    query = """
    SELECT c.name, o.order_id, p.name as product_name
    FROM customers c
    JOIN orders o ON c.customer_id = o.customer_id
    JOIN order_items oi ON o.order_id = oi.order_id
    JOIN products p ON oi.product_id = p.product_id
    """

    results = db.query(query)
    for row in results:
        # 각 row를 독립적인 chunk로 처리
        chunk = f"{row.name}이 주문 #{row.order_id}에서 {row.product_name}을 구매"
        embed_and_store(chunk)

왜 문제인가요?

python
# 원본 데이터
# 고객 A의 주문 #1: 상품 X, Y, Z (3개 아이템)
#
# Flatten 후 chunks:
# - "고객 A가 주문 #1에서 상품 X를 구매"
# - "고객 A가 주문 #1에서 상품 Y를 구매"
# - "고객 A가 주문 #1에서 상품 Z를 구매"

# 문제점:
# 1. 동일 주문이 3개의 chunk로 분리됨
# 2. "고객 A의 주문 #1에 포함된 전체 상품"을 한번에 못 가져옴
# 3. 검색 시 일부 상품만 retrieve되어 불완전한 답변

패턴 3: Context Window 최적화라는 함정

python
# AS-IS: 토큰 수 줄이려고 요약
def wrong_summarization():
    customer_data = get_full_customer_profile(customer_id=123)

    # 원본: 주문 50건, 상품 200개 상세 정보
    # "토큰 절약"을 위해 요약
    summary = f"""
    고객 123 요약:
    - 총 주문: 50건
    - 총 구매액: 5,000,000원
    - 주요 카테고리: 전자제품
    """
    embed_and_store(summary)

왜 문제인가요?

python
# 사용자 질문
query = "고객 123이 2024년 1월에 산 상품 중 아직 배송 안 된 건?"

# 요약본에는 개별 주문/배송 상태가 없음
# → "정보를 찾을 수 없습니다" 또는 hallucination

Hallucination 발생 메커니즘

python
def demonstrate_hallucination():
    """
    Hallucination이 발생하는 구체적 과정
    """

    # Step 1: 잘못된 chunking으로 저장된 데이터
    stored_chunks = [
        {"id": 1, "text": "고객 A는 VIP입니다", "vector": [0.1, 0.2, ...]},
        {"id": 2, "text": "주문 #100은 2024-01-15에 생성", "vector": [0.3, 0.1, ...]},
        {"id": 3, "text": "상품 X는 전자제품", "vector": [0.2, 0.4, ...]},
        # 관계 정보 없음: 고객 A가 주문 #100을 했는지? 주문 #100에 상품 X가 있는지?
    ]

    # Step 2: 사용자 질문
    question = "고객 A가 주문한 전자제품은?"

    # Step 3: 벡터 검색 (의미적 유사도)
    retrieved = [
        "고객 A는 VIP입니다",           # "고객 A" 매칭
        "상품 X는 전자제품",            # "전자제품" 매칭
    ]

    # Step 4: LLM에게 전달
    prompt = f"""
    Context: {retrieved}
    Question: {question}
    """

    # Step 5: LLM의 추론 (hallucination)
    llm_response = "고객 A가 주문한 전자제품은 상품 X입니다."

    # 실제로는 고객 A와 상품 X 사이에 연결 관계가 있는지 알 수 없음!
    # LLM은 context에 두 정보가 함께 있으니 연결된 것으로 "추측"

4. TO-BE: 관계를 보존하는 Chunking 전략

핵심 원칙

관계 보존 Chunking 3대 원칙

1. 관계의 "1" 측을 기준으로 그룹핑

- 고객 기준: 고객 + 해당 고객의 모든 주문

- 주문 기준: 주문 + 해당 주문의 모든 아이템

2. Chunk 내에 관계 체인 완전 포함

- 고객 → 주문 → 상품까지 한 chunk에

- FK 참조를 텍스트로 명시

3. Metadata에 관계 식별자 포함

- customer_id, order_id를 metadata로 저장

- 검색 후 SQL로 추가 정보 조회 가능

TO-BE: 관계 보존 Chunking 구현

python
from typing import List, Dict, Any
from dataclasses import dataclass
import json

@dataclass
class RelationalChunk:
    """관계 정보를 보존하는 Chunk 구조"""
    text: str                          # 임베딩할 텍스트
    metadata: Dict[str, Any]           # 관계 식별자들
    parent_id: str = None              # Parent-Child 패턴용
    chunk_type: str = "standalone"     # standalone, parent, child

def create_customer_centric_chunks(db_connection) -> List[RelationalChunk]:
    """
    TO-BE: 고객 중심으로 관계를 보존하는 Chunking

    핵심: 고객 한 명의 전체 컨텍스트를 하나의 Chunk 그룹으로
    """

    query = """
    WITH customer_orders AS (
        SELECT
            c.customer_id,
            c.name,
            c.email,
            c.segment,
            json_agg(
                json_build_object(
                    'order_id', o.order_id,
                    'order_date', o.order_date,
                    'total_amount', o.total_amount,
                    'status', o.status,
                    'items', (
                        SELECT json_agg(
                            json_build_object(
                                'product_id', p.product_id,
                                'product_name', p.name,
                                'category', p.category,
                                'quantity', oi.quantity,
                                'unit_price', oi.unit_price,
                                'return_status', oi.return_status
                            )
                        )
                        FROM order_items oi
                        JOIN products p ON oi.product_id = p.product_id
                        WHERE oi.order_id = o.order_id
                    )
                ) ORDER BY o.order_date DESC
            ) as orders
        FROM customers c
        LEFT JOIN orders o ON c.customer_id = o.customer_id
        GROUP BY c.customer_id, c.name, c.email, c.segment
    )
    SELECT * FROM customer_orders
    """

    chunks = []
    results = db_connection.execute(query)

    for row in results:
        # 고객 전체 프로필을 하나의 chunk로
        chunk_text = format_customer_profile(row)

        chunk = RelationalChunk(
            text=chunk_text,
            metadata={
                "customer_id": row['customer_id'],
                "customer_name": row['name'],
                "segment": row['segment'],
                "order_ids": [o['order_id'] for o in row['orders'] if o],
                "categories": extract_categories(row['orders']),
                "total_orders": len([o for o in row['orders'] if o]),
                "has_returns": has_any_returns(row['orders']),
            },
            chunk_type="customer_profile"
        )
        chunks.append(chunk)

    return chunks

def format_customer_profile(row: Dict) -> str:
    """
    고객 프로필을 검색 가능한 텍스트로 포맷팅
    관계 정보를 자연어로 명시적 표현
    """

    lines = [
        f"## 고객 프로필: {row['name']}",
        f"고객 ID: {row['customer_id']}",
        f"이메일: {row['email']}",
        f"등급: {row['segment']}",
        "",
        "### 주문 이력",
    ]

    orders = row.get('orders', [])
    if not orders or orders[0] is None:
        lines.append("주문 내역 없음")
    else:
        for order in orders:
            lines.append(f"\n#### 주문 #{order['order_id']} ({order['order_date']})")
            lines.append(f"상태: {order['status']} | 총액: {order['total_amount']:,}원")

            items = order.get('items', [])
            if items:
                lines.append("포함 상품:")
                for item in items:
                    return_info = f" [반품: {item['return_status']}]" if item['return_status'] != 'none' else ""
                    lines.append(
                        f"  - {item['product_name']} ({item['category']}) "
                        f"x{item['quantity']} @ {item['unit_price']:,}원{return_info}"
                    )

    # 요약 통계 (검색 키워드로 활용)
    lines.extend([
        "",
        "### 요약",
        f"총 주문 수: {len([o for o in orders if o])}건",
        f"구매 카테고리: {', '.join(extract_categories(orders))}",
        f"반품 이력: {'있음' if has_any_returns(orders) else '없음'}",
    ])

    return "\n".join(lines)

def extract_categories(orders: List[Dict]) -> List[str]:
    """주문에서 고유 카테고리 추출"""
    categories = set()
    for order in orders:
        if order and order.get('items'):
            for item in order['items']:
                if item.get('category'):
                    categories.add(item['category'])
    return list(categories)

def has_any_returns(orders: List[Dict]) -> bool:
    """반품 이력 존재 여부"""
    for order in orders:
        if order and order.get('items'):
            for item in order['items']:
                if item.get('return_status') not in (None, 'none'):
                    return True
    return False

관계 보존 Chunk의 실제 모습

markdown
## 고객 프로필: 김철수
고객 ID: 123
이메일: kim@example.com
등급: VIP

### 주문 이력

#### 주문 #456 (2024-01-15)
상태: completed | 총액: 1,500,000원
포함 상품:
  - MacBook Pro (전자제품) x1 @ 1,200,000원
  - Magic Mouse (전자제품) x2 @ 150,000원 [반품: completed]

#### 주문 #789 (2024-02-20)
상태: completed | 총액: 89,000원
포함 상품:
  - 기계식 키보드 (전자제품) x1 @ 89,000원 [반품: completed]

### 요약
총 주문 수: 2건
구매 카테고리: 전자제품
반품 이력: 있음

이제 "김철수가 반품한 상품은?"이라는 질문에 정확히 답할 수 있습니다.

5. Parent-Child Document 패턴 심층 분석

패턴 개요

대용량 관계형 데이터에서는 모든 관계를 하나의 chunk에 넣을 수 없습니다. 이때 Parent-Child Document 패턴을 사용합니다.

Parent-Child Document 패턴

Parent Document (요약)

전체 맥락을 담은 요약 정보

모든 Child의 ID 참조

검색 시 Parent만으로도 관련성 판단 가능

Child Documents (상세)

개별 상세 정보

Parent ID 역참조

세부 검색 시 활용

검색 전략

Step 1: Child 검색으로 관련 문서 찾기

Step 2: Child의 Parent 조회

Step 3: Parent의 전체 맥락을 LLM에 전달

구현 코드

python
from typing import List, Tuple
from dataclasses import dataclass, field
import uuid

@dataclass
class ParentDocument:
    """부모 문서: 전체 맥락 + 자식 참조"""
    doc_id: str
    text: str
    child_ids: List[str]
    metadata: Dict[str, Any]

@dataclass
class ChildDocument:
    """자식 문서: 상세 정보 + 부모 참조"""
    doc_id: str
    parent_id: str
    text: str
    metadata: Dict[str, Any]

class ParentChildChunker:
    """
    Parent-Child 패턴으로 관계형 데이터 Chunking

    사용 시나리오:
    - 고객(Parent) - 개별 주문(Child)
    - 제품(Parent) - 개별 리뷰(Child)
    - 문서(Parent) - 섹션/단락(Child)
    """

    def __init__(self, db_connection):
        self.db = db_connection

    def create_customer_order_chunks(self) -> Tuple[List[ParentDocument], List[ChildDocument]]:
        """
        고객-주문 관계를 Parent-Child 구조로 변환

        Parent: 고객 요약 (기본 정보 + 주문 통계)
        Child: 개별 주문 상세
        """

        parents = []
        children = []

        # 고객별 처리
        customers = self.db.query("SELECT * FROM customers")

        for customer in customers:
            customer_id = customer['customer_id']
            parent_id = f"customer_{customer_id}"

            # 해당 고객의 주문들 조회
            orders = self.db.query("""
                SELECT o.*,
                       json_agg(
                           json_build_object(
                               'product_name', p.name,
                               'category', p.category,
                               'quantity', oi.quantity,
                               'return_status', oi.return_status
                           )
                       ) as items
                FROM orders o
                JOIN order_items oi ON o.order_id = oi.order_id
                JOIN products p ON oi.product_id = p.product_id
                WHERE o.customer_id = %s
                GROUP BY o.order_id
            """, [customer_id])

            child_ids = []

            # Child documents 생성 (개별 주문)
            for order in orders:
                child_id = f"order_{order['order_id']}"
                child_ids.append(child_id)

                child_text = self._format_order_detail(order, customer)

                child = ChildDocument(
                    doc_id=child_id,
                    parent_id=parent_id,
                    text=child_text,
                    metadata={
                        "order_id": order['order_id'],
                        "customer_id": customer_id,
                        "order_date": str(order['order_date']),
                        "categories": list(set(i['category'] for i in order['items'])),
                        "has_return": any(i['return_status'] != 'none' for i in order['items']),
                    }
                )
                children.append(child)

            # Parent document 생성 (고객 요약)
            parent_text = self._format_customer_summary(customer, orders)

            parent = ParentDocument(
                doc_id=parent_id,
                text=parent_text,
                child_ids=child_ids,
                metadata={
                    "customer_id": customer_id,
                    "segment": customer['segment'],
                    "total_orders": len(orders),
                    "order_ids": [o['order_id'] for o in orders],
                }
            )
            parents.append(parent)

        return parents, children

    def _format_customer_summary(self, customer: Dict, orders: List[Dict]) -> str:
        """Parent 문서용 고객 요약 포맷"""

        total_amount = sum(o['total_amount'] for o in orders)
        categories = set()
        return_count = 0

        for order in orders:
            for item in order['items']:
                categories.add(item['category'])
                if item['return_status'] != 'none':
                    return_count += 1

        return f"""
고객 요약: {customer['name']} (ID: {customer['customer_id']})
등급: {customer['segment']}
이메일: {customer['email']}

주문 통계:
- 총 주문 수: {len(orders)}건
- 총 구매액: {total_amount:,.0f}원
- 구매 카테고리: {', '.join(categories)}
- 반품 건수: {return_count}건

이 고객의 상세 주문 정보는 개별 주문 문서를 참조하세요.
주문 ID 목록: {', '.join(str(o['order_id']) for o in orders)}
""".strip()

    def _format_order_detail(self, order: Dict, customer: Dict) -> str:
        """Child 문서용 주문 상세 포맷"""

        items_text = []
        for item in order['items']:
            return_info = f" (반품: {item['return_status']})" if item['return_status'] != 'none' else ""
            items_text.append(f"  - {item['product_name']} [{item['category']}] x{item['quantity']}{return_info}")

        return f"""
주문 상세: #{order['order_id']}
고객: {customer['name']} (ID: {customer['customer_id']}, {customer['segment']})
주문일: {order['order_date']}
상태: {order['status']}
총액: {order['total_amount']:,.0f}원

주문 상품:
{chr(10).join(items_text)}
""".strip()

Parent-Child 검색 전략

python
class ParentChildRetriever:
    """
    Parent-Child 구조에서의 검색 전략

    핵심: Child로 검색하고, Parent로 맥락 확보
    """

    def __init__(self, vector_db, sql_db):
        self.vector_db = vector_db
        self.sql_db = sql_db

    def retrieve(self, query: str, top_k: int = 5) -> List[Dict]:
        """
        2단계 검색 전략

        1. 벡터 검색으로 관련 Child 찾기
        2. Child의 Parent 조회하여 전체 맥락 확보
        """

        # Step 1: Child documents에서 검색
        child_results = self.vector_db.search(
            query=query,
            filter={"doc_type": "child"},
            top_k=top_k
        )

        # Step 2: 관련 Parent IDs 수집
        parent_ids = set()
        for result in child_results:
            parent_ids.add(result.metadata['parent_id'])

        # Step 3: Parent documents 조회
        parents = self.vector_db.get_by_ids(list(parent_ids))

        # Step 4: 결과 조합
        enriched_results = []
        for child in child_results:
            parent = next(
                (p for p in parents if p.doc_id == child.metadata['parent_id']),
                None
            )

            enriched_results.append({
                "child": child,
                "parent": parent,
                "context": self._build_context(parent, child),
            })

        return enriched_results

    def _build_context(self, parent: ParentDocument, child: ChildDocument) -> str:
        """LLM에게 전달할 통합 컨텍스트 생성"""

        return f"""
[전체 맥락 - 고객 정보]
{parent.text}

[상세 정보 - 관련 주문]
{child.text}
"""

    def retrieve_with_sql_fallback(self, query: str, customer_id: int = None) -> Dict:
        """
        벡터 검색 + SQL 보완 전략

        벡터 검색으로 관련 문서를 찾고,
        정확한 수치가 필요하면 SQL로 보완
        """

        # 벡터 검색
        vector_results = self.retrieve(query)

        # SQL로 정확한 데이터 보완
        if customer_id and self._needs_exact_numbers(query):
            sql_data = self.sql_db.query("""
                SELECT
                    COUNT(DISTINCT o.order_id) as order_count,
                    SUM(o.total_amount) as total_spent,
                    COUNT(CASE WHEN oi.return_status != 'none' THEN 1 END) as return_count
                FROM customers c
                JOIN orders o ON c.customer_id = o.customer_id
                JOIN order_items oi ON o.order_id = oi.order_id
                WHERE c.customer_id = %s
            """, [customer_id])

            return {
                "vector_context": vector_results,
                "exact_data": sql_data,
            }

        return {"vector_context": vector_results}

    def _needs_exact_numbers(self, query: str) -> bool:
        """정확한 수치가 필요한 쿼리인지 판단"""
        exact_keywords = ['총', '합계', '몇 개', '몇 건', '얼마', '평균', '최대', '최소']
        return any(kw in query for kw in exact_keywords)

6. SQL JOIN 결과를 Embedding할 때 주의점

Blog Image

JOIN의 함정: 데이터 폭발

sql
-- 단순해 보이는 JOIN
SELECT c.name, o.order_id, p.name as product
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id;

문제점:

text
원본 레코드 수:
- customers: 1,000
- orders: 10,000
- order_items: 50,000
- products: 5,000

JOIN 결과: 50,000 rows (order_items 기준으로 폭발)

각 row를 chunk로 만들면:
- 50,000개의 chunk
- 동일 고객 정보가 수십~수백 번 중복
- 임베딩 비용 폭증
- 검색 시 중복 결과

올바른 JOIN 전략

python
class SmartJoinChunker:
    """
    JOIN 결과를 효율적으로 Chunking하는 전략
    """

    def __init__(self, db_connection):
        self.db = db_connection

    def chunk_with_aggregation(self) -> List[RelationalChunk]:
        """
        전략 1: GROUP BY + JSON 집계

        1:N 관계를 JSON 배열로 집계하여 중복 제거
        """

        query = """
        SELECT
            c.customer_id,
            c.name,
            c.segment,
            -- 주문을 JSON 배열로 집계
            json_agg(
                DISTINCT jsonb_build_object(
                    'order_id', o.order_id,
                    'order_date', o.order_date,
                    'items', (
                        SELECT json_agg(
                            jsonb_build_object(
                                'product', p.name,
                                'category', p.category,
                                'qty', oi.quantity
                            )
                        )
                        FROM order_items oi
                        JOIN products p ON oi.product_id = p.product_id
                        WHERE oi.order_id = o.order_id
                    )
                )
            ) as orders
        FROM customers c
        LEFT JOIN orders o ON c.customer_id = o.customer_id
        GROUP BY c.customer_id, c.name, c.segment
        """

        results = self.db.query(query)

        chunks = []
        for row in results:
            chunk = RelationalChunk(
                text=self._format_aggregated_customer(row),
                metadata={
                    "customer_id": row['customer_id'],
                    "type": "customer_profile",
                }
            )
            chunks.append(chunk)

        return chunks  # customers 수만큼만 생성 (1,000개)

    def chunk_with_window_functions(self) -> List[RelationalChunk]:
        """
        전략 2: Window Function으로 컨텍스트 보존

        각 레코드에 관련 컨텍스트를 포함
        """

        query = """
        WITH order_context AS (
            SELECT
                o.order_id,
                o.customer_id,
                o.order_date,
                c.name as customer_name,
                c.segment,
                -- 동일 고객의 다른 주문 수
                COUNT(*) OVER (PARTITION BY o.customer_id) as customer_total_orders,
                -- 동일 고객의 총 구매액
                SUM(o.total_amount) OVER (PARTITION BY o.customer_id) as customer_total_spent,
                -- 주문 순서
                ROW_NUMBER() OVER (PARTITION BY o.customer_id ORDER BY o.order_date) as order_sequence
            FROM orders o
            JOIN customers c ON o.customer_id = c.customer_id
        )
        SELECT
            oc.*,
            json_agg(
                jsonb_build_object(
                    'product', p.name,
                    'category', p.category,
                    'quantity', oi.quantity
                )
            ) as items
        FROM order_context oc
        JOIN order_items oi ON oc.order_id = oi.order_id
        JOIN products p ON oi.product_id = p.product_id
        GROUP BY oc.order_id, oc.customer_id, oc.order_date,
                 oc.customer_name, oc.segment, oc.customer_total_orders,
                 oc.customer_total_spent, oc.order_sequence
        """

        results = self.db.query(query)

        chunks = []
        for row in results:
            # 각 주문이 고객 컨텍스트를 포함
            chunk = RelationalChunk(
                text=self._format_order_with_context(row),
                metadata={
                    "order_id": row['order_id'],
                    "customer_id": row['customer_id'],
                    "type": "order_with_context",
                }
            )
            chunks.append(chunk)

        return chunks  # orders 수만큼 생성 (10,000개), 하지만 각각 컨텍스트 포함

    def _format_order_with_context(self, row: Dict) -> str:
        """주문 + 고객 컨텍스트 포맷"""

        items_text = "\n".join([
            f"  - {item['product']} ({item['category']}) x{item['quantity']}"
            for item in row['items']
        ])

        return f"""
주문 #{row['order_id']} ({row['order_date']})

고객 정보:
- 이름: {row['customer_name']} (ID: {row['customer_id']})
- 등급: {row['segment']}
- 총 주문 수: {row['customer_total_orders']}건 (이 주문은 {row['order_sequence']}번째)
- 누적 구매액: {row['customer_total_spent']:,.0f}원

주문 상품:
{items_text}
""".strip()

중복 제거와 Deduplication

python
class ChunkDeduplicator:
    """
    Embedding 전 중복 chunk 제거
    """

    def deduplicate_chunks(
        self,
        chunks: List[RelationalChunk],
        strategy: str = "exact"
    ) -> List[RelationalChunk]:
        """
        중복 제거 전략

        - exact: 정확히 같은 텍스트 제거
        - semantic: 의미적으로 유사한 것도 제거 (임베딩 비용 발생)
        - id_based: metadata의 primary key 기준
        """

        if strategy == "exact":
            seen = set()
            unique = []
            for chunk in chunks:
                text_hash = hash(chunk.text)
                if text_hash not in seen:
                    seen.add(text_hash)
                    unique.append(chunk)
            return unique

        elif strategy == "id_based":
            # Primary key 기준 중복 제거
            seen_ids = set()
            unique = []
            for chunk in chunks:
                primary_key = self._get_primary_key(chunk)
                if primary_key not in seen_ids:
                    seen_ids.add(primary_key)
                    unique.append(chunk)
            return unique

        elif strategy == "semantic":
            return self._semantic_dedup(chunks)

    def _get_primary_key(self, chunk: RelationalChunk) -> str:
        """Chunk의 primary key 추출"""
        meta = chunk.metadata
        if "customer_id" in meta and "order_id" not in meta:
            return f"customer_{meta['customer_id']}"
        elif "order_id" in meta:
            return f"order_{meta['order_id']}"
        else:
            return hash(chunk.text)

    def _semantic_dedup(
        self,
        chunks: List[RelationalChunk],
        threshold: float = 0.95
    ) -> List[RelationalChunk]:
        """
        의미적 유사도 기반 중복 제거

        주의: 임베딩 API 호출 필요
        """
        from sklearn.metrics.pairwise import cosine_similarity
        import numpy as np

        # 임베딩 생성
        embeddings = [self._embed(chunk.text) for chunk in chunks]
        embeddings = np.array(embeddings)

        # 유사도 행렬
        similarity_matrix = cosine_similarity(embeddings)

        # 중복 마킹
        unique_indices = []
        removed = set()

        for i in range(len(chunks)):
            if i in removed:
                continue
            unique_indices.append(i)

            # i와 유사한 것들 제거 대상으로
            for j in range(i + 1, len(chunks)):
                if similarity_matrix[i, j] > threshold:
                    removed.add(j)

        return [chunks[i] for i in unique_indices]

7. Metadata 필터링 vs 순수 Vector Search

두 접근법의 차이

검색 전략 비교

순수 Vector Search

장점: 의미적 유사성 탐색, 유연한 검색

단점: 정확한 필터링 불가, 관계 무시

적합: "~와 비슷한 것 찾아줘"

Metadata 필터링 + Vector Search

장점: 정확한 조건 필터링, 관계 ID로 연결

단점: 필터 조건 명시 필요

적합: "VIP 고객 중에서 ~한 것 찾아줘"

Hybrid (SQL + Vector)

장점: SQL로 정확한 필터 + Vector로 유사성

단점: 구현 복잡도 증가

적합: 복잡한 비즈니스 쿼리

Metadata 설계 원칙

python
class MetadataDesigner:
    """
    효과적인 Metadata 설계

    원칙:
    1. 관계 ID는 필수 (customer_id, order_id 등)
    2. 자주 필터링하는 속성만 포함
    3. 값의 카디널리티 고려 (너무 세분화하면 필터 효과 없음)
    """

    @staticmethod
    def design_customer_metadata(customer: Dict, orders: List[Dict]) -> Dict:
        """
        고객 chunk용 metadata 설계
        """

        # 필수: 관계 식별자
        metadata = {
            "customer_id": customer['customer_id'],
            "type": "customer_profile",
        }

        # 권장: 자주 필터링하는 속성
        metadata.update({
            "segment": customer['segment'],  # VIP, Regular 등
            "registration_year": customer['created_at'].year,
        })

        # 권장: 집계된 검색 조건
        categories = set()
        has_returns = False
        for order in orders:
            for item in order.get('items', []):
                categories.add(item['category'])
                if item.get('return_status') not in (None, 'none'):
                    has_returns = True

        metadata.update({
            "categories": list(categories),  # 리스트 필터링 지원
            "has_returns": has_returns,       # 불리언 필터
            "total_orders": len(orders),      # 범위 필터용
        })

        # 비권장: 너무 세부적인 정보
        # - 개별 주문 날짜들 (SQL로 처리)
        # - 개별 상품 ID들 (너무 많음)
        # - 정확한 금액 (범위 쿼리는 SQL이 적합)

        return metadata

Hybrid 검색 구현

python
class HybridRetriever:
    """
    SQL 필터링 + Vector Search 결합

    전략:
    1. SQL로 후보군 좁히기 (정확한 조건)
    2. Vector Search로 관련성 정렬 (의미적 유사성)
    """

    def __init__(self, sql_db, vector_db):
        self.sql_db = sql_db
        self.vector_db = vector_db

    def search(
        self,
        query: str,
        filters: Dict = None,
        top_k: int = 10
    ) -> List[Dict]:
        """
        Hybrid 검색 실행

        예시:
        query = "반품 관련 불만 사항"
        filters = {
            "segment": "VIP",
            "has_returns": True,
            "order_date_after": "2024-01-01"
        }
        """

        # Step 1: SQL로 후보 ID 추출
        candidate_ids = self._sql_filter(filters)

        if not candidate_ids:
            # 조건에 맞는 데이터 없음
            return []

        # Step 2: Vector Search with ID 필터
        vector_results = self.vector_db.search(
            query=query,
            filter={"customer_id": {"$in": candidate_ids}},
            top_k=top_k
        )

        # Step 3: SQL로 상세 정보 보강
        enriched = self._enrich_with_sql(vector_results)

        return enriched

    def _sql_filter(self, filters: Dict) -> List[int]:
        """SQL로 조건에 맞는 customer_id 추출"""

        if not filters:
            return None  # 필터 없으면 전체 대상

        conditions = ["1=1"]
        params = []

        if filters.get("segment"):
            conditions.append("c.segment = %s")
            params.append(filters["segment"])

        if filters.get("has_returns"):
            conditions.append("""
                EXISTS (
                    SELECT 1 FROM orders o
                    JOIN order_items oi ON o.order_id = oi.order_id
                    WHERE o.customer_id = c.customer_id
                    AND oi.return_status != 'none'
                )
            """)

        if filters.get("order_date_after"):
            conditions.append("""
                EXISTS (
                    SELECT 1 FROM orders o
                    WHERE o.customer_id = c.customer_id
                    AND o.order_date >= %s
                )
            """)
            params.append(filters["order_date_after"])

        query = f"""
            SELECT DISTINCT c.customer_id
            FROM customers c
            WHERE {' AND '.join(conditions)}
        """

        results = self.sql_db.query(query, params)
        return [r['customer_id'] for r in results]

    def _enrich_with_sql(self, vector_results: List) -> List[Dict]:
        """Vector 결과에 SQL 데이터 보강"""

        customer_ids = [r.metadata['customer_id'] for r in vector_results]

        sql_data = self.sql_db.query("""
            SELECT
                c.customer_id,
                c.name,
                COUNT(o.order_id) as order_count,
                SUM(o.total_amount) as total_spent,
                MAX(o.order_date) as last_order_date
            FROM customers c
            LEFT JOIN orders o ON c.customer_id = o.customer_id
            WHERE c.customer_id = ANY(%s)
            GROUP BY c.customer_id, c.name
        """, [customer_ids])

        sql_lookup = {d['customer_id']: d for d in sql_data}

        enriched = []
        for result in vector_results:
            cid = result.metadata['customer_id']
            enriched.append({
                "text": result.text,
                "score": result.score,
                "metadata": result.metadata,
                "sql_data": sql_lookup.get(cid, {}),
            })

        return enriched

성능 비교

Blog Image
python
def benchmark_search_strategies():
    """
    검색 전략별 성능 비교
    """

    queries = [
        {
            "question": "VIP 고객 중 반품이 많은 사람은?",
            "expected_filter": {"segment": "VIP", "has_returns": True},
        },
        {
            "question": "전자제품을 자주 구매하는 고객의 특성",
            "expected_filter": {"categories": {"$contains": "전자제품"}},
        },
        {
            "question": "최근 3개월 구매 이력이 있는 휴면 고객",
            "expected_filter": {
                "order_date_after": "2024-09-01",
                "segment": "Dormant",
            },
        },
    ]

    results = []

    for q in queries:
        # 전략 1: 순수 Vector Search
        start = time.time()
        pure_vector = vector_db.search(q["question"], top_k=10)
        pure_vector_time = time.time() - start
        pure_vector_precision = calculate_precision(pure_vector, q)

        # 전략 2: Metadata 필터 + Vector
        start = time.time()
        metadata_vector = vector_db.search(
            q["question"],
            filter=q["expected_filter"],
            top_k=10
        )
        metadata_time = time.time() - start
        metadata_precision = calculate_precision(metadata_vector, q)

        # 전략 3: SQL + Vector Hybrid
        start = time.time()
        hybrid = hybrid_retriever.search(
            q["question"],
            filters=q["expected_filter"],
            top_k=10
        )
        hybrid_time = time.time() - start
        hybrid_precision = calculate_precision(hybrid, q)

        results.append({
            "question": q["question"],
            "pure_vector": {"time": pure_vector_time, "precision": pure_vector_precision},
            "metadata": {"time": metadata_time, "precision": metadata_precision},
            "hybrid": {"time": hybrid_time, "precision": hybrid_precision},
        })

    return results

# 예상 결과:
# ┌─────────────────────────────────┬─────────────┬─────────────┬─────────────┐
# │ Query                           │ Pure Vector │ Metadata    │ Hybrid      │
# ├─────────────────────────────────┼─────────────┼─────────────┼─────────────┤
# │ VIP 고객 중 반품 많은 사람       │ P: 0.3      │ P: 0.7      │ P: 0.9      │
# │                                 │ T: 50ms     │ T: 45ms     │ T: 80ms     │
# ├─────────────────────────────────┼─────────────┼─────────────┼─────────────┤
# │ 전자제품 자주 구매 고객          │ P: 0.4      │ P: 0.8      │ P: 0.95     │
# │                                 │ T: 48ms     │ T: 52ms     │ T: 85ms     │
# └─────────────────────────────────┴─────────────┴─────────────┴─────────────┘

8. 실전 구현: 고객-주문-상품 RAG 시스템

전체 아키텍처

python
"""
E-Commerce RAG 시스템 전체 구현

구성요소:
1. DataLoader: SQL에서 관계 데이터 로드
2. ChunkBuilder: 관계 보존 Chunking
3. Indexer: 벡터 DB 인덱싱
4. Retriever: Hybrid 검색
5. Generator: LLM 응답 생성
"""

from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
import json
from datetime import datetime
import psycopg2
from openai import OpenAI

# 설정
@dataclass
class RAGConfig:
    # Database
    db_host: str = "localhost"
    db_name: str = "ecommerce"
    db_user: str = "user"
    db_password: str = "password"

    # Vector DB
    vector_db_url: str = "http://localhost:6333"
    collection_name: str = "customer_profiles"

    # Embedding
    embedding_model: str = "text-embedding-3-small"
    embedding_dimension: int = 1536

    # LLM
    llm_model: str = "gpt-4-turbo-preview"
    max_tokens: int = 2000

    # Chunking
    chunk_strategy: str = "customer_centric"  # customer_centric, order_centric, parent_child

class EcommerceRAG:
    """
    E-Commerce RAG 시스템 메인 클래스
    """

    def __init__(self, config: RAGConfig):
        self.config = config
        self.sql_db = self._connect_sql()
        self.vector_db = self._connect_vector_db()
        self.openai = OpenAI()

    def _connect_sql(self):
        return psycopg2.connect(
            host=self.config.db_host,
            database=self.config.db_name,
            user=self.config.db_user,
            password=self.config.db_password
        )

    def _connect_vector_db(self):
        from qdrant_client import QdrantClient
        return QdrantClient(url=self.config.vector_db_url)

    # =========================================
    # 1. 데이터 로드 및 Chunking
    # =========================================

    def build_index(self):
        """
        SQL 데이터를 벡터 DB에 인덱싱
        """
        print("Starting index build...")

        # Step 1: 관계 데이터 로드
        customers = self._load_customer_data()
        print(f"Loaded {len(customers)} customers with orders")

        # Step 2: Chunk 생성
        chunks = self._create_chunks(customers)
        print(f"Created {len(chunks)} chunks")

        # Step 3: 임베딩 및 인덱싱
        self._index_chunks(chunks)
        print("Indexing complete!")

        return len(chunks)

    def _load_customer_data(self) -> List[Dict]:
        """
        고객-주문-상품 관계 데이터 로드

        핵심: JSON 집계로 1:N 관계를 하나의 레코드로
        """

        query = """
        WITH order_details AS (
            SELECT
                o.order_id,
                o.customer_id,
                o.order_date,
                o.total_amount,
                o.status,
                json_agg(
                    json_build_object(
                        'product_id', p.product_id,
                        'product_name', p.name,
                        'category', p.category,
                        'quantity', oi.quantity,
                        'unit_price', oi.unit_price,
                        'return_status', oi.return_status
                    )
                ) as items
            FROM orders o
            JOIN order_items oi ON o.order_id = oi.order_id
            JOIN products p ON oi.product_id = p.product_id
            GROUP BY o.order_id
        )
        SELECT
            c.customer_id,
            c.name,
            c.email,
            c.segment,
            c.created_at,
            COALESCE(
                json_agg(
                    json_build_object(
                        'order_id', od.order_id,
                        'order_date', od.order_date,
                        'total_amount', od.total_amount,
                        'status', od.status,
                        'items', od.items
                    ) ORDER BY od.order_date DESC
                ) FILTER (WHERE od.order_id IS NOT NULL),
                '[]'::json
            ) as orders
        FROM customers c
        LEFT JOIN order_details od ON c.customer_id = od.customer_id
        GROUP BY c.customer_id, c.name, c.email, c.segment, c.created_at
        """

        with self.sql_db.cursor() as cur:
            cur.execute(query)
            columns = [desc[0] for desc in cur.description]
            results = [dict(zip(columns, row)) for row in cur.fetchall()]

        return results

    def _create_chunks(self, customers: List[Dict]) -> List[Dict]:
        """
        관계 보존 Chunk 생성
        """

        chunks = []

        for customer in customers:
            chunk_text = self._format_customer_chunk(customer)

            # Metadata 설계
            orders = customer['orders'] if customer['orders'] else []
            categories = self._extract_categories(orders)

            metadata = {
                "customer_id": customer['customer_id'],
                "customer_name": customer['name'],
                "segment": customer['segment'],
                "categories": categories,
                "has_returns": self._has_returns(orders),
                "total_orders": len(orders),
                "total_spent": sum(o['total_amount'] for o in orders if o.get('total_amount')),
                "last_order_date": orders[0]['order_date'] if orders else None,
            }

            chunks.append({
                "text": chunk_text,
                "metadata": metadata,
            })

        return chunks

    def _format_customer_chunk(self, customer: Dict) -> str:
        """
        고객 데이터를 검색 가능한 텍스트로 변환

        핵심: 관계 정보를 명시적으로 포함
        """

        lines = [
            f"# 고객 프로필: {customer['name']}",
            f"",
            f"## 기본 정보",
            f"- 고객 ID: {customer['customer_id']}",
            f"- 이메일: {customer['email']}",
            f"- 등급: {customer['segment']}",
            f"- 가입일: {customer['created_at']}",
            f"",
        ]

        orders = customer['orders'] if customer['orders'] else []

        if orders:
            # 요약 통계
            total_spent = sum(o['total_amount'] for o in orders if o.get('total_amount'))
            categories = self._extract_categories(orders)
            return_count = sum(
                1 for o in orders
                for item in (o.get('items') or [])
                if item.get('return_status') not in (None, 'none')
            )

            lines.extend([
                f"## 구매 요약",
                f"- 총 주문 수: {len(orders)}건",
                f"- 총 구매액: {total_spent:,.0f}원",
                f"- 구매 카테고리: {', '.join(categories)}",
                f"- 반품 건수: {return_count}건",
                f"",
                f"## 주문 상세",
            ])

            # 최근 10건만 상세 포함 (너무 길면 truncate)
            for order in orders[:10]:
                lines.append(f"")
                lines.append(f"### 주문 #{order['order_id']} ({order['order_date']})")
                lines.append(f"상태: {order['status']} | 금액: {order['total_amount']:,.0f}원")

                items = order.get('items') or []
                if items:
                    lines.append("상품:")
                    for item in items:
                        return_info = ""
                        if item.get('return_status') not in (None, 'none'):
                            return_info = f" [반품: {item['return_status']}]"
                        lines.append(
                            f"  - {item['product_name']} ({item['category']}) "
                            f"x{item['quantity']} @ {item['unit_price']:,.0f}원{return_info}"
                        )

            if len(orders) > 10:
                lines.append(f"")
                lines.append(f"... 외 {len(orders) - 10}건의 주문 이력")
        else:
            lines.extend([
                f"## 구매 이력",
                f"주문 내역 없음",
            ])

        return "\n".join(lines)

    def _extract_categories(self, orders: List[Dict]) -> List[str]:
        categories = set()
        for order in orders:
            for item in (order.get('items') or []):
                if item.get('category'):
                    categories.add(item['category'])
        return list(categories)

    def _has_returns(self, orders: List[Dict]) -> bool:
        for order in orders:
            for item in (order.get('items') or []):
                if item.get('return_status') not in (None, 'none'):
                    return True
        return False

    # =========================================
    # 2. 인덱싱
    # =========================================

    def _index_chunks(self, chunks: List[Dict]):
        """
        Chunk들을 벡터 DB에 인덱싱
        """
        from qdrant_client.models import Distance, VectorParams, PointStruct

        # 컬렉션 생성
        self.vector_db.recreate_collection(
            collection_name=self.config.collection_name,
            vectors_config=VectorParams(
                size=self.config.embedding_dimension,
                distance=Distance.COSINE
            )
        )

        # 배치 임베딩 및 인덱싱
        batch_size = 100
        for i in range(0, len(chunks), batch_size):
            batch = chunks[i:i + batch_size]

            # 임베딩 생성
            texts = [c['text'] for c in batch]
            embeddings = self._embed_texts(texts)

            # 포인트 생성
            points = [
                PointStruct(
                    id=i + j,
                    vector=embeddings[j],
                    payload={
                        "text": batch[j]['text'],
                        **batch[j]['metadata']
                    }
                )
                for j in range(len(batch))
            ]

            # 업로드
            self.vector_db.upsert(
                collection_name=self.config.collection_name,
                points=points
            )

            print(f"Indexed {min(i + batch_size, len(chunks))}/{len(chunks)} chunks")

    def _embed_texts(self, texts: List[str]) -> List[List[float]]:
        """텍스트 임베딩 생성"""
        response = self.openai.embeddings.create(
            model=self.config.embedding_model,
            input=texts
        )
        return [item.embedding for item in response.data]

    # =========================================
    # 3. 검색 및 응답 생성
    # =========================================

    def query(
        self,
        question: str,
        filters: Dict = None,
        top_k: int = 5
    ) -> Dict:
        """
        RAG 질의 실행

        Args:
            question: 사용자 질문
            filters: 메타데이터 필터 (optional)
            top_k: 검색 결과 수

        Returns:
            {
                "answer": str,
                "sources": List[Dict],
                "sql_supplement": Dict (optional)
            }
        """

        # Step 1: 질문 분석 및 필터 추출
        parsed = self._parse_question(question)

        # Step 2: 벡터 검색
        search_results = self._search(
            question,
            filters=filters or parsed.get('filters'),
            top_k=top_k
        )

        # Step 3: SQL 보완 (정확한 수치 필요시)
        sql_data = None
        if parsed.get('needs_exact_data'):
            sql_data = self._get_exact_data(parsed)

        # Step 4: LLM 응답 생성
        answer = self._generate_answer(question, search_results, sql_data)

        return {
            "answer": answer,
            "sources": [
                {
                    "customer_id": r.payload['customer_id'],
                    "customer_name": r.payload['customer_name'],
                    "score": r.score,
                }
                for r in search_results
            ],
            "sql_supplement": sql_data,
        }

    def _parse_question(self, question: str) -> Dict:
        """
        질문에서 필터 조건과 데이터 요구사항 추출
        """

        # 간단한 규칙 기반 파싱 (프로덕션에서는 LLM 활용 권장)
        parsed = {
            "filters": {},
            "needs_exact_data": False,
        }

        # 세그먼트 필터
        if "VIP" in question.upper():
            parsed["filters"]["segment"] = "VIP"
        elif "휴면" in question or "dormant" in question.lower():
            parsed["filters"]["segment"] = "Dormant"

        # 반품 필터
        if "반품" in question or "return" in question.lower():
            parsed["filters"]["has_returns"] = True

        # 카테고리 필터
        categories = ["전자제품", "의류", "식품", "가구"]
        for cat in categories:
            if cat in question:
                parsed["filters"]["categories"] = {"$contains": cat}
                break

        # 정확한 수치 필요 여부
        exact_keywords = ['총', '합계', '몇 개', '몇 건', '얼마', '평균', '최대', '최소', '비율']
        parsed["needs_exact_data"] = any(kw in question for kw in exact_keywords)

        return parsed

    def _search(
        self,
        query: str,
        filters: Dict = None,
        top_k: int = 5
    ) -> List:
        """벡터 검색 실행"""
        from qdrant_client.models import Filter, FieldCondition, MatchValue

        # 쿼리 임베딩
        query_vector = self._embed_texts([query])[0]

        # 필터 구성
        qdrant_filter = None
        if filters:
            conditions = []
            for key, value in filters.items():
                if isinstance(value, dict) and "$contains" in value:
                    # 리스트 포함 조건
                    conditions.append(
                        FieldCondition(
                            key=key,
                            match=MatchValue(value=value["$contains"])
                        )
                    )
                else:
                    conditions.append(
                        FieldCondition(
                            key=key,
                            match=MatchValue(value=value)
                        )
                    )

            if conditions:
                qdrant_filter = Filter(must=conditions)

        # 검색 실행
        results = self.vector_db.search(
            collection_name=self.config.collection_name,
            query_vector=query_vector,
            query_filter=qdrant_filter,
            limit=top_k
        )

        return results

    def _get_exact_data(self, parsed: Dict) -> Dict:
        """SQL로 정확한 통계 데이터 조회"""

        # 필터 조건 구성
        where_clauses = ["1=1"]
        params = []

        if parsed["filters"].get("segment"):
            where_clauses.append("c.segment = %s")
            params.append(parsed["filters"]["segment"])

        if parsed["filters"].get("has_returns"):
            where_clauses.append("""
                EXISTS (
                    SELECT 1 FROM order_items oi2
                    JOIN orders o2 ON oi2.order_id = o2.order_id
                    WHERE o2.customer_id = c.customer_id
                    AND oi2.return_status != 'none'
                )
            """)

        query = f"""
        SELECT
            COUNT(DISTINCT c.customer_id) as customer_count,
            COUNT(DISTINCT o.order_id) as order_count,
            COALESCE(SUM(o.total_amount), 0) as total_revenue,
            COALESCE(AVG(o.total_amount), 0) as avg_order_value,
            COUNT(CASE WHEN oi.return_status != 'none' THEN 1 END) as return_count
        FROM customers c
        LEFT JOIN orders o ON c.customer_id = o.customer_id
        LEFT JOIN order_items oi ON o.order_id = oi.order_id
        WHERE {' AND '.join(where_clauses)}
        """

        with self.sql_db.cursor() as cur:
            cur.execute(query, params)
            columns = [desc[0] for desc in cur.description]
            row = cur.fetchone()
            return dict(zip(columns, row)) if row else {}

    def _generate_answer(
        self,
        question: str,
        search_results: List,
        sql_data: Dict = None
    ) -> str:
        """LLM으로 최종 응답 생성"""

        # 컨텍스트 구성
        context_parts = []

        for i, result in enumerate(search_results, 1):
            context_parts.append(f"[검색 결과 {i}] (관련도: {result.score:.2f})")
            context_parts.append(result.payload['text'])
            context_parts.append("")

        context = "\n".join(context_parts)

        # SQL 보완 데이터
        sql_info = ""
        if sql_data:
            sql_info = f"""

정확한 통계 데이터:
- 해당 고객 수: {sql_data.get('customer_count', 'N/A')}명
- 총 주문 수: {sql_data.get('order_count', 'N/A')}건
- 총 매출액: {sql_data.get('total_revenue', 0):,.0f}원
- 평균 주문 금액: {sql_data.get('avg_order_value', 0):,.0f}원
- 반품 건수: {sql_data.get('return_count', 'N/A')}건
"""

        prompt = f"""다음 고객 데이터를 기반으로 질문에 답변해주세요.

## 검색된 고객 정보
{context}
{sql_info}

## 질문
{question}

## 지시사항
1. 검색 결과에 있는 정보만 사용하여 답변하세요.
2. 정확한 수치가 제공된 경우 해당 수치를 인용하세요.
3. 확실하지 않은 정보는 "확인된 데이터에 따르면"으로 시작하세요.
4. 답을 찾을 수 없으면 솔직히 "제공된 데이터에서 해당 정보를 찾을 수 없습니다"라고 하세요.
"""

        response = self.openai.chat.completions.create(
            model=self.config.llm_model,
            messages=[
                {"role": "system", "content": "당신은 E-Commerce 데이터 분석 전문가입니다."},
                {"role": "user", "content": prompt}
            ],
            max_tokens=self.config.max_tokens,
            temperature=0.3  # 낮은 temperature로 정확성 우선
        )

        return response.choices[0].message.content

사용 예시

python
# 시스템 초기화
config = RAGConfig(
    db_host="localhost",
    db_name="ecommerce",
)
rag = EcommerceRAG(config)

# 인덱스 빌드 (최초 1회)
rag.build_index()

# 질의 실행
result = rag.query("VIP 고객 중 반품이 많은 사람은 누구인가요?")
print(result['answer'])
# 출력:
# 확인된 데이터에 따르면, VIP 고객 중 반품 이력이 있는 고객은 다음과 같습니다:
# 1. 김철수 (ID: 123) - 반품 2건 (Magic Mouse, 기계식 키보드)
# 2. 이영희 (ID: 456) - 반품 1건 (무선 이어폰)
# ...

# 정확한 수치가 필요한 질의
result = rag.query("VIP 고객의 총 매출액과 평균 주문 금액은?")
print(result['answer'])
# 출력:
# VIP 고객 통계:
# - 총 고객 수: 150명
# - 총 매출액: 2,500,000,000원
# - 평균 주문 금액: 850,000원

9. 성능 벤치마크: 관계 보존 vs 미보존

실험 설계

python
"""
벤치마크 실험: 관계 보존 Chunking의 효과 측정

비교 대상:
1. Baseline: 테이블별 독립 Chunking
2. Flat JOIN: JOIN 결과를 row별로 Chunking
3. Proposed: 관계 보존 Chunking (Customer-centric)
"""

from dataclasses import dataclass
from typing import List, Dict, Tuple
import time
import numpy as np

@dataclass
class BenchmarkResult:
    strategy: str
    precision_at_5: float
    recall_at_5: float
    f1_at_5: float
    latency_ms: float
    hallucination_rate: float
    chunk_count: int
    index_time_sec: float

class RAGBenchmark:
    """RAG 시스템 벤치마크"""

    def __init__(self, test_queries: List[Dict]):
        """
        test_queries 형식:
        [
            {
                "question": "VIP 고객 중 반품이 많은 사람은?",
                "ground_truth": {
                    "customer_ids": [123, 456],
                    "expected_facts": ["김철수 반품 2건", "이영희 반품 1건"]
                }
            },
            ...
        ]
        """
        self.test_queries = test_queries
        self.results = []

    def run_benchmark(self) -> List[BenchmarkResult]:
        """전체 벤치마크 실행"""

        strategies = [
            ("baseline", self._build_baseline_index),
            ("flat_join", self._build_flat_join_index),
            ("proposed", self._build_proposed_index),
        ]

        for name, build_fn in strategies:
            print(f"\n{'='*50}")
            print(f"Testing strategy: {name}")
            print('='*50)

            # 인덱스 빌드
            start = time.time()
            rag_system, chunk_count = build_fn()
            index_time = time.time() - start

            # 쿼리 실행 및 평가
            metrics = self._evaluate_strategy(rag_system)

            result = BenchmarkResult(
                strategy=name,
                precision_at_5=metrics['precision'],
                recall_at_5=metrics['recall'],
                f1_at_5=metrics['f1'],
                latency_ms=metrics['latency'],
                hallucination_rate=metrics['hallucination_rate'],
                chunk_count=chunk_count,
                index_time_sec=index_time
            )
            self.results.append(result)

        return self.results

    def _evaluate_strategy(self, rag_system) -> Dict:
        """전략 평가"""

        precisions = []
        recalls = []
        latencies = []
        hallucinations = []

        for query in self.test_queries:
            # 검색 실행
            start = time.time()
            result = rag_system.query(query['question'])
            latency = (time.time() - start) * 1000
            latencies.append(latency)

            # Precision/Recall 계산
            retrieved_ids = set(s['customer_id'] for s in result['sources'])
            relevant_ids = set(query['ground_truth']['customer_ids'])

            if retrieved_ids:
                precision = len(retrieved_ids & relevant_ids) / len(retrieved_ids)
            else:
                precision = 0

            if relevant_ids:
                recall = len(retrieved_ids & relevant_ids) / len(relevant_ids)
            else:
                recall = 1

            precisions.append(precision)
            recalls.append(recall)

            # Hallucination 체크
            hallucination = self._check_hallucination(
                result['answer'],
                query['ground_truth']['expected_facts']
            )
            hallucinations.append(hallucination)

        avg_precision = np.mean(precisions)
        avg_recall = np.mean(recalls)

        return {
            'precision': avg_precision,
            'recall': avg_recall,
            'f1': 2 * avg_precision * avg_recall / (avg_precision + avg_recall + 1e-10),
            'latency': np.mean(latencies),
            'hallucination_rate': np.mean(hallucinations),
        }

    def _check_hallucination(self, answer: str, expected_facts: List[str]) -> float:
        """
        Hallucination 비율 측정

        반환: 0.0 (hallucination 없음) ~ 1.0 (완전 hallucination)
        """

        # 간단한 방법: 예상 사실이 답변에 포함되어 있는지 확인
        # 프로덕션에서는 NLI 모델 활용 권장

        if not expected_facts:
            return 0.0

        found_facts = sum(1 for fact in expected_facts if fact in answer)
        fact_coverage = found_facts / len(expected_facts)

        # fact가 없는데 자신있게 답변하면 hallucination
        # fact가 있으면 hallucination 아님
        return 1.0 - fact_coverage

    def print_results(self):
        """결과 출력"""

        print("\n" + "="*80)
        print("BENCHMARK RESULTS")
        print("="*80)

        headers = ["Strategy", "P@5", "R@5", "F1@5", "Latency", "Halluc.", "Chunks", "Index Time"]

        print(f"\n{'Strategy':<15} {'P@5':>8} {'R@5':>8} {'F1@5':>8} {'Latency':>10} {'Halluc.':>8} {'Chunks':>8} {'Index':>10}")
        print("-" * 80)

        for r in self.results:
            print(
                f"{r.strategy:<15} "
                f"{r.precision_at_5:>8.2%} "
                f"{r.recall_at_5:>8.2%} "
                f"{r.f1_at_5:>8.2%} "
                f"{r.latency_ms:>8.1f}ms "
                f"{r.hallucination_rate:>8.2%} "
                f"{r.chunk_count:>8,} "
                f"{r.index_time_sec:>8.1f}s"
            )

예상 벤치마크 결과

text
================================================================================
BENCHMARK RESULTS
================================================================================

Strategy        P@5      R@5     F1@5    Latency  Halluc.   Chunks      Index
--------------------------------------------------------------------------------
baseline        31.2%    28.5%   29.8%     45.2ms   68.5%    65,000       12.3s
flat_join       42.8%    35.2%   38.6%     52.1ms   45.2%   150,000       28.7s
proposed        78.5%    72.3%   75.3%     48.5ms   12.8%    10,000        8.2s

================================================================================
ANALYSIS
================================================================================

1. Precision/Recall 향상
   - Proposed vs Baseline: +47.3%p (Precision), +43.8%p (Recall)
   - 관계 보존으로 정확한 고객을 검색

2. Hallucination 감소
   - Proposed: 12.8% vs Baseline: 68.5%
   - 5배 이상 hallucination 감소
   - 관계 정보가 명시되어 LLM이 추측하지 않음

3. 효율성
   - Chunk 수: Proposed 10,000 vs Flat JOIN 150,000 (93% 감소)
   - Index 시간: 8.2s vs 28.7s (71% 감소)
   - 임베딩 API 비용 대폭 절감

4. Latency
   - 비슷한 수준 (45~52ms)
   - Chunk 수가 적어도 검색 품질 향상

상세 분석: 왜 이런 차이가 나나요?

python
def analyze_failure_cases():
    """
    Baseline 전략의 실패 사례 분석
    """

    # 질문: "김철수가 반품한 상품은?"

    # Baseline 검색 결과 (테이블별 독립 chunk)
    baseline_retrieved = [
        "김철수 고객은 VIP 등급입니다.",           # 고객 정보만
        "Magic Mouse는 전자제품입니다.",          # 상품 정보만
        "주문 #456은 2024-01-15에 생성되었습니다.", # 주문 정보만
    ]

    # 문제: 세 정보 간의 연결 관계가 없음
    # LLM은 "김철수"와 "Magic Mouse"가 관련있다고 추측 (hallucination)
    # 실제로 김철수가 Magic Mouse를 샀는지 알 수 없음

    # Proposed 검색 결과 (관계 보존 chunk)
    proposed_retrieved = [
        """
        고객 프로필: 김철수
        - 주문 #456 (2024-01-15)
          - Magic Mouse x2 [반품: completed]
          - MacBook Pro x1
        - 주문 #789 (2024-02-20)
          - 기계식 키보드 x1 [반품: completed]
        반품 이력: Magic Mouse, 기계식 키보드
        """
    ]

    # 차이: 관계가 명시되어 있어 LLM이 정확히 답변 가능
    # "김철수가 반품한 상품은 Magic Mouse와 기계식 키보드입니다."

10. 프로덕션 체크리스트

데이터 파이프라인 체크리스트

markdown
## 관계형 데이터 → RAG 파이프라인 체크리스트

### 1. 데이터 분석 단계
- [ ] ERD 문서화 완료
- [ ] 주요 1:N, N:M 관계 식별
- [ ] 각 테이블의 카디널리티 확인
- [ ] 자주 조회되는 쿼리 패턴 분석
- [ ] 데이터 갱신 주기 파악

### 2. Chunking 전략 설계
- [ ] 관계의 "1" 측 식별 (고객? 주문? 상품?)
- [ ] Parent-Child 패턴 필요 여부 결정
- [ ] Chunk 크기 결정 (토큰 제한 고려)
- [ ] Metadata 스키마 설계
- [ ] 중복 제거 전략 수립

### 3. SQL 쿼리 최적화
- [ ] JSON 집계 쿼리 성능 테스트
- [ ] 필요한 인덱스 생성
- [ ] 배치 처리 크기 결정
- [ ] 증분 업데이트 쿼리 작성

### 4. 임베딩 및 인덱싱
- [ ] 임베딩 모델 선택 및 테스트
- [ ] 벡터 DB 선택 (Qdrant, Pinecone, Weaviate 등)
- [ ] 인덱스 설정 (HNSW 파라미터 등)
- [ ] Metadata 필터링 인덱스 설정

### 5. 검색 전략
- [ ] Hybrid 검색 필요 여부 결정
- [ ] 필터 조건 추출 로직 구현
- [ ] Top-K 값 결정
- [ ] Reranking 필요 여부 검토

### 6. 품질 관리
- [ ] 테스트 쿼리 세트 구축 (최소 50개)
- [ ] Ground Truth 라벨링
- [ ] Hallucination 탐지 로직 구현
- [ ] 정기 품질 모니터링 대시보드

### 7. 운영 준비
- [ ] 데이터 동기화 스케줄러
- [ ] 에러 핸들링 및 재시도 로직
- [ ] 로깅 및 모니터링
- [ ] 비용 모니터링 (임베딩 API)

흔한 실수와 해결책

python
"""
RAG + 관계형 데이터에서 흔한 실수들
"""

# 실수 1: 관계 없이 테이블별로 따로 임베딩
# ❌ Bad
for customer in customers:
    embed(f"고객 {customer.name}")
for order in orders:
    embed(f"주문 #{order.id}")

# ✅ Good
for customer in customers_with_orders:
    embed(format_customer_with_orders(customer))


# 실수 2: JOIN 결과를 그대로 row별 chunk
# ❌ Bad
for row in db.query("SELECT * FROM customers JOIN orders JOIN items"):
    embed(str(row))  # 동일 고객이 수십 번 중복

# ✅ Good
for customer in db.query(aggregated_query_with_json):
    embed(format_aggregated(customer))  # 고객당 1개 chunk


# 실수 3: Metadata 없이 텍스트만 저장
# ❌ Bad
vector_db.insert({"text": chunk_text, "vector": embedding})

# ✅ Good
vector_db.insert({
    "text": chunk_text,
    "vector": embedding,
    "customer_id": 123,  # 관계 ID
    "segment": "VIP",    # 필터링용
    "categories": ["전자제품", "의류"],  # 다중 값
})


# 실수 4: 정확한 수치를 벡터 검색에만 의존
# ❌ Bad
question = "VIP 고객 총 매출액은?"
answer = rag.query(question)  # 벡터 검색 결과에서 추정

# ✅ Good
question = "VIP 고객 총 매출액은?"
vector_context = rag.search(question)
exact_data = sql.query("SELECT SUM(amount) FROM orders WHERE segment='VIP'")
answer = llm.generate(context=vector_context, exact_data=exact_data)


# 실수 5: Chunk 크기 무시
# ❌ Bad
# 고객 한 명의 주문이 1000건이면 chunk가 너무 커짐
chunk = format_all_orders(customer)  # 100,000 토큰

# ✅ Good
# Parent-Child 패턴 사용
parent = format_customer_summary(customer)  # 500 토큰
children = [format_order(o) for o in customer.orders]  # 각 200 토큰


# 실수 6: 동기화 누락
# ❌ Bad
# 최초 인덱싱 후 업데이트 없음

# ✅ Good
# 증분 동기화 구현
def sync_updates():
    last_sync = get_last_sync_time()
    updated = db.query(f"SELECT * FROM customers WHERE updated_at > {last_sync}")
    for customer in updated:
        vector_db.upsert(customer_id, new_embedding)

성능 최적화 팁

python
"""
프로덕션 성능 최적화
"""

# 1. 배치 임베딩
# ❌ Bad: 하나씩 API 호출
for chunk in chunks:
    embedding = openai.embed(chunk.text)  # 1000번 API 호출

# ✅ Good: 배치 처리
batch_size = 100
for i in range(0, len(chunks), batch_size):
    batch = chunks[i:i+batch_size]
    embeddings = openai.embed([c.text for c in batch])  # 10번 API 호출


# 2. 캐싱
from functools import lru_cache

@lru_cache(maxsize=1000)
def embed_query(query: str) -> List[float]:
    return openai.embed(query)

# 동일 쿼리 반복 시 API 호출 절약


# 3. 비동기 처리
import asyncio

async def search_with_sql(query: str):
    # 벡터 검색과 SQL 쿼리 병렬 실행
    vector_task = asyncio.create_task(vector_db.search_async(query))
    sql_task = asyncio.create_task(sql_db.query_async(stats_query))

    vector_results, sql_data = await asyncio.gather(vector_task, sql_task)
    return combine_results(vector_results, sql_data)


# 4. 인덱스 최적화 (Qdrant 예시)
from qdrant_client.models import OptimizersConfig

client.update_collection(
    collection_name="customers",
    optimizer_config=OptimizersConfig(
        indexing_threshold=10000,  # 10K 이상일 때만 인덱싱
        memmap_threshold=50000,    # 50K 이상이면 디스크 사용
    )
)


# 5. Metadata 인덱스 (필터링 성능)
client.create_payload_index(
    collection_name="customers",
    field_name="segment",
    field_schema="keyword"  # 정확한 매칭용
)

client.create_payload_index(
    collection_name="customers",
    field_name="total_orders",
    field_schema="integer"  # 범위 쿼리용
)

마무리: 핵심 요약

RAG + SQL 핵심 원칙

1. 관계를 텍스트로 명시하라

- FK 관계를 자연어로 풀어서 chunk에 포함

- "주문 #456은 고객 김철수(ID: 123)의 주문입니다"

2. "1"의 측을 기준으로 그룹핑하라

- 고객 중심: 고객 + 해당 고객의 모든 주문

- JOIN 폭발을 JSON 집계로 해결

3. Metadata에 관계 ID를 저장하라

- customer_id, order_id로 필터링

- 검색 후 SQL로 추가 조회 가능

4. 정확한 수치는 SQL로 보완하라

- 벡터 검색: 관련 문서 찾기

- SQL: 정확한 집계/통계

5. Parent-Child로 스케일하라

- 대용량 관계는 요약(Parent) + 상세(Child)

- Child로 검색, Parent로 맥락 확보

벡터 DB는 의미를 이해하지만, 관계는 이해하지 못합니다.

RAG 시스템의 정확도를 높이려면 벡터 DB 튜닝 이전에, 원본 데이터의 관계형 구조가 chunk에 어떻게 반영되는지부터 점검해야 합니다. 1:N 관계를 무시한 chunking은 아무리 좋은 임베딩 모델을 써도 hallucination을 만들 수밖에 없습니다.

SQL에서 관계를 정리하고, 그 관계를 보존하는 chunking 전략을 세우는 것 - 이것이 RAG 시스템 정확도 향상의 첫 번째 단계다.

참고 자료

  • [LangChain: Parent Document Retriever](https://python.langchain.com/docs/modules/data_connection/retrievers/parent_document_retriever)
  • [Qdrant: Hybrid Search](https://qdrant.tech/documentation/concepts/hybrid-queries/)
  • [PostgreSQL: JSON Functions](https://www.postgresql.org/docs/current/functions-json.html)
  • [OpenAI: Embeddings Best Practices](https://platform.openai.com/docs/guides/embeddings)