올리브영 테크블로그 포스팅 재고의 변동을 시계열 데이터로?!
Redis Stream

재고의 변동을 시계열 데이터로?!

Redis Stream으로 재고 변동 이력 구현하기

2024.11.15

안녕하세요. 인벤토리 스쿼드 백엔드 개발을 담당하고 있는 한첨지입니다!
앞서, 올여우님께서 신규 재고 시스템 구축을 위한 개발 여정을 소개해 주셨는데요,

신규 구축 이후로도 인벤토리 스쿼드에서는 올리브영의 타 시스템에서 재고를 빠르고 효율적으로 가져가실 수 있도록 끊임없이 개선의 과정을 거치고 있습니다.
이번에 그중 하나인 Redis Stream으로 특정 기간 변동 재고 제공 사례를 소개해 드리고자 합니다!



배경


올리브영의 일부 시스템에서는, 특정 주기를 가지고 올리브영 모든 매장, 모든 상품의 재고 수량 정보를 가져가 각자의 시스템에 맞게 색인하여 활용합니다.
매장의 상품 단위를 SKU(Stock Keeping Unit)라고 하는데, 올리브영은 1,000만 SKU가 넘습니다.

일정 시간 단위 주기로 1,000만 건 이상의 대량 데이터를 가져가기 때문에 주기마다 Redis CPU 및 API Latency가 급격히 증가하는 현상이 발생하였는데요,

먼저 현재 인벤토리 Redis의 구조에 대해 살펴봅시다.
인벤토리 스쿼드에서는 1개 SKU 조회 기준으로 고성능 API 속도를 유지하기 위해 Redis에 Hash 타입으로 올리브영 전체 SKU의 재고 수량 정보를 저장합니다.

Key Value
field value
상품키 수량 10
상품키 ... ...
... ... ...

위 구조에서는 항상 최신의 재고 수량만 보유하기 때문에, 타 시스템에서 재고를 색인하려면 Redis의 모든 데이터를 탈탈 털어 전체 재고를 가져가야 하는 비효율이 발생하였는데요,

이런 비효율을 개선하기 위해 매장 내 POS, 물류 관리 시스템, 백오피스를 비롯해 여러 시스템에서 발생하는 상품의 판매, 입고, 반품, 폐기 등 모든 재고 변동 이벤트를 Redis Stream 데이터로 기록하기로 했습니다.
Redis Stream이 무엇이고 Redis Stream을 선택한 이유를 알아볼까요?

inventory

Redis Stream이란?

Redis Stream은 로그 데이터를 처리하기 위해 Redis 5.0부터 도입된 자료 구조입니다.
Redis Stream은 크게 두 가지 특성이 있습니다.

  • append-only의 특징을 가진 시계열 데이터 처리
  • 기록된 데이터 소비(메시징 시스템, 영속성을 가진 메시지)

이러한 특성 덕분에 Redis Stream은 실시간 데이터 처리, 로그 수집, 메시지 브로커 및 이벤트 드리븐 아키텍처 등에 널리 활용됩니다.

아래 표는 Redis Stream의 기본 명령어입니다.

종류 설명
XADD 스트림에 새 Entry 추가합니다.
XREAD 주어진 위치에서 시작하여 시간 순서대로 앞으로 이동하면서 하나 이상의 항목을 읽습니다.
XRANGE 두 개의 제공된 Entry ID 사이의 범위에 해당하는 Entry를 반환합니다.
XLEN 스트림의 Length를 반환합니다.

이 밖의 Redis Stream 명령어가 더 궁금하시다면 여기를 참고해 주세요!

추가적인 인프라 구성없이 시계열 데이터를 처리할 수 있다는 점이 Redis Stream을 선택한 이유입니다.
이제 구현 과정을 살펴볼까요~



목표 및 구현


기능을 제공하기에 앞서 변동 재고를 주기 위해 다음과 같은 목표를 세웠습니다.

  • 처리 순서 보장
  • 빠른 조회 속도 보장



처리 순서 보장


