본문 바로가기
text/Python

python으로 간단한 In-Memory Key-Value DB 만들기

by hoonzii 2025. 7. 13.
반응형

계기는 아래 스레드 글이었다.

 

평소 Redis 같은 in-memory DB를 쓰면서 “와, 진짜 편하다”라고만 생각했지, 

그 안에서 무슨 일이 일어나는지 깊게 고민해 본 적은 없었다. 

그러다 어느 날 문득 “나도 비슷한 걸 한번 만들어볼까?”  

 

물론 Redis를 똑같이 구현할 자신은 없다. 검색해 보니 Redis는 C로 짜여 있고, 내부 구조도 엄청나게 복잡하더라...

Python으로 그걸 따라잡겠다는 건 터무니없는 도전이다. 그래서 목표를 간단히 잡았다.

Python으로 간단한 key-value in-memory DB를 만들어보자.  

이 문서에서는 이 DB를 설계하고 구현하는 과정을 챕터별로 나눠 정리한다. 기본 기능부터 시작해 TTL, 트랜잭션, 백업/복원, 확장 명령어, 사용자 명령 기록, 서버-클라이언트 구조까지 점진적으로 발전시키며, 각 단계에서 고민한 점과 깨달음을 공유한다.

 

tl;dr - show me the code

https://github.com/hoonzinope/py-in-mem-db

 

GitHub - hoonzinope/py-in-mem-db: A simple in-memory key-value database implementation built with Python.

A simple in-memory key-value database implementation built with Python. - hoonzinope/py-in-mem-db

github.com

 

챕터 1: 기본 Key-Value DB 구현

어떤 기능을 넣을까?
핵심은 데이터를 key: value 형태로 저장하는 거다. 

Python의 dict를 기반으로 쓰면 되니까 간단할 것 같다. 여기에 DB스러운 명령어를 몇 개 붙여서 동작하게 만들 계획이다. 

필요한 기능은 대충 이렇게 정리해 봤다.

쓰기(갱신):
- put <key> <value>: key에 value를 저장
- delete <key>: 특정 key-value 쌍 삭제
- clear: 저장된 데이터 전부 지우기

읽기(조회):
- get <key>: key에 해당하는 value 가져오기
- exists <key>: key가 있는지 확인
- keys: 모든 key 목록 보기
- values: 모든 value 목록 보기
- items: 모든 key-value 쌍 보기
- size: 저장된 데이터 개수 확인

기타:
- help: 사용 가능한 명령어 안내

결국 DB라는 게 거창해 보여도, 쓰고 읽는 게 전부다. 이 두 축만 잘 잡으면 기본은 갖춘 셈이다.
데이터를 메모리에 저장하고 관리할 inMemoryDB 클래스를 만들었다. 

Python의 dict를 그대로 써서 핵심 기능만 간단히 메서드로 감싸줬다. 코드부터 보자.

class inMemoryDB:
    def __init__(self):
        self.data = {}

    def put(self, key, value):
        self.data[key] = value

    def get(self, key):
        return self.data.get(key, None)

    def delete(self, key):
        if key in self.data:
            del self.data[key]

    def clear(self):
        self.data.clear()

    def exists(self, key):
        return key in self.data
    
    def keys(self):
        return list(self.data.keys())
    
    def values(self):
        return list(self.data.values())
    
    def items(self):
        return list(self.data.items())
    
    def size(self):
        return len(self.data)
    
    def help(self):
        return (
            "Commands:\n"
            "put <key> <value> - Store a value with a key\n"
            "get <key> - Retrieve a value by key\n"
            "delete <key> - Remove a key-value pair\n"
            "clear - Clear the database\n"
            "exists <key> - Check if a key exists\n"
            "keys - List all keys\n"
            "values - List all values\n"
            "items - List all key-value pairs\n"
            "size - Get the number of items in the database\n"
            "exit - Exit the command interface"
        )



보면 알겠지만, 진짜 별거 없다. Python dict가 다 해주는 걸 내가 이름만 예쁘게 붙인 수준이다. 

그래도 이렇게 감싸니까 좀 DB처럼 보이지 않나?

 

이제 이 DB를 터미널에서 명령어로 다룰 수 있게 해 보자. 

while True로 루프를 돌면서 사용자가 입력한 명령어를 받아 적절한 메서드를 호출하도록 했다. 코드도 간단하다.

from MemDB import inMemoryDB

class command:
    def __init__(self):
        self.memdb = inMemoryDB()

    def execute(self, cmd):
        parts = cmd.split()
        if not parts:
            return "No command provided"

        action = parts[0].lower()
        if action == "put" and len(parts) == 3:
            self.memdb.put(parts[1], parts[2])
            return f"Stored {parts[1]}: {parts[2]}"
        elif action == "get" and len(parts) == 2:
            value = self.memdb.get(parts[1])
            return f"{parts[1]}: {value}" if value is not None else f"{parts[1]} not found"
        elif action == "delete" and len(parts) == 2:
            self.memdb.delete(parts[1])
            return f"Deleted {parts[1]}"
        elif action == "clear":
            self.memdb.clear()
            return "Database cleared"
        elif action == "exists" and len(parts) == 2:
            exists = self.memdb.exists(parts[1])
            return f"{parts[1]} exists: {exists}"
        elif action == "keys":
            return f"Keys: {self.memdb.keys()}"
        elif action == "values":
            return f"Values: {self.memdb.values()}"
        elif action == "items":
            return f"Items: {self.memdb.items()}"
        elif action == "size":
            return f"Size: {self.memdb.size()}"
        elif action == "help":
            return self.memdb.help()
        else:
            return "Invalid command"
        
    def run(self):
        print("Welcome to the in-memory database command interface!")
        print("Type 'help' for a list of commands.")
        while True:
            cmd = input("Enter command: ")
            if cmd.lower() == "exit":
                print("Exiting...")
                break
            response = self.execute(cmd)
            print(response)

if __name__ == "__main__":
    cmd = command()
    cmd.run()



정리
이렇게 만든 DB는 단일 사용자, 단일 프로세스에서만 동작하는 초간단 in-memory DB다. 프로그램을 끄면 데이터는 싹 날아간다. 메모리에만 저장되니까 당연한 거지만. 그래도 직접 만들어보니 DB의 본질이 뭔지 다시 생각해 보게 됐다. 결국 DB는 CRUD(생성, 조회, 갱신, 삭제)를 깔끔하게 처리하는 도구일 뿐이다. Python의 dict에 살짝 인터페이스만 얹었을 뿐인데, 제법 DB처럼 굴러가는 걸 보니 나름 뿌듯하다.


챕터 2: TTL(Time To Live) 기능 추가

이제 우리 DB에 TTL(Time To Live) 기능을 넣어보자. TTL은 데이터에 유효기간을 설정해서 일정 시간이 지나면 자동으로 사라지게 만드는 기능이다. 캐시처럼 임시 데이터를 다룰 때 특히 유용하다.
어떤 식으로 동작해야 할지 간단히 정리해 보면:

