mirror of
https://github.com/aykhans/portfolio-blog.git
synced 2025-09-08 15:10:45 +00:00
- Added flake8
- create_user moved to commands
This commit is contained in:
@@ -10,19 +10,26 @@ from pydantic import (
|
||||
class Settings(BaseSettings):
|
||||
PROJECT_NAME: str = 'FastAPI Portfolio & Blog'
|
||||
|
||||
MAIN_PATH: Path = Path(__file__).resolve().parent.parent.parent # path to src folder
|
||||
APP_PATH: Path = MAIN_PATH / 'app' # path to app folder
|
||||
MEDIA_FOLDER: Path = Path('media') # name of media folder
|
||||
MEDIA_PATH: Path = MAIN_PATH / MEDIA_FOLDER # path to media folder
|
||||
STATIC_FOLDER_NAME: Path = Path('static') # name of static folder
|
||||
STATIC_FOLDER: Path = MAIN_PATH / STATIC_FOLDER_NAME # path to static folder
|
||||
MAIN_PATH: Path = (
|
||||
Path(__file__).
|
||||
resolve().
|
||||
parent.
|
||||
parent.
|
||||
parent
|
||||
) # path to src folder
|
||||
APP_PATH: Path = MAIN_PATH / 'app' # path to app folder
|
||||
MEDIA_FOLDER: Path = Path('media') # name of media folder
|
||||
MEDIA_PATH: Path = MAIN_PATH / MEDIA_FOLDER # path to media folder
|
||||
STATIC_FOLDER_NAME: Path = Path('static') # name of static folder
|
||||
STATIC_FOLDER: Path =\
|
||||
MAIN_PATH / STATIC_FOLDER_NAME # path to static folder
|
||||
|
||||
FILE_FOLDERS: dict[str, Path] = {
|
||||
'post_images': Path('post_images'),
|
||||
}
|
||||
|
||||
SECRET_KEY: str
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES: int = 43200 # 30 days
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES: int = 43200 # 30 days
|
||||
|
||||
POSTGRES_USER: str
|
||||
POSTGRES_PASSWORD: str
|
||||
@@ -34,7 +41,7 @@ class Settings(BaseSettings):
|
||||
def LOGIN_URL(self) -> str:
|
||||
return self.SECRET_KEY[-10:]
|
||||
|
||||
def get_postgres_dsn(self, _async: bool=False) -> PostgresDsn:
|
||||
def get_postgres_dsn(self, _async: bool = False) -> PostgresDsn:
|
||||
scheme = 'postgresql+asyncpg' if _async else 'postgresql'
|
||||
|
||||
return PostgresDsn.build(
|
||||
@@ -56,4 +63,4 @@ class Settings(BaseSettings):
|
||||
EMAIL_RECIPIENTS: list[EmailStr] = []
|
||||
|
||||
|
||||
settings = Settings()
|
||||
settings = Settings()
|
||||
|
@@ -32,7 +32,11 @@ def create_access_token(
|
||||
)
|
||||
|
||||
to_encode = {"exp": expire, "sub": str(subject)}
|
||||
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
|
||||
encoded_jwt = jwt.encode(
|
||||
to_encode,
|
||||
settings.SECRET_KEY,
|
||||
algorithm=ALGORITHM
|
||||
)
|
||||
|
||||
return encoded_jwt
|
||||
|
||||
@@ -42,4 +46,4 @@ def verify_password(plain_password: str, hashed_password: str) -> bool:
|
||||
|
||||
|
||||
def get_password_hash(password: str) -> str:
|
||||
return pwd_context.hash(password)
|
||||
return pwd_context.hash(password)
|
||||
|
@@ -26,7 +26,7 @@ UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
|
||||
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
|
||||
def __init__(self, model: Type[ModelType]):
|
||||
"""
|
||||
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
|
||||
CRUD object with default methods to Create, Read, Update, Delete(CRUD).
|
||||
|
||||
**Parameters**
|
||||
|
||||
@@ -44,7 +44,12 @@ class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
|
||||
self, db: Session, *, skip: int = 0, limit: int = 100
|
||||
) -> List[ModelType]:
|
||||
|
||||
q = select(self.model).offset(skip).limit(limit).order_by(self.model.id.desc())
|
||||
q = (
|
||||
select(self.model).
|
||||
offset(skip).
|
||||
limit(limit).
|
||||
order_by(self.model.id.desc())
|
||||
)
|
||||
obj = await db.execute(q)
|
||||
return obj.scalars()
|
||||
|
||||
@@ -52,11 +57,18 @@ class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
|
||||
self, db: Session, *, skip: int = 0, limit: int = 100
|
||||
) -> List[ModelType]:
|
||||
|
||||
q = select(self.model).offset(skip).limit(limit).order_by(self.model.id.desc())
|
||||
q = (
|
||||
select(self.model).
|
||||
offset(skip).
|
||||
limit(limit).
|
||||
order_by(self.model.id.desc())
|
||||
)
|
||||
obj = db.execute(q)
|
||||
return obj.scalars()
|
||||
|
||||
async def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType:
|
||||
async def create(
|
||||
self, db: Session, *, obj_in: CreateSchemaType
|
||||
) -> ModelType:
|
||||
obj_in_data = jsonable_encoder(obj_in)
|
||||
db_obj = self.model(**obj_in_data) # type: ignore
|
||||
db.add(db_obj)
|
||||
|
@@ -75,4 +75,5 @@ class CRUDPost(CRUDBase[Post, PostCreate, PostUpdate]):
|
||||
|
||||
return obj
|
||||
|
||||
post = CRUDPost(Post)
|
||||
|
||||
post = CRUDPost(Post)
|
||||
|
@@ -60,7 +60,11 @@ class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
|
||||
return db_obj
|
||||
|
||||
async def update(
|
||||
self, db: Session, *, db_obj: User, obj_in: Union[UserUpdate, Dict[str, Any]]
|
||||
self,
|
||||
db: Session,
|
||||
*,
|
||||
db_obj: User,
|
||||
obj_in: Union[UserUpdate, Dict[str, Any]]
|
||||
) -> User:
|
||||
|
||||
if isinstance(obj_in, dict):
|
||||
@@ -76,7 +80,9 @@ class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
|
||||
|
||||
return await super().update(db, db_obj=db_obj, obj_in=update_data)
|
||||
|
||||
async def authenticate(self, db: Session, *, email: str, password: str) -> Optional[User]:
|
||||
async def authenticate(
|
||||
self, db: Session, *, email: str, password: str
|
||||
) -> Optional[User]:
|
||||
user = await self.get_by_email(db, email=email)
|
||||
|
||||
if user is None:
|
||||
@@ -93,4 +99,5 @@ class CRUDUser(CRUDBase[User, UserCreate, UserUpdate]):
|
||||
async def is_superuser(self, user: User) -> bool:
|
||||
return user.is_superuser
|
||||
|
||||
user = CRUDUser(User)
|
||||
|
||||
user = CRUDUser(User)
|
||||
|
@@ -1 +1 @@
|
||||
from app.db.base_class import Base
|
||||
from app.db.base_class import Base
|
||||
|
@@ -16,10 +16,14 @@ class Base:
|
||||
__name__: str
|
||||
|
||||
id: Any
|
||||
created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
|
||||
created_at = Column(
|
||||
DateTime(timezone=True),
|
||||
server_default=func.now(),
|
||||
nullable=False
|
||||
)
|
||||
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
|
||||
|
||||
# Generate __tablename__ automatically
|
||||
@declared_attr
|
||||
def __tablename__(cls) -> str:
|
||||
return cls.__name__.lower()
|
||||
return cls.__name__.lower()
|
||||
|
@@ -34,4 +34,4 @@ AsyncSessionLocal: AsyncSession = sessionmaker(
|
||||
autoflush=False,
|
||||
bind=async_engine,
|
||||
class_=AsyncSession
|
||||
)
|
||||
)
|
||||
|
@@ -1,2 +1,3 @@
|
||||
from .post import Post
|
||||
from .user import User
|
||||
from .user import User
|
||||
from . import post_events
|
@@ -1,12 +1,4 @@
|
||||
from contextlib import contextmanager
|
||||
from slugify import slugify
|
||||
|
||||
from sqlalchemy.orm.base import NO_VALUE
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.event import (
|
||||
listen,
|
||||
listens_for
|
||||
)
|
||||
from sqlalchemy import (
|
||||
Column,
|
||||
ForeignKey,
|
||||
@@ -26,44 +18,3 @@ class Post(Base):
|
||||
owner_id = Column(Integer(), ForeignKey("user.id"))
|
||||
|
||||
owner = relationship("User", back_populates="posts")
|
||||
|
||||
|
||||
from app.views.depends import get_db
|
||||
from app.utils.file_operations import remove_file
|
||||
from app.core.config import settings
|
||||
|
||||
def generate_slug(target, value, oldvalue, initiator):
|
||||
slug = slugify(value)
|
||||
|
||||
with contextmanager(get_db)() as db:
|
||||
number = 1
|
||||
temp_slug = slug
|
||||
|
||||
while db.query(Post).filter(Post.slug == temp_slug).first() is not None:
|
||||
temp_slug = f'{slug}-{number}'
|
||||
number += 1
|
||||
|
||||
target.slug = temp_slug
|
||||
|
||||
listen(Post.title, 'set', generate_slug)
|
||||
|
||||
|
||||
def remove_old_image_on_update(target, value, oldvalue, initiator):
|
||||
if oldvalue is not NO_VALUE:
|
||||
remove_file(
|
||||
settings.MEDIA_PATH /
|
||||
settings.FILE_FOLDERS['post_images'] /
|
||||
oldvalue
|
||||
)
|
||||
|
||||
listen(Post.image_path, 'set', remove_old_image_on_update)
|
||||
|
||||
|
||||
@listens_for(Post, 'before_delete')
|
||||
def before_delete_listener(mapper, connection, target):
|
||||
if target.image_path:
|
||||
remove_file(
|
||||
settings.MEDIA_PATH /
|
||||
settings.FILE_FOLDERS['post_images'] /
|
||||
target.image_path
|
||||
)
|
54
src/app/models/post_events.py
Normal file
54
src/app/models/post_events.py
Normal file
@@ -0,0 +1,54 @@
|
||||
from contextlib import contextmanager
|
||||
from slugify import slugify
|
||||
|
||||
from sqlalchemy.orm.base import NO_VALUE
|
||||
from sqlalchemy.event import (
|
||||
listen,
|
||||
listens_for
|
||||
)
|
||||
|
||||
from app.views.depends import get_db
|
||||
from app.utils.file_operations import remove_file
|
||||
from app.core.config import settings
|
||||
from .post import Post
|
||||
|
||||
|
||||
def generate_slug(target, value, oldvalue, initiator):
|
||||
slug = slugify(value)
|
||||
|
||||
with contextmanager(get_db)() as db:
|
||||
number = 1
|
||||
temp_slug = slug
|
||||
|
||||
while db.query(Post).filter(
|
||||
Post.slug == temp_slug
|
||||
).first() is not None:
|
||||
temp_slug = f'{slug}-{number}'
|
||||
number += 1
|
||||
|
||||
target.slug = temp_slug
|
||||
|
||||
|
||||
listen(Post.title, 'set', generate_slug)
|
||||
|
||||
|
||||
def remove_old_image_on_update(target, value, oldvalue, initiator):
|
||||
if oldvalue is not NO_VALUE:
|
||||
remove_file(
|
||||
settings.MEDIA_PATH /
|
||||
settings.FILE_FOLDERS['post_images'] /
|
||||
oldvalue
|
||||
)
|
||||
|
||||
|
||||
listen(Post.image_path, 'set', remove_old_image_on_update)
|
||||
|
||||
|
||||
@listens_for(Post, 'before_delete')
|
||||
def before_delete_listener(mapper, connection, target):
|
||||
if target.image_path:
|
||||
remove_file(
|
||||
settings.MEDIA_PATH /
|
||||
settings.FILE_FOLDERS['post_images'] /
|
||||
target.image_path
|
||||
)
|
@@ -17,4 +17,4 @@ class User(Base):
|
||||
is_active = Column(Boolean(), default=True)
|
||||
is_superuser = Column(Boolean(), default=False)
|
||||
|
||||
posts = relationship("Post", back_populates="owner")
|
||||
posts = relationship("Post", back_populates="owner")
|
||||
|
@@ -21,4 +21,4 @@ class JWTToken(BaseModel):
|
||||
|
||||
|
||||
class JWTTokenPayload(BaseModel):
|
||||
sub: Optional[EmailStr] = None
|
||||
sub: Optional[EmailStr] = None
|
||||
|
@@ -11,4 +11,4 @@ class SendEmail:
|
||||
name: str = Form(..., max_length=50)
|
||||
email: EmailStr = Form(...)
|
||||
phone: Optional[str] = Form(None, max_length=20)
|
||||
message: str = Form(..., max_length=1000)
|
||||
message: str = Form(..., max_length=1000)
|
||||
|
@@ -13,7 +13,6 @@ from app.utils.custom_functions import html2text
|
||||
from app.core.config import settings
|
||||
|
||||
|
||||
|
||||
class PostBase(BaseModel):
|
||||
title: Optional[str] = Field(max_length=100)
|
||||
text: Optional[str] = None
|
||||
@@ -26,7 +25,8 @@ class PostCreate(PostBase):
|
||||
image_path: str
|
||||
|
||||
|
||||
class PostUpdate(PostBase): ...
|
||||
class PostUpdate(PostBase):
|
||||
...
|
||||
|
||||
|
||||
class PostInTemplate(BaseModel):
|
||||
@@ -85,4 +85,4 @@ class Post(PostInDBBase):
|
||||
settings.MEDIA_FOLDER /
|
||||
settings.FILE_FOLDERS['post_images'] /
|
||||
v
|
||||
)
|
||||
)
|
||||
|
@@ -34,4 +34,4 @@ class User(UserInDBBase):
|
||||
|
||||
|
||||
class UserInDB(UserInDBBase):
|
||||
hashed_password: str
|
||||
hashed_password: str
|
||||
|
@@ -1,119 +0,0 @@
|
||||
from sys import path
|
||||
path.append('/src')
|
||||
|
||||
from contextlib import contextmanager
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import (
|
||||
EmailStr,
|
||||
ValidationError,
|
||||
ConfigDict
|
||||
)
|
||||
|
||||
from app import crud
|
||||
from app.schemas import UserCreate
|
||||
from app.views.depends import get_db
|
||||
|
||||
|
||||
class UserCreateCommand(UserCreate):
|
||||
model_config = ConfigDict(validate_assignment=True)
|
||||
|
||||
email: Optional[EmailStr] = None
|
||||
password: Optional[str] = None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
user_in = UserCreateCommand()
|
||||
|
||||
while 1:
|
||||
email = input('*Email: ')
|
||||
|
||||
if not email:
|
||||
print('Email is required\n')
|
||||
continue
|
||||
|
||||
try:
|
||||
user_in.email = email
|
||||
|
||||
except ValidationError as e:
|
||||
print('\n', e, end='\n\n')
|
||||
continue
|
||||
|
||||
with contextmanager(get_db)() as db:
|
||||
user = crud.user.sync_get_by_email(
|
||||
db,
|
||||
email=user_in.email
|
||||
)
|
||||
|
||||
if user:
|
||||
print('User already exists\n')
|
||||
continue
|
||||
|
||||
break
|
||||
|
||||
while 1:
|
||||
username = input('Username: ')
|
||||
|
||||
if username:
|
||||
try:
|
||||
user_in.username = username
|
||||
|
||||
except ValidationError as e:
|
||||
print('\n', e, end='\n\n')
|
||||
continue
|
||||
|
||||
break
|
||||
|
||||
while 1:
|
||||
password = input('*Password: ')
|
||||
|
||||
if not password:
|
||||
print('Password is required\n')
|
||||
continue
|
||||
|
||||
try:
|
||||
user_in.password = password
|
||||
|
||||
except ValidationError as e:
|
||||
print('\n', e, end='\n\n')
|
||||
continue
|
||||
|
||||
break
|
||||
|
||||
while 1:
|
||||
is_active = input('Is active? y/n (y): ') or 'y'
|
||||
|
||||
if is_active == 'y':
|
||||
user_in.is_active = True
|
||||
|
||||
elif is_active == 'n':
|
||||
user_in.is_active = False
|
||||
|
||||
else:
|
||||
print('Invalid input\n')
|
||||
continue
|
||||
|
||||
break
|
||||
|
||||
while 1:
|
||||
is_superuser = input('Is superuser? y/n (n): ') or 'n'
|
||||
|
||||
if is_superuser == 'y':
|
||||
user_in.is_superuser = True
|
||||
|
||||
elif is_superuser == 'n':
|
||||
user_in.is_superuser = False
|
||||
|
||||
else:
|
||||
print('Invalid input\n')
|
||||
continue
|
||||
|
||||
break
|
||||
|
||||
with contextmanager(get_db)() as db:
|
||||
user = crud.user.sync_create(
|
||||
db,
|
||||
obj_in=user_in
|
||||
)
|
||||
|
||||
print(f'\nUser created:\n{user_in}\n')
|
@@ -8,4 +8,4 @@ def html2text(html: str) -> str:
|
||||
|
||||
|
||||
def get_remote_address(request: Request) -> str:
|
||||
return request.headers.get('host')
|
||||
return request.headers.get('host')
|
||||
|
@@ -17,21 +17,21 @@ def send_email_notification(
|
||||
|
||||
if settings.EMAIL_RECIPIENTS:
|
||||
conf = ConnectionConfig(
|
||||
MAIL_USERNAME = settings.SMTP_USER,
|
||||
MAIL_PASSWORD = settings.SMTP_PASSWORD,
|
||||
MAIL_FROM = settings.SMTP_USER,
|
||||
MAIL_PORT = settings.SMTP_PORT,
|
||||
MAIL_SERVER = settings.SMTP_HOST,
|
||||
MAIL_SSL_TLS = settings.SMTP_SSL_TLS,
|
||||
MAIL_STARTTLS = True
|
||||
MAIL_USERNAME=settings.SMTP_USER,
|
||||
MAIL_PASSWORD=settings.SMTP_PASSWORD,
|
||||
MAIL_FROM=settings.SMTP_USER,
|
||||
MAIL_PORT=settings.SMTP_PORT,
|
||||
MAIL_SERVER=settings.SMTP_HOST,
|
||||
MAIL_SSL_TLS=settings.SMTP_SSL_TLS,
|
||||
MAIL_STARTTLS=True
|
||||
)
|
||||
|
||||
message = MessageSchema(
|
||||
subject = subject,
|
||||
recipients = settings.EMAIL_RECIPIENTS,
|
||||
body = body,
|
||||
subject=subject,
|
||||
recipients=settings.EMAIL_RECIPIENTS,
|
||||
body=body,
|
||||
subtype=MessageType.plain
|
||||
)
|
||||
|
||||
fast_mail = FastMail(conf)
|
||||
return partial(fast_mail.send_message, message)
|
||||
return partial(fast_mail.send_message, message)
|
||||
|
@@ -8,5 +8,7 @@ async def mkdir_if_not_exists(path: Path) -> None:
|
||||
|
||||
|
||||
def remove_file(file_path: str) -> None:
|
||||
try: remove(file_path)
|
||||
except FileNotFoundError: ...
|
||||
try:
|
||||
remove(file_path)
|
||||
except FileNotFoundError:
|
||||
...
|
||||
|
@@ -22,4 +22,4 @@ async def generate_unique_image_name(
|
||||
|
||||
async def save_image(image: Image, image_path: Path) -> None:
|
||||
await mkdir_if_not_exists(image_path.parent)
|
||||
image.save(image_path)
|
||||
image.save(image_path)
|
||||
|
@@ -1,4 +1,4 @@
|
||||
from slowapi import Limiter
|
||||
from app.utils.custom_functions import get_remote_address
|
||||
|
||||
limiter = Limiter(key_func=get_remote_address)
|
||||
limiter = Limiter(key_func=get_remote_address)
|
||||
|
@@ -81,7 +81,7 @@ async def get_current_user_or_die(
|
||||
detail="Could not validate credentials",
|
||||
)
|
||||
|
||||
if token_data.sub is None:
|
||||
if token_data.sub is None:
|
||||
raise HTTPException(status_code=404, detail="User not found")
|
||||
|
||||
user = await crud.user.get_by_email(db, email=token_data.sub)
|
||||
@@ -98,7 +98,8 @@ async def get_current_user_or_none(
|
||||
)
|
||||
) -> UserModel | None:
|
||||
|
||||
if token is None: return None
|
||||
if token is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
payload = jwt.decode(
|
||||
@@ -109,7 +110,7 @@ async def get_current_user_or_none(
|
||||
except (jwt.JWTError, ValidationError):
|
||||
return None
|
||||
|
||||
if token_data.sub is None:
|
||||
if token_data.sub is None:
|
||||
return None
|
||||
|
||||
user = await crud.user.get_by_email(db, email=token_data.sub)
|
||||
@@ -132,7 +133,8 @@ async def get_current_active_user_or_none(
|
||||
current_user: UserModel | None = Depends(get_current_user_or_none),
|
||||
) -> UserModel | None:
|
||||
|
||||
if current_user is None: return None
|
||||
if current_user is None:
|
||||
return None
|
||||
|
||||
if current_user.is_active is False:
|
||||
return None
|
||||
@@ -154,7 +156,8 @@ async def get_current_active_superuser_or_none(
|
||||
current_user: UserModel | None = Depends(get_current_active_user_or_none),
|
||||
) -> UserModel | None:
|
||||
|
||||
if current_user is None: return None
|
||||
if current_user is None:
|
||||
return None
|
||||
|
||||
if current_user.is_superuser is False:
|
||||
return None
|
||||
@@ -182,19 +185,19 @@ async def handle_post_image_or_die(image: UploadFile) -> str:
|
||||
raise ValueError('Invalid image format')
|
||||
|
||||
unique_image_name = await generate_unique_image_name(
|
||||
path = settings.MEDIA_PATH / settings.FILE_FOLDERS['post_images'],
|
||||
image_name = image.filename,
|
||||
image_format = pil_image.format.lower()
|
||||
path=settings.MEDIA_PATH / settings.FILE_FOLDERS['post_images'],
|
||||
image_name=image.filename,
|
||||
image_format=pil_image.format.lower()
|
||||
)
|
||||
|
||||
await save_image(
|
||||
image = pil_image,
|
||||
image_path = settings.MEDIA_PATH /
|
||||
settings.FILE_FOLDERS['post_images'] /
|
||||
unique_image_name
|
||||
image=pil_image,
|
||||
image_path=settings.MEDIA_PATH /
|
||||
settings.FILE_FOLDERS['post_images'] /
|
||||
unique_image_name
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
except Exception:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
||||
detail='Invalid image'
|
||||
@@ -204,12 +207,14 @@ async def handle_post_image_or_die(image: UploadFile) -> str:
|
||||
try:
|
||||
pil_image.close()
|
||||
|
||||
except: ...
|
||||
except Exception:
|
||||
...
|
||||
return str(unique_image_name)
|
||||
|
||||
|
||||
async def handle_post_image_or_none(image: UploadFile = None) -> Optional[str]:
|
||||
if image is None: return None
|
||||
if image is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
pil_image = Image.open(io.BytesIO(image.file.read()))
|
||||
@@ -218,19 +223,19 @@ async def handle_post_image_or_none(image: UploadFile = None) -> Optional[str]:
|
||||
raise ValueError('Invalid image format')
|
||||
|
||||
unique_image_name = await generate_unique_image_name(
|
||||
path = settings.MEDIA_PATH / settings.FILE_FOLDERS['post_images'],
|
||||
image_name = image.filename,
|
||||
image_format = pil_image.format.lower()
|
||||
path=settings.MEDIA_PATH / settings.FILE_FOLDERS['post_images'],
|
||||
image_name=image.filename,
|
||||
image_format=pil_image.format.lower()
|
||||
)
|
||||
|
||||
await save_image(
|
||||
image = pil_image,
|
||||
image_path = settings.MEDIA_PATH /
|
||||
settings.FILE_FOLDERS['post_images'] /
|
||||
unique_image_name
|
||||
image=pil_image,
|
||||
image_path=settings.MEDIA_PATH /
|
||||
settings.FILE_FOLDERS['post_images'] /
|
||||
unique_image_name
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
except Exception:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
||||
detail='Invalid image'
|
||||
@@ -240,5 +245,6 @@ async def handle_post_image_or_none(image: UploadFile = None) -> Optional[str]:
|
||||
try:
|
||||
pil_image.close()
|
||||
|
||||
except: ...
|
||||
return str(unique_image_name)
|
||||
except Exception:
|
||||
...
|
||||
return str(unique_image_name)
|
||||
|
@@ -7,4 +7,4 @@ from app.views.routers import (
|
||||
|
||||
main_router = APIRouter()
|
||||
main_router.include_router(main.router, tags=['main'])
|
||||
main_router.include_router(user.router, tags=['user'])
|
||||
main_router.include_router(user.router, tags=['user'])
|
||||
|
@@ -56,8 +56,8 @@ async def send_email(
|
||||
f"message: {form_data.message}"
|
||||
|
||||
email_notification = send_email_notification(
|
||||
subject = f"Portfolio Blog (by {form_data.email})",
|
||||
body = body
|
||||
subject=f"Portfolio Blog (by {form_data.email})",
|
||||
body=body
|
||||
)
|
||||
|
||||
if email_notification is not None:
|
||||
@@ -124,4 +124,4 @@ async def blog_post(
|
||||
'request': request,
|
||||
'post': PostDetail.model_validate(post)
|
||||
}
|
||||
)
|
||||
)
|
||||
|
@@ -81,12 +81,17 @@ async def login(
|
||||
)
|
||||
|
||||
if user is None:
|
||||
raise HTTPException(status_code=400, detail="Incorrect email or password")
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Incorrect email or password"
|
||||
)
|
||||
|
||||
elif user.is_active is False:
|
||||
raise HTTPException(status_code=400, detail="Inactive user")
|
||||
|
||||
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
|
||||
access_token_expires = timedelta(
|
||||
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
|
||||
)
|
||||
|
||||
return {
|
||||
"access_token": security.create_access_token(
|
||||
@@ -130,7 +135,9 @@ async def create_post(
|
||||
image_path=image
|
||||
)
|
||||
|
||||
post = await crud.post.create_with_owner(db, obj_in=obj_in, owner_id=user.id)
|
||||
post = await crud.post.create_with_owner(
|
||||
db, obj_in=obj_in, owner_id=user.id
|
||||
)
|
||||
|
||||
return RedirectResponse(
|
||||
str(request.url_for('get_update_post', slug=post.slug)),
|
||||
|
Reference in New Issue
Block a user