<aside> 💡
</aside>
Web Server Web Framework SQLAlchemy ORM Code
-------------- -------------- ------------------------------
startup -> Web framework # Session registry is established
initializes Session = scoped_session(sessionmaker())
incoming
web request -> web request -> # The registry is *optionally*
starts # called upon explicitly to create
# a Session local to the thread and/or request
Session()
# the Session registry can otherwise
# be used at any time, creating the
# request-local Session() if not present,
# or returning the existing one
Session.query(MyClass) # ...
Session.add(some_object) # ...
# if data was modified, commit the
# transaction
Session.commit()
web request ends -> # the registry is instructed to
# remove the Session
Session.remove()
sends output <-
outgoing web <-
response
session.py
scoped_session
의 scopefunc
파라미터에 SessionContext
의 get_session_context
메서드 설정ContextVar
vs threading.local()
threading.local()
: scopefunc
에서 디폴트로 사용하는 레지스트리
ContextVar
사용 시 개발자가 원하는 범위로 커스터마이징 가능하다
threading.local()
과 유사하게 작동@transactional
데코레이터에서는 커밋 혹은 롤백만 명시해주면 된다import asyncio
from contextvars import ContextVar, Token
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from app.core.config import config
from app.core.exception.custom_exception import CustomHttpException
class SessionContext:
session_context: ContextVar[str] = ContextVar("session_context")
@classmethod
def get_session_context(cls) -> str:
return cls.session_context.get()
@classmethod
def set_session_context(cls, session_id: str) -> Token:
return cls.session_context.set(session_id)
@classmethod
def reset_session_context(cls, context: Token) -> None:
cls.session_context.reset(context)
engine = create_engine(
config.rds_url,
pool_size=50,
pool_recycle=3600,
# echo=True,
# echo_pool="debug",
)
ScopedSession = scoped_session(
sessionmaker(autocommit=False, autoflush=True, bind=engine),
scopefunc=SessionContext.get_session_context, # 세션 스코프 커스터마이징
)
# 데코레이터 정의
def transactional(func):
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
ScopedSession.commit() # 쓰기 작업 이후 커밋
except CustomHttpException as custom_http_exception:
ScopedSession.rollback() # 에외 발생 시 롤백
raise custom_http_exception
except Exception as e: # 에외 발생 시 롤백
ScopedSession.rollback()
raise e
finally:
ScopedSession.remove() # 작업 이후 항상 세션 종료
return result
return wrapper
middleware.py
uuid4
를 사용한 session_id
설정으로 각 요청 내에서 고유한 session context를 공유한다import uuid
from app.core.db.session import SessionContext, ScopedSession
class SqlalchemyMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
session_id = str(uuid.uuid4())
context = SessionContext.set_session_context(session_id=session_id)
try:
await self.app(scope, receive, send)
finally:
ScopedSession.remove()
SessionContext.reset_session_context(context=context)
main.py
미들웨어 등록
app = FastAPI(
...
middleware=[
Middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
),
Middleware(SessionMiddleware, secret_key=config.SESSION_SECRET_KEY),
Middleware(SqlalchemyMiddleware),
],
...
)
user/service.py