Source code for services.webserver

import asyncio
import atexit
import logging
import os
import secrets
from dataclasses import dataclass
import hashlib
import hmac
import re
from typing import Any, Dict, Optional, Tuple

# Configure structured logging and Sentry as early as possible,
# and install sensitive data redaction on log handlers before Sentry hooks logging.
try:
    from observability import setup_structlog_logging, init_sentry, get_log_level_from_env  # type: ignore
    setup_structlog_logging(get_log_level_from_env("INFO"))
    try:
        from utils import install_sensitive_filter  # type: ignore
        install_sensitive_filter()
    except Exception as e:
        # תיעוד חריגה במקום pass בלבד – זרימה אוטומטית מטופלת כאנומליה
        try:
            from observability import emit_event  # type: ignore
            emit_event(
                "install_sensitive_filter_failed",
                severity="anomaly",
                operation="startup",
                handled=True,
                error=str(e),
            )
        except Exception:
            logging.getLogger(__name__).warning(
                "install_sensitive_filter_failed", extra={"operation": "startup", "handled": True}
            )
    init_sentry()
except Exception as e:
    # Fail-open: אל תחסום עלייה – אך רשום אזהרה במקום pass
    logging.getLogger(__name__).warning(
        "observability_init_failed", extra={"operation": "startup", "handled": True, "error": str(e)}
    )

from aiohttp import web
import json
import time
from services.db_health_service import (
    get_db_health_service,
    InvalidCollectionNameError,
    CollectionAccessDeniedError,
    MAX_SKIP,
    clean_db_health_filter_value,
)
from services.sentry_utils import first_int
try:
    # Correlation for web requests
    from observability import generate_request_id, bind_request_id  # type: ignore
