Player + team works

This commit is contained in:
Simon 2022-09-07 09:29:57 +02:00
parent f613bff18a
commit bf3bd68e52
46 changed files with 2869 additions and 34 deletions

3
.flake8 Normal file
View File

@ -0,0 +1,3 @@
[flake8]
max-line-length = 88
exclude = .git,__pycache__,__init__.py,.mypy_cache,.pytest_cache

1
app/__init__.py Normal file
View File

@ -0,0 +1 @@
__version__ = "0.1.0"

0
app/api/__init__.py Normal file
View File

View File

12
app/api/api_v1/api.py Normal file
View File

@ -0,0 +1,12 @@
from fastapi import APIRouter
from app.api.api_v1.endpoints import users
from app.api.api_v1.endpoints import login
from app.api.api_v1.endpoints import player
from app.api.api_v1.endpoints import team
api_router = APIRouter()
api_router.include_router(login.router, tags=["login"])
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(player.router, prefix="/player", tags=["player"])
api_router.include_router(team.router, prefix="/team", tags=["team"])

View File

View File

@ -0,0 +1,103 @@
from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Body, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
from app.core import security
from app.core.config import settings
from app.core.security import get_password_hash
from app.utils import (
generate_password_reset_token,
send_reset_password_email,
verify_password_reset_token,
)
router = APIRouter()
@router.post("/login/access-token", response_model=schemas.Token)
def login_access_token(
db: Session = Depends(deps.get_db),
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests
"""
user = crud.user.authenticate(
db, email=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(
status_code=400, detail="Incorrect email or password"
)
elif not crud.user.is_active(user):
raise HTTPException(status_code=400, detail="Inactive user")
access_token_expires = timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
return {
"access_token": security.create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/login/test-token", response_model=schemas.User)
def test_token(
current_user: models.User = Depends(deps.get_current_user),
) -> Any:
"""
Test access token
"""
return current_user
@router.post("/password-recovery/{email}", response_model=schemas.Msg)
def recover_password(email: str, db: Session = Depends(deps.get_db)) -> Any:
"""
Password Recovery
"""
user = crud.user.get_by_email(db, email=email)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system.",
)
password_reset_token = generate_password_reset_token(email=email)
send_reset_password_email(
email_to=user.email, email=email, token=password_reset_token
)
return {"msg": "Password recovery email sent"}
@router.post("/reset-password/", response_model=schemas.Msg)
def reset_password(
token: str = Body(...),
new_password: str = Body(...),
db: Session = Depends(deps.get_db),
) -> Any:
"""
Reset password
"""
email = verify_password_reset_token(token)
if not email:
raise HTTPException(status_code=400, detail="Invalid token")
user = crud.user.get_by_email(db, email=email)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system.",
)
elif not crud.user.is_active(user):
raise HTTPException(status_code=400, detail="Inactive user")
hashed_password = get_password_hash(new_password)
user.hashed_password = hashed_password
db.add(user)
db.commit()
return {"msg": "Password updated successfully"}

View File

@ -0,0 +1,91 @@
from typing import Any, List
from fastapi import APIRouter, Body, Depends, HTTPException
from fastapi.encoders import jsonable_encoder
from pydantic.networks import EmailStr
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
from app.core.config import settings
from app.schemas.player import PlayerUpdate
router = APIRouter()
@router.get("/{id}", response_model=schemas.Player)
def read_player(
*,
db: Session = Depends(deps.get_db),
firstname: str,
lastname: str,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get player by firstname and lastname.
"""
player = crud.player.get_player_by_name(
db=db, firstname=firstname, lastname=lastname
)
if not player:
raise HTTPException(status_code=404, detail="player not found")
if not crud.user.is_superuser(current_user):
raise HTTPException(status_code=400, detail="Not enough permissions")
return player
@router.post("/", response_model=schemas.Player)
def create_player(
*,
db: Session = Depends(deps.get_db),
player_in: schemas.PlayerCreate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Create new user.
"""
player = crud.player.get_player_by_name(
db, firstname=player_in.firstname, lastname=player_in.lastname
)
if player:
raise HTTPException(
status_code=400,
detail="The user with this username already exists in the system.",
)
player = crud.player.create(db, obj_in=player_in)
return player
@router.get("/", response_model=List[schemas.Player])
def get_players(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Retrieve all players.
"""
player = crud.player.get_players(db, skip=skip, limit=limit)
return player
@router.post("/{id}", response_model=schemas.Player)
def update_player(
*,
db: Session = Depends(deps.get_db),
id: int,
player_in: PlayerUpdate,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update player.
"""
player = crud.player.get(db=db, id=id)
if not player:
raise HTTPException(status_code=404, detail="Player not found")
player = crud.player.update(db=db, db_obj=player, obj_in=player_in)
return player

View File

@ -0,0 +1,84 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
router = APIRouter()
@router.get("/{id}", response_model=schemas.Team)
def get_team(
*,
db: Session = Depends(deps.get_db),
id: int,
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get team by id.
"""
team = crud.team.get_team(db=db, team_id=id)
if not team:
raise HTTPException(status_code=404, detail="player not found")
if not crud.user.is_superuser(current_user):
raise HTTPException(status_code=400, detail="Not enough permissions")
return team
@router.get("/", response_model=List[schemas.Team])
def get_teams(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Retrieve teams.
"""
team = crud.team.get_teams(db, skip=skip, limit=limit)
return team
@router.post("/", response_model=schemas.Team)
def create_team(
*,
db: Session = Depends(deps.get_db),
team_in: schemas.TeamCreate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Create team.
"""
team = crud.team.create(db, obj_in=team_in)
return team
@router.put("/players/{team_id}", response_model=schemas.Team)
def add_player_team(
*,
db: Session = Depends(deps.get_db),
player_id: int,
team_id: int,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Add player to team.
"""
if crud.player.get_player(db, player_id=player_id) is None:
raise HTTPException(status_code=404, detail="Player not found")
if crud.team.get_team(db=db, team_id=team_id) is None:
raise HTTPException(status_code=404, detail="Team not found")
if crud.team.is_player_in_team(
db=db, player_id=player_id, team_id=team_id
):
raise HTTPException(
status_code=404, detail="Player is already in team"
)
team = crud.team.add_player_in_team(db, team_id, player_id)
return team

View File

@ -0,0 +1,152 @@
from typing import Any, List
from fastapi import APIRouter, Body, Depends, HTTPException
from fastapi.encoders import jsonable_encoder
from pydantic.networks import EmailStr
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.api import deps
from app.core.config import settings
router = APIRouter()
@router.get("/", response_model=List[schemas.User])
def read_users(
db: Session = Depends(deps.get_db),
skip: int = 0,
limit: int = 100,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Retrieve users.
"""
users = crud.user.get_multi(db, skip=skip, limit=limit)
return users
@router.post("/", response_model=schemas.User)
def create_user(
*,
db: Session = Depends(deps.get_db),
user_in: schemas.UserCreate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Create new user.
"""
user = crud.user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this username already exists in the system.",
)
user = crud.user.create(db, obj_in=user_in)
return user
@router.put("/me", response_model=schemas.User)
def update_user_me(
*,
db: Session = Depends(deps.get_db),
password: str = Body(None),
full_name: str = Body(None),
email: EmailStr = Body(None),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Update own user.
"""
current_user_data = jsonable_encoder(current_user)
user_in = schemas.UserUpdate(**current_user_data)
if password is not None:
user_in.password = password
if full_name is not None:
user_in.full_name = full_name
if email is not None:
user_in.email = email
user = crud.user.update(db, db_obj=current_user, obj_in=user_in)
return user
@router.get("/me", response_model=schemas.User)
def read_user_me(
db: Session = Depends(deps.get_db),
current_user: models.User = Depends(deps.get_current_active_user),
) -> Any:
"""
Get current user.
"""
return current_user
@router.post("/open", response_model=schemas.User)
def create_user_open(
*,
db: Session = Depends(deps.get_db),
password: str = Body(...),
email: EmailStr = Body(...),
full_name: str = Body(None),
) -> Any:
"""
Create new user without the need to be logged in.
"""
if not settings.USERS_OPEN_REGISTRATION:
raise HTTPException(
status_code=403,
detail="Open user registration is forbidden on this server",
)
user = crud.user.get_by_email(db, email=email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this username already exists in the system",
)
user_in = schemas.UserCreate(
password=password, email=email, full_name=full_name
)
user = crud.user.create(db, obj_in=user_in)
return user
@router.get("/{user_id}", response_model=schemas.User)
def read_user_by_id(
user_id: int,
current_user: models.User = Depends(deps.get_current_active_user),
db: Session = Depends(deps.get_db),
) -> Any:
"""
Get a specific user by id.
"""
user = crud.user.get(db, id=user_id)
if user == current_user:
return user
if not crud.user.is_superuser(current_user):
raise HTTPException(
status_code=400, detail="The user doesn't have enough privileges"
)
return user
@router.put("/{user_id}", response_model=schemas.User)
def update_user(
*,
db: Session = Depends(deps.get_db),
user_id: int,
user_in: schemas.UserUpdate,
current_user: models.User = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Update a user.
"""
user = crud.user.get(db, id=user_id)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system",
)
user = crud.user.update(db, db_obj=user, obj_in=user_in)
return user

61
app/api/deps.py Normal file
View File

@ -0,0 +1,61 @@
from typing import Generator
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session
from app import crud, models, schemas
from app.core import security
from app.core.config import settings
from app.db.session import SessionLocal
reusable_oauth2 = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/login/access-token"
)
def get_db() -> Generator:
try:
db = SessionLocal()
yield db
finally:
db.close()
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(reusable_oauth2)
) -> models.User:
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[security.ALGORITHM]
)
token_data = schemas.TokenPayload(**payload)
except (jwt.JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
user = crud.user.get(db, id=token_data.sub)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
def get_current_active_user(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not crud.user.is_active(current_user):
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def get_current_active_superuser(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not crud.user.is_superuser(current_user):
raise HTTPException(
status_code=400, detail="The user doesn't have enough privileges"
)
return current_user

0
app/core/__init__.py Normal file
View File

49
app/core/config.py Normal file
View File

@ -0,0 +1,49 @@
import secrets
from typing import Any, Dict, List, Optional, Union
from pydantic import AnyHttpUrl, BaseSettings, validator, EmailStr, HttpUrl
class Settings(BaseSettings):
API_V1_STR: str = "/api/v1"
SQLALCHEMY_DATABASE_URI: str = "sqlite:///./floorball.db"
SECRET_KEY: str = secrets.token_urlsafe(32)
# 60 minutes * 24 hours * 8 days = 8 days
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8
SERVER_NAME: str = "localhost"
SERVER_HOST: AnyHttpUrl = "http://localhost"
# BACKEND_CORS_ORIGINS is a JSON-formatted list of origins
# e.g: '["http://localhost", "http://localhost:4200", "http://localhost:3000", \
# "http://localhost:8080", "http://local.dockertoolbox.tiangolo.com"]'
BACKEND_CORS_ORIGINS: List[AnyHttpUrl] = []
@validator("BACKEND_CORS_ORIGINS", pre=True)
def assemble_cors_origins(
cls, v: Union[str, List[str]]
) -> Union[List[str], str]:
if isinstance(v, str) and not v.startswith("["):
return [i.strip() for i in v.split(",")]
elif isinstance(v, (list, str)):
return v
raise ValueError(v)
PROJECT_NAME: str = "Floorball stats"
SENTRY_DSN: Optional[HttpUrl] = ""
@validator("SENTRY_DSN", pre=True)
def sentry_dsn_can_be_blank(cls, v: str) -> Optional[str]:
if len(v) == 0:
return None
return v
EMAIL_TEST_USER: EmailStr = "test@example.com" # type: ignore
FIRST_SUPERUSER: EmailStr = "test@example.com"
FIRST_SUPERUSER_PASSWORD: str = "hej"
USERS_OPEN_REGISTRATION: bool = False
class Config:
case_sensitive = True
settings = Settings()

36
app/core/security.py Normal file
View File

@ -0,0 +1,36 @@
from datetime import datetime, timedelta
from typing import Any, Union
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
ALGORITHM = "HS256"
def create_access_token(
subject: Union[str, Any], expires_delta: timedelta = None
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(
to_encode, settings.SECRET_KEY, algorithm=ALGORITHM
)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)

3
app/crud/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .crud_user import user
from .crud_player import player
from .crud_team import team

64
app/crud/base.py Normal file
View File

@ -0,0 +1,64 @@
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.db.base_class import Base
ModelType = TypeVar("ModelType", bound=Base)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
"""
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
**Parameters**
* `model`: A SQLAlchemy model class
* `schema`: A Pydantic model (schema) class
"""
self.model = model
def get(self, db: Session, id: Any) -> Optional[ModelType]:
return db.query(self.model).filter(self.model.id == id).first()
def get_multi(
self, db: Session, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
return db.query(self.model).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
obj_in_data = jsonable_encoder(obj_in)
db_obj = self.model(**obj_in_data) # type: ignore
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self,
db: Session,
*,
db_obj: ModelType,
obj_in: Union[UpdateSchemaType, Dict[str, Any]],
) -> ModelType:
obj_data = jsonable_encoder(db_obj)
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def remove(self, db: Session, *, id: int) -> ModelType:
obj = db.query(self.model).get(id)
db.delete(obj)
db.commit()
return obj

41
app/crud/crud_player.py Normal file
View File

@ -0,0 +1,41 @@
from typing import Any, Dict, Optional, Union
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.player import Player
from app.schemas.player import PlayerCreate, PlayerUpdate, PlayerUpdateTeam
class CRUDPlayer(CRUDBase[Player, PlayerCreate, PlayerUpdate]):
def get_player_by_name(
self,
db: Session,
*,
firstname: str,
lastname: str,
) -> Optional[Player]:
return (
db.query(Player)
.filter(Player.lastname == lastname, Player.firstname == firstname)
.first()
)
def get_player(self, db: Session, *, player_id: int) -> Optional[Player]:
return db.query(Player).filter(Player.id == player_id).first()
def get_players(self, db: Session, skip: int = 0, limit: int = 100):
return db.query(Player).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: PlayerCreate) -> Player:
db_obj = Player(
firstname=obj_in.firstname,
lastname=obj_in.lastname,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
player = CRUDPlayer(Player)

47
app/crud/crud_team.py Normal file
View File

@ -0,0 +1,47 @@
from typing import Any, Dict, Optional, Union, List
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.crud import player as crud_player
from app.models.team import Team
from app.schemas.team import TeamCreate, TeamUpdate
class CRUDTeam(CRUDBase[Team, TeamCreate, TeamUpdate]):
def get_team(self, db: Session, *, team_id: int) -> Optional[Team]:
return db.query(Team).filter(Team.team_id == team_id).first()
def get_teams(
self, db: Session, skip: int = 0, limit: int = 100
) -> List[Team]:
return db.query(Team).offset(skip).limit(limit).all()
def create(self, db: Session, *, obj_in: TeamCreate) -> Team:
db_obj = Team(teamname=obj_in.teamname)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def add_player_in_team(
self, db: Session, team_id: int, player_id: int
) -> Team:
team = self.get_team(db=db, team_id=team_id)
db_player = crud_player.get_player(db=db, player_id=player_id)
team.players.append(db_player)
db.commit()
return team
def is_player_in_team(
self, db: Session, *, player_id: int, team_id: int
) -> bool:
team = self.get_team(db=db, team_id=team_id)
if team is None:
return False
db_player = crud_player.get_player(db=db, player_id=player_id)
return db_player in team.players
team = CRUDTeam(Team)

61
app/crud/crud_user.py Normal file
View File

@ -0,0 +1,61 @@
from typing import Any, Dict, Optional, Union
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.crud.base import CRUDBase
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
def get_by_email(self, db: Session, *, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()
def create(self, db: Session, *, obj_in: UserCreate) -> User:
db_obj = User(
email=obj_in.email,
hashed_password=get_password_hash(obj_in.password),
full_name=obj_in.full_name,
is_superuser=obj_in.is_superuser,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self,
db: Session,
*,
db_obj: User,
obj_in: Union[UserUpdate, Dict[str, Any]],
) -> User:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
if update_data["password"]:
hashed_password = get_password_hash(update_data["password"])
del update_data["password"]
update_data["hashed_password"] = hashed_password
return super().update(db, db_obj=db_obj, obj_in=update_data)
def authenticate(
self, db: Session, *, email: str, password: str
) -> Optional[User]:
user = self.get_by_email(db, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active(self, user: User) -> bool:
return user.is_active
def is_superuser(self, user: User) -> bool:
return user.is_superuser
user = CRUDUser(User)

0
app/db/__init__.py Normal file
View File

4
app/db/base.py Normal file
View File

@ -0,0 +1,4 @@
from app.db.base_class import Base # noqa
# from app.models.item import Item # noqa
from app.models.user import User # noqa

13
app/db/base_class.py Normal file
View File

@ -0,0 +1,13 @@
from typing import Any
from sqlalchemy.ext.declarative import as_declarative, declared_attr
@as_declarative()
class Base:
id: Any
__name__: str
# Generate __tablename__ automatically
@declared_attr
def __tablename__(cls) -> str:
return cls.__name__.lower()

27
app/db/init_db.py Normal file
View File

@ -0,0 +1,27 @@
from sqlalchemy.orm import Session
from app import crud, schemas
from app.core.config import settings
from app.db import base # noqa: F401
# make sure all SQL Alchemy models are imported (app.db.base) before initializing DB
# otherwise, SQL Alchemy might fail to initialize relationships properly
# for more details: https://github.com/tiangolo/full-stack-fastapi-postgresql/issues/28
from app.db.base_class import Base
from app.db.session import engine
def init_db(db: Session) -> None:
# Tables should be created with Alembic migrations
# But if you don't want to use migrations, create
# the tables un-commenting the next line
Base.metadata.create_all(bind=engine)
user = crud.user.get_by_email(db, email=settings.FIRST_SUPERUSER)
if not user:
user_in = schemas.UserCreate(
email=settings.FIRST_SUPERUSER,
password=settings.FIRST_SUPERUSER_PASSWORD,
is_superuser=True,
)
user = crud.user.create(db, obj_in=user_in) # noqa: F841

7
app/db/session.py Normal file
View File

@ -0,0 +1,7 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
engine = create_engine(settings.SQLALCHEMY_DATABASE_URI, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

30
app/main.py Normal file
View File

@ -0,0 +1,30 @@
from fastapi import FastAPI
from starlette.middleware.cors import CORSMiddleware
from app.api.api_v1.api import api_router
from app.core.config import settings
from app.db.init_db import init_db
from app.db.session import SessionLocal
db = SessionLocal()
init_db(db)
app = FastAPI(
title=settings.PROJECT_NAME,
openapi_url=f"{settings.API_V1_STR}/openapi.json",
)
# Set all CORS enabled origins
if settings.BACKEND_CORS_ORIGINS:
app.add_middleware(
CORSMiddleware,
allow_origins=[
str(origin) for origin in settings.BACKEND_CORS_ORIGINS
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(api_router, prefix=settings.API_V1_STR)

5
app/models/__init__.py Normal file
View File

@ -0,0 +1,5 @@
# from .match import Match
# from .matchday import Matchday
from .player import Player
from .team import Team
from .user import User

20
app/models/match.py Normal file
View File

@ -0,0 +1,20 @@
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from app.db.database import Base
class Match(Base):
match_id = Column(Integer, primary_key=True)
team_1 = Column(ForeignKey("team.id"), nullable=False)
team_2 = Column(ForeignKey("team.id"), nullable=False)
day = Column(ForeignKey("matchday.id"), nullable=False)
winner = Column(ForeignKey("team.team_id"), nullable=True)
team_1_result = Column(Integer, nullable=True)
team_2_result = Column(Integer, nullable=True)
matchday = relationship("Matchday")
team = relationship("Team", primaryjoin="Match.team_1 == Team.team_id")
team1 = relationship("Team", primaryjoin="Match.team_2 == Team.team_id")
team2 = relationship("Team", primaryjoin="Match.winner == Team.team_id")

17
app/models/matchday.py Normal file
View File

@ -0,0 +1,17 @@
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from app.db.base_class import Base
class MatchdayPlayer(Base):
player_id = Column("players_id", ForeignKey("players.id"), primary_key=True)
matchday_id = Column("teams_id", ForeignKey("teams.id"), primary_key=True)
class Matchday(Base):
matchday_id = Column(Integer, primary_key=True)
day = Column(DateTime, nullable=False)
players = relationship("Player", secondary="matchdayplayer")

16
app/models/player.py Normal file
View File

@ -0,0 +1,16 @@
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from app.db.base_class import Base
class Player(Base):
id = Column(Integer, primary_key=True, index=True)
firstname = Column(String, nullable=True)
lastname = Column(String, nullable=True)
team_id = Column(Integer, ForeignKey("team.team_id"))
team = relationship("Team", back_populates="players")
team_id = Column(Integer, ForeignKey("team.team_id"))
team = relationship("Team", back_populates="players")

11
app/models/team.py Normal file
View File

@ -0,0 +1,11 @@
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from app.db.base_class import Base
class Team(Base):
team_id = Column(Integer, primary_key=True, index=True)
teamname = Column(String, nullable=False)
players = relationship("Player", back_populates="team")

12
app/models/user.py Normal file
View File

@ -0,0 +1,12 @@
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, Boolean
from app.db.base_class import Base
class User(Base):
id = Column(Integer, primary_key=True, index=True)
full_name = Column(String, index=True)
email = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean(), default=True)
is_superuser = Column(Boolean(), default=False)

5
app/schemas/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from .user import User, UserCreate, UserInDB, UserUpdate
from .token import Token, TokenPayload
from .msg import Msg
from .player import Player, PlayerCreate, PlayerInDB
from .team import Team, TeamCreate, TeamInDBBase

39
app/schemas/match.py Normal file
View File

@ -0,0 +1,39 @@
from typing import Optional
from pydantic import BaseModel
# Shared properties
class MatchBase(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
# Properties to receive on match creation
class MatchCreate(MatchBase):
title: str
# Properties to receive on match update
class MatchUpdate(MatchBase):
pass
# Properties shared by models stored in DB
class MatchInDBBase(MatchBase):
id: int
title: str
owner_id: int
class Config:
orm_mode = True
# Properties to return to client
class Match(MatchInDBBase):
pass
# Properties properties stored in DB
class MatchInDB(MatchInDBBase):
pass

42
app/schemas/matchday.py Normal file
View File

@ -0,0 +1,42 @@
from typing import Optional
from pydantic import BaseModel
# Shared properties
from pydantic.datetime_parse import datetime
class MatchBase(BaseModel):
day: Optional[datetime] = None
description: Optional[str] = None
# Properties to receive on match creation
class MatchCreate(MatchBase):
title: str
# Properties to receive on match update
class MatchUpdate(MatchBase):
pass
# Properties shared by models stored in DB
class MatchInDBBase(MatchBase):
id: int
title: str
owner_id: int
class Config:
orm_mode = True
# Properties to return to client
class Match(MatchInDBBase):
pass
# Properties properties stored in DB
class MatchInDB(MatchInDBBase):
pass

5
app/schemas/msg.py Normal file
View File

@ -0,0 +1,5 @@
from pydantic import BaseModel
class Msg(BaseModel):
msg: str

41
app/schemas/player.py Normal file
View File

@ -0,0 +1,41 @@
from typing import Optional
from pydantic import BaseModel, EmailStr
# Shared properties
class PlayerBase(BaseModel):
firstname: Optional[str] = None
lastname: Optional[str] = None
# Properties to receive via API on creation
class PlayerCreate(PlayerBase):
firstname: str
lastname: str
class PlayerInDBBase(PlayerBase):
id: Optional[int] = None
class Config:
orm_mode = True
class PlayerUpdate(PlayerBase):
firstname: str
lastname: str
class PlayerUpdateTeam(PlayerBase):
team_id: Optional[int]
# Additional properties to return via API
class Player(PlayerInDBBase):
pass
# Additional properties stored in DB
class PlayerInDB(PlayerInDBBase):
pass

39
app/schemas/team.py Normal file
View File

@ -0,0 +1,39 @@
from typing import Optional
from pydantic import BaseModel
# Shared properties
from app.schemas import Player
class TeamBase(BaseModel):
teamname: Optional[str] = None
# Properties to receive via API on creation
class TeamCreate(TeamBase):
teamname: Optional[str]
class TeamInDBBase(TeamBase):
players: list[Player] = []
class Config:
orm_mode = True
class TeamUpdate(TeamBase):
pass
# Additional properties to return via API
class Team(TeamInDBBase):
team_id: Optional[int] = None
players: list[Player] = []
# Additional properties stored in DB
class TeamInDB(TeamInDBBase):
pass

12
app/schemas/token.py Normal file
View File

@ -0,0 +1,12 @@
from typing import Optional
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: Optional[int] = None

39
app/schemas/user.py Normal file
View File

@ -0,0 +1,39 @@
from typing import Optional
from pydantic import BaseModel, EmailStr
# Shared properties
class UserBase(BaseModel):
email: Optional[EmailStr] = None
is_active: Optional[bool] = True
is_superuser: bool = False
full_name: Optional[str] = None
# Properties to receive via API on creation
class UserCreate(UserBase):
email: EmailStr
password: str
# Properties to receive via API on update
class UserUpdate(UserBase):
password: Optional[str] = None
class UserInDBBase(UserBase):
id: Optional[int] = None
class Config:
orm_mode = True
# Additional properties to return via API
class User(UserInDBBase):
pass
# Additional properties stored in DB
class UserInDB(UserInDBBase):
hashed_password: str

114
app/utils.py Normal file
View File

@ -0,0 +1,114 @@
import logging
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, Optional
import emails
from emails.template import JinjaTemplate
from jose import jwt
from app.core.config import settings
def send_email(
email_to: str,
subject_template: str = "",
html_template: str = "",
environment: Dict[str, Any] = {},
) -> None:
assert (
settings.EMAILS_ENABLED
), "no provided configuration for email variables"
message = emails.Message(
subject=JinjaTemplate(subject_template),
html=JinjaTemplate(html_template),
mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
)
smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT}
if settings.SMTP_TLS:
smtp_options["tls"] = True
if settings.SMTP_USER:
smtp_options["user"] = settings.SMTP_USER
if settings.SMTP_PASSWORD:
smtp_options["password"] = settings.SMTP_PASSWORD
response = message.send(to=email_to, render=environment, smtp=smtp_options)
logging.info(f"send email result: {response}")
def send_test_email(email_to: str) -> None:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - Test email"
with open(Path(settings.EMAIL_TEMPLATES_DIR) / "test_email.html") as f:
template_str = f.read()
send_email(
email_to=email_to,
subject_template=subject,
html_template=template_str,
environment={"project_name": settings.PROJECT_NAME, "email": email_to},
)
def send_reset_password_email(email_to: str, email: str, token: str) -> None:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - Password recovery for user {email}"
with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f:
template_str = f.read()
server_host = settings.SERVER_HOST
link = f"{server_host}/reset-password?token={token}"
send_email(
email_to=email_to,
subject_template=subject,
html_template=template_str,
environment={
"project_name": settings.PROJECT_NAME,
"username": email,
"email": email_to,
"valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS,
"link": link,
},
)
def send_new_account_email(
email_to: str, username: str, password: str
) -> None:
project_name = settings.PROJECT_NAME
subject = f"{project_name} - New account for user {username}"
with open(Path(settings.EMAIL_TEMPLATES_DIR) / "new_account.html") as f:
template_str = f.read()
link = settings.SERVER_HOST
send_email(
email_to=email_to,
subject_template=subject,
html_template=template_str,
environment={
"project_name": settings.PROJECT_NAME,
"username": username,
"password": password,
"email": email_to,
"link": link,
},
)
def generate_password_reset_token(email: str) -> str:
delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS)
now = datetime.utcnow()
expires = now + delta
exp = expires.timestamp()
encoded_jwt = jwt.encode(
{"exp": exp, "nbf": now, "sub": email},
settings.SECRET_KEY,
algorithm="HS256",
)
return encoded_jwt
def verify_password_reset_token(token: str) -> Optional[str]:
try:
decoded_token = jwt.decode(
token, settings.SECRET_KEY, algorithms=["HS256"]
)
return decoded_token["email"]
except jwt.JWTError:
return None

BIN
floorball.db Normal file

Binary file not shown.

View File

@ -1 +0,0 @@
__version__ = '0.1.0'

1544
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -2,13 +2,34 @@
name = "floorball_stats"
version = "0.1.0"
description = ""
authors = ["Simon Milvert <simon.milvert@scania.com>"]
authors = ["Simon Milvert <simon@milvert.com>"]
[tool.poetry.dependencies]
python = "^3.9"
fastapi = "^0.81.0"
SQLAlchemy = "^1.4.40"
uvicorn = "^0.18.2"
passlib = {extras = ["bcrypt"], version = "^1.7.4"}
python-jose = {extras = ["cryptography"], version = "^3.3.0"}
pydantic = {extras = ["email"], version = "^1.10.0"}
python-multipart = "^0.0.5"
emails = "^0.6"
[tool.poetry.dev-dependencies]
pytest = "^5.2"
pytest = "^7.1.2"
black = "^22.6.0"
coverage = {extras = ["toml"], version = "^6.4.4"}
pytest-cov = "^3.0.0"
mypy = "^0.971"
flake8 = "^5.0.4"
pylint = "^2.15.0"
tox = "^3.25.1"
isort = "^5.10.1"
[tool.black]
line-length = 79
target-version = [ "py39",]
[build-system]
requires = ["poetry-core>=1.0.0"]

View File

@ -1,5 +1,5 @@
from floorball_stats import __version__
from app import __version__
def test_version():
assert __version__ == '0.1.0'
assert __version__ == "0.1.0"

23
tox.ini Normal file
View File

@ -0,0 +1,23 @@
[tox]
envlist =
py39
format
isolated_build = true
[testenv:format]
description = Linting and checking syntax
require_locked_deps = true
install_dev_deps = true
commands =
poetry run isort app\
poetry run black app\
poetry run flake8 app\
poetry run pylint app
[testenv:test]
description = Linting and checking syntax
require_locked_deps = true
install_dev_deps = true
commands =
poetry run pytest -v