- 각 key-value 쌍에 **만료 시각(expire time)**을 같이 저장한다.
- put 명령으로 데이터를 넣을 때 TTL(초 단위)을 지정할 수 있다.
- 현재 시각이 만료 시각을 넘으면 데이터는 무효 처리되고, 조회 시 자동으로 삭제된다.
- 조회 명령(get, exists 등)에서 만료 여부를 확인해, 만료된 데이터는 삭제하고 결과도 반환하지 않는다.

 

만료 처리 방식은 두 가지:
- 지연 삭제(lazy expiration): 조회할 때 만료 여부를 확인하고 삭제.
- 능동 삭제(active expiration): 백그라운드에서 주기적으로 만료된 데이터를 청소.

TTL을 지정하지 않거나 0으로 설정하면 데이터는 영구히 저장된다.

1) 지연 삭제(lazy expiration) 구현
먼저 간단한 방식인 지연 삭제부터 해보자. 

이 방식은 데이터를 조회할 때마다 만료 여부를 확인해서, 만료된 데이터면 바로 지우는 방법이다. 구현이 간단해서 시작하기 딱 좋다.

 

put 메서드를 고쳐서 TTL을 받도록 만들었다. TTL이 없으면 기본값으로 7초를 설정해 봤다.

def put(self, key, value, ttl=None):
    if ttl is not None:
        expire_time = time.time() + ttl
    else:
        expire_time = time.time() + 7  # 기본 7초 뒤 만료

    self.data[key] = {
        "value": value,
        "expire_time": expire_time
    }



데이터를 저장할 때 value와 만료 시각을 함께 저장한다. 간단하죠?

 

get 메서드에서 만료 확인
get으로 데이터를 조회할 때, 먼저 만료 시각을 확인한다. 만료됐다면 데이터를 지우고 None을 반환한다.

def get(self, key):
    if key not in self.data:
        return None

    if self.data[key]["expire_time"] < time.time():
        del self.data[key]
        return None
    return self.data[key]["value"]


조회할 때마다 만료 여부를 체크하니까, 메모리 관리도 자연스럽게 된다.

 

지연 삭제의 한계
지연 삭제는 구현이 간단하고 추가 오버헤드가 거의 없다는 점이 매력적이다. 하지만 단점도 있다. 

조회가 없는 데이터는 만료돼도 메모리에 계속 남아있다. 데이터가 쌓이면 메모리가 낭비될 수 있다. 

(물론 우리 같은 소규모 DB에선 큰 문제는 아니지만, 그래도 알아두는 게 좋겠지!)

 

2) 능동 삭제(active expiration) 추가
메모리 낭비 문제를 해결하려면, 주기적으로 만료된 데이터를 청소하는 능동 삭제 방식을 추가해야 한다. 

별도의 스레드가 데이터를 훑어보며 만료된 걸 지워주는 방식이다.

 

동시성 문제와 Lock
여기서 조심해야 할 게 있다. 사용자가 데이터를 넣거나 조회, 삭제하는 중에 백그라운드 스레드가 데이터를 지우러 들어오면 데이터가 꼬일 수 있다. 이런 동시성 문제를 막으려면 Lock을 써서 데이터 접근을 보호해야 한다. Python의 threading.Lock을 사용하면 간단히 해결된다.

from threading import Lock

class inMemoryDB:
    def __init__(self):
        self.data = {}
        self.lock = Lock()

    def put(self, key, value, ttl=None):
        with self.lock:
            if ttl is not None:
                expire_time = time.time() + ttl
            else:
                expire_time = time.time() + 7
            self.data[key] = {
                "value": value,
                "expire_time": expire_time
            }

    def get(self, key):
        with self.lock:
            if key not in self.data:
                return None
            if self.data[key]["expire_time"] < time.time():
                del self.data[key]
                return None
            return self.data[key]["value"]

    def delete(self, key):
        with self.lock:
            if key in self.data:
                del self.data[key]



만료 데이터 청소 스레드
이제 백그라운드에서 주기적으로 만료된 데이터를 지우는 스레드를 만들어보자.

def _delete_expired(self):
    while True:
        with self.lock:
            now = time.time()
            expired_keys = [key for key, value in self.data.items()
                            if value["expire_time"] < now]
            for key in expired_keys:
                del self.data[key]
        time.sleep(1)  # 1초마다 확인


1초마다 데이터를 훑어보며 만료된 키를 지운다. 간단하지만 효과적이다.

 

스레드 실행
DB 객체를 만들 때 이 청소 스레드가 자동으로 시작되도록 설정한다.

from threading import Thread, Lock

class inMemoryDB:
    def __init__(self):
        self.data = {}
        self.lock = Lock()
        self.expiration_thread = Thread(target=self._delete_expired, daemon=True)
        self.expiration_thread.start()


daemon=True 옵션으로 스레드를 설정해서 프로그램 종료 시 같이 멈추게 했다.

 

결론: 두 방식의 조화
실제 DB(예: Redis)에서는 지연 삭제와 능동 삭제를 섞어서 쓴다고 한다. 조회할 때 만료된 데이터를 바로 지우고, 백그라운드 스레드로 주기적으로 남은 데이터를 청소하는 식이다. TTL을 구현하면서 가장 크게 깨달은 건, 데이터를 언제 지우느냐도 중요하지만, 여러 스레드가 데이터를 안전하게 다루도록 Lock으로 보호하는 게 더 중요하다는 점이다. 막상 만들기 전에는 이런 고민이 피부로 와닿지 않았는데, 직접 코드를 짜보니 확실히 이해가 됐다.


챕터 3: 트랜잭션(Transaction) 기능 추가

 

이제 우리 DB에 트랜잭션 기능을 넣어보자. 트랜잭션은 데이터를 안전하고 일관되게 다룰 수 있게 해주는 핵심 기능이다. 

이번엔 이걸 구현하면서 어떤 점을 고민해야 하는지 하나씩 풀어본다.

 

트랜잭션이 뭔데?
쉽게 말해, 트랜잭션은 모 아니면 도(all-or-nothing) 방식으로 동작하는 작업 단위다. 여러 작업을 하나로 묶어서, 전부 성공하거나 아예 아무것도 반영되지 않도록 한다. 예를 들어, 은행 송금을 생각해 보자. 내가 A은행에서 친구의 B은행 계좌로 돈을 보낼 때, 내 계좌에서 돈이 빠져나가고 친구 계좌에 돈이 들어가는 과정이 모두 성공해야 한다. 중간에 뭐 하나라도 잘못되면, 처음 상태로 돌아가야 한다. 이게 바로 트랜잭션의 핵심이다.

 

트랜잭션 구현 전략
우리 DB에 트랜잭션을 추가하려면, 작업이 진행되는 동안 실제 데이터에 영향을 주지 않도록 해야 한다. 아이디어는 간단하다:

- begin: 트랜잭션을 시작하면 데이터를 통째로 복사해서 따로 보관한다. 작업은 복사본이 아닌 원본 데이터(data)에서 진행한다.
- commit: 모든 작업이 문제없이 끝나면 복사본을 지우고 트랜잭션을 마무리.
- rollback: 문제가 생기면 복사본을 원본에 덮어씌워 처음 상태로 되돌린다.