except Exception:  # pragma: no cover
[docs] def generate_request_id(): # type: ignore return ""
[docs] def bind_request_id(_rid: str) -> None: # type: ignore return None
try: from metrics import ( metrics_endpoint_bytes, metrics_content_type, record_request_outcome, record_request_queue_delay, note_request_started, note_request_finished, note_deployment_started, note_deployment_shutdown, ) except Exception: # pragma: no cover metrics_endpoint_bytes = lambda: b"" # type: ignore metrics_content_type = lambda: "text/plain; charset=utf-8" # type: ignore def record_request_outcome(status_code: int, duration_seconds: float, **_kwargs) -> None: # type: ignore return None def record_request_queue_delay(method: str, endpoint: str | None, delay_seconds: float, **_kwargs) -> None: # type: ignore return None def note_request_started() -> None: # type: ignore return None def note_request_finished() -> None: # type: ignore return None def note_deployment_started(_summary: str = "Service starting up") -> None: # type: ignore return None def note_deployment_shutdown(_summary: str = "Service shutting down") -> None: # type: ignore return None from html import escape as html_escape # הערה: לא נייבא את code_sharing כ-reference קבוע כדי לאפשר monkeypatch דינמי בטסטים. # במקום זאת נפתור את ה-service בזמן ריצה בתוך ה-handler. # Optional structured logging/event emission and error counter (fail-open) try: # type: ignore from observability import emit_event # type: ignore except Exception: # pragma: no cover
[docs] def emit_event(event: str, severity: str = "info", **fields): # type: ignore return None
try: from metrics import errors_total # type: ignore except Exception: # pragma: no cover errors_total = None # type: ignore try: _AI_REQUEST_TIMEOUT = max(5.0, min(20.0, float(os.getenv("OBS_AI_EXPLAIN_TIMEOUT", "10")))) except ValueError: _AI_REQUEST_TIMEOUT = 10.0 _AI_ROUTE_TOKEN = os.getenv("OBS_AI_EXPLAIN_TOKEN") or os.getenv("AI_EXPLAIN_TOKEN") or "" logger = logging.getLogger(__name__) # --- DB Health auth (Token-based) --- DB_HEALTH_TOKEN = os.getenv("DB_HEALTH_TOKEN", "") def _constant_time_compare(a: str, b: str) -> bool: """השוואה בזמן קבוע למניעת timing attacks. משתמש ב-hmac.compare_digest שמבצע השוואה בזמן קבוע ללא קיצור-דרך על אי-התאמה ראשונה. """ try: return hmac.compare_digest( a.encode("utf-8") if isinstance(a, str) else a, b.encode("utf-8") if isinstance(b, str) else b, ) except (TypeError, AttributeError): return False @web.middleware async def db_health_auth_middleware(request: web.Request, handler): """Middleware להגנה על endpoints רגישים (DB + Jobs Monitor).""" if ( request.path.startswith("/api/db/") or request.path.startswith("/api/jobs") or request.path.startswith("/api/debug/") ): if not DB_HEALTH_TOKEN: # אם לא מוגדר token, חסום לגמרי return web.json_response({"error": "disabled"}, status=403) auth = request.headers.get("Authorization", "") or "" # עדיפות ל-header; fallback ל-query param (?token=...) רק ל-endpoint תחזוקה, # כדי לא להחליש את מצב האבטחה של /api/db/* ו-/api/jobs/* provided_token = "" if auth.startswith("Bearer "): provided_token = auth[7:] # הסר את "Bearer " else: allow_query_token = request.path == "/api/debug/maintenance_cleanup" if allow_query_token: try: provided_token = str(request.query.get("token") or "") except Exception: provided_token = "" if not provided_token: return web.json_response({"error": "unauthorized"}, status=401) # השוואה בזמן קבוע למניעת timing attacks! # secrets.compare_digest או hmac.compare_digest if not _constant_time_compare(provided_token, DB_HEALTH_TOKEN): return web.json_response({"error": "unauthorized"}, status=401) return await handler(request) # AI explain service is optional in minimal envs. # IMPORTANT: Tests monkeypatch `services.webserver.ai_explain_service`, so keep the attribute always present. try: # type: ignore from services import ai_explain_service as ai_explain_service # type: ignore except Exception: # pragma: no cover class _AiExplainServiceStub: class AiExplainError(RuntimeError): pass async def generate_ai_explanation(self, *_a, **_k): # type: ignore[no-untyped-def] raise self.AiExplainError("service_unavailable") ai_explain_service = _AiExplainServiceStub() # type: ignore # --- Queue delay (request queueing) instrumentation --- _QUEUE_DELAY_HEADERS = ("X-Queue-Start", "X-Request-Start") _QUEUE_DELAY_EVENT_NAME = "access_logs" _QUEUE_DELAY_EPOCH_RE = re.compile(r"(-?\d+(?:\.\d+)?)") def _queue_delay_warn_threshold_ms() -> int: try: return max(0, int(float(os.getenv("QUEUE_DELAY_WARN_MS", "500") or 500))) except Exception: return 500 def _parse_request_start_to_epoch_seconds(raw: str | None) -> float | None: """Parse request start header into epoch seconds (best-effort). Supported shapes: - "t=1700000000.123" (seconds, float) - "1700000000" (seconds, int) - "1700000000123" (milliseconds) - "1700000000123456" (microseconds) - "1700000000123456789" (nanoseconds) """ try: text = str(raw or "").strip() except Exception: return None if not text: return None # Common prefix: "t=..." if text.lower().startswith("t="): text = text.split("=", 1)[1].strip() m = _QUEUE_DELAY_EPOCH_RE.search(text) if not m: return None token = m.group(1) if not token: return None # Float token => treat as seconds (e.g., "1700000000.123") if "." in token: try: value = float(token) except Exception: return None return value if value > 0 else None # Integer token => infer unit from digit length try: value_int = int(token) except Exception: return None if value_int <= 0: return None digits = len(token.lstrip("+-")) # epoch seconds ~ 10 digits, ms ~ 13, us ~ 16, ns ~ 19 if digits <= 10: return float(value_int) if digits <= 13: return float(value_int) / 1_000.0 if digits <= 16: return float(value_int) / 1_000_000.0 return float(value_int) / 1_000_000_000.0 def _compute_queue_delay_ms(headers: Any, *, now_epoch_seconds: float) -> Tuple[int, str | None]: """Return (queue_delay_ms, source_header) with fail-open behavior.""" for header_name in _QUEUE_DELAY_HEADERS: try: raw = headers.get(header_name) except Exception: raw = None if not raw: continue ts = _parse_request_start_to_epoch_seconds(raw) if ts is None: continue try: delay_ms = int(round(max(0.0, float(now_epoch_seconds - float(ts)) * 1000.0))) except Exception: delay_ms = 0 return delay_ms, header_name return 0, None def _bind_queue_delay_context(queue_delay_ms: int, source_header: str | None) -> None: """Bind queue delay to structlog contextvars (best-effort).""" try: import structlog # type: ignore payload: Dict[str, Any] = {"queue_delay": int(queue_delay_ms)} if source_header: payload["queue_delay_source"] = str(source_header) structlog.contextvars.bind_contextvars(**payload) except Exception: return # --- Sentry webhook: in-memory de-dup to avoid bursts --- _SENTRY_DEDUP: dict[str, float] = {} def _sentry_dedup_window_seconds() -> int: try: return max(0, int(float(os.getenv("SENTRY_WEBHOOK_DEDUP_WINDOW_SECONDS", "300") or 300))) except Exception: return 300 def _sentry_secret() -> str: # Prefer explicit Sentry webhook secret; fallback to generic webhook secret if set. return str(os.getenv("SENTRY_WEBHOOK_SECRET") or os.getenv("WEBHOOK_SECRET") or "").strip() def _sha256_hmac_hex(secret: str, msg: bytes) -> str: try: return hmac.new(secret.encode("utf-8"), msg, hashlib.sha256).hexdigest() except Exception: return "" def _constant_time_equals(a: str, b: str) -> bool: try: return hmac.compare_digest(str(a or ""), str(b or "")) except Exception: return False def _verify_sentry_webhook(request: web.Request, body: bytes) -> bool: """Best-effort verification for Sentry webhook calls. Supported modes: - HMAC signature headers (preferred when provided by Sentry) - Bearer token / query param token fallback (for setups where headers are not configurable) """ secret = _sentry_secret() if not secret: # Explicit opt-in: if no secret configured, allow (fail-open). return True # 1) Token fallback: Authorization: Bearer <secret> or ?token=<secret> try: auth = str(request.headers.get("Authorization") or "").strip() if auth.lower().startswith("bearer "): token = auth.split(" ", 1)[1].strip() if token and _constant_time_equals(token, secret): return True except Exception: pass try: token = str(request.query.get("token") or "").strip() if token and _constant_time_equals(token, secret): return True except Exception: pass # 2) HMAC signature headers (Sentry varies between integrations) # Common headers: # - X-Sentry-Hook-Signature / X-Sentry-Signature # - X-Sentry-Hook-Timestamp / X-Sentry-Timestamp try: sig = ( request.headers.get("X-Sentry-Hook-Signature") or request.headers.get("X-Sentry-Signature") or request.headers.get("Sentry-Hook-Signature") or request.headers.get("Sentry-Signature") or "" ) sig = str(sig or "").strip() except Exception: sig = "" try: ts = ( request.headers.get("X-Sentry-Hook-Timestamp") or request.headers.get("X-Sentry-Timestamp") or request.headers.get("Sentry-Hook-Timestamp") or request.headers.get("Sentry-Timestamp") or "" ) ts = str(ts or "").strip() except Exception: ts = "" if not sig: return False # Accept both common signing shapes: # - HMAC(secret, body) # - HMAC(secret, f"{timestamp}.{body}") try: candidate_a = _sha256_hmac_hex(secret, body) if candidate_a and _constant_time_equals(candidate_a, sig): return True except Exception: pass if ts: try: candidate_b = _sha256_hmac_hex(secret, ts.encode("utf-8") + b"." + body) if candidate_b and _constant_time_equals(candidate_b, sig): return True except Exception: pass return False @dataclass class _SentryAlert: name: str summary: str severity: str dedup_key: str details: Dict[str, Any] def _map_sentry_level_to_severity(level: str | None) -> str: v = str(level or "").strip().lower() if v in {"fatal", "critical"}: return "critical" if v in {"error", "err"}: return "error" if v in {"warning", "warn"}: return "warning" if v in {"info"}: return "info" # Unknown -> keep it visible but not noisy return "warning" def _truncate(text: str, limit: int) -> str: try: s = str(text or "").strip() except Exception: s = "" if limit and len(s) > limit: return s[: max(0, limit - 1)] + "…" return s def _extract_sentry_alert(payload: Any) -> _SentryAlert | None: if not isinstance(payload, dict) or not payload: return None action = str(payload.get("action") or payload.get("trigger") or payload.get("status") or "").strip().lower() data = payload.get("data") if isinstance(payload.get("data"), dict) else {} issue = ( data.get("issue") if isinstance(data.get("issue"), dict) else (payload.get("issue") if isinstance(payload.get("issue"), dict) else {}) ) event = ( data.get("event") if isinstance(data.get("event"), dict) else (payload.get("event") if isinstance(payload.get("event"), dict) else {}) ) project = ( data.get("project") if isinstance(data.get("project"), dict) else (payload.get("project") if isinstance(payload.get("project"), dict) else {}) ) title = ( payload.get("title") or issue.get("title") or event.get("title") or event.get("message") or payload.get("message") or "" ) title_s = _truncate(str(title), 220) issue_id = str(issue.get("id") or payload.get("issue_id") or "").strip() short_id = str(issue.get("shortId") or issue.get("short_id") or payload.get("shortId") or payload.get("short_id") or "").strip() permalink = str( issue.get("permalink") or payload.get("permalink") or event.get("permalink") or payload.get("url") or "" ).strip() project_slug = str(project.get("slug") or payload.get("project_slug") or payload.get("project") or "").strip() level = str(event.get("level") or payload.get("level") or payload.get("level_name") or "").strip().lower() or None severity = _map_sentry_level_to_severity(level) # Drop resolved notifications by default (still record as info if they come through). if action in {"resolved", "resolved_issue", "issue_resolved"}: severity = "info" # Heuristic: some Sentry issues are background/maintenance noise (e.g. pymongo pool housekeeping). # Downgrade them to warning so they don't look like user-facing errors in Telegram/Observability. try: lowered_title = str(title_s or "").lower() # NOTE: do not override "resolved" notifications; they should remain informational, # otherwise we emit new warning-level alerts and create new dedup keys. if ( severity != "info" and ("_operationcancelled" in lowered_title or "operation cancelled" in lowered_title) ): severity = "warning" except Exception: pass # Stable identifiers for dedup primary_id = issue_id or short_id or str(event.get("id") or event.get("event_id") or "").strip() dedup_key = "|".join([x for x in [primary_id, project_slug, severity, action] if x]) if not dedup_key: # Worst-case fallback: title bucket dedup_key = f"title:{title_s[:80]}" display_id = short_id or (issue_id[:8] if issue_id else "issue") name = _truncate(f"Sentry: {display_id}", 128) occurrence_count = first_int( issue.get("count"), issue.get("eventCount"), issue.get("occurrence_count"), payload.get("occurrence_count"), payload.get("count"), event.get("count"), ) # Important: order matters for UI display. Put alert_type and Sentry IDs first. details: Dict[str, Any] = { "alert_type": "sentry_issue", "sentry_issue_id": issue_id or None, "sentry_short_id": short_id or None, "sentry_permalink": permalink or None, "sentry_event_id": str(event.get("id") or event.get("event_id") or "").strip() or None, "project": project_slug or None, "level": level or None, "action": action or None, "logger": str(event.get("logger") or payload.get("logger") or "").strip() or None, "culprit": str(issue.get("culprit") or event.get("culprit") or "").strip() or None, "environment": str(event.get("environment") or payload.get("environment") or "").strip() or None, "occurrence_count": occurrence_count, } # Remove None values to keep storage clean details = {k: v for k, v in details.items() if v not in (None, "")} summary = title_s or "Sentry alert" return _SentryAlert(name=name, summary=summary, severity=severity, dedup_key=dedup_key, details=details) def _should_emit_sentry_alert(dedup_key: str) -> bool: window = _sentry_dedup_window_seconds() if window <= 0: return True now = time.time() # Lazy cleanup of old entries (best-effort) try: cutoff = now - float(window) for k, ts in list(_SENTRY_DEDUP.items()): if ts < cutoff: _SENTRY_DEDUP.pop(k, None) except Exception: pass last = _SENTRY_DEDUP.get(dedup_key) if last is not None and (now - float(last)) < float(window): return False _SENTRY_DEDUP[dedup_key] = now return True
[docs] def create_app() -> web.Application: # הוסף middleware שמייצר ומקשר request_id לכל בקשה נכנסת @web.middleware async def _request_id_mw(request: web.Request, handler): req_id = generate_request_id() or "" start = time.perf_counter() wall_now = time.time() handler_name = getattr(handler, "__name__", None) or handler.__class__.__name__ queue_delay_ms, queue_delay_source = _compute_queue_delay_ms( request.headers, now_epoch_seconds=float(wall_now) ) _bind_queue_delay_context(queue_delay_ms, queue_delay_source) try: note_request_started() except Exception: pass try: bind_request_id(req_id) except Exception as e: try: emit_event( "bind_request_id_failed", severity="anomaly", operation="request_id_middleware", handled=True, request_id=req_id, error=str(e), ) except Exception: logging.getLogger(__name__).warning( "bind_request_id_failed", extra={"operation": "request_id_middleware", "handled": True} ) # המשך עיבוד try: response = await handler(request) finally: try: note_request_finished() except Exception: pass try: if hasattr(response, "headers") and req_id: response.headers["X-Request-ID"] = req_id except Exception as e: try: emit_event( "set_request_id_header_failed", severity="anomaly", operation="request_id_middleware", handled=True, request_id=req_id, error=str(e), ) except Exception: logging.getLogger(__name__).warning( "set_request_id_header_failed", extra={"operation": "request_id_middleware", "handled": True} ) # Update unified request metrics (best-effort) try: duration = max(0.0, float(time.perf_counter() - start)) status = int(getattr(response, "status", 0) or 0) route_name = None try: route = getattr(request.match_info, "route", None) route_name = getattr(route, "name", None) except Exception: route_name = None path_label = getattr(request, "path", "") method_label = getattr(request, "method", "GET") handler_label = route_name or handler_name or path_label record_request_outcome( status, duration, source="aiohttp", handler=handler_label, path=path_label, method=method_label, cache_hit=None, ) try: record_request_queue_delay( method_label, route_name or handler_name or path_label, float(queue_delay_ms) / 1000.0, ) except Exception: pass # Structured access log (best-effort) try: access_fields: Dict[str, Any] = { "request_id": req_id, "method": method_label, "path": path_label, "handler": handler_label, "status_code": status, "duration_ms": int(duration * 1000), "queue_delay": int(queue_delay_ms), } if queue_delay_source: access_fields["queue_delay_source"] = str(queue_delay_source) # Silence noisy monitoring endpoints when request is "ok". # - For health/metrics: skip only successes (<400) but keep 4xx/5xx. # - For favicon: also skip 404/4xx noise, keep 5xx. # - For root availability probes: skip HEAD / when "ok" (<400), but keep 4xx/5xx. silent_paths = {"/metrics", "/health", "/healthz", "/favicon.ico"} is_silent_path = path_label in silent_paths is_root_check = (path_label == "/" and str(method_label).upper() == "HEAD") should_silence = False if is_silent_path or is_root_check: ok_threshold = 500 if path_label == "/favicon.ico" else 400 should_silence = int(status) < int(ok_threshold) if not should_silence: emit_event(_QUEUE_DELAY_EVENT_NAME, severity="info", **access_fields) except Exception: pass # Warning when queue delay is suspiciously high try: threshold = _queue_delay_warn_threshold_ms() if threshold > 0 and int(queue_delay_ms) >= int(threshold): warn_fields: Dict[str, Any] = { "request_id": req_id, "queue_delay": int(queue_delay_ms), "threshold_ms": int(threshold), "method": method_label, "path": path_label, "handler": handler_label, } if queue_delay_source: warn_fields["queue_delay_source"] = str(queue_delay_source) emit_event("queue_delay_high", severity="warning", **warn_fields) except Exception: pass except Exception as e: try: emit_event( "record_request_outcome_failed", severity="anomaly", operation="request_metrics", handled=True, request_id=req_id, error=str(e), ) except Exception: logging.getLogger(__name__).warning( "record_request_outcome_failed", extra={"operation": "request_metrics", "handled": True} ) return response app = web.Application(middlewares=[_request_id_mw, db_health_auth_middleware]) # --- Query Performance Profiler routes (best-effort) --- # NOTE: המנגנון יכול להיות מנוטרל "קשיח" ברמת הקוד בתוך DatabaseManager.ENABLE_PROFILING. try: profiler_enabled_env = str(os.getenv("PROFILER_ENABLED", "true") or "").strip().lower() in {"1", "true", "yes", "y", "on"} except Exception: profiler_enabled_env = True if profiler_enabled_env: try: from database import db_manager # type: ignore # Hard-disable: אל תרשום routes/שירות אם DatabaseManager נועל את הפיצ'ר if not bool(getattr(db_manager, "ENABLE_PROFILING", True)): try: emit_event("profiler_disabled_by_code_flag", severity="info") except Exception: pass else: from services.query_profiler_service import PersistentQueryProfilerService # type: ignore from handlers.profiler_handler import setup_profiler_routes # type: ignore try: # שיכוך כאבים: ברירת מחדל גבוהה יותר כדי לא לתעד כל latency "רגיל" threshold_ms = int(float(os.getenv("PROFILER_SLOW_THRESHOLD_MS", "1000") or 1000)) except Exception: threshold_ms = 1000 profiler_service = PersistentQueryProfilerService(db_manager, slow_threshold_ms=threshold_ms) app["profiler_service"] = profiler_service setup_profiler_routes(app, profiler_service) except Exception as e: try: emit_event( "profiler_routes_init_failed", severity="warn", handled=True, error=str(e), ) except Exception: pass async def on_startup(app: web.Application): """אתחול שירותים בעליית השרת.""" try: # אתחול מוקדם של DB Health Service svc = await get_db_health_service() app["db_health_service"] = svc logger.info("DB Health Service initialized") except Exception as e: logger.warning(f"DB Health Service init failed: {e}") async def on_cleanup(app: web.Application): """ניקוי משאבים בכיבוי השרת.""" svc = app.get("db_health_service") if svc and hasattr(svc, "close"): await svc.close() app.on_startup.append(on_startup) app.on_cleanup.append(on_cleanup) async def health(request: web.Request) -> web.Response: return web.json_response({"status": "ok"}) async def metrics_view(request: web.Request) -> web.Response: try: payload = metrics_endpoint_bytes() return web.Response(body=payload, headers={"Content-Type": metrics_content_type()}) except Exception as e: logger.error(f"metrics_view error: {e}") # העדף emit_event שהוחלף במודול זה (monkeypatch), ואם לא – פתור דינמית את observability.emit_event try: chosen_emit = emit_event # type: ignore try: import sys as _sys obs = _sys.modules.get("observability") obs_emit = getattr(obs, "emit_event", None) if obs is not None else None except Exception: obs_emit = None # אם emit_event במודול הזה אינו מגיע מ-observability (סימן ל-monkeypatch) – העדף אותו # אחרת, אפשר להשתמש ב-observability.emit_event בזמן ריצה כדי לכבד monkeypatch שבוצע לאחר import if not callable(chosen_emit) or getattr(chosen_emit, "__module__", "") == "observability": if callable(obs_emit): chosen_emit = obs_emit # type: ignore chosen_emit( "metrics_view_error", severity="error", error_code="E_METRICS_VIEW", error=str(e), ) # type: ignore except Exception: pass try: if errors_total is not None: errors_total.labels(code="E_METRICS_VIEW").inc() except Exception: pass return web.Response(status=500, text="metrics error") async def alerts_view(request: web.Request) -> web.Response: """Alertmanager webhook endpoint: forwards alerts and logs them. Expected payload schema: {"alerts": [...]} or a single alert object. """ try: raw = await request.text() data = json.loads(raw) if raw else {} except Exception as e: try: # פנה ל-emit_event שהוחדר למודול זה (מאפשר monkeypatch בטסטים) emit_event("alerts_parse_error", severity="warn", error_code="E_ALERTS_PARSE", error=str(e)) # type: ignore except Exception: pass try: if errors_total is not None: errors_total.labels(code="E_ALERTS_PARSE").inc() except Exception: pass return web.Response(status=400, text="invalid json") # Normalize to list of alerts alerts = [] if isinstance(data, dict) and "alerts" in data and isinstance(data["alerts"], list): alerts = data["alerts"] elif isinstance(data, dict) and data: alerts = [data] # Forward via helper (Slack/Telegram) and emit events try: # Log only a lightweight counter at info level here; per-alert severities # are handled inside alert_forwarder.forward_alerts. emit_event("alert_received", severity="info", count=int(len(alerts))) except Exception: pass try: from alert_forwarder import forward_alerts # type: ignore forward_alerts(alerts) except Exception as e: # Soft-fail; דווח כאנומליה מטופלת try: emit_event( "alerts_forward_failed", severity="anomaly", operation="alerts_forward", handled=True, error=str(e), ) except Exception: logging.getLogger(__name__).warning( "alerts_forward_failed", extra={"operation": "alerts_forward", "handled": True} ) return web.json_response({"status": "ok", "forwarded": len(alerts)}) async def alerts_get_view(request: web.Request) -> web.Response: """Return recent internal alerts as JSON for ChatOps and dashboards. Query params: - limit: int (default 20) """ try: limit = int(request.query.get("limit", "20")) except Exception: limit = 20 try: from internal_alerts import get_recent_alerts # type: ignore items = get_recent_alerts(limit=max(1, min(200, limit))) or [] except Exception: items = [] return web.json_response({"alerts": items}) async def sentry_webhook_view(request: web.Request) -> web.Response: """Sentry webhook endpoint: converts Sentry alerts into internal alerts. - Emits internal_alerts.emit_internal_alert(...) so Telegram forwarding works. - Persists to Mongo via monitoring.alerts_storage through internal_alerts (best-effort). """ try: body = await request.read() except Exception: body = b"" if not _verify_sentry_webhook(request, body): try: emit_event("sentry_webhook_unauthorized", severity="warn", handled=True) # type: ignore except Exception: pass return web.json_response({"ok": False, "error": "unauthorized"}, status=401) try: raw_text = body.decode("utf-8", errors="replace") if body else "" payload = json.loads(raw_text) if raw_text else {} except Exception as e: try: emit_event( "sentry_webhook_parse_error", severity="warn", handled=True, error=str(e), ) # type: ignore except Exception: pass return web.json_response({"ok": False, "error": "invalid_json"}, status=400) alert = _extract_sentry_alert(payload) if alert is None: try: emit_event("sentry_webhook_ignored", severity="info", handled=True, reason="empty_payload") # type: ignore except Exception: pass return web.json_response({"ok": True, "ignored": True}) should_emit = _should_emit_sentry_alert(alert.dedup_key) try: emit_event( "sentry_webhook_received", severity="info", handled=True, dedup=not should_emit, sentry_short_id=str(alert.details.get("sentry_short_id") or ""), project=str(alert.details.get("project") or ""), level=str(alert.details.get("level") or ""), action=str(alert.details.get("action") or ""), ) # type: ignore except Exception: pass if should_emit: try: from internal_alerts import emit_internal_alert # type: ignore emit_internal_alert( name=alert.name, severity=str(alert.severity), summary=str(alert.summary), **(alert.details or {}), ) except Exception as e: try: emit_event( "sentry_webhook_emit_failed", severity="anomaly", handled=True, error=str(e), ) # type: ignore except Exception: pass return web.json_response({"ok": False, "error": "emit_failed"}, status=500) return web.json_response({"ok": True, "deduped": (not should_emit)}) async def incidents_get_view(request: web.Request) -> web.Response: """Return incident history as JSON. Query params: - limit: int (default 20) """ try: limit = int(request.query.get("limit", "20")) except Exception: limit = 20 try: from remediation_manager import get_incidents # type: ignore items = get_incidents(limit=max(1, min(200, limit))) or [] except Exception: items = [] return web.json_response({"incidents": items}) async def ai_explain_view(request: web.Request) -> web.Response: start = time.perf_counter() req_id = request.headers.get("X-Request-ID") or "" if _AI_ROUTE_TOKEN: auth_header = request.headers.get("Authorization", "").strip() expected_header = f"Bearer {_AI_ROUTE_TOKEN}" try: valid_token = secrets.compare_digest(auth_header, expected_header) except Exception: valid_token = False if not valid_token: return web.json_response( { "error": "unauthorized", "message": "missing or invalid bearer token", }, status=401, ) try: payload = await request.json() except json.JSONDecodeError: return web.json_response({"error": "bad_request", "message": "invalid json"}, status=400) except Exception: return web.json_response({"error": "bad_request", "message": "invalid body"}, status=400) context = payload.get("context") expected_sections = payload.get("expected_sections") if not isinstance(context, dict): return web.json_response( {"error": "invalid_context", "message": "context must be an object"}, status=400, ) if expected_sections is not None and not isinstance(expected_sections, list): expected_sections = None alert_uid = str(context.get("alert_uid") or "") try: explanation = await asyncio.wait_for( ai_explain_service.generate_ai_explanation( context, expected_sections=expected_sections, request_id=req_id, ), timeout=_AI_REQUEST_TIMEOUT, ) except asyncio.TimeoutError: duration = time.perf_counter() - start try: emit_event( "ai_explain_request_failure", severity="error", alert_uid=alert_uid, duration_ms=int(duration * 1000), error_code="handler_timeout", handled=True, ) except Exception: pass return web.json_response( { "error": "timeout", "message": "פניית ה-AI חרגה מחלון הזמן", }, status=504, ) except ai_explain_service.AiExplainError as exc: duration = time.perf_counter() - start error_code = str(exc) or "provider_error" if error_code == "invalid_context": status = 400 message = "מבנה ההקשר אינו תקין" elif error_code == "anthropic_api_key_missing": status = 503 message = "השירות לא הוגדר (חסר מפתח Anthropic)" elif error_code in {"service_unavailable", "ai_explain_service_unavailable"}: status = 503 message = "שירות ההסבר אינו זמין" else: status = 502 message = "ספק ה-AI לא הצליח להחזיר תשובה" try: emit_event( "ai_explain_request_failure", severity="error", alert_uid=alert_uid, duration_ms=int(duration * 1000), error_code=error_code, handled=status < 500, ) except Exception: pass return web.json_response({"error": error_code, "message": message}, status=status) except Exception as exc: duration = time.perf_counter() - start try: emit_event( "ai_explain_request_failure", severity="error", alert_uid=alert_uid, duration_ms=int(duration * 1000), error=str(exc), handled=False, ) except Exception: pass logger.exception("ai_explain_handler_failed") return web.json_response( {"error": "internal_error", "message": "שגיאה בשירות ההסבר"}, status=500, ) duration = time.perf_counter() - start try: emit_event( "ai_explain_request_success", severity="info", alert_uid=alert_uid, duration_ms=int(duration * 1000), provider=explanation.get("provider"), ) except Exception: pass return web.json_response(explanation) async def share_view(request: web.Request) -> web.Response: share_id = request.match_info.get("share_id", "") try: # 해결 תלויות בזמן ריצה כדי לאפשר monkeypatch ב-tests: try: import importlib integ = importlib.import_module("integrations") _code_sharing = getattr(integ, "code_sharing") except Exception: from integrations import code_sharing as _code_sharing # type: ignore data = _code_sharing.get_internal_share(share_id) except Exception as e: logger.error(f"share_view error: {e}") try: # דווח אירוע מובנה על שגיאה בהצגת שיתוף # הערה: משתמש ב-emit_event של המודול כדי לאפשר monkeypatch בטסטים emit_event("share_view_error", severity="error", error_code="E_SHARE_VIEW", share_id=str(share_id), error=str(e)) except Exception: pass try: if errors_total is not None: errors_total.labels(code="E_SHARE_VIEW").inc() except Exception: pass data = None if not data: # החזר 404 וגם דווח אירוע מובנה לצורכי ניטור try: # הערה: משתמש ב-emit_event של המודול כדי לאפשר monkeypatch בטסטים emit_event("share_view_not_found", severity="warn", share_id=str(share_id)) except Exception: pass return web.Response(status=404, text="Share not found or expired") # החזר HTML פשוט לצפייה נוחה code = data.get("code", "") file_name = data.get("file_name", "snippet.txt") language = data.get("language", "text") try: emit_event("share_view_success", severity="info", share_id=str(share_id), file_name=str(file_name), language=str(language)) except Exception: pass html = f""" <!DOCTYPE html> <html lang="he"> <head> <meta charset="utf-8" /> <meta name="viewport" content="width=device-width, initial-scale=1" /> <title>Share: {file_name}</title> <style> body {{ font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace; margin: 24px; }} pre {{ white-space: pre-wrap; word-wrap: break-word; background: #0d1117; color: #c9d1d9; padding: 16px; border-radius: 8px; overflow: auto; }} h1 {{ font-size: 18px; }} .meta {{ color: #57606a; margin-bottom: 8px; }} a {{ color: #58a6ff; }} </style> </head> <body> <h1>📄 {file_name}</h1> <div class="meta">שפה: {language}</div> <pre>{html_escape(code)}</pre> </body> </html> """ return web.Response(text=html, content_type="text/html") async def db_health_pool_view(request: web.Request) -> web.Response: """GET /api/db/pool - מצב Connection Pool.""" try: # await לקבלת ה-service (יכול להיות async init) svc = await get_db_health_service() # await לקריאה ל-MongoDB (Motor או thread pool) pool = await svc.get_pool_status() return web.json_response(pool.to_dict()) except Exception as e: logger.error(f"db_health_pool error: {e}") return web.json_response({"error": "failed", "message": "internal_error"}, status=500) async def db_health_ops_view(request: web.Request) -> web.Response: """GET /api/db/ops - פעולות איטיות פעילות.""" try: threshold = int(request.query.get("threshold_ms", "1000")) include_system = request.query.get("include_system", "").lower() == "true" svc = await get_db_health_service() # await חובה! - הקריאה ל-MongoDB היא אסינכרונית ops = await svc.get_current_operations( threshold_ms=threshold, include_system=include_system, ) return web.json_response( { "count": len(ops), "threshold_ms": threshold, "operations": [op.to_dict() for op in ops], } ) except Exception as e: logger.error(f"db_health_ops error: {e}") return web.json_response({"error": "failed", "message": "internal_error"}, status=500) async def db_health_collections_view(request: web.Request) -> web.Response: """GET /api/db/collections - סטטיסטיקות collections.""" try: collection = request.query.get("collection") svc = await get_db_health_service() # await חובה! - collStats יכול לקחת זמן stats = await svc.get_collection_stats(collection_name=collection) return web.json_response( { "count": len(stats), "collections": [s.to_dict() for s in stats], } ) except Exception as e: logger.error(f"db_health_collections error: {e}") return web.json_response({"error": "failed", "message": "internal_error"}, status=500) async def db_collection_documents_view(request: web.Request) -> web.Response: """GET /api/db/{collection}/documents - שליפת מסמכים מ-collection. Query Parameters: skip: מספר מסמכים לדלג (ברירת מחדל: 0) limit: מספר מסמכים להחזיר (ברירת מחדל: 20, מקסימום: 100) Returns: JSON עם documents, total, skip, limit, has_more Status Codes: 200: הצלחה 400: פרמטרים לא תקינים / שם collection לא תקין 403: גישה ל-collection חסומה 500: שגיאת שרת """ try: collection_name = request.match_info.get("collection", "") # פרסור פרמטרים עם ברירות מחדל try: skip = int(request.query.get("skip", "0")) limit = int(request.query.get("limit", "20")) except ValueError: return web.json_response( {"error": "invalid_params", "message": "skip and limit must be integers"}, status=400, ) # וולידציה בסיסית if skip < 0 or limit < 1: return web.json_response( {"error": "invalid_params", "message": "skip >= 0, limit >= 1"}, status=400, ) if skip > MAX_SKIP: return web.json_response( {"error": "invalid_params", "message": f"skip cannot exceed {MAX_SKIP}"}, status=400, ) filters: Dict[str, Any] = {} user_id_raw = clean_db_health_filter_value(request.query.get("userId") or request.query.get("user_id"), 40) status_raw = clean_db_health_filter_value(request.query.get("status"), 40) file_id_raw = clean_db_health_filter_value(request.query.get("fileId") or request.query.get("file_id"), 120) sort_raw = clean_db_health_filter_value(request.query.get("sort"), 20) sort_value = sort_raw or None if user_id_raw: try: filters["user_id"] = int(user_id_raw) except Exception: filters["user_id"] = user_id_raw if status_raw: filters["status"] = status_raw if file_id_raw: filters["file_id"] = file_id_raw svc = await get_db_health_service() result = await svc.get_documents( collection_name=collection_name, skip=skip, limit=limit, filters=filters or None, sort=sort_value, ) return web.json_response(result) except InvalidCollectionNameError as e: # שם collection לא תקין → 400 Bad Request return web.json_response( {"error": "invalid_collection_name", "message": str(e)}, status=400, ) except CollectionAccessDeniedError as e: # גישה חסומה → 403 Forbidden return web.json_response( {"error": "access_denied", "message": str(e)}, status=403, ) except Exception as e: logger.error(f"db_collection_documents error: {e}") return web.json_response( {"error": "internal_error", "message": "An unexpected error occurred"}, status=500, ) async def db_health_summary_view(request: web.Request) -> web.Response: """GET /api/db/health - סיכום בריאות כללי.""" try: svc = await get_db_health_service() # await חובה! summary = await svc.get_health_summary() return web.json_response(summary) except Exception as e: logger.error(f"db_health_summary error: {e}") return web.json_response({"error": "failed", "message": "internal_error"}, status=500) async def maintenance_cleanup_view(request: web.Request) -> web.Response: """GET /api/debug/maintenance_cleanup Endpoint תחזוקה חד-פעמי (DB): - איפוס קולקציות לוגים: slow_queries_log, service_metrics - יצירת TTL לקולקציות לוגים (מחיקה אוטומטית בעתיד) - ניקוי אינדקסים ב-code_snippets: השארה של אינדקסים קריטיים בלבד. ⚠️ מוגן ע"י db_health_auth_middleware (Bearer token). """ from services.db_provider import get_db def _run_cleanup() -> dict: preview = str(request.query.get("preview", "") or "").lower() in {"1", "true", "yes", "on"} db = get_db() # --- collections to purge --- try: slow_queries_coll = db.slow_queries_log except Exception: slow_queries_coll = None try: service_metrics_coll = db.service_metrics except Exception: service_metrics_coll = None try: code_snippets_coll = db.code_snippets except Exception: code_snippets_coll = None # Detect fail-open/noop db if not getattr(slow_queries_coll, "delete_many", None) or not getattr(service_metrics_coll, "delete_many", None): raise RuntimeError("db_unavailable_or_no_delete_many") if not getattr(code_snippets_coll, "index_information", None) or not getattr(code_snippets_coll, "drop_index", None): raise RuntimeError("db_unavailable_or_no_index_management") deleted_slow = 0 deleted_metrics = 0 if not preview: slow_res = slow_queries_coll.delete_many({}) metrics_res = service_metrics_coll.delete_many({}) deleted_slow = int(getattr(slow_res, "deleted_count", 0) or 0) deleted_metrics = int(getattr(metrics_res, "deleted_count", 0) or 0) # --- TTL indexes (permanent cleanup) --- def _ensure_ttl_index(coll: Any, *, field: str, expire_seconds: int, index_name: str) -> dict: """Ensure TTL index exists with requested expireAfterSeconds (best-effort).""" info_before = {} try: info_before = coll.index_information() or {} except Exception: info_before = {} existing_meta = info_before.get(index_name) if isinstance(info_before, dict) else None if isinstance(existing_meta, dict): try: existing_expire = existing_meta.get("expireAfterSeconds") existing_key = existing_meta.get("key") if ( existing_expire == int(expire_seconds) and existing_key == [(field, 1)] ): return { "name": index_name, "field": field, "expireAfterSeconds": int(expire_seconds), "status": "exists", } except Exception: pass # Try drop conflicting TTL index with the same name if not preview: try: coll.drop_index(index_name) except Exception: pass created_name = None if not preview: try: created_name = coll.create_index( [(field, 1)], name=index_name, expireAfterSeconds=int(expire_seconds), background=True, ) except Exception as e: # Best-effort: report error and continue return { "name": index_name, "field": field, "expireAfterSeconds": int(expire_seconds), "status": "error", "error": str(e), } return { "name": str(created_name or index_name), "field": field, "expireAfterSeconds": int(expire_seconds), "status": "planned" if preview else "created", } # Explicitly drop legacy TTL index that may conflict (IndexOptionsConflict) service_metrics_pre_drop: dict[str, Any] if preview: service_metrics_pre_drop = {"planned_drop": ["metrics_ttl"]} else: dropped_pre: list[str] = [] try: service_metrics_coll.drop_index("metrics_ttl") dropped_pre.append("metrics_ttl") except Exception: pass service_metrics_pre_drop = {"dropped": dropped_pre} ttl_results: dict[str, Any] = { "slow_queries_log": _ensure_ttl_index( slow_queries_coll, field="timestamp", expire_seconds=604800, # 7 days index_name="ttl_cleanup", ), # service_metrics uses "ts" in code, but we'll also create a "timestamp" TTL for safety/backward-compat "service_metrics_ts": _ensure_ttl_index( service_metrics_coll, field="ts", expire_seconds=86400, # 24 hours index_name="ttl_cleanup_ts", ), "service_metrics_timestamp": _ensure_ttl_index( service_metrics_coll, field="timestamp", expire_seconds=86400, # 24 hours index_name="ttl_cleanup", ), } ttl_results["service_metrics_pre_drop"] = service_metrics_pre_drop # --- index cleanup (code_snippets) --- try: idx_info = code_snippets_coll.index_information() or {} except Exception: idx_info = {} indexes_before = sorted([str(k) for k in idx_info.keys()]) indexes_before_details: dict[str, dict] = {} for k, meta in (idx_info or {}).items(): if not isinstance(meta, dict): continue name = str(k) # keep response compact but useful for review (especially text indexes) indexes_before_details[name] = { "key": meta.get("key"), "unique": bool(meta.get("unique")) if "unique" in meta else False, "expireAfterSeconds": meta.get("expireAfterSeconds"), "weights": meta.get("weights"), "default_language": meta.get("default_language"), } dropped: list[str] = [] kept: list[str] = [] drop_errors: dict[str, str] = {} def _should_keep_code_snippets_index(index_name: str, meta: Any) -> bool: # Keep by explicit name if index_name in {"_id_", "search_text_idx", "unique_file_name", "user_id", "user_updated_at"}: return True if not isinstance(meta, dict): return False key = meta.get("key") # Keep single-field user_id index (name may be user_id_1 / user_id_idx etc.) if key in ([("user_id", 1)], [("user_id", -1)]): return True # Keep default UI sort index (user_id + updated_at desc) if key in ( [("user_id", 1), ("updated_at", -1)], [("updated_at", -1), ("user_id", 1)], [("user_id", -1), ("updated_at", -1)], [("updated_at", -1), ("user_id", -1)], ): return True # Keep a unique index enforcing unique file name per user (compound user_id + file_name) try: if bool(meta.get("unique")) and key in ( [("user_id", 1), ("file_name", 1)], [("file_name", 1), ("user_id", 1)], [("user_id", 1), ("file_name", -1)], [("file_name", -1), ("user_id", 1)], [("user_id", -1), ("file_name", 1)], [("file_name", 1), ("user_id", -1)], [("user_id", -1), ("file_name", -1)], [("file_name", -1), ("user_id", -1)], ): return True except Exception: pass return False for name in sorted(idx_info.keys()): idx_name = str(name) meta = idx_info.get(name) if _should_keep_code_snippets_index(idx_name, meta): kept.append(idx_name) continue try: if preview: dropped.append(idx_name) # planned else: code_snippets_coll.drop_index(idx_name) dropped.append(idx_name) except Exception as e: # Best-effort: אם אינדקס לא קיים/לא ניתן למחיקה, נשמור שגיאה ונתקדם. drop_errors[idx_name] = str(e) # Ensure critical UI sort index exists (user_id + updated_at desc) ensured: dict[str, Any] = {"name": "user_updated_at", "key": [("user_id", 1), ("updated_at", -1)]} if preview: ensured["status"] = "planned" else: try: code_snippets_coll.create_index( [("user_id", 1), ("updated_at", -1)], name="user_updated_at", background=True, ) ensured["status"] = "created_or_exists" except Exception as e: ensured["status"] = "error" ensured["error"] = str(e) try: idx_info_after = code_snippets_coll.index_information() or {} except Exception: idx_info_after = {} indexes_after = sorted([str(k) for k in idx_info_after.keys()]) return { "ok": True, "preview": preview, "deleted_documents": { "slow_queries_log": deleted_slow, "service_metrics": deleted_metrics, "total": deleted_slow + deleted_metrics, }, "ttl": ttl_results, "indexes": { "collection": "code_snippets", "before": indexes_before, "before_details": indexes_before_details, "after": indexes_after, "dropped": dropped, "kept": kept, "drop_errors": drop_errors, "ensured": ensured, }, } try: result = await asyncio.to_thread(_run_cleanup) try: emit_event( "maintenance_cleanup_done", severity="info", deleted_total=int((result.get("deleted_documents") or {}).get("total") or 0), dropped_indexes_count=int(len(((result.get("indexes") or {}).get("dropped")) or [])), ) except Exception: pass return web.json_response(result) except Exception as e: logger.exception("maintenance_cleanup_failed") try: emit_event("maintenance_cleanup_failed", severity="error", handled=True, error=str(e)) except Exception: pass return web.json_response({"ok": False, "error": "failed", "message": "internal_error"}, status=500) app.router.add_get("/health", health) # Always expose /healthz alias for platform probes try: app.router.add_get("/healthz", health) except Exception as e: # Ignore if already registered – אך תעד כאנומליה מטופלת try: emit_event( "healthz_route_register_failed", severity="anomaly", operation="startup", handled=True, error=str(e), ) except Exception: logging.getLogger(__name__).warning( "healthz_route_register_failed", extra={"operation": "startup", "handled": True} ) app.router.add_get("/metrics", metrics_view) app.router.add_post("/alerts", alerts_view) app.router.add_get("/alerts", alerts_get_view) app.router.add_post("/webhooks/sentry", sentry_webhook_view) app.router.add_get("/incidents", incidents_get_view) app.router.add_post("/api/ai/explain", ai_explain_view) app.router.add_get("/share/{share_id}", share_view) app.router.add_get("/api/debug/maintenance_cleanup", maintenance_cleanup_view) app.router.add_get("/api/db/pool", db_health_pool_view) app.router.add_get("/api/db/ops", db_health_ops_view) app.router.add_get("/api/db/collections", db_health_collections_view) app.router.add_get("/api/db/{collection}/documents", db_collection_documents_view) app.router.add_get("/api/db/health", db_health_summary_view) # Jobs Monitor routes try: register_jobs_routes(app) except Exception: pass return app
[docs] async def get_jobs_list(request: web.Request) -> web.Response: """GET /api/jobs - רשימת כל ה-jobs""" from services.job_registry import JobRegistry registry = JobRegistry() jobs = [] for job in registry.list_all(): is_enabled = registry.is_enabled(job.job_id) jobs.append( { "job_id": job.job_id, "name": job.name, "description": job.description, "category": job.category.value, "type": job.job_type.value, "interval_seconds": job.interval_seconds, "enabled": is_enabled, "env_toggle": job.env_toggle, # can_trigger: מאפשר הפעלה ידנית אם יש callback מוגדר "can_trigger": bool(job.callback_name), } ) return web.json_response({"jobs": jobs})
[docs] async def get_job_detail(request: web.Request) -> web.Response: """GET /api/jobs/{job_id} - פרטי job ספציפי""" from services.job_registry import JobRegistry from services.job_tracker import get_job_tracker job_id = request.match_info.get("job_id") registry = JobRegistry() tracker = get_job_tracker() job = registry.get(job_id) if not job: return web.json_response({"error": "Job not found"}, status=404) history = tracker.get_job_history(job_id, limit=20) active = [r for r in tracker.get_active_runs() if r.job_id == job_id] return web.json_response( { "job": { "job_id": job.job_id, "name": job.name, "description": job.description, "category": job.category.value, "type": job.job_type.value, "interval_seconds": job.interval_seconds, "enabled": registry.is_enabled(job.job_id), "source_file": job.source_file, }, "active_runs": [_run_to_dict(r) for r in active], "history": [_run_to_dict(r) for r in history], } )
[docs] async def get_run_detail(request: web.Request) -> web.Response: """GET /api/jobs/runs/{run_id} - פרטי הרצה""" from services.job_tracker import get_job_tracker run_id = request.match_info.get("run_id") tracker = get_job_tracker() run = tracker.get_run(run_id) if not run: return web.json_response({"error": "Run not found"}, status=404) return web.json_response({"run": _run_to_dict(run, include_logs=True)})
[docs] async def get_active_runs(request: web.Request) -> web.Response: """GET /api/jobs/active - הרצות פעילות""" from services.job_tracker import get_job_tracker tracker = get_job_tracker() runs = tracker.get_active_runs() return web.json_response({"active_runs": [_run_to_dict(r) for r in runs]})
[docs] async def trigger_job(request: web.Request) -> web.Response: """POST /api/jobs/{job_id}/trigger - הפעלה ידנית""" from services.job_registry import JobRegistry job_id = request.match_info.get("job_id") registry = JobRegistry() job = registry.get(job_id) if not job: return web.json_response({"error": "Job not found"}, status=404) # Trigger via Telegram JobQueue (same process as bot) tg_app = None try: tg_app = request.app.get("telegram_application") except Exception: tg_app = None if tg_app is None: return web.json_response({"error": "job_queue_unavailable"}, status=503) jq = getattr(tg_app, "job_queue", None) if jq is None or not hasattr(jq, "get_jobs_by_name"): return web.json_response({"error": "job_queue_unavailable"}, status=503) try: jobs = jq.get_jobs_by_name(job_id) except Exception: jobs = [] if not jobs: return web.json_response({"error": "job_not_scheduled"}, status=404) job_obj = jobs[0] callback = getattr(job_obj, "callback", None) if not callable(callback): return web.json_response({"error": "job_callback_unavailable"}, status=500) # Schedule immediate one-off run try: suffix = str(int(time.time())) except Exception: suffix = "now" try: data = getattr(job_obj, "data", None) chat_id = getattr(job_obj, "chat_id", None) user_id = getattr(job_obj, "user_id", None) kwargs = {"when": 0, "name": f"{job_id}_manual_{suffix}"} if data is not None: kwargs["data"] = data if chat_id is not None: kwargs["chat_id"] = chat_id if user_id is not None: kwargs["user_id"] = user_id jq.run_once(callback, **kwargs) except Exception: try: # Fallback for older signatures jq.run_once(callback, when=0) except Exception: logger.exception("jobs_trigger_failed job_id=%s", job_id) return web.json_response( {"error": "trigger_failed", "message": "Failed to trigger job"}, status=500, ) return web.json_response({"message": f"Job {job_id} triggered", "job_id": job_id})
def _run_to_dict(run, include_logs: bool = False) -> dict: """המרת JobRun ל-dict""" d = { "run_id": run.run_id, "job_id": run.job_id, "started_at": run.started_at.isoformat() if run.started_at else None, "ended_at": run.ended_at.isoformat() if run.ended_at else None, "status": run.status.value, "progress": run.progress, "total_items": run.total_items, "processed_items": run.processed_items, "error_message": run.error_message, "trigger": run.trigger, "user_id": run.user_id, "duration_seconds": ( (run.ended_at - run.started_at).total_seconds() if run.ended_at and run.started_at else None ), } if include_logs: d["logs"] = [ { "timestamp": log.timestamp.isoformat(), "level": log.level, "message": log.message, } for log in run.logs ] return d
[docs] def register_jobs_routes(app: web.Application): """רישום routes של Jobs""" app.router.add_get("/api/jobs", get_jobs_list) app.router.add_get("/api/jobs/active", get_active_runs) app.router.add_get("/api/jobs/{job_id}", get_job_detail) app.router.add_get("/api/jobs/runs/{run_id}", get_run_detail) app.router.add_post("/api/jobs/{job_id}/trigger", trigger_job)
[docs] def run(host: str = "0.0.0.0", port: int = 10000) -> None: # OpenTelemetry (best-effort, fail-open) try: from observability_otel import setup_telemetry as _setup_otel # type: ignore _setup_otel( service_name=str(os.getenv("OTEL_SERVICE_NAME") or "codebot-aiohttp"), service_version=os.getenv("SERVICE_VERSION") or os.getenv("RENDER_GIT_COMMIT") or None, environment=os.getenv("ENVIRONMENT") or os.getenv("ENV") or None, flask_app=None, ) except Exception: pass try: note_deployment_started("aiohttp service starting up") except Exception: pass app = create_app() web.run_app(app, host=host, port=port)
if __name__ == "__main__": # pragma: no cover - used by Render/CLI entrypoint host = os.getenv("WEB_HOST") or os.getenv("HOST") or "0.0.0.0" port_env = os.getenv("PORT") or os.getenv("WEB_PORT") or "10000" try: port = int(port_env) except (TypeError, ValueError): port = 10000 run(host=host, port=port)