Redis Stream은 데이터 생성 시 자동으로 고유한 Entry ID를 생성합니다.

  • 저장된 데이터를 Entry라고 하는데, 각 Entry는 고유의 Entry ID를 가집니다.
  • 별도 설정이 ID를 지정해 주지 않으면 millisecondsTime-sequenceNumber 형태로 저장됩니다.
  • sequenceNumber는 동일한 밀리초에 생성된 경우 0,1,2, ... 순차적으로 증가합니다.
    • sequenceNumber는 64비트 범위이므로 사실상 동일 밀리초에 생성할 수 있는 항목 수에 제한이 없습니다.

> XADD inventory-20241030 * store-id STORE_A product-id GOODS_A stock-count 10
"1730271083140-0"

  • 각 순서는 다음 값을 의미합니다.
    • XADD [스트림 Key] [Entry ID] [필드-값]
  • *는 Entry ID 자동 생성을 트리거하는 것을 의미하며, 명시적으로 지정할 수 있습니다.(권장하진 않는다고 합니다.)

> XADD inventory-explicit-id 0-1 store-id STORE_A product-id GOODS_A stock-count 10
"0-1"

> XADD inventory-explicit-id 0-1 store-id STORE_A product-id GOODS_B stock-count 5
"(error) ERR The ID specified in XADD is equal or smaller than the target stream top item"

Redis Stream은 Entry ID를 기준으로 범위 쿼리를 지원합니다.

  • XRANGE 명령어는 Entry ID를 기준으로 범위를 조회합니다.
  • -는 최소 Entry ID, +는 최대 Entry ID를 의미합니다.
  • XREVRANGE 명령어로 역순 조회도 가능합니다.

> XRANGE inventory-20241030 - +
1) 1) "1730271083140-0"
   2) 1) "store-id"
      2) "STORE_A"
      3) "product-id"
      4) "GOODS_A
      5) "stock-count"
      6) "10"

XRANGE 명령어를 통해 반환된 데이터는 자동으로 생성된 Entry ID로 그 순서를 보장할 수 있습니다.
명령어만으로 설명하면 심심하니 Redisson 라이브러리를 적용한 예제 코드도 살펴볼까요?


// XADD
fun addStockHistory(streamKey: String, storeId: String, productId: String, stockCount: Int) {
    redissonClient.getStream<String, Any>(streamKey)
        .let {
            it.add(
                StreamAddArgs.entries(
                    mapOf(
                        "store-id" to storeId,
                        "product-id" to productId,
                        "stock-count" to stockCount
                    )
                )
            )
        }
  }

// 호출
addStockHistory("inventory-1030", "STORE_A", "GOODS_A", 10)
addStockHistory("inventory-1030", "STORE_A", "GOODS_B", 5)

여기서 잠깐!!
지금까지 예제에서 쓰인 Stream Key 뒤에 왜 날짜로 구별할까요?

변동 재고의 경우, 재고를 동기화하는 시스템에 API로 제공하는 것을 목표로 하므로 영구적으로 데이터를 보관할 필요가 없습니다.
이 때문에 당일의 변동 재고를 저장하는 Key는 "StreamKey-날짜" 형식으로 설계하였습니다.


Stream Key에 TTL을 주면, TTL이 만료된 이후에 내부적으로 데이터가 정리되어 지난 날짜들의 더미 데이터를 신경 쓰지 않아도 되는 이점이 있습니다.

add 함수 다음에 expire 함수로 TTL 기간을 지정해 주면 됩니다.

it.expire(Duration.ofDays(1L))



이어서 범위를 조회하기 위한 XRANGE 예제도 살펴봅니다~

// XRANGE
fun findStockHistoriesIn(streamKey: String, startMilli: Long, endMilli: Long): List<StockHistory> {
  return redissonClient.getStream<String, String>(streamKey)
            .range(
                StreamMessageId(startMilli, 0), // included
                StreamMessageId(endMMilli+1, 0) // excluded
            ).let {
                it.entries.map { entry ->
                    StockHistory.domainFromMap(entry.key, entry.value)
                }
            }
}

여기서 또 의문이 한 가지 생깁니다.