기본 구조 세팅
inMemoryDB 클래스에 트랜잭션을 위한 변수를 추가한다.

from threading import Lock
import copy

class inMemoryDB:
    def __init__(self):
        self.data = {}
        self.org_data = {}  # 트랜잭션용 백업 데이터
        self.lock = Lock()  # 동시성 제어
        self.in_transaction = False  # 트랜잭션 진행 여부



트랜잭션 시작: begin
트랜잭션을 시작하면 데이터를 깊은 복사(deepcopy)해서 org_data에 저장한다. 동시성 문제를 피하려고 lock도 잡는다.

def begin_transaction(self):
    self.lock.acquire()
    try:
        if self.in_transaction:
            raise RuntimeError("이미 트랜잭션이 진행 중입니다.")
        self.org_data = copy.deepcopy(self.data)
        self.in_transaction = True
    except Exception as e:
        if self.lock.locked():
            self.lock.release()
        raise e



작업 확정: commit
commit은 트랜잭션을 마무리한다. org_data를 비우고, 트랜잭션 플래그를 끄고, lock을 해제한다.

def commit_transaction(self):
    try:
        if not self.in_transaction:
            raise RuntimeError("진행 중인 트랜잭션이 없습니다.")
        self.org_data = {}
        self.in_transaction = False
    except Exception as e:
        self.rollback_transaction()
        raise e
    finally:
        if self.lock.locked():
            self.lock.release()



되돌리기: rollback
rollback은 org_data를 다시 data에 복사해서 트랜잭션 시작 전 상태로 되돌린다.

def rollback_transaction(self):
    try:
        if not self.in_transaction:
            raise RuntimeError("진행 중인 트랜잭션이 없습니다.")
        self.data = copy.deepcopy(self.org_data)
        self.org_data = {}
        self.in_transaction = False
    finally:
        if self.lock.locked():
            self.lock.release()



TTL과 트랜잭션 충돌 방지
앞서 만든 TTL의 능동 삭제(active expiration) 기능과 트랜잭션이 충돌할 수 있다. 

트랜잭션 중에 데이터가 반쯤 바뀐 상태에서 TTL 스레드가 만료된 데이터를 지우면, 트랜잭션의 원자성이 깨진다. 

이를 막으려면 트랜잭션 진행 중에는 TTL 스레드가 데이터를 건드리지 않도록 해야 한다.

def _delete_expired(self):
    while True:
        if not self.in_transaction:
            self._clean_expired()
        time.sleep(1)  # 1초마다 확인


트랜잭션 중일 때는 청소를 건너뛰고, 평소에만 만료 데이터를 지운다.

 

동시성 처리 깔끔하게: 데코레이터

코드를 좀더 발전시켜보면 모든 작업은 transaction = true / false인 상황 두 가지로 구분되고,

각 작업의 구조는 동일하게 아래와 같이 중복이 일어난다.

def 작업():
    if self.in_transaction:
        _실제_작업()
    else:
        with self.lock:
            _실제_작업()


put, get, delete 같은 메서드마다 매번 트랜잭션 여부를 확인하고 lock을 잡는 코드를 넣으면 지저분해진다. 

Python의 데코레이터를 쓰면 훨씬 깔끔하다.

def transaction_safe(func):
    def wrapper(self, *args, **kwargs):
        if self.in_transaction:
            return func(self, *args, **kwargs)
        with self.lock:
            return func(self, *args, **kwargs)
    return wrapper



이 데코레이터를 적용하면, 트랜잭션 중에는 lock 없이 실행하고, 평소에는 lock을 잡아서 thread-safe 하게 동작한다.
예시: get 메서드

@transaction_safe
def get(self, key):
    if key not in self.data:
        return None
    if self.data[key]["expire_time"] < time.time():
        del self.data[key]
        return None
    return self.data[key]["value"]


이제 put, delete 같은 메서드에도 이 데코레이터를 붙이면 깔끔하게 동시성 문제를 해결할 수 있다.

 

구현하면서 느낀 점
트랜잭션을 만들어보면서 몇 가지 깨달은 게 있다.

트랜잭션은 원자성을 보장해서 데이터를 안전하게 지키지만, 데이터를 통째로 복사하는 방식은 메모리를 많이 잡아먹는다. 실제 DB는 변경 로그 같은 방식으로 메모리 부담을 줄인다고 한다.
트랜잭션이 길어질수록 메모리 사용량이 늘어나는 문제도 있다. 이걸 직접 겪어보니 왜 대규모 DB가 복잡한 구조를 쓰는지 이해가 갔다.

우리 같은 간단한 DB에서는 깊은 복사로도 충분하지만, 실제 시스템의 복잡함을 조금이나마 맛본 느낌이다.

 


챕터 4: 백업과 복원으로 데이터 지키기


지금까지 만든 in-memory DB는 프로그램이 꺼지면 데이터가 싹 날아간다. 

실험용으로는 괜찮지만, 진짜 DB라면 데이터를 잃지 않도록 **영속성(persistence)**을 보장해야 한다. 

갑작스러운 종료(crash)에도 데이터가 무사해야 하니까. 이번엔 백업과 복원 기능을 추가해 보자.

 

백업과 복원의 기본 아이디어
실제 DB는 데이터를 보존하기 위해 몇 가지 방법을 쓴다. 대표적으로 세 가지를 정리해 보면:

- Snapshot: 데이터를 통째로 주기적으로 파일에 저장한다. 빠르게 복원할 수 있지만, snapshot 직전의 작업은 잃을 수 있다.
- AOF(Append Only File): put, delete, clear 같은 변경 명령을 파일에 차곡차곡 기록한다. 복원할 때 명령을 처음부터 다시 실행해 상태를 재구성한다. 하지만 명령이 많아지면 복원이 느려질 수 있다.
- Snapshot + AOF 조합: snapshot으로 빠르게 기본 상태를 로드한 뒤, 이후 변경된 명령(AOF)만 적용한다. 이 방식은 Redis 같은 실제 DB에서도 자주 쓰인다.

우리 DB에서도 이 조합을 구현해 볼 계획이다.

 

설계 방향
구체적으로 어떻게 할지 정리해 보면:

- Snapshot: 별도 스레드가 주기적으로 데이터를 파일에 저장한다. 빠른 로드를 위해 바이너리 포맷으로 저장하고, snapshot을 저장할 때 AOF 파일은 비운다.
- AOF: put, delete, clear 같은 변경 명령을 파일 끝에 추가한다. 트랜잭션 중에는 메모리에 모아뒀다가 commit 시 한꺼번에 기록한다.
- 복원(Load): 프로그램 시작 시 snapshot을 먼저 로드하고, 그 이후의 AOF 명령을 순서대로 실행해 최신 상태를 복구한다.

구조 개편: 명령 객체 패턴
백업과 복원 기능을 넣으면서 기존 구조를 좀 손봤다. 

