<aside> 💡

불필요한 DB 커넥션 연결 방지 및 여러 리포지토리 메서드를 한 트랜잭션으로 관리하기 위함

</aside>

구현

전체 플로우 개요

Web Server          Web Framework        SQLAlchemy ORM Code
--------------      --------------       ------------------------------
startup        ->   Web framework        # Session registry is established
                    initializes          Session = scoped_session(sessionmaker())

incoming
web request    ->   web request     ->   # The registry is *optionally*
                    starts               # called upon explicitly to create
                                         # a Session local to the thread and/or request
                                         Session()

                                         # the Session registry can otherwise
                                         # be used at any time, creating the
                                         # request-local Session() if not present,
                                         # or returning the existing one
                                         Session.query(MyClass) # ...

                                         Session.add(some_object) # ...

                                         # if data was modified, commit the
                                         # transaction
                                         Session.commit()

                    web request ends  -> # the registry is instructed to
                                         # remove the Session
                                         Session.remove()

                    sends output      <-
outgoing web    <-
response

session.py

import asyncio
from contextvars import ContextVar, Token

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session

from app.core.config import config
from app.core.exception.custom_exception import CustomHttpException

class SessionContext:
    session_context: ContextVar[str] = ContextVar("session_context")

    @classmethod
    def get_session_context(cls) -> str:
        return cls.session_context.get()

    @classmethod
    def set_session_context(cls, session_id: str) -> Token:
        return cls.session_context.set(session_id)

    @classmethod
    def reset_session_context(cls, context: Token) -> None:
        cls.session_context.reset(context)

engine = create_engine(
    config.rds_url,
    pool_size=50,
    pool_recycle=3600,
    # echo=True,
    # echo_pool="debug",
)
ScopedSession = scoped_session(
    sessionmaker(autocommit=False, autoflush=True, bind=engine),
    scopefunc=SessionContext.get_session_context,  # 세션 스코프 커스터마이징
)

# 데코레이터 정의
def transactional(func):
    def wrapper(*args, **kwargs):
        try:
            result = func(*args, **kwargs)
            ScopedSession.commit()  # 쓰기 작업 이후 커밋
        except CustomHttpException as custom_http_exception:
            ScopedSession.rollback()  # 에외 발생 시 롤백
            raise custom_http_exception
        except Exception as e:  # 에외 발생 시 롤백
            ScopedSession.rollback()
            raise e
        finally:
            ScopedSession.remove()  # 작업 이후 항상 세션 종료
        return result

    return wrapper

middleware.py

import uuid

from app.core.db.session import SessionContext, ScopedSession

class SqlalchemyMiddleware:
    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        session_id = str(uuid.uuid4())
        context = SessionContext.set_session_context(session_id=session_id)
        try:
            await self.app(scope, receive, send)
        finally:
            ScopedSession.remove()
            SessionContext.reset_session_context(context=context)

main.py

사용 예시

user/service.py