Files
cc/app/sbin/api
2025-10-09 13:02:27 -03:00

388 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
from fastapi import FastAPI, HTTPException, Depends, Response
from fastapi.security.api_key import APIKeyHeader
from fastapi.responses import RedirectResponse
from pydantic import BaseModel
import psutil
import sqlite3
import subprocess
import os
import uvicorn
from typing import Dict, Any, Optional
from datetime import datetime
import json
import re
from collections import deque
# ---------------------- Constants ----------------------
DB_PATH = "/4server/data/contracts.db"
BIN_PATH = "/4server/sbin"
API_KEY = os.getenv("API_KEY", "your-secret-api-key")
VERSION = "API: 0.0.7"
# ---------------------- FastAPI App ----------------------
app = FastAPI()
api_key_header = APIKeyHeader(name="X-API-Key")
def verify_api_key(key: str = Depends(api_key_header)):
if key != API_KEY:
raise HTTPException(status_code=403, detail="Unauthorized")
# ---------------------- Helpers ----------------------
def run_command(cmd: list[str]) -> str:
"""Run a shell command and return stdout or raise HTTPException on error."""
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
error_msg = (
f"Command failed\n"
f"Command: {' '.join(cmd)}\n"
f"Return code: {result.returncode}\n"
f"Stdout: {result.stdout.strip()}\n"
f"Stderr: {result.stderr.strip() or 'None'}"
)
raise HTTPException(status_code=500, detail=error_msg)
return result.stdout.strip()
def init_db():
"""Initialize the database with containers table."""
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS containers (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
UUID CHAR(50) UNIQUE,
email CHAR(100),
expires DATE,
tags TEXT,
env TEXT,
affiliate CHAR(30),
image CHAR(50),
history TEXT,
comment TEXT,
domains TEXT,
status CHAR(20),
created DATE,
bump DATE,
secret TEXT
)
''')
conn.commit()
conn.close()
def execute_db(query: str, params: tuple = (), fetch: bool = False):
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(query, params)
conn.commit()
data = cursor.fetchall() if fetch else None
conn.close()
if data and fetch:
return [dict(row) for row in data]
return None
# ---------------------- Models ----------------------
class ContainerModel(BaseModel):
UUID: str
email: Optional[str] = None
expires: Optional[str] = None
tags: Optional[str] = None
env: Optional[Dict[str, Any]] = None
affiliate: Optional[str] = None
image: Optional[str] = None
history: Optional[str] = None
comment: Optional[str] = None
domains: Optional[str] = None
status: Optional[str] = None
created: Optional[str] = None
bump: Optional[str] = None
secret: Optional[Dict[str, Any]] = None
class UUIDRequest(BaseModel):
UUID: str
# ---------------------- Routes ----------------------
@app.get("/", include_in_schema=False)
def redirect_to_odoo():
return RedirectResponse(url="https://ODOO4PROJECTS.com")
from fastapi import FastAPI, Depends
from fastapi.responses import RedirectResponse
from pydantic import BaseModel
from typing import Optional, Dict, Any
import json
app = FastAPI()
# ---------------------- Models ----------------------
class ContainerModel(BaseModel):
UUID: str
email: Optional[str] = None
expires: Optional[str] = None
tags: Optional[str] = None
env: Optional[Dict[str, Any]] = None
affiliate: Optional[str] = None
image: Optional[str] = None
history: Optional[str] = None
comment: Optional[str] = None
domains: Optional[str] = None
status: Optional[str] = None
created: Optional[str] = None
bump: Optional[str] = None
secret: Optional[Dict[str, Any]] = None
# ---------------------- Routes ----------------------
@app.get("/", include_in_schema=False)
def redirect_to_odoo():
return RedirectResponse(url="https://ODOO4PROJECTS.com")
@app.post("/container/update", dependencies=[Depends(verify_api_key)])
def update_container(request: ContainerModel):
# Convert dict fields to JSON strings
env_str = json.dumps(request.env) if isinstance(request.env, dict) else None
secret_str = json.dumps(request.secret) if isinstance(request.secret, dict) else None
# Fetch existing record
existing = execute_db("SELECT * FROM containers WHERE UUID = ?", (request.UUID,), fetch=True)
if not existing:
# If record does not exist, insert a new one with all given fields
execute_db("""
INSERT INTO containers (UUID, email, expires, tags, env, affiliate, image, history,
comment, domains, status, created, bump, secret)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
request.UUID, request.email, request.expires, request.tags, env_str,
request.affiliate, request.image, request.history, request.comment,
request.domains, request.status, request.created, request.bump, secret_str
))
return {"UUID": request.UUID, "status": "created"}
# Existing record found, do partial update
existing = existing[0] # Assuming fetch returns list of dicts
updates = {}
params = []
# Only add fields that are not None in the request
for field in ContainerModel.__fields__:
if field == "UUID":
continue
value = getattr(request, field)
if value is not None:
if field in ["env", "secret"]:
value = json.dumps(value)
updates[field] = value
params.append(value)
if updates:
# Build SQL dynamically
set_clause = ", ".join(f"{k}=?" for k in updates.keys())
params.append(request.UUID) # UUID for WHERE clause
execute_db(f"UPDATE containers SET {set_clause} WHERE UUID=?", tuple(params))
return {"UUID": request.UUID, "status": "updated", "fields_updated": list(updates.keys())}
return {"UUID": request.UUID, "status": "no_change"}
@app.post("/container/start", dependencies=[Depends(verify_api_key)])
def start_container(request: UUIDRequest):
return {"message": run_command([f"{BIN_PATH}/startContainer", request.UUID])}
@app.post("/container/stop", dependencies=[Depends(verify_api_key)])
def stop_container(request: UUIDRequest):
return {"message": run_command([f"{BIN_PATH}/stopContainer", request.UUID])}
@app.post("/container/nuke", dependencies=[Depends(verify_api_key)])
def nuke_container(request: UUIDRequest):
status = execute_db("SELECT status FROM containers WHERE UUID=?", (request.UUID,), fetch=True)
if not status or status[0]["status"] != "nuke":
raise HTTPException(400, "Container status is not 'nuke'")
return {"message": run_command([f"{BIN_PATH}/nukeContainer", request.UUID])}
@app.post("/container/info", dependencies=[Depends(verify_api_key)])
def info_container(request: Optional[UUIDRequest] = None):
# Fields to select
fields = [
"ID", "UUID", "email", "expires", "tags", "env", "affiliate",
"image", "history", "comment", "domains", "status", "created"
]
field_str = ", ".join(fields)
# Execute query
if request:
rows = execute_db(
f"SELECT {field_str} FROM containers WHERE UUID=?",
(str(request.UUID),),
fetch=True
)
else:
rows = execute_db(
f"SELECT {field_str} FROM containers",
fetch=True
)
# Map rows to dicts safely
containers = []
for row in rows:
if isinstance(row, dict):
# Already a dict (e.g., some DB wrappers)
containers.append(row)
else:
# Tuple/list -> map with fields
containers.append(dict(zip(fields, row)))
# Wrap in n8n JSON format
n8n_items = [{"json": container} for container in containers]
return n8n_items
@app.post("/container/bump", dependencies=[Depends(verify_api_key)])
def bump_container(request: UUIDRequest):
today = datetime.utcnow().strftime("%Y-%m-%d")
execute_db("UPDATE containers SET bump=? WHERE UUID=?", (today, request.UUID))
msg = run_command([f"{BIN_PATH}/bumpContainer", request.UUID])
return {"message": msg, "bump_date": today}
@app.post("/container/quota", dependencies=[Depends(verify_api_key)])
def container_quota(request: UUIDRequest):
output = run_command([
"docker", "stats", request.UUID, "--no-stream",
"--format", "{{.MemUsage}},{{.BlockIO}}"
])
mem_usage, disk_usage = output.split(",")
return {"memory_usage": mem_usage, "disk_io": disk_usage}
# ---------------------- SYSTEM ----------------------
@app.get("/system/containers", dependencies=[Depends(verify_api_key)])
def get_containers():
return Response(content=run_command([f"{BIN_PATH}/getContainers"]), media_type="application/json")
@app.get("/system/images", dependencies=[Depends(verify_api_key)])
def list_images():
images = run_command(["docker", "images", "--format", "{{.Repository}}:{{.Tag}}"])
return {"images": images.split("\n")}
@app.get("/system/info", dependencies=[Depends(verify_api_key)])
def get_system_info():
try:
alpine_version = None
last_update = None
bump_dates = execute_db("SELECT MAX(bump) AS latest_bump FROM containers", fetch=True)[0]["latest_bump"]
if os.path.exists("/4server/data/update"):
with open("/4server/data/update") as f:
last_update = f.read().strip()
if os.path.exists("/etc/alpine-release"):
with open("/etc/alpine-release") as f:
alpine_version = f.read().strip()
mem = psutil.virtual_memory()
disk = psutil.disk_usage("/")
cpu_count = psutil.cpu_count(logical=True)
return {
"alpine_version": alpine_version,
"last_update": last_update,
"latest_bump": bump_dates,
"version": VERSION,
"resources": {
"memory": {"total": mem.total, "available": mem.available, "used": mem.used},
"disk": {"total": disk.total, "used": disk.used, "free": disk.free},
"cpu_count": cpu_count
}
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/system/pull", dependencies=[Depends(verify_api_key)])
def pull_all_images():
return {"message": run_command([f"{BIN_PATH}/pullAllContainers"])}
@app.post("/client/revert/{uuid}", dependencies=[Depends(verify_api_key)])
def revert_changes(uuid: str):
return {"message": run_command([f"{BIN_PATH}/gitRevert", uuid ])}
@app.get("/client/logs/{uuid}", dependencies=[Depends(verify_api_key)])
async def get_odoo_log_summary(uuid: str):
if not re.fullmatch(r"[0-9a-fA-F\-]+", uuid):
raise HTTPException(status_code=400, detail="Invalid UUID format. Only numbers, letters a-f, and '-' are allowed.")
BASE_LOG_DIR = "/4server/data"
# Build file paths as strings
project_dir = os.path.join(BASE_LOG_DIR, uuid)
odoo_log_file = os.path.join(project_dir, "logs", "odoo.log")
git_log_file = os.path.join(project_dir, "logs", "git.log")
if not os.path.isfile(odoo_log_file):
raise HTTPException(status_code=404, detail="Odoo log file not found")
# --- Helper variables and functions ---
IMPORTANT_PATTERNS = [
re.compile(r"odoo\.addons\."), # Any Odoo addon log
re.compile(r"Job '.*' starting"), # Cron job start
re.compile(r"Job '.*' fully done"), # Cron job end
re.compile(r"ERROR"), # Errors
re.compile(r"WARNING"), # Warnings
re.compile(r"Traceback"), # Tracebacks
]
def is_important_line(line: str) -> bool:
return any(p.search(line) for p in IMPORTANT_PATTERNS)
def read_last_lines(file_path: str, max_lines: int) -> list[str]:
"""
Read the last `max_lines` from the file efficiently.
"""
if not os.path.isfile(file_path):
return []
last_lines = deque(maxlen=max_lines)
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
last_lines.append(line.strip())
return list(last_lines)
# --- Main logic ---
try:
# Last 500 lines from Odoo log
last_500_lines = read_last_lines(odoo_log_file, 500)
important_odoo_lines = [line for line in last_500_lines if is_important_line(line)]
# Last 50 lines from git.log
last_50_git_lines = read_last_lines(git_log_file, 50)
return {
"uuid": str(uuid),
"important_odoo_log_lines": important_odoo_lines,
"last_git_log_lines": last_50_git_lines,
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error reading log files: {e}")
# ---------------------- Entry Point ----------------------
if __name__ == "__main__":
print(VERSION)
init_db()
uvicorn.run(app, host="10.5.0.1", port=8888)