원래는 main → command_handler → memDB로 단순하게 이어졌는데, 복원을 위해 프로그램 시작 직후 memDB가 데이터를 로드해야 하고, 명령을 실행할 때마다 AOF에 기록도 해야 했다. 

이런 복잡함을 정리하려고 팩토리 + 명령 객체 패턴을 도입했다:

main → command_handler: 사용자 입력을 받아 command_parser로 Command 객체를 만든다.
registry: 명령어 이름을 Command 클래스에 매핑한다.
memDB → execute(command): Command 객체가 memDB와 persistence_manager를 사용해 데이터 처리와 AOF 기록을 한다.

이렇게 하면 나중에 새로운 명령을 추가할 때도 Command 객체만 만들면 되니까 훨씬 유연하다.
복원: load 명령
프로그램이 시작되면 command_handler가 자동으로 load 명령을 실행한다.

class Load(Command):
    def execute(self, memdb, persistence_manager):
        with memdb.lock:
            self._load_snapshot(memdb, persistence_manager)
            self._load_aof(memdb, persistence_manager)

    def _load_snapshot(self, memdb, persistence_manager):
        snapshot_data = persistence_manager.load_data()
        if snapshot_data:
            memdb.data = snapshot_data

    def _load_aof(self, memdb, persistence_manager):
        memdb.in_load = True  # 복원 중임을 표시
        aof_commands = persistence_manager.load_command()
        for command in aof_commands:
            parts = command.strip().split()
            if not parts:
                continue
            action = parts[0].lower()
            # registry에서 해당 명령을 찾아 실행
            cmd = registry.get(action)
            if cmd:
                cmd.execute(memdb, persistence_manager, parts[1:])
        memdb.in_load = False



in_load 플래그는 복원 중일 때 불필요한 락이나 AOF 기록을 막아준다.
Snapshot과 AOF 동작

명령 처리:
- put, delete 같은 변경 명령은 트랜잭션 상태가 아니면 바로 AOF에 추가된다.
- 트랜잭션 중이면 메모리에 모아뒀다가 commit 시 AOF에 한꺼번에 기록한다.

Snapshot: 별도 스레드가 10초마다 데이터를 저장한다.
- 트랜잭션 중이라 락을 못 잡으면 다음 주기로 넘어간다.
- snapshot 저장 시 AOF 파일을 비워서 중복 기록을 줄인다.

트랜잭션과 백업 연계
트랜잭션도 이제 Command 객체로 처리된다:

- begin: 락을 걸고 in_transaction = True로 설정. 변경 명령은 메모리에만 저장된다.
- commit: 메모리에 모인 명령을 AOF에 기록하고 트랜잭션을 끝낸다.
- rollback: 원본 데이터(org_data)로 되돌리고 트랜잭션을 종료.

이렇게 하면 snapshot 스레드가 트랜잭션 중에 데이터를 건드리지 않도록 락으로 보호된다.

 

만들면서 느낀 점
처음엔 간단한 put, get만 구현했는데, TTL, 트랜잭션, 그리고 백업/복원까지 추가하다 보니 구조를 완전히 새로 짜야했다. 명령 객체 패턴과 팩토리를 도입하니 새로운 기능을 추가하기 훨씬 쉬워졌다. 또, snapshot과 AOF를 조합하면서 트랜잭션 충돌이나 AOF 재실행 같은 문제를 고민하다 보니, 실제 DB가 왜 그렇게 복잡한지 피부로 느껴졌다. 작은 toy 프로젝트인데도 이렇게 고민이 많을 줄이야!

챕터 5: 새로운 명령어로 DB 확장하기 — alias, batch, find


이제 우리 DB에 새로운 기능을 얹어보자. put, get 같은 기본 명령어 이외에 alias, batch, find라는 세 가지 명령어를 추가해서 DB를 더 쓰기 편하고 강력하게 만들어본다.

 

왜 구조를 바꿨나?
처음엔 put, get 같은 명령어를 단순히 if-else로 처리했는데, 명령어가 늘어나면서 코드가 점점 엉망이 됐다. 

그래서 구조를 싹 갈아엎고 팩토리 + 명령 객체 패턴으로 재설계했다. 

덕분에 새로운 명령어를 추가할 때 Command 객체만 만들면 바로 동작하도록 깔끔해졌다.

 

🔄 alias: 나만의 명령어 만들기
왜 필요하지?
put key val 10처럼 매번 긴 명령어를 치기 귀찮다. 그래서 alias를 추가해서 사용자가 원하는 짧은 별칭을 만들 수 있게 했다. 

예를 들어: alias p put
실행 시 이제 p key val 10처럼 짧게 쓸 수 있다.

 

설계 고민: 무한 루프 막기
별칭을 자유롭게 등록하다 보면 문제가 생길 수 있다. 예를 들어:

alias put set
alias set cat
alias cat put


이렇게 하면 cat을 호출했을 때 무한 루프에 빠진다. 이를 막으려면:

별칭의 우변은 반드시 기본 명령어(put, get 등)만 허용한다.
이미 등록된 별칭을 가리키는 새로운 별칭은 만들 수 없다. 이런 제한으로 안전하게 설계했다.

# command/alias.py
@register_command("alias")
class Alias(Command):
    def __init__(self, original_command=None):
        super().__init__()
        self.original_command = original_command
        self.command_list = ["put", "get", "delete",
                             "clear", "exists", "keys",
                             "values", "items", "size",
                             "help", "begin", "commit", "rollback",
                             "alias", "load"]
        self.alias_command = {}
		# ...

    def _set_alias(self, cmd):
        parts = cmd.split()
        if len(parts) != 3:
            return "Invalid alias command format. Use: alias <alias_name> <command>"

        alias_name = parts[1]
        command = parts[2]

        if alias_name == "load" or command == "load":
            return "Cannot create alias for 'load' command."

        if alias_name in self.alias_command:
            return f"Alias '{alias_name}' already exists."

        if command not in self.command_list:
            return f"Command '{command}' is not a valid command."

        self.alias_command[alias_name] = command
        return f"Alias '{alias_name}' set for command '{command}'."

 

“load” 역시 명령어의 일종이므로 실수로 입력될 경우 등록이 방지되게끔 해준다.

만약, 등록하려는 명령어가 이미 등록되어 있는경우 등록하지 않는다. (2번 제약)

만약 대체하려는 명령어가 기존 명령어 리스트에 없는 경우 등록하지 못한다. (1번 제약)

이렇게 등록 시킬 경우, 앞서 얘기한 무한루프에서 벗어날 수 있게 된다.



alias도 저장하고 복원하기
alias는 프로그램이 꺼졌다 켜져도 유지돼야 하니까, 별도의 JSON 파일에 저장하도록 했다.

def save_alias(self, alias_dict):
    with open(self.alias_file, 'w') as file:
        json.dump(alias_dict, file)


복원할 때는 load 명령어에서 JSON 파일을 읽어 memdb.alias_command에 넣는다.

