kemono2/src/pages/api/v1/authentication.py
2024-11-26 00:11:49 +01:00

123 lines
3.4 KiB
Python

import re
from json import JSONDecodeError
from typing import TypedDict
import orjson
from flask import Blueprint, request, current_app, make_response, jsonify, g, session
from src.lib.account import attempt_login, create_account
from src.lib.api import create_client_error_response
from src.lib.security import is_password_compromised
from src.types.account import Account
from . import v1api_bp
# it is in a separate file because
# all account routes require auth
authentication_bp = Blueprint("authentication", __name__)
class TDRegistrationBody(TypedDict):
location: str
username: str
password: str
confirm_password: str
favorites_json: str
USERNAME_REGEX = re.compile(r"^[a-z0-9_@+.\-]{3,15}$")
@authentication_bp.post("/authentication/register")
def post_register():
body: TDRegistrationBody = request.get_json()
username = body.get("username", "").replace("\x00", "").strip()
password = body.get("password", "").strip()
confirm_password = body.get("confirm_password", "").strip()
favorites_json = body.get("favorites", "[]")
favorites = []
if favorites_json != "":
try:
favorites = orjson.loads(favorites_json)
except JSONDecodeError:
pass
if username == "":
return create_client_error_response("Username cannot be empty")
if not USERNAME_REGEX.match(username):
return create_client_error_response("Invalid username")
if password == "":
return create_client_error_response("Password cannot be empty")
if len(password) < 5:
return create_client_error_response("Password must have at least 5 characters.")
if password != confirm_password:
return create_client_error_response("Passwords do not match")
if current_app.config.get("ENABLE_PASSWORD_VALIDATOR") and is_password_compromised(password):
return create_client_error_response(
"We've detected that password was compromised in a data breach on another site. Please choose a different password."
)
success = create_account(username, password, favorites)
if not success:
return create_client_error_response("Username already taken")
response = make_response(jsonify(True), 200)
return response
class TDLoginBody(TypedDict):
username: str
password: str
@authentication_bp.post("/authentication/login")
def post_login():
body: TDLoginBody = request.get_json()
account: Account | None = g.get("account")
if account:
return create_client_error_response("Already logged in", 409)
username = body.get("username", "").replace("\x00", "")
password = body.get("password", "")
if not username:
return create_client_error_response("Username is required.")
if not password:
return create_client_error_response("Password is required.")
(account, error_message) = attempt_login(username, password)
if error_message:
return create_client_error_response(error_message)
if not account:
return create_client_error_response("Account doesn't exist")
response = make_response(jsonify(account), 200)
return response
@authentication_bp.post("/authentication/logout")
def logout():
if "account_id" in session:
session.pop("account_id")
response = make_response(jsonify(True), 200)
return response
v1api_bp.register_blueprint(authentication_bp)