Загрузка...

Script
Bumper of all topics

Thread in Python created by Эксфадор Oct 21, 2025. (bumped Jan 23, 2026) 996 views

  1. Эксфадор
    Python
    import json
    import time
    import logging
    import threading
    import heapq
    import queue
    import base64
    from dataclasses import dataclass, field
    from collections import deque
    from typing import Any, Dict, List, Optional, Tuple
    from urllib import request, parse, error


    class Config:
    def __init__(self) -> None:
    self.base_url = "https://prod-api.lolz.live"
    self.access_token = "тут токен"
    self.authorization = f"Bearer {self.access_token}"
    self.request_timeout_seconds = 20
    self.bump_gap_seconds = 0.5
    self.fetch_threads_interval_seconds = 0.5
    self.success_cooldown_seconds = 0.5
    self.failure_backoff_seconds = 0.5
    self.empty_queue_sleep_seconds = 0.5
    self.rate_limit_min_interval_seconds = 0.2
    self.worker_count = 6


    class RateLimiter:
    def __init__(self, min_interval_seconds: float) -> None:
    self.min_interval_seconds = float(min_interval_seconds)
    self._lock = threading.Lock()
    self._next_allowed_ts = 0.0

    def wait(self) -> None:
    while True:
    with self._lock:
    now = time.time()
    if now >= self._next_allowed_ts:
    self._next_allowed_ts = now + self.min_interval_seconds
    return
    sleep_for = self._next_allowed_ts - now
    if sleep_for > 0:
    time.sleep(sleep_for)


    class JsonHttpClient:
    def __init__(self, base_url: str, authorization: str, timeout_seconds: int, rate_limiter: Optional[RateLimiter] = None) -> None:
    self.base_url = base_url.rstrip("/")
    self.authorization = authorization
    self.timeout_seconds = timeout_seconds
    self.rate_limiter = rate_limiter

    def request(self, method: str, path: str, query: Optional[Dict[str, Any]] = None, data: Optional[Dict[str, Any]] = None) -> Tuple[int, Optional[Dict[str, Any]], Dict[str, Any]]:
    url = f"{self.base_url}{path}"
    if query:
    url = f"{url}?{parse.urlencode(query)}"
    headers = {"accept": "application/json", "authorization": self.authorization}
    body_bytes: Optional[bytes] = None
    if method.upper() == "POST":
    if data is None:
    body_bytes = b""
    else:
    headers["content-type"] = "application/json"
    body_bytes = json.dumps(data, ensure_ascii=False).encode("utf-8")
    req = request.Request(url=url, data=body_bytes, headers=headers, method=method.upper())
    try:
    if self.rate_limiter is not None:
    self.rate_limiter.wait()
    with request.urlopen(req, timeout=self.timeout_seconds) as resp:
    status = getattr(resp, "status", 200)
    raw = resp.read()
    text = raw.decode("utf-8", errors="replace") if raw else ""
    if text:
    try:
    return status, json.loads(text), dict(resp.headers)
    except Exception:
    return status, None, dict(resp.headers)
    return status, None, dict(resp.headers)
    except error.HTTPError as e:
    raw = e.read()
    text = raw.decode("utf-8", errors="replace") if raw else ""
    try:
    return e.code, json.loads(text) if text else None, dict(e.headers or {})
    except Exception:
    return e.code, None, dict(e.headers or {})
    except Exception:
    return 0, None, {}


    @dataclass
    class ThreadData:
    thread_id: int
    title: str
    bump_can: bool
    next_available_time: Optional[int]


    class ThreadFetcher:
    def __init__(self, client: JsonHttpClient, creator_user_id: int) -> None:
    self.client = client
    self.creator_user_id = creator_user_id

    def fetch_all(self) -> List[ThreadData]:
    status, payload, _ = self.client.request("GET", "/threads", {"creator_user_id": self.creator_user_id})
    if status != 200 or not isinstance(payload, dict):
    return []
    threads_raw = payload.get("threads", [])
    result: List[ThreadData] = []
    for item in threads_raw:
    try:
    permissions = item.get("permissions", {})
    bump = permissions.get("bump", {}) if isinstance(permissions, dict) else {}
    result.append(ThreadData(
    thread_id=int(item.get("thread_id")),
    title=str(item.get("thread_title", "")),
    bump_can=bool(bump.get("can", False)),
    next_available_time=int(bump.get("next_available_time")) if bump.get("next_available_time") is not None else None,
    ))
    except Exception:
    continue
    return result


    class Bumper:
    def __init__(self, client: JsonHttpClient) -> None:
    self.client = client

    def bump(self, thread_id: int) -> Tuple[bool, Optional[int], str]:
    status, payload, _ = self.client.request("POST", f"/threads/{thread_id}/bump")
    if 200 <= status < 300:
    next_available_time = None
    if isinstance(payload, dict):
    v = payload.get("next_available_time") or payload.get("bump", {}).get("next_available_time") if isinstance(payload.get("bump", {}), dict) else None
    if v is not None:
    try:
    next_available_time = int(v)
    except Exception:
    next_available_time = None
    return True, next_available_time, "ok"
    next_available_time = None
    message = ""
    if isinstance(payload, dict):
    for k in ["error", "message", "detail"]:
    if payload.get(k):
    message = str(payload.get(k))
    break
    v = payload.get("next_available_time") or payload.get("bump", {}).get("next_available_time") if isinstance(payload.get("bump", {}), dict) else None
    if v is not None:
    try:
    next_available_time = int(v)
    except Exception:
    next_available_time = None
    return False, next_available_time, message


    @dataclass(order=True)
    class BumpScheduleEntry:
    due_at: float
    order: int
    thread_id: int = field(compare=False)
    title: str = field(compare=False)


    class BumpScheduler:
    def __init__(self, success_cooldown_seconds: int, failure_backoff_seconds: int) -> None:
    self.success_cooldown_seconds = success_cooldown_seconds
    self.failure_backoff_seconds = failure_backoff_seconds
    self._heap: List[BumpScheduleEntry] = []
    self._index: Dict[int, BumpScheduleEntry] = {}
    self._counter = 0
    self._lock = threading.Lock()

    def update_from_threads(self, threads: List[ThreadData], now: float) -> None:
    with self._lock:
    for t in threads:
    if t.thread_id not in self._index:
    self._counter += 1
    entry = BumpScheduleEntry(due_at=now, order=self._counter, thread_id=t.thread_id, title=t.title)
    self._index[t.thread_id] = entry
    heapq.heappush(self._heap, entry)

    def _push_or_replace(self, thread_id: int, title: str, due_at: float) -> None:
    with self._lock:
    self._counter += 1
    entry = BumpScheduleEntry(due_at=due_at, order=self._counter, thread_id=thread_id, title=title)
    self._index[thread_id] = entry
    heapq.heappush(self._heap, entry)

    def get_next(self) -> Optional[BumpScheduleEntry]:
    with self._lock:
    while self._heap:
    top = heapq.heappop(self._heap)
    current = self._index.get(top.thread_id)
    if current and current.due_at == top.due_at and current.order == top.order:
    return top
    return None

    def get_next_if_due(self, now: float) -> Optional[BumpScheduleEntry]:
    with self._lock:
    if not self._heap:
    return None
    top = self._heap[0]
    if top.due_at <= now:
    heapq.heappop(self._heap)
    current = self._index.get(top.thread_id)
    if current and current.due_at == top.due_at and current.order == top.order:
    return top
    return None

    def reschedule_success(self, thread_id: int, title: str, now: float) -> None:
    self._push_or_replace(thread_id, title, now + self.success_cooldown_seconds)

    def reschedule_failure(self, thread_id: int, title: str, now: float, next_available_time: Optional[int]) -> None:
    self._push_or_replace(thread_id, title, now + self.failure_backoff_seconds)


    class StatsTracker:
    def __init__(self) -> None:
    self.timestamps = deque()
    self._lock = threading.Lock()

    def record_success(self, ts: float) -> None:
    with self._lock:
    self.timestamps.append(ts)
    self._prune(ts)

    def _prune(self, now: float) -> None:
    threshold = now - 30 * 24 * 3600
    while self.timestamps and self.timestamps[0] < threshold:
    self.timestamps.popleft()

    def count_last(self, seconds: int, now: float) -> int:
    with self._lock:
    self._prune(now)
    cutoff = now - seconds
    i = len(self.timestamps) - 1
    c = 0
    while i >= 0:
    if self.timestamps[i] >= cutoff:
    c += 1
    i -= 1
    else:
    break
    return c

    def snapshot(self, now: float) -> Dict[str, int]:
    return {
    "hour": self.count_last(3600, now),
    "day": self.count_last(86400, now),
    "week": self.count_last(7 * 86400, now),
    "month": self.count_last(30 * 86400, now),
    }


    class StyledLogger:
    COLORS = {
    "RESET": "\033[0m",
    "DIM": "\033[2m",
    "BOLD": "\033[1m",
    "GREEN": "\033[32m",
    "YELLOW": "\033[33m",
    "RED": "\033[31m",
    "BLUE": "\033[34m",
    "CYAN": "\033[36m",
    "MAGENTA": "\033[35m",
    }

    class Formatter(logging.Formatter):
    def __init__(self, colors: Dict[str, str]) -> None:
    super().__init__(fmt="%(message)s")
    self.colors = colors

    def format(self, record: logging.LogRecord) -> str:
    ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    level = record.levelname
    color = self.colors.get("CYAN")
    if level == "INFO":
    color = self.colors.get("GREEN")
    elif level == "WARNING":
    color = self.colors.get("YELLOW")
    elif level == "ERROR":
    color = self.colors.get("RED")
    badge = f"{self.colors.get('BOLD')}{color}[{level}]#{self.colors.get('RESET')}"
    return f"{self.colors.get('DIM')}{ts}{self.colors.get('RESET')} {badge} {record.getMessage()}"

    @staticmethod
    def create(name: str = "bump") -> logging.Logger:
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    if not logger.handlers:
    h = logging.StreamHandler()
    h.setFormatter(StyledLogger.Formatter(StyledLogger.COLORS))
    logger.addHandler(h)
    logger.propagate = False
    return logger


    class Runner:
    def __init__(self, config: Config) -> None:
    self.config = config
    self.rate_limiter = RateLimiter(config.rate_limit_min_interval_seconds)
    self.client = JsonHttpClient(config.base_url, config.authorization, config.request_timeout_seconds, self.rate_limiter)
    resolved_user_id = self._resolve_creator_user_id()
    self.fetcher = ThreadFetcher(self.client, resolved_user_id)
    self.bumper = Bumper(self.client)
    self.scheduler = BumpScheduler(config.success_cooldown_seconds, config.failure_backoff_seconds)
    self.stats = StatsTracker()
    self.logger = StyledLogger.create()
    self._stop_event = threading.Event()
    self.jobs: "queue.Queue[BumpScheduleEntry]" = queue.Queue()
    self._threads: List[threading.Thread] = []
    self.logger.info(f"creator_user_id: {resolved_user_id}")

    def _resolve_creator_user_id(self) -> int:
    token = self.config.authorization.split(" ", 1)[1] if " " in self.config.authorization else self.config.authorization
    try:
    parts = token.split(".")
    if len(parts) >= 2:
    payload_b64 = parts[1]
    padding = '=' * (-len(payload_b64) % 4)
    payload_json = base64.urlsafe_b64decode(payload_b64 + padding).decode("utf-8", errors="replace")
    data = json.loads(payload_json)
    sub = data.get("sub")
    if isinstance(sub, int):
    return sub
    if isinstance(sub, str) and sub.isdigit():
    return int(sub)
    except Exception:
    pass
    try:
    status, payload, _ = self.client.request("GET", "/users/me/")
    if status == 200 and isinstance(payload, dict):
    for key in ["user_id", "id", "uid"]:
    if key in payload:
    val = payload[key]
    if isinstance(val, int):
    return val
    if isinstance(val, str) and val.isdigit():
    return int(val)
    except Exception:
    pass
    return 0

    def stop(self) -> None:
    self._stop_event.set()
    for t in self._threads:
    try:
    t.join(timeout=2)
    except Exception:
    pass

    def run(self) -> None:
    def refresher() -> None:
    next_refresh = 0.0
    while not self._stop_event.is_set():
    try:
    now = time.time()
    if now >= next_refresh:
    threads = self.fetcher.fetch_all()
    self.scheduler.update_from_threads(threads, now)
    next_refresh = now + float(self.config.fetch_threads_interval_seconds)
    time.sleep(self.config.bump_gap_seconds)
    except Exception:
    self.logger.error("refresher", exc_info=True)
    time.sleep(self.config.bump_gap_seconds)

    def dispatcher() -> None:
    while not self._stop_event.is_set():
    try:
    now = time.time()
    entry = self.scheduler.get_next_if_due(now)
    if entry is not None:
    self.jobs.put(entry)
    else:
    time.sleep(0.05)
    except Exception:
    self.logger.error("dispatcher", exc_info=True)
    time.sleep(self.config.bump_gap_seconds)

    class BumpWorker(threading.Thread):
    def __init__(self, outer: "Runner") -> None:
    super().__init__(daemon=True)
    self.outer = outer

    def run(self) -> None:
    while not self.outer._stop_event.is_set():
    try:
    try:
    entry = self.outer.jobs.get(timeout=0.5)
    except queue.Empty:
    continue
    ok, next_available_time, message = self.outer.bumper.bump(entry.thread_id)
    if ok:
    ts = time.time()
    self.outer.stats.record_success(ts)
    snap = self.outer.stats.snapshot(ts)
    self.outer.logger.info(f"поднят {entry.thread_id} • {entry.title} | за час {snap['hour']} • сутки {snap['day']} • неделя {snap['week']} • месяц {snap['month']}")
    self.outer.scheduler.reschedule_success(entry.thread_id, entry.title, ts)
    else:
    self.outer.scheduler.reschedule_failure(entry.thread_id, entry.title, time.time(), next_available_time)
    except Exception:
    self.outer.logger.error("worker", exc_info=True)
    finally:
    try:
    self.outer.jobs.task_done()
    except Exception:
    pass

    t_ref = threading.Thread(target=refresher, daemon=True)
    t_disp = threading.Thread(target=dispatcher, daemon=True)
    self._threads = [t_ref, t_disp]
    for _ in range(self.config.worker_count):
    self._threads.append(BumpWorker(self))
    for t in self._threads:
    t.start()
    while not self._stop_event.is_set():
    time.sleep(0.5)


    def main() -> None:
    cfg = Config()
    while True:
    runner = Runner(cfg)
    try:
    runner.run()
    except KeyboardInterrupt:
    runner.stop()
    break
    except Exception:
    logging.getLogger("bump").error("фатальная ошибка, перезапуск", exc_info=True)
    time.sleep(cfg.bump_gap_seconds)


    if __name__ == "__main__":
    main()

    Автоподнятие всех тем, которые есть у Вас.
     
    1. View previous comments (1)
    2. морфий
  2. ЧернильныйБро
    ЗАЧЕМ, это ему будет столького времени же стоить... По 12(вроде) тем в 12-24 часов
     
  3. 555
    мне кажется за массовый бамп неактуальных тем в бан улетишь на раз-два :smiledog:
     
  4. MORTY
    MORTY Nov 19, 2025 i ball was rawt 25,314 Oct 13, 2018
    бампер это на машине вроде если я не ошибаюсь
     
    1. Эксфадор Topic starter
      avatarMORTY , передний бампер
Loading...