Redis Streams는 Redis 5.0에서 추가된 로그와 유사한 자료구조로, 아파치 카프카에서 영감을 받아 설계되었다. 영속적인 메시지 저장, 컨슈머 그룹, 소비 추적 등을 지원한다.
핵심 개념
Stream 엔트리: ID + 필드-값 쌍
1704067200000-0: {user:"alice", action:"login"}
1704067260000-0: {user:"bob", action:"purchase"}
ID 형식: <밀리초 타임스탬프>-<시퀀스>
* : 자동 생성 (권장)
Consumer Group:
- 여러 컨슈머가 나누어 처리 (파티셔닝 X)
- 각 컨슈머는 다른 메시지 수신
- ACK로 처리 확인
- PEL (Pending Entry List): 미처리 메시지 추적
python
import redis
import time
r = redis.Redis(decode_responses=True)
# 메시지 발행
stream_id = r.xadd('events:user', {
'user_id': '1001',
'action': 'login',
'ip': '192.168.1.1'
})
print(f"Published: {stream_id}")
# 컨슈머 그룹 생성 (처음 실행 시)
try:
r.xgroup_create('events:user', 'analytics', '$', mkstream=True)
except redis.ResponseError:
pass # 이미 존재
# 메시지 소비
def consume(consumer_name, count=10):
messages = r.xreadgroup(
'analytics', consumer_name,
{'events:user': '>'}, # '>' = 새 메시지
count=count,
block=5000 # 5초 대기
)
for stream, msgs in messages:
for msg_id, data in msgs:
try:
process(data)
r.xack('events:user', 'analytics', msg_id)
except Exception:
pass # PEL에 유지 → 나중에 claim 가능
# 미처리 메시지 재처리 (장애 복구)
pending = r.xpending_range('events:user', 'analytics', '-', '+', 10)
for msg in pending:
if msg['time_since_delivered'] > 60000: # 1분 이상 대기
# 다른 컨슈머로 이전
r.xclaim('events:user', 'analytics', 'recovery-worker',
60000, [msg['message_id']])
Redis Streams vs Kafka 비교
| 항목 | Redis Streams | Apache Kafka |
|---|
| 파티셔닝 | 단일 스트림 | 파티션 기반 |
| 보존 | maxlen 또는 무제한 | 시간/크기 기반 |
| 운영 복잡도 | 낮음 (Redis 내장) | 높음 (ZooKeeper 등) |
| 처리량 | 수백만/초 | 수백만~수십억/초 |
| 리플레이 | ID 범위 조회 | 오프셋 기반 |
| 적합 사례 | 소규모, 낮은 운영비 | 대규모 스트리밍 |
관련 문서