kiwi-vpn/api/kiwi_vpn_api/db/connection.py

34 lines
671 B
Python

"""
Database connection management.
"""
from sqlmodel import Session, SQLModel, create_engine
class Connection:
"""
Namespace for the database connection
"""
engine = None
@classmethod
def connect(cls, connection_url: str) -> None:
"""
Connect ORM to a database engine.
"""
cls.engine = create_engine(connection_url)
SQLModel.metadata.create_all(cls.engine)
@classmethod
@property
def session(cls) -> Session | None:
"""
Create an ORM session using a context manager.
"""
if cls.engine is None:
return None
return Session(cls.engine)