init app
This commit is contained in:
BIN
__pycache__/crud.cpython-313.pyc
Normal file
BIN
__pycache__/crud.cpython-313.pyc
Normal file
Binary file not shown.
BIN
__pycache__/database.cpython-313.pyc
Normal file
BIN
__pycache__/database.cpython-313.pyc
Normal file
Binary file not shown.
BIN
__pycache__/main.cpython-313.pyc
Normal file
BIN
__pycache__/main.cpython-313.pyc
Normal file
Binary file not shown.
BIN
__pycache__/models.cpython-313.pyc
Normal file
BIN
__pycache__/models.cpython-313.pyc
Normal file
Binary file not shown.
BIN
__pycache__/schemas.cpython-313.pyc
Normal file
BIN
__pycache__/schemas.cpython-313.pyc
Normal file
Binary file not shown.
84
crud.py
Normal file
84
crud.py
Normal file
@@ -0,0 +1,84 @@
|
||||
from sqlalchemy.orm import Session
|
||||
import models
|
||||
import schemas
|
||||
|
||||
# Tasks
|
||||
def get_task(db: Session, task_id: int, telegram_user_id: str = None):
|
||||
query = db.query(models.Task).filter(models.Task.id == task_id)
|
||||
if telegram_user_id:
|
||||
query = query.filter(models.Task.telegram_user_id == telegram_user_id)
|
||||
return query.first()
|
||||
|
||||
def get_tasks(db: Session, skip: int = 0, limit: int = 100, telegram_user_id: str = None, status: models.TaskStatus = None):
|
||||
query = db.query(models.Task)
|
||||
if telegram_user_id:
|
||||
query = query.filter(models.Task.telegram_user_id == telegram_user_id)
|
||||
if status:
|
||||
query = query.filter(models.Task.status == status)
|
||||
return query.offset(skip).limit(limit).all()
|
||||
|
||||
def create_task(db: Session, task: schemas.TaskCreate):
|
||||
db_task = models.Task(**task.model_dump())
|
||||
db.add(db_task)
|
||||
db.commit()
|
||||
db.refresh(db_task)
|
||||
return db_task
|
||||
|
||||
def update_task(db: Session, task_id: int, task_update: schemas.TaskUpdate, telegram_user_id: str = None):
|
||||
db_task = get_task(db, task_id, telegram_user_id)
|
||||
if not db_task:
|
||||
return None
|
||||
|
||||
update_data = task_update.model_dump(exclude_unset=True)
|
||||
for key, value in update_data.items():
|
||||
setattr(db_task, key, value)
|
||||
|
||||
db.commit()
|
||||
db.refresh(db_task)
|
||||
return db_task
|
||||
|
||||
def delete_task(db: Session, task_id: int, telegram_user_id: str = None):
|
||||
db_task = get_task(db, task_id, telegram_user_id)
|
||||
if db_task:
|
||||
db.delete(db_task)
|
||||
db.commit()
|
||||
return True
|
||||
return False
|
||||
|
||||
# Reminders
|
||||
def get_reminder(db: Session, reminder_id: int):
|
||||
return db.query(models.Reminder).filter(models.Reminder.id == reminder_id).first()
|
||||
|
||||
def get_reminders(db: Session, task_id: int = None, skip: int = 0, limit: int = 100):
|
||||
query = db.query(models.Reminder)
|
||||
if task_id:
|
||||
query = query.filter(models.Reminder.task_id == task_id)
|
||||
return query.offset(skip).limit(limit).all()
|
||||
|
||||
def create_reminder(db: Session, reminder: schemas.ReminderCreate):
|
||||
db_reminder = models.Reminder(**reminder.model_dump())
|
||||
db.add(db_reminder)
|
||||
db.commit()
|
||||
db.refresh(db_reminder)
|
||||
return db_reminder
|
||||
|
||||
def update_reminder(db: Session, reminder_id: int, reminder_update: schemas.ReminderUpdate):
|
||||
db_reminder = get_reminder(db, reminder_id)
|
||||
if not db_reminder:
|
||||
return None
|
||||
|
||||
update_data = reminder_update.model_dump(exclude_unset=True)
|
||||
for key, value in update_data.items():
|
||||
setattr(db_reminder, key, value)
|
||||
|
||||
db.commit()
|
||||
db.refresh(db_reminder)
|
||||
return db_reminder
|
||||
|
||||
def delete_reminder(db: Session, reminder_id: int):
|
||||
db_reminder = get_reminder(db, reminder_id)
|
||||
if db_reminder:
|
||||
db.delete(db_reminder)
|
||||
db.commit()
|
||||
return True
|
||||
return False
|
||||
19
database.py
Normal file
19
database.py
Normal file
@@ -0,0 +1,19 @@
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker, declarative_base
|
||||
|
||||
SQLALCHEMY_DATABASE_URL = "sqlite:///./tasks.db"
|
||||
# connect_args={"check_same_thread": False} is required for SQLite in FastAPI
|
||||
engine = create_engine(
|
||||
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
|
||||
)
|
||||
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
# Dependency
|
||||
def get_db():
|
||||
db = SessionLocal()
|
||||
try:
|
||||
yield db
|
||||
finally:
|
||||
db.close()
|
||||
35
int.txt
Normal file
35
int.txt
Normal file
@@ -0,0 +1,35 @@
|
||||
Task and Reminder Manager Backend - Walkthrough
|
||||
What Was Accomplished
|
||||
I successfully developed the backend infrastructure for the Task and Reminder Manager application, exposing a REST API built with FastAPI that meets the user requirements. I set up the database, models, and endpoints necessary to manage tasks and reminders.
|
||||
|
||||
Key Changes Made
|
||||
Application Structure: Created a modular FastAPI backend (
|
||||
main.py
|
||||
,
|
||||
database.py
|
||||
,
|
||||
models.py
|
||||
,
|
||||
schemas.py
|
||||
,
|
||||
crud.py
|
||||
).
|
||||
Database System: Implemented SQLite functionality via SQLAlchemy, with specific models adapted to track user execution and messaging contexts.
|
||||
Designed
|
||||
Task
|
||||
model containing the telegram_user_id and the status (pending, completed).
|
||||
Designed
|
||||
Reminder
|
||||
model tightly coupled to
|
||||
Task
|
||||
, capable of deep tracking: trigger_time, recurrence_type (daily, custom, etc.), and notification toggles (notify_push, notify_telegram).
|
||||
Data Validation: Built Pydantic schemas representing creation payloads, updates, and serialized API responses to ensure safe data boundaries.
|
||||
API Endpoints: Built isolated routers to handle operations across tasks (
|
||||
routers/tasks.py
|
||||
) and reminders (
|
||||
routers/reminders.py
|
||||
). Examples include retrieving tasks specific to a telegram_user_id, handling task completion logic (via PUT endpoint updates), and constructing sophisticated reminders.
|
||||
Validation Results
|
||||
Verified that all necessary dependencies (including updated Python 3.13-compatible versions) install inside a valid virtual environment.
|
||||
Verified successful Uvicorn server boot without errors.
|
||||
Fired test API requests to generate entries into SQLite, returning 200 success responses. The system's CRUD functionality handles database commits successfully and retrieves nested relationships gracefully.
|
||||
34
main.py
Normal file
34
main.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from fastapi import FastAPI
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
import models
|
||||
from database import engine
|
||||
from routers import tasks, reminders
|
||||
|
||||
# Create database tables
|
||||
models.Base.metadata.create_all(bind=engine)
|
||||
|
||||
app = FastAPI(
|
||||
title="Task Manager API",
|
||||
description="Backend API for managing tasks and reminders for Web and Telegram.",
|
||||
version="1.0.0"
|
||||
)
|
||||
|
||||
# CORS configuration
|
||||
origins = [
|
||||
"*" # Adjust this in production
|
||||
]
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=origins,
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
# Include routers
|
||||
app.include_router(tasks.router)
|
||||
app.include_router(reminders.router)
|
||||
|
||||
@app.get("/")
|
||||
def read_root():
|
||||
return {"message": "Welcome to the Task Manager API"}
|
||||
46
models.py
Normal file
46
models.py
Normal file
@@ -0,0 +1,46 @@
|
||||
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Enum
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.sql import func
|
||||
import enum
|
||||
from database import Base
|
||||
|
||||
class TaskStatus(str, enum.Enum):
|
||||
pending = "pending"
|
||||
completed = "completed"
|
||||
|
||||
class RecurrenceType(str, enum.Enum):
|
||||
none = "none"
|
||||
daily = "daily"
|
||||
weekly = "weekly"
|
||||
monthly = "monthly"
|
||||
custom = "custom"
|
||||
|
||||
class Task(Base):
|
||||
__tablename__ = "tasks"
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
title = Column(String, index=True, nullable=False)
|
||||
description = Column(String, nullable=True)
|
||||
telegram_user_id = Column(String, index=True, nullable=False)
|
||||
status = Column(Enum(TaskStatus), default=TaskStatus.pending, nullable=False)
|
||||
created_at = Column(DateTime(timezone=True), server_default=func.now())
|
||||
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
|
||||
|
||||
reminders = relationship("Reminder", back_populates="task", cascade="all, delete-orphan")
|
||||
|
||||
class Reminder(Base):
|
||||
__tablename__ = "reminders"
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
task_id = Column(Integer, ForeignKey("tasks.id"), nullable=False)
|
||||
trigger_time = Column(DateTime(timezone=True), nullable=False)
|
||||
recurrence_type = Column(Enum(RecurrenceType), default=RecurrenceType.none, nullable=False)
|
||||
recurrence_interval = Column(Integer, nullable=True) # E.g. every X minutes/hours if custom
|
||||
remind_count_limit = Column(Integer, default=0, nullable=False) # 0 for infinite
|
||||
remind_count_current = Column(Integer, default=0, nullable=False)
|
||||
notify_push = Column(Boolean, default=False, nullable=False)
|
||||
notify_telegram = Column(Boolean, default=False, nullable=False)
|
||||
created_at = Column(DateTime(timezone=True), server_default=func.now())
|
||||
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
|
||||
|
||||
task = relationship("Task", back_populates="reminders")
|
||||
60
pip_install.log
Normal file
60
pip_install.log
Normal file
@@ -0,0 +1,60 @@
|
||||
Collecting fastapi==0.110.0 (from -r requirements.txt (line 1))
|
||||
Using cached fastapi-0.110.0-py3-none-any.whl.metadata (25 kB)
|
||||
Collecting uvicorn==0.27.1 (from -r requirements.txt (line 2))
|
||||
Using cached uvicorn-0.27.1-py3-none-any.whl.metadata (6.3 kB)
|
||||
Collecting sqlalchemy==2.0.28 (from -r requirements.txt (line 3))
|
||||
Using cached SQLAlchemy-2.0.28-py3-none-any.whl.metadata (9.6 kB)
|
||||
Collecting pydantic==2.6.4 (from -r requirements.txt (line 4))
|
||||
Using cached pydantic-2.6.4-py3-none-any.whl.metadata (85 kB)
|
||||
Collecting pydantic-settings==2.2.1 (from -r requirements.txt (line 5))
|
||||
Using cached pydantic_settings-2.2.1-py3-none-any.whl.metadata (3.1 kB)
|
||||
Collecting starlette<0.37.0,>=0.36.3 (from fastapi==0.110.0->-r requirements.txt (line 1))
|
||||
Using cached starlette-0.36.3-py3-none-any.whl.metadata (5.9 kB)
|
||||
Collecting typing-extensions>=4.8.0 (from fastapi==0.110.0->-r requirements.txt (line 1))
|
||||
Using cached typing_extensions-4.15.0-py3-none-any.whl.metadata (3.3 kB)
|
||||
Collecting annotated-types>=0.4.0 (from pydantic==2.6.4->-r requirements.txt (line 4))
|
||||
Using cached annotated_types-0.7.0-py3-none-any.whl.metadata (15 kB)
|
||||
Collecting pydantic-core==2.16.3 (from pydantic==2.6.4->-r requirements.txt (line 4))
|
||||
Using cached pydantic_core-2.16.3.tar.gz (368 kB)
|
||||
Installing build dependencies: started
|
||||
Installing build dependencies: finished with status 'done'
|
||||
Getting requirements to build wheel: started
|
||||
Getting requirements to build wheel: finished with status 'done'
|
||||
Installing backend dependencies: started
|
||||
Installing backend dependencies: finished with status 'done'
|
||||
Preparing metadata (pyproject.toml): started
|
||||
Preparing metadata (pyproject.toml): finished with status 'error'
|
||||
error: subprocess-exited-with-error
|
||||
|
||||
Preparing metadata (pyproject.toml) did not run successfully.
|
||||
exit code: 1
|
||||
|
||||
[14 lines of output]
|
||||
Python reports SOABI: cp313-win_amd64
|
||||
Computed rustc target triple: x86_64-pc-windows-msvc
|
||||
Installation directory: C:\Users\patro\AppData\Local\puccinialin\puccinialin\Cache
|
||||
Rustup already downloaded
|
||||
Installing rust to C:\Users\patro\AppData\Local\puccinialin\puccinialin\Cache\rustup
|
||||
warn: installing msvc toolchain without its prerequisites
|
||||
error: could not read metadata for file: 'C:\Users\patro\AppData\Local\puccinialin\puccinialin\Cache\rustup-init\rustup-init.exe': ╨б╨╕╤Б╤В╨╡╨╝╨╡ ╨╜╨╡ ╤Г╨┤╨░╨╡╤В╤Б╤П ╨╜╨░╨╣╤В╨╕ ╤Г╨║╨░╨╖╨░╨╜╨╜╤Л╨╣ ╨┐╤Г╤В╤М. (os error 3)
|
||||
|
||||
Cargo, the Rust package manager, is not installed or is not on PATH.
|
||||
This package requires Rust and Cargo to compile extensions. Install it through
|
||||
the system's package manager or via https://rustup.rs/
|
||||
|
||||
Checking for Rust toolchain....
|
||||
Rust not found, installing into a temporary directory
|
||||
[end of output]
|
||||
|
||||
note: This error originates from a subprocess, and is likely not a problem with pip.
|
||||
|
||||
[notice] A new release of pip is available: 25.3 -> 26.0.1
|
||||
[notice] To update, run: G:\CodeAssistent\task\api\task-api\.venv\Scripts\python.exe -m pip install --upgrade pip
|
||||
error: metadata-generation-failed
|
||||
|
||||
Encountered error while generating package metadata.
|
||||
|
||||
pydantic-core
|
||||
|
||||
note: This is an issue with the package mentioned above, not pip.
|
||||
hint: See above for details.
|
||||
5
requirements.txt
Normal file
5
requirements.txt
Normal file
@@ -0,0 +1,5 @@
|
||||
fastapi
|
||||
uvicorn
|
||||
sqlalchemy
|
||||
pydantic
|
||||
pydantic-settings
|
||||
BIN
routers/__pycache__/reminders.cpython-313.pyc
Normal file
BIN
routers/__pycache__/reminders.cpython-313.pyc
Normal file
Binary file not shown.
BIN
routers/__pycache__/tasks.cpython-313.pyc
Normal file
BIN
routers/__pycache__/tasks.cpython-313.pyc
Normal file
Binary file not shown.
53
routers/reminders.py
Normal file
53
routers/reminders.py
Normal file
@@ -0,0 +1,53 @@
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query
|
||||
from sqlalchemy.orm import Session
|
||||
from typing import List
|
||||
|
||||
import crud
|
||||
import models
|
||||
import schemas
|
||||
from database import get_db
|
||||
|
||||
router = APIRouter(
|
||||
prefix="/reminders",
|
||||
tags=["reminders"]
|
||||
)
|
||||
|
||||
@router.post("/", response_model=schemas.Reminder)
|
||||
def create_reminder(reminder: schemas.ReminderCreate, db: Session = Depends(get_db)):
|
||||
# Verify task exists
|
||||
db_task = crud.get_task(db, task_id=reminder.task_id)
|
||||
if not db_task:
|
||||
raise HTTPException(status_code=404, detail="Task not found")
|
||||
|
||||
return crud.create_reminder(db=db, reminder=reminder)
|
||||
|
||||
@router.get("/", response_model=List[schemas.Reminder])
|
||||
def read_reminders(
|
||||
skip: int = 0,
|
||||
limit: int = 100,
|
||||
task_id: int = Query(None, description="Filter by Task ID"),
|
||||
db: Session = Depends(get_db)
|
||||
):
|
||||
reminders = crud.get_reminders(db, skip=skip, limit=limit, task_id=task_id)
|
||||
return reminders
|
||||
|
||||
@router.get("/{reminder_id}", response_model=schemas.Reminder)
|
||||
def read_reminder(reminder_id: int, db: Session = Depends(get_db)):
|
||||
db_reminder = crud.get_reminder(db, reminder_id=reminder_id)
|
||||
if db_reminder is None:
|
||||
raise HTTPException(status_code=404, detail="Reminder not found")
|
||||
return db_reminder
|
||||
|
||||
@router.put("/{reminder_id}", response_model=schemas.Reminder)
|
||||
def update_reminder_endpoint(reminder_id: int, reminder: schemas.ReminderUpdate, db: Session = Depends(get_db)):
|
||||
db_reminder = crud.update_reminder(db, reminder_id=reminder_id, reminder_update=reminder)
|
||||
if db_reminder is None:
|
||||
raise HTTPException(status_code=404, detail="Reminder not found")
|
||||
return db_reminder
|
||||
|
||||
@router.delete("/{reminder_id}")
|
||||
def delete_reminder_endpoint(reminder_id: int, db: Session = Depends(get_db)):
|
||||
success = crud.delete_reminder(db, reminder_id=reminder_id)
|
||||
if not success:
|
||||
raise HTTPException(status_code=404, detail="Reminder not found")
|
||||
return {"message": "Reminder successfully deleted"}
|
||||
49
routers/tasks.py
Normal file
49
routers/tasks.py
Normal file
@@ -0,0 +1,49 @@
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query
|
||||
from sqlalchemy.orm import Session
|
||||
from typing import List
|
||||
|
||||
import crud
|
||||
import models
|
||||
import schemas
|
||||
from database import get_db
|
||||
|
||||
router = APIRouter(
|
||||
prefix="/tasks",
|
||||
tags=["tasks"]
|
||||
)
|
||||
|
||||
@router.post("/", response_model=schemas.Task)
|
||||
def create_task(task: schemas.TaskCreate, db: Session = Depends(get_db)):
|
||||
return crud.create_task(db=db, task=task)
|
||||
|
||||
@router.get("/", response_model=List[schemas.Task])
|
||||
def read_tasks(
|
||||
skip: int = 0,
|
||||
limit: int = 100,
|
||||
telegram_user_id: str = Query(None, description="Filter by Telegram User ID"),
|
||||
status: models.TaskStatus = Query(None, description="Filter by task status"),
|
||||
db: Session = Depends(get_db)
|
||||
):
|
||||
tasks = crud.get_tasks(db, skip=skip, limit=limit, telegram_user_id=telegram_user_id, status=status)
|
||||
return tasks
|
||||
|
||||
@router.get("/{task_id}", response_model=schemas.Task)
|
||||
def read_task(task_id: int, telegram_user_id: str = Query(None, description="Telegram User ID for authorization"), db: Session = Depends(get_db)):
|
||||
db_task = crud.get_task(db, task_id=task_id, telegram_user_id=telegram_user_id)
|
||||
if db_task is None:
|
||||
raise HTTPException(status_code=404, detail="Task not found")
|
||||
return db_task
|
||||
|
||||
@router.put("/{task_id}", response_model=schemas.Task)
|
||||
def update_task_endpoint(task_id: int, task: schemas.TaskUpdate, telegram_user_id: str = Query(None, description="Telegram User ID for authorization"), db: Session = Depends(get_db)):
|
||||
db_task = crud.update_task(db, task_id=task_id, task_update=task, telegram_user_id=telegram_user_id)
|
||||
if db_task is None:
|
||||
raise HTTPException(status_code=404, detail="Task not found or you don't have access")
|
||||
return db_task
|
||||
|
||||
@router.delete("/{task_id}")
|
||||
def delete_task_endpoint(task_id: int, telegram_user_id: str = Query(None, description="Telegram User ID for authorization"), db: Session = Depends(get_db)):
|
||||
success = crud.delete_task(db, task_id=task_id, telegram_user_id=telegram_user_id)
|
||||
if not success:
|
||||
raise HTTPException(status_code=404, detail="Task not found or you don't have access")
|
||||
return {"message": "Task successfully deleted"}
|
||||
60
schemas.py
Normal file
60
schemas.py
Normal file
@@ -0,0 +1,60 @@
|
||||
from pydantic import BaseModel, Field
|
||||
from datetime import datetime
|
||||
from typing import Optional, List
|
||||
from models import TaskStatus, RecurrenceType
|
||||
|
||||
# Reminder Schemas
|
||||
class ReminderBase(BaseModel):
|
||||
trigger_time: datetime
|
||||
recurrence_type: RecurrenceType = RecurrenceType.none
|
||||
recurrence_interval: Optional[int] = None
|
||||
remind_count_limit: int = 0
|
||||
notify_push: bool = False
|
||||
notify_telegram: bool = False
|
||||
|
||||
class ReminderCreate(ReminderBase):
|
||||
task_id: int
|
||||
|
||||
class ReminderUpdate(BaseModel):
|
||||
trigger_time: Optional[datetime] = None
|
||||
recurrence_type: Optional[RecurrenceType] = None
|
||||
recurrence_interval: Optional[int] = None
|
||||
remind_count_limit: Optional[int] = None
|
||||
remind_count_current: Optional[int] = None
|
||||
notify_push: Optional[bool] = None
|
||||
notify_telegram: Optional[bool] = None
|
||||
|
||||
class Reminder(ReminderBase):
|
||||
id: int
|
||||
task_id: int
|
||||
remind_count_current: int
|
||||
created_at: datetime
|
||||
updated_at: datetime
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
|
||||
|
||||
# Task Schemas
|
||||
class TaskBase(BaseModel):
|
||||
title: str
|
||||
description: Optional[str] = None
|
||||
telegram_user_id: str
|
||||
|
||||
class TaskCreate(TaskBase):
|
||||
pass
|
||||
|
||||
class TaskUpdate(BaseModel):
|
||||
title: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
status: Optional[TaskStatus] = None
|
||||
|
||||
class Task(TaskBase):
|
||||
id: int
|
||||
status: TaskStatus
|
||||
created_at: datetime
|
||||
updated_at: datetime
|
||||
reminders: List[Reminder] = []
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
Reference in New Issue
Block a user