불러올 때는 alias도 불러오게끔 load 명령어에 _load_alias를 추가해 준다.  

@register_command("load")
class Load(Command):
    def __init__(self):
        super().__init__()

    def execute(self, memdb, persistence_manager):
        self.memdb = memdb
        self.persistence_manager = persistence_manager

        with self.memdb.lock:
            # Load snapshot data
            self._load_snapshot()

            # Load alias commands
            self._load_alias()

            # Load AOF commands
            self._load_aof()
            
     # ....
     
     def _load_alias(self):
        alias_data = self.persistence_manager.load_alias()
        if alias_data:
            self.memdb.alias_command = alias_data
        else:
            self.memdb.alias_command = {}

                                        

 

명령어 변환
사용자가 p 같은 별칭을 입력하면, 커맨드 파서에서 이를 원래 명령어로 바꾼다.

#command_handler.py
def execute(self, cmd):
    cmd = self.convert_alias(cmd) if cmd != "load" else cmd
    # ... 기존 command 처리와 동일

def convert_alias(self, cmd):
    parts = cmd.split()
    if len(parts) < 2:
        if parts[0] in self.alias_command:
            return self.alias_command[parts[0]]
        else:
            return cmd

    alias = parts[0]
    if alias in self.alias_command:
        command = self.alias_command[alias]
        return command + " " + " ".join(parts[1:])

    return cmd  # No alias found, return original command


AOF에는 항상 원래 명령어(put, delete 등)로 기록되니까, alias 파일이 깨져도 데이터 복구에 문제없다.

 

해당 부분까지의 작업내용은 아래의 링크로 확인할 수 있다.

https://github.com/hoonzinope/py-in-mem-db/tree/d22489121fd92e3cdc0188df08aac8a196d7f0b5

 

🧑‍💻 batch: 한 번에 여러 명령 처리
왜 필요하지?
여러 명령을 한꺼번에 실행하고 싶을 때가 있다. 예를 들어, 키-값 쌍을 여러 개 추가하려면 하나씩 치는 건 비효율적이다. 

batch 명령어는 이런 작업을 한 번에 처리하고, 중간에 문제가 생기면 전부 취소(rollback)한다.

batch -c "put k1 v1 10; put k2 v2 10; put k3 v3 10"



트랜잭션과 연결
batch는 항상 트랜잭션 안에서 실행된다. 내부에 begin, commit 같은 명령어가 포함돼도 무시하고, 오류가 나면 rollback, 정상적으로 끝나면 commit 한다.

#command/batch.py

# Execute the batch command in the context of a lock
    def _execute_batch_not_in_transaction(self, commands):
        parsed_commands = self._convert_batch_to_commands(commands)
        self.memdb.execute(parse_command("begin"))
        try:
            for cmd in parsed_commands:
                if cmd:
                    result = self.memdb.execute(parse_command(cmd))
                    self.results.append(result)
        except Exception as e:
            # If an error occurs, rollback the transaction
            self.memdb.execute(parse_command("rollback"))
            print("Error executing batch command:", e)
            return []

        self.memdb.execute(parse_command("commit"))
        return self.results

들어온 명령들을 parsing 해주고, begin을 명시적으로 구성해준다.

만약 처리 중 exception 발생 시 rollback을 발생시키고, 정상적으로 동작했을 경우 commit 명령어를 실행하게 된다.

(앞서 구성했던 것처럼, begin, rollback, commit 명령들은 알아서 persistence_manager를 통해 작업을 백업하게 된다.)



명령어 파싱
파이썬의 기본 명령어 파싱 모듈인 shlex를 써서 ;로 구분된 명령어를 안전하게 나눴다. 예를 들어:
batch -c "put 'key1' 'val1' 100; put key2 val2 100" 

이렇게 입력하면 명령어를 하나씩 분리해서 처리한다.

# command/batch.py
# Convert the batch command string into a list of individual commands
    # this method ignores 'begin', 'commit', and 'rollback' commands
    # input example : -c "put \"key1\" \"value1\" 10000; put key2 value2 10000; begin; put key3 value3 10000; commit;"
    # output example: ['put "key1" "value1"', 'put key2 value2', 'put key3 value3']
    def _convert_batch_to_commands(self, commands):
        if not commands:
            return []
        tokens = shlex.split(commands.strip())
        if not tokens:
            return []  # Return an empty list if tokens is empty
        input_type = tokens[0]
        if input_type == "-c" or input_type == "--command":
            if len(tokens) < 2:  # Ensure there are enough tokens
                return []  # Return an empty list if insufficient tokens
            commands = " ".join(tokens[1:])
        elif input_type == "-f" or input_type == "--file":
            if len(tokens) < 2:  # Ensure there are enough tokens
                return []  # Return an empty list if insufficient tokens
            input_file_path = tokens[1]
            with open(input_file_path, 'r') as file:
                commands = file.read()
        else: # default type is -c
            if len(tokens) < 2:  # Ensure there are enough tokens
                return []  # Return an empty list if insufficient tokens
            commands = " ".join(tokens[1:])

        command_list = []
        for cmd in commands.strip().split(';'):
            cmd = cmd.strip()
            if cmd:
                if cmd in ('begin', 'commit', 'rollback'):
                    continue
                else:
                    command_list.append(cmd)
        return command_list

 


🔍 find: 데이터 검색하기
왜 필요하지?
지금 기존의 명령어로는 찾는 게 어렵다는 생각이 들었다.

key값을 exists로 조회해서 존재하는지 확인하거나, keys, values, items 등으로 모든 값을 반환받는 것만 가능했다. 앞선 방법의 경우 존재하는지 이미 알고 있거나, 모든 정보를 전부 반환받아 값이 존재하는지 확인하는 방법 이외에 검색하는 방법은 없다.

그렇다면 아래와 같이 기능이 존재해야 할거 같았다.

  1. key 값이 정확히 일치, 일부 일치, 패턴이 일치되는 경우 key 값을 반환
  2. value 값이 정확히 일치, 일부 일치, 패턴이 일치 되는 경우 value 값을 반환

각 방법 이름으로는 exact match, like match, regex match 3가지로 나눴다.

 

사용 예시

find -k mykey: 키가 정확히 mykey인 데이터 찾기
find -k -l my*: 키가 my로 시작하는 데이터 찾기
find -k -r ^my\d+: 정규표현식으로 키 검색
find -v -l *value*: 값에 value가 포함된 데이터 찾기



플래그 파싱
python의 argparse모듈을 사용해 -k(키), -v(값), -l(like), -r(regex) 플래그를 처리한다.

self.parser.add_argument('-k', '--key', action='store_true')
self.parser.add_argument('-v', '--value', action='store_true')
self.parser.add_argument('-r', '--regex', type=str)
self.parser.add_argument('-l', '--like', type=str)

 

각 요청별로 아래와 같이 적절한 함수에 맵핑되어 찾게 된다.