시간 범위를 의미하는 startMilli와 endMilli는 어떤 값을 넣어줘야 하나요?
이제부터 XRANGE 명령어의 함정에 관해서 얘기해 보겠습니다.

Redis는 inmemory를 데이터 저장소로 사용하여 고성능 데이터 처리가 가능하도록 설계되어 있는데요,
내부적으로는 싱글 스레드로 돌아가기 때문에 한 명령어의 처리 시간이 길어지면 자칫 장애로 이어질 수 있습니다.

위험한 명령어로는 KEYS, HGETALL 등이 있습니다.
이 두 명령어는 O(n)의 시간 복잡도를 가지는데요, 저장된 key 또는 field의 개수가 많아질수록 처리 시간이 오래 걸립니다.

XRANGE 의 시간복잡도는 O(log(N)+M)으로, 데이터가 많을수록 한 번에 조회하는 것은 매우 위험합니다.

  • N : 스트림에 저장된 Entry 개수
  • M : 반환될 Entry 개수

그렇다면 어떻게 빠르고 안전한 조회 속도를 보장했을까요?
이어서 가보겠습니다~!

빠른 조회 속도 보장


KEYS, HGETALL를 써야 할 때 대체할 수 있는 명령어가 있습니다.
바로 SCAN, HSCAN인데요, 간단하게 설명해 드리면 두 명령어는 스캔 범위(count)를 설정하여 해당 count만큼 끊어서 조회합니다.


XRANGE를 쓰기 위해 비슷한 방식으로 적절하게 나누어 병렬로 조회합니다.

changed stocks

예시 코드를 살펴봅시다!

조회 시에는 RedissonReactiveClient를 사용하면 리액티브 프로그래밍으로 조회 성능을 끌어올릴 수 있습니다.

/**
    timeSplitList : 조회 범위를 밀리초로 변경하여 100ms 간격으로 나눈 리스트
    ex. [1709701855083, 1709701855183, 1709701855283, 1709701855383,...]
**/
fun findListOfStockHistory(streamKey: String, timeSplitList: List<Long>): List<StockHistory> {
    return Flux.fromIterable(timeSplitList)
        .flatMap { time ->
            reactiveMemoryDBConnectionFactory.getStream<String, String>(streamKey) // Stream-Key
                .range(
                    StreamMessageId(time, 0), // included
                    StreamMessageId(time + 100, 0) // 100ms, excluded
                )
                .map {
                    it.entries.map { entry ->
                        StockHistory.domainFromMap(entry.key, entry.value)
                    }
                }
        }.flatMapIterable { it }
        .timeout(Duration.ofSeconds(10))
        .collectList()
        .block() ?: listOf()
}

각 시스템에 맞게 가장 적절한 조회 범위를 찾는 것이 가장 핵심 내용입니다.

일반적으로는, 조회 범위가 길어 반환할 Entry 수가 많아지면 시간 복잡도가 증가하여 한 명령어 당 조회 성능이 저하되지만, 조회 범위가 너무 짧으면 명령어 호출 횟수가 그만큼 증가하여 자원을 낭비할 수 있습니다.

시스템상 데이터가 시간의 흐름에 따라 어떤 분포도를 가지는지 파악하고 그에 맞는 조회 범위를 찾아야 합니다.
시간의 흐름에 따른 스트림에 적재하기 유리한 분포도를 그려보았습니다.

graph

위 분포도처럼 Redis Stream에 쌓인 Entry의 시간대별 분포를 분석하면 적절한 조회 범위를 찾을 수 있습니다.


마치며..


이번 시간에는 Redis Stream의 시계열 데이터 처리 특성을 중점적으로 개선 과정을 소개하였는데요!
향후 개선 과정에 있어서 Redis Stream을 메시징 시스템으로도 사용하게 된다면, 이어서 소개해 드리겠습니다~ :)

긴 글 읽어주셔서 감사합니다.

Redis StreamRedisson
올리브영 테크 블로그 작성 재고의 변동을 시계열 데이터로?!
🤫
한첨지 |
Back-end Engineer
어려운 문제라도 재밌게 해결해보고자 합니다!!