kiwi-vpn/api/kiwi_vpn_api/db_new/connection.py

30 lines
631 B
Python

from sqlmodel import Session, SQLModel, create_engine
class Connection:
"""
Namespace for the database connection.
"""
engine = None
@classmethod
def connect(cls, connection_url: str) -> None:
"""
Connect ORM to a database engine.
"""
cls.engine = create_engine(connection_url)
SQLModel.metadata.create_all(cls.engine)
@classmethod
@property
def session(cls) -> Session | None:
"""
Create an ORM session using a context manager.
"""
if cls.engine is None:
return None
return Session(cls.engine)