#command/find.py
def _like_pattern_execute(self, pattern: str, checklist : list) -> list:
    try:
        regex_pattern = self._wildcard_to_regex(pattern)
        matching_keys = [key for key in checklist if re.match(regex_pattern, key)]
        return matching_keys if matching_keys else []
    except re.error as e:
        print(f"Invalid like pattern: {e}")
        return []
        
 def _wildcard_to_regex(self, pattern: str) -> str:
    # 예: key* → ^key.*, *key → .*key$, *key* → .*key.*, key? → ^key.$
    regex = "^" + re.escape(pattern).replace(r"\*", ".*").replace(r"\?", ".") + "$"
    return regex
    
 def _regex_pattern_execute(self, pattern: str, checklist : list) -> list:
    try:
        regex_pattern = re.compile(pattern)
        matching_keys = [key for key in checklist if regex_pattern.match(key)]
        return matching_keys if matching_keys else []
    except re.error as e:
        print(f"Invalid regex pattern: {e}")
        return []



like 패턴을 regex로 변환
like 패턴(*,?)은 내부적으로 정규표현식으로 변환해서 처리한다.

def _like_pattern_execute(self, pattern, checklist):
    regex_pattern = "^" + re.escape(pattern).replace(r"\*", ".*").replace(r"\?", ".") + "$"
    return [item for item in checklist if re.match(regex_pattern, item)]

 

이렇게 하면 like와 regex를 통일해서 처리할 수 있다.

 

정리: 확장성의 힘
이 모든 게 가능했던 건 처음부터 Command 객체와 registry 패턴으로 구조를 잘 잡아놨기 때문이다. 새로운 명령어를 추가하려면 Command 클래스 하나만 만들어 registry에 등록하면 끝. 예전처럼 if cmd == "put" 같은 하드코딩으로 고생할 일 없다. 새로운 기능을 추가할 때마다 설계의 유연함이 빛을 발한다.

 

이번 구현은 아래의 github commit 기록에서 볼 수 있다.

https://github.com/hoonzinope/py-in-mem-db/tree/5eea7c8f8131238f67fcc9b885c7f3c0f4cbdcd5


챕터 6: 사용자 명령 기록 (Usage Log)

 

목적
상용 데이터베이스는 데이터 저장 외에도 사용자 명령 기록(usage log)을 유지하여 통계 분석이나 이상 패턴 탐지에 활용한다. 

현재 toy DB는 AOF를 통해 데이터 변경 명령(put, delete 등)을 기록하지만, 조회 명령(get, exists, keys 등)은 포함되지 않는다. 

모든 명령을 추적하기 위해 별도의 usage log를 구현했다.

 

설계 요구사항
- 모든 사용자 명령을 순서대로 기록한다(백업과 무관).
- 명령이 일정량(batch size) 이상 쌓이면 디스크에 기록한다.
- 별도 스레드가 1초마다 기록을 시도한다.
- persistence(AOF/snapshot)와 독립적으로 동작하며, 파일 경로와 저장 방식은 logger가 전담한다.

구현 구조
기존 logger.py를 확장하여 명령 기록 기능을 추가했다. logger는 모든 명령을 기록하고 관리한다.

 

Logger 클래스: 싱글턴 패턴
logger는 단일 인스턴스로 동작하도록 싱글턴 패턴을 적용했다.

# logger.py
class logger:
    __instance = None

    def __init__(self):
        self.time_format = "%Y-%m-%d %H:%M:%S"
        # for logging command usage
        self.command_log = []
        self.usage_log_file_path = "./meta-data/command_usage.log"
        # Ensure the directory exists
        os.makedirs(os.path.dirname(self.usage_log_file_path), exist_ok=True)

        self.lock = threading.Lock()
        self.batch_size = 100  # Number of commands to log before flushing
        self.running = True
        self.thread = threading.Thread(target=self._flush, daemon=True)
        self.thread.start()
        
    @staticmethod
    def get_logger():
        if logger.__instance is None:
            logger.__instance = logger()
        return logger.__instance



명령 기록: append_usage_log
모든 명령을 command_log 리스트에 저장하고, 리스트 크기가 batch_size(100)를 초과하면 즉시 파일에 기록한다.

def append_usage_log(self, command, name="logger"):
    with self.lock:
        timestamp = time.strftime(self.time_format, time.localtime())
        log_entry = f"[{timestamp}]\t[{name}]\tcommand:{command}\n"
        self.command_log.append(log_entry)
        if len(self.command_log) >= self.batch_size:
            self._flush_log()



주기적 기록: _flush 및 _flush_log
별도 스레드가 1초마다 command_log를 확인해 파일에 기록한다. append_usage_log에서 batch_size 초과 시에도 즉시 기록한다.

def _flush(self):
    while self.running:
        time.sleep(1)
        try:
            with self.lock:
                self._flush_log()
        except Exception:
            pass

def _flush_log(self):
    if self.command_log:
        with open(self.usage_log_file_path, "a") as f:
            f.writelines(self.command_log)
        self.command_log.clear()



명령 파싱 시 기록
사용자 입력 명령은 파싱 전에 logger에 기록된다.

#command/command_parser.py
class Parser:
    def __init__(self):
        self.logger = logger.get_logger()

    def parse(self, cmd):
        self.logger.append_usage_log(cmd, name=self.__class__.__name__)
        return parse_command(cmd)

 

 

명령이 put key val 10처럼 입력되면, 파싱 전에 로그에 저장되고 이후 명령 객체로 처리된다.

 

print 대신 logger 사용
기존의 print 출력을 모두 logger.log()로 대체했다. 

싱글턴 패턴을 적용한 이유는 로그 출력과 명령 기록을 단일 객체로 관리하기 위함이다.

 

향후 활용 가능성
현재는 명령 기록만 구현했지만, 기록된 데이터를 기반으로 통계(예: 인기 명령어, 키 사용 빈도)나 이상 탐지 기능을 추가할 수 있다. 이는 향후 확장 가능성을 열어둔다.

 

전체적인 코드는 아래의 기록에서 확인할 수 있다.

https://github.com/hoonzinope/py-in-mem-db/tree/84c3ed79d2d87956a52a9e97a53dfaabe850257f

챕터 7: Server-Client 구조 구현

 

목적
기존 in-memory DB는 단일 사용자가 명령줄에서만 실행할 수 있었다. 

실제 데이터베이스처럼 동작하려면 서버(데이터베이스 엔진)와 클라이언트(사용자 인터페이스)를 분리해야 한다. 

이를 위해 TCP 소켓 기반의 server-client 구조를 구현했다.

server.py: 데이터베이스 엔진을 실행.
client.py: 서버에 명령을 전송.
codec.py: 서버와 클라이언트 간 데이터 직렬화/역직렬화.

프로젝트 구조

/project
 ├ command_handler.py
 ├ memory_store.py
 ├ logger.py
 ├ main.py          # 기존 단일 사용자 진입점
 └ /protocol
      ├ server.py   # 서버: 데이터베이스 실행
      ├ client.py   # 클라이언트: 명령 전송
      └ codec.py    # 데이터 직렬화/역직렬화



통신 프로토콜: codec

