Source code for corvix.crypto
"""Fernet-based token encryption helpers and SQLAlchemy encrypted column type."""
from __future__ import annotations
from functools import cache
from cryptography.fernet import Fernet, InvalidToken
from sqlalchemy import Text
from sqlalchemy.types import TypeDecorator
from corvix.env import get_env_value
@cache
[docs]
def get_fernet() -> Fernet:
"""Return a Fernet instance keyed from the TOKEN_ENCRYPTION_KEY env var.
Raises:
RuntimeError: if TOKEN_ENCRYPTION_KEY (or TOKEN_ENCRYPTION_KEY_FILE) is not set.
"""
key = get_env_value("TOKEN_ENCRYPTION_KEY")
if not key:
msg = (
"TOKEN_ENCRYPTION_KEY environment variable is required for token encryption. "
"Generate a key with: "
"python -c \"from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())\""
)
raise RuntimeError(msg)
return Fernet(key.encode())
[docs]
def encrypt_token(plaintext: str) -> str:
"""Encrypt a plaintext token string with Fernet symmetric encryption.
Returns:
A URL-safe base64-encoded Fernet token (starts with ``gAAAAA``).
"""
return get_fernet().encrypt(plaintext.encode()).decode()
[docs]
def decrypt_token(ciphertext: str) -> str:
"""Decrypt a Fernet-encrypted token string back to plaintext.
Raises:
cryptography.fernet.InvalidToken: if the ciphertext is invalid or the key is wrong.
"""
return get_fernet().decrypt(ciphertext.encode()).decode()
[docs]
class EncryptedText(TypeDecorator):
"""SQLAlchemy column type that transparently Fernet-encrypts values at rest.
The database column stores a URL-safe base64 Fernet token (plain TEXT).
Python code reads and writes the original plaintext string.
Requires the TOKEN_ENCRYPTION_KEY environment variable (or TOKEN_ENCRYPTION_KEY_FILE
for Docker secret file support) to be set at application startup and during migrations.
"""
"""Marks this type as safe to use with SQLAlchemy's statement cache."""
[docs]
def process_bind_param(self, value: str | None, dialect: object) -> str | None:
"""Encrypt the Python value before writing it to the database."""
if value is None:
return None
return encrypt_token(value)
[docs]
def process_result_value(self, value: str | None, dialect: object) -> str | None:
"""Decrypt the database value when loading it into Python."""
if value is None:
return None
return decrypt_token(value)
__all__ = ["EncryptedText", "InvalidToken", "decrypt_token", "encrypt_token", "get_fernet"]