redis는 서버와 클라이언트 간의 통신 규약(protocol) 형식을 따로 지정해 놨다.

Redis serialization protocol (RESP)라고 하는데 텍스트 기반 프로토콜로, 명령어와 데이터를 직렬화(serialize)하여 TCP 소켓을 통해 주고받는다.

아래는 주요 특징이다.

  • 예를 들어, 클라이언트가 SET 명령을 보내면, 3\\r\\n$3\\r\\nSET\\r\\n$3\\r\\nkey\\r\\n$5\\r\\nvalue\\r\\n와 같은 형식으로 데이터를 전송
  • 서버의 응답도 RESP 형식, 사람이 읽을 수 있을 정도로 단순하면서도 빠른 파싱이 가능
  • RESP는 Redis 1.2 버전에서 도입, 이후 표준 통신 방식

자세한 건 다음 링크를 확인해 보자

https://redis.io/docs/latest/develop/reference/protocol-spec/


나도 비슷하게  서버와 클라이언트 간 데이터 교환을 위해 문자열 기반 직렬화 프로토콜을 설계했다.
encode
명령어(예: get "item1")를 토큰으로 분리한 뒤 직렬화한다.
2\r\n3\r\nget\r\n5\r\nitem1\r\n

*2: 토큰 2개.
$3\r\nget\r\n: 첫 번째 토큰.
$5\r\nitem1\r\n: 두 번째 토큰.

import shlex

def encode(data):
    parts = shlex.split(data)
    if not parts:
        return "*0\r\n"
    encode_data = f"*{len(parts)}\r\n"
    for part in parts:
        encode_data += f"${len(part)}\r\n{part}\r\n"
    return encode_data



decode
수신된 데이터를 토큰 리스트로 변환한 뒤 문자열 명령으로 복원한다.

def decode(data):
    decode_data = []
    parts = data.split("\r\n")
    if len(parts) < 2 or not parts[0].startswith("*"):
        return None
    num_elements = int(parts[0][1:])
    if num_elements == 0:
        return decode_data
    for i in range(2, num_elements * 2 + 1, 2):
        length = int(parts[i-1][1:])
        if length == -1:
            decode_data.append(None)
        else:
            decode_data.append(parts[i][:length])
    return ' '.join(decode_data)



서버 구현
server.py는 소켓을 열어 클라이언트 요청을 처리한다. 

요청을 수신하면 decode, command_handler로 처리, 결과를 encode 하여 반환한다.

 

멀티스레드 처리
각 클라이언트 연결을 별도 스레드로 처리하여 다중 사용자 요청을 병렬로 처리한다.

import socket
import threading

def start(self):
    self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self.server_socket.bind((self.host, self.port))
    self.server_socket.listen(5)
    while True:
        client_socket, addr = self.server_socket.accept()
        threading.Thread(target=self.handle_client, args=(client_socket, addr)).start()



명령 처리

def handle_client(self, client_socket, addr):
    while True:
        request = client_socket.recv(1024).decode('utf-8')
        request = decode(request)
        response = self.process_request(request)
        response = encode(response)
        client_socket.send(response.encode('utf-8'))



클라이언트 구현
client.py는 사용자가 입력한 명령을 encode 하여 서버로 전송하고, 결과를 decode 하여 출력한다.

def send_command(self, command):
    encoded_command = encode(command)
    self.sock.send(encoded_command.encode('utf-8'))
    
def receive_response(self):
    chunks = []
    while True:
        chunk = self.sock.recv(4096)
        if not chunk:
            break
        chunks.append(chunk)
        if len(chunk) < 4096:
            break
    data = b''.join(chunks)
    return decode(data.decode('utf-8'))

 

해당 버전까지의 github은 아래의 링크와 같다.

https://github.com/hoonzinope/py-in-mem-db/tree/910fe4488a981f37b001816216744490e65d2816


응답 정규화
명령마다 반환 형식이 달라(예: get은 값, put은 OK, exists는 True/False) 일관성을 위해 Response 객체를 도입했다.

class Response:
    def __init__(self, status_code, message, data=None):
        self.status_code = status_code
        self.message = message
        self.data = data



서버 응답 처리

def process_request(self, request):
    response = self.command.execute(request.strip())
    if isinstance(response, Response):
        if response.status_code == STATUS_CODE["OK"]:
            return str(response.data) if response.data else "OK"
        else:
            return f"Error: {response.message}"
    return str(response)

 

server는 명령어 처리 후 날아온 response를 보고 정상/에러 여부를 확인할 수 있게 되고,

분기해 처리가 가능해진다.

 

해당 버전까지의 github 주소는 아래와 같다.

https://github.com/hoonzinope/py-in-mem-db/tree/c95b48146ae48373b24161a167529dde42d9ecc7



REST API 구현
앞서 만든 건 server-client의 구조이지만 여전히 client 파일을 실행해 cmd로 명령을 날리게 되는 구조다.

내가 그렸던 최종 목표는 postman 같은 rest api 툴에서 요청으로 db 엔진을 호출할 수 있어야 했기에, rest api 요청을 받을 수 있게 새로운 server를 구성해 줬다.

 

fast-api, flask 등 api 프레임워크등이 존재하지만 기본으로 포함된 httpserver를 이용해 구성해 줬다. 별 이유는 없고, 만들 때 readme.md에 pure python이라는 문구를 포함시켰어서 해당 문구를 지우고 싶지 않았다.

 

구조는 요청이 들어오면 command 쪽으로 넘기고 실행된 결과를 받아서 요청한 사용자에게 반환하는 기존의 server와 같다. 구조가 같기 때문에 추가한 파일은 protocol/httpHandler.py 하나다.

# protocol/httpHandler.py

from http.server import ThreadingHTTPServer,BaseHTTPRequestHandler, HTTPServer
import json
from command_handler import Command
from response import Response

post_path = ['get','exists','alias','find', 'clear']
get_path = ['keys','values',
                 'items','size','help',
                 'show-alias',
                 'begin','commit','rollback']
put_path = ['put', 'batch']
delete_path = ['delete', 'reset-alias']
command = Command()

class HttpHandler(BaseHTTPRequestHandler):
    def __init__(self, request, client_address, server):
        super().__init__(request, client_address, server)
        self.server = server
        self.type = None
        print(f"New request from {client_address}")

 

구현하면서 알게 된 사실은 기본적인 http.server모듈의 경우, do_<method> 명으로만 처리가 가능했다는 점이다…

그래서 get으로 들어오면 어떤 path의 get으로 들어온 요청인지 구별하기 위해 전역변수로 get_path를 선언해 줬다. 다른 method 역시 어떤 명령인지 저장해 준다. 분기는 아래와 같이 해준다.


경로 분기

# protocol/httpHandler.py

def _path_branch(self):
    # This method can be used to handle different paths if needed
    path = self.path.strip('/')
    if path in post_path:
        return 'POST'
    elif path in get_path:
        return 'GET'
    elif path in put_path:
        return 'PUT'
    elif path in delete_path:
        return 'DELETE'
    else:
        self.send_error(404, "Path not found")
        return None



HTTP 요청 처리

요청이 들어올 때 어떤 요청이 들어오더라도 아래 함수를 타게끔 구성해 준 다음 path별로 분기되게끔 구성한다.

def do_GET(self):
    self.type = self._path_branch()
    if self.type != 'GET':
        self.send_error(404)
        return
    self.send_response(200)
    self.end_headers()
    response = self._return_response()
    self.wfile.write(json.dumps(response).encode('utf-8'))

get 요청의 경우를 보자면, 요청 시 type을 보고 get 타입의 요청이 아닐 경우, 404를 띄운다.

맞을 경우 (get, keys, values, items 등) 해당 요청을 처리할 함수를 호출해 준다.

 

이쯤 되면, 궁금해질 수 있다. 단일 사용자 - 단일 명령일 때만 transaction을 구성했는데 이렇게 멀티 유저, 단일 DB 엔진일 때도 동일하게 적용이 안될 텐데…? 하고 말이다.


다중 사용자 트랜잭션
구성하는 나도 다 구성하고 느꼈다.

멀티 사용자 중 누군가 transaction 요청을 시작하면 다른 모든 사용자도 해당 transaction 내 동작하게 된다…. (실제로도 그렇게 동작했다. 당연하다.)

 

그럼 어떻게 multi-transaction을 구현할 수 있을까?

transaction의 버전을 여러 개 구현하면 된다.

사용자 요청에 대한 식별자 (like session id)를 두고, 사용자는 해당 session 버전의 transaction을 복사해서 동작하게 되는 셈이다.

 

로직은 아래와 같다.

사용자가 begin 입력했을 때 엔진은 내부적으로 단일 복사본을 생성하는 것이 아닌

tx_data[session_id] = { transaction 시작 전 데이터 (snapshot), 이후 수정가능한 복사본 (copy) }

이렇게 session_id 별 2개의 저장소가 메모리에 생성을 한다.

transaction 시작 시 버전은 각 사용자 별로 달라지기에 새로 생성되게끔 구성되고

이후 수정 시 해당 session_id로 tx_data에 접근해 두 개의 저장 데이터를 참조해 동작하게 된다.

 

가령, put의 경우를 보자면

# command/put.py


def _execute_put(self, key, value, expiration_time):
    self._check_key_value(key, value)
    expiration_time = self._convert_expiration_time_parameter(expiration_time)
    self.memdb.data[key] = {
        "value": value,
        "expiration_time": expiration_time
    }

def _execute_put_in_transaction(self, value, expiration_time, session_id):
    copy = self.memdb.tx_data[session_id]["copy"]
    expiration_time = self._convert_expiration_time_parameter(expiration_time)

    if self.key in copy:
        # Update existing key
        copy[self.key]["value"] = value
        copy[self.key]["expiration_time"] = expiration_time
    else:
        # Add new key
        copy[self.key] = {
            "value": value,
            "expiration_time": expiration_time
        }

 

transaction 내부 동작이 아니라면 기존처럼 본래의 data에 추가하게 되고, 아니라면 tx_data의 session_id 버전을 꺼내와서 해당 버전에 값을 수정해 준다.

🙋‍♂️: copy에만 저장하고 snapshot은 안 쓰는데?

commit의 경우, 본래의 데이터에 추가하는 작업도 기존과 달라지게 된다.

snapshot 데이터는 이때, 본래의 데이터를 비교해서 이상 데이터가 있는지 확인해 준다.

  1. snapshot에 있는데, 본래 데이터에 없는 key가 있다면 해당 데이터는 오염됐기 때문에 오류
  2. snapshot에 있는데, 본래 데이터와 다른 value 라면 해당 데이터는 오염됐기 때문에 오류
  3. snapshot에 있는데, 본래 데이터와 다른 expire_time 라면 해당 데이터는 오염됐기 때문에 오류

조건을 만족했다면 실제로 작업이 이루어진 copy 데이터의 값을 하나씩 본 저장소 값에 하나씩 적용해 준다.

(물론 expire_time을 넘은 데이터는 넘어가준다.)

# command/commit.py

def _commit(self, session_id):
    if session_id in self.memdb.in_tx:
        # check snapshot == self.memdb.data
        snapshot = self.memdb.tx_data[session_id]["snapshot"]
        copy = self.memdb.tx_data[session_id]["copy"]

        for key, value in snapshot.items():
            if key not in self.memdb.data or \
		            self.memdb.data[key]['value'] != value['value'] or \
                self.memdb.data[key]['expiration_time'] != value['expiration_time']:
                self._log(f"Snapshot mismatch for key {key}. Cannot commit transaction.")
                raise Exception("Snapshot mismatch. Cannot commit transaction.")
        # Commit the transaction
        with self.memdb.lock:
            for key, value in copy.items():
                if value["expiration_time"] is None or value["expiration_time"] > time.time():
                    self.memdb.data[key] = value

        # append the commands to AOF
        for command in self.memdb.tx_commands[session_id]:
            self.persistence_manager.append_aof(command)

        # Clear the transaction data
        del self.memdb.in_tx[session_id]
        del self.memdb.tx_data[session_id]
        del self.memdb.tx_commands[session_id]
        self._log(f"Transaction for session {session_id} committed successfully.")
    else:
        self._log(f"No transaction found for session {session_id} to commit.")



세션 관리
내 경우 요청 시 들어오는 port 값을 이용해 식별자로 써줬다.

내가 구성한 tcp 서버의 경우, 연결을 재이용하기 때문에 마치 사용자별로 식별자로 이용할 수 있었기 때문이다.

실제 환경에서는 서버가 고유한 session_id를 발급해야 한다.

# protocol/server.py

 def handle_client(self, client_socket, addr):
    try:
        while True:
            # ...

            response = self.process_request(request, addr)
            response = encode(response)  # Encode the response using the codec
            client_socket.send(response.encode('utf-8'))
    except Exception as e:
        # ...

def process_request(self, request, addr):
    response = ""
    if request.strip() == "":
        response = "No command received"
    else:
        response =  self.command.execute(request.strip(), session_id=addr[1])

    # ...

 

 

해당 버전까지의 github 코드는 아래에서 확인할 수 있다.

https://github.com/hoonzinope/py-in-mem-db/tree/8229da191f46c73e96c96d357bb2f74df1c4dc50


마치며
단일 사용자, 단일 세션 환경인 우리 DB에서는 이 정도로도 충분하다. 

하지만 트랜잭션, 백업/복원, 서버-클라이언트 구조를 구현해 보면서 실제 DB가 왜 그렇게 복잡한 구조를 가지게 됐는지, 그리고 그 안에서 어떤 고민들이 오가는지 생생히 느낄 수 있었다. 명령 객체 패턴과 팩토리로 구조를 유연하게 바꾸고, 락과 세션 관리로 동시성을 처리하면서, 작은 toy 프로젝트가 상용 DB의 핵심 개념을 이해하는 데 큰 도움이 됐다.

 

반응형

댓글