import os import time import random import logging import uuid import inspect import copy import re from urllib import parse as uparse from datetime import datetime from typing import Optional, List, Any, Tuple, Dict, Callable from contextlib import asynccontextmanager from enum import Enum import ujson from fastapi import FastAPI, HTTPException, Request, Response, File, Form, Query, UploadFile, Depends, Path from fastapi.responses import HTMLResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, create_model from pydantic.dataclasses import dataclass from lxml import html from httpx import AsyncClient, TimeoutException from httpx_retries import RetryTransport, Retry from urllib.parse import urlparse import httpx import uvicorn CONFIG: Dict[str, str] = { 'http_proxy': 'http://10.0.0.3:20171/', 'dev': os.path.isfile('./dev') } logging.basicConfig(level=logging.WARNING) logger = logging.getLogger("cache") logger.setLevel(logging.DEBUG if CONFIG['dev'] else logging.WARNING) logger = logging.getLogger("network") logger.setLevel(logging.DEBUG if CONFIG['dev'] else logging.WARNING) logger = logging.getLogger("main") logger.setLevel(logging.DEBUG if CONFIG['dev'] else logging.INFO) class Exceptions: class WorkNotFound(Exception): def __init__(self, wid: int, cid: Optional[int] = None, *args: Any): super().__init__(*args) self.work_id = wid self.chapter_id = cid class NetworkTimeout(Exception): def __init__(self, endpoint, *args: Any): super().__init__(*args) self.endpoint = endpoint class Unauthorized(Exception): def __init__(self, endpoint, *args: Any): super().__init__(*args) self.endpoint = endpoint class RE: author_pseuds = re.compile(r"^/users/([^/]+)/pseuds/([^/]+)$") chapter_work = re.compile(r"/works/(\d+)/chapters/(\d+)") @dataclass class Pair: left: Any right: Any class Util: @staticmethod def cleanInt(string: str) -> int: digits_only = ''.join(filter(str.isdigit, string)) number = int(digits_only) if string and string[0] == '-': number = -number return number @staticmethod def html_to_text_with_newlines(stree: html.HtmlElement) -> str: tree = copy.deepcopy(stree) for br in tree.xpath('//br'): if br.tail: br.tail = '\n' + br.tail else: br.tail = '\n' for p in tree.xpath('//p'): if p.tail: p.tail = '\n\n' + p.tail else: p.tail = '\n\n' return tree.text_content() @staticmethod def split_lines(tree) -> list[str]: return filter(lambda x: bool(x), tree.split('\n')) class AsyncCache: def __init__(self, fallback: Callable[[str], float] = None, expire: Optional[int] = None): self.expire = expire self.cache: dict[Pair] = {} self.fallback = fallback self.logger = logging.getLogger("cache") async def __call__(self, key: str) -> Any: return await self.get(key) async def get(self, key: str) -> Any: if key in self.cache: item = self.cache[key] if self.expire: if self.expire < 0: self.logger.debug(f'Miss {key} skip') return await self.resolve(key) if time.time() - item.right > self.expire: self.logger.debug(f'Miss {key} expired') return await self.resolve(key) else: self.logger.debug(f'Hit {key}') return item.left else: self.logger.debug(f'Hit {key} skip') return item.left else: self.logger.debug(f'Miss {key} notfound') return await self.resolve(key) def set(self, key: str, value: Any): self.logger.debug(f'Set {key}') self.cache[key] = Pair(value, time.time()) async def resolve(self, key: str) -> Any: self.logger.debug(f'Resolve {key}') if self.fallback: st = time.time() value = self.fallback(key) if inspect.isawaitable(value): self.logger.debug(f'Resolve {key} await') value = await value et = time.time() self.set(key, value) self.logger.debug(f'Resolve {key} done in {et-st:.4f}') return value else: self.logger.warning(f'No fallback function') raise KeyError(key) class Category(Enum): FF = "ff" # GL FM = "fm" # BG MM = "mm" # BL NONE = "none" MULTI = "multi" UNKNOWN = "unknown" @classmethod def parse(cls, typ: str) -> "Category": match typ.strip(): case 'F/F': return cls.FF case 'F/M': return cls.FM case 'M/M': return cls.MM case 'Gen': return cls.NONE case 'Multi': return cls.MULTI case _: return cls.UNKNOWN @dataclass class AO3Time: year: int month: int date: int _mon_name = { "jan": 1, "feb": 2, "mar": 3, "apr": 4, "may": 5, "jun": 6, "jul": 7, "aug": 8, "sept": 9, "oct": 10, "nov": 11, "dec": 12, } @classmethod def parse(cls, string: str) -> "AO3Time": year, month, date = string.strip().split('-',2) return cls( year=int(year), month=int(month), date=int(date), ) @classmethod def parse1(cls, string: str) -> "AO3Time": date, month, year = string.strip().split(' ',2) return cls( year=int(year), month=cls._mon_name.get(month.lower(),-1), date=int(date), ) def __repr__(self) -> str: return f'{self.year}-{self.month}-{self.date}' @dataclass class AO3WorkStat: publishedTime: AO3Time wordCount: int hitCount: int kudoCount: Optional[int] = None commentCount: Optional[int] = None bookmarkCount: Optional[int] = None chapter: Optional[Pair] = None updatedTime: Optional[AO3Time] = None completedTime: Optional[AO3Time] = None @dataclass class WorkDataResult: @dataclass class ChapterItem: title: str chapterId: int workId: int chapterId: Optional[int] title: str text: list[str] pseud: str lang: str stats: AO3WorkStat summary: Optional[str] = None fandom: Optional[list[str]] = None category: Optional[list[Category]] = None relationship: Optional[list[str]] = None additionalTags: Optional[list[str]] = None code: int = 0 chapters: Optional[list[ChapterItem]] = None chapterIndex: Optional[int] = None @dataclass class SearchWorkItem: workId: int title: str pseud: str author: str summary: str stats: AO3WorkStat giftTo: Optional[str] = None @dataclass class SimpleSearchWorkResult: keyword: str count: int pageCount: int page: int works: list[SearchWorkItem] code: int = 0 class Network: def __init__(self, proxy: Optional[str] = None) -> None: self.logger = logging.getLogger("network") self.http_proxy: Optional[str] = proxy self.client = AsyncClient( proxy=self.http_proxy, transport=RetryTransport(retry=Retry( total=5, backoff_factor=0.5 )) ) self.xmpp_client = httpx.AsyncClient(timeout=10) self._get = Util.AsyncCache(self._real_get, 36000000 if CONFIG['dev'] else 1800) self.works = {} async def send_message(self, msg: str): url = "http://10.0.0.3:52222/send" headers = {"Content-Type": "application/json"} payload = { "alias": "ao3mirror", "message": msg } try: response = await self.xmpp_client.post(url, headers=headers, json=payload) response.raise_for_status() except httpx.HTTPError as e: self.logger.error(e) async def _real_get(self, uri: str) -> httpx.Response: url = urlparse(uri) try: self.logger.debug(f'HTTP Get {uri}') response = await self.client.get(uri, headers=self._build_headers(url.scheme, url.netloc)) except httpx.TimeoutException as e: raise Exceptions.NetworkTimeout(uri) from e response.raise_for_status() return response def _build_headers(self, scheme: str, host: str) -> Dict[str, str]: return { "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:137.0) Gecko/20100101 Firefox/137.0", "Referer": f"{scheme}://{host}/", "Origin": f"{scheme}://{host}" } async def _get_page_data(self, work_id: int, chapter_id: Optional[int] = None) -> WorkDataResult: url = ( f'https://archiveofourown.org/works/{work_id}/chapters/{chapter_id}?view_adult=yes' if chapter_id else f'https://archiveofourown.org/works/{work_id}?view_adult=yes' ) try: response = await self._get(url) except httpx.HTTPStatusError as e: if e.response.status_code == 404: raise Exceptions.WorkNotFound(work_id, chapter_id) elif e.response.status_code == 302: newurl = e.response.headers.get('location') if (result := RE.chapter_work.match(newurl)) and (ids := result.groups()): work_id, chapter_id = ids self.logger.info(f'Work {work_id} has first chapter {chapter_id}') try: work_id = Util.cleanInt(work_id) chapter_id = Util.cleanInt(chapter_id) except ValueError as e1: raise e1 from e return await self.get_page_data(work_id, chapter_id) elif newurl.endswith('/users/login?restricted=true'): raise Exceptions.Unauthorized(url) else: logger.debug(f'Unknown 302 to: {e.response.headers.get("location")}') raise else: self.logger.debug(f'Unknown code: {e.response.status_code}') raise return self.parse_page_data(response.text, work_id, chapter_id) async def get_page_data(self, work_id: int, chapter_id: Optional[int] = None) -> WorkDataResult: wid = (work_id, chapter_id) if wid in self.works: self.logger.debug(f"Work {work_id}:{chapter_id} hit cache") return self.works[wid] else: self.logger.debug(f"Work {work_id}:{chapter_id} miss cache") result = await self._get_page_data(work_id, chapter_id) self.works[wid] = result return result def parse_page_data(self, result: str, work_id: int, chapter_id: Optional[int]) -> WorkDataResult: tree = html.fromstring(result) meta_block = tree.cssselect('div.wrapper')[0].cssselect('dl.work')[0] stats_block = meta_block.cssselect('dl.stats')[0] if (tblock := stats_block.cssselect('dt.status')) and (dblock := stats_block.cssselect('dd.status')): tblock = tblock[0] dblock = dblock[0] match tblock.text_content().strip().lower(): case 'completed:': completedTime = AO3Time.parse(dblock.text_content()) updatedTime = None case 'updated:': completedTime = None updatedTime = AO3Time.parse(dblock.text_content()) case _: completedTime = None updatedTime = None else: completedTime = None updatedTime = None if block := meta_block.cssselect('dd.category'): categories = [Category.parse(item.text_content().strip()) for item in block[0].cssselect('li a')] else: categories = [] if block := meta_block.cssselect('dd.fandom'): fandoms = [item.text_content().strip() for item in block[0].cssselect('li a')] else: fandoms = [] if block := meta_block.cssselect('dd.relationship'): relationships = [item.text_content().strip() for item in block[0].cssselect('li a')] else: relationships = [] if block := meta_block.cssselect('dd.freeform.tags'): additionalTags = [item.text_content().strip() for item in block[0].cssselect('li a')] else: additionalTags = [] body_block = tree.xpath('//*[@id="workskin"]')[0] preface_block = body_block.cssselect('.preface')[0] if (chapter_block := meta_block.cssselect('dd.chapters')): chapter_block = chapter_block[0] left, right = chapter_block.text_content().split('/') right = right.strip() if right == '?': right = -1 else: right = Util.cleanInt(right) chapter = Pair(Util.cleanInt(left), right) else: chapter = None text = [] for p in body_block.cssselect('div.userstuff p'): text.extend(Util.split_lines(Util.html_to_text_with_newlines(p))) if chapter_id: chapters = [] chapterIndex = 0 index = 0 if chapter_block := tree.cssselect('div.work ul.work.navigation.actions li.chapter ul#chapter_index.expandable.secondary li form'): for chapter_option in chapter_block[0].cssselect('select#selected_id')[0].cssselect('option'): title = chapter_option.text_content().split('.')[1].strip() cchapter_id = int(chapter_option.attrib['value']) chapters.append(WorkDataResult.ChapterItem(title, cchapter_id)) if chapter_id == cchapter_id: chapterIndex = index index += 1 else: chapters.append(WorkDataResult.ChapterItem(tree.cssselect('div#workskin div#chapters h3.title')[0].text_content().split(':',1)[1].strip(), chapter_id)) else: chapters = chapterIndex = None summary = ( Util.html_to_text_with_newlines(summary_block[0]).strip() if (summary_block := tree.cssselect('blockquote.userstuff')) else None ) return WorkDataResult( workId=work_id, chapterId=chapter_id, chapterIndex=chapterIndex, title=preface_block.cssselect("h2.title")[0].text_content().strip(), summary=summary, text=text, stats=AO3WorkStat( publishedTime=AO3Time.parse(stats_block.cssselect('dd.published')[0].text_content()), completedTime = completedTime, updatedTime = updatedTime, wordCount=Util.cleanInt(stats_block.cssselect('dd.words')[0].text_content()), #kudoCount=Util.cleanInt(stats_block.cssselect('dd.kudos')[0].text_content()), hitCount=Util.cleanInt(stats_block.cssselect('dd.hits')[0].text_content()), chapter=chapter ), category=categories, fandom=fandoms, relationship=relationships, additionalTags = additionalTags, lang=meta_block.cssselect('dd.language')[0].text_content().strip(), pseud=preface_block.cssselect('.byline')[0].text_content().strip(), chapters=chapters ) async def search_works(self, keyword: str, page: int = 1) -> SimpleSearchWorkResult: page = abs(page) url = f"https://archiveofourown.org/works/search?work_search%5Bquery%5D={uparse.quote(keyword)}" if page in (0, 1) else \ f"https://archiveofourown.org/works/search?work_search%5Bquery%5D={uparse.quote(keyword)}&page={page}" response = await self._get(url) return self.parse_search_result(response.text, keyword, page) def parse_search_result(self, result: str, keyword: str, page: int = 1) -> SimpleSearchWorkResult: tree = html.fromstring(result) count_block = tree.cssselect('h3.heading')[0].text_content() if 'Found' in count_block: count = Util.cleanInt(count_block.split('Found', 1)[0]) work_blocks = tree.cssselect('ol.work.index.group > li') works = [] for block in work_blocks: ass = block.cssselect('div.header.module > h4.heading > a') match len(ass): case 1: b1 = ass[0]; b2 = None; b3 = None case 2: b1, b2 = ass; b3 = None case 3: b1, b2, b3 = ass b3 = b3.attrib['href'].split('/')[-2] case _: continue if b1 is not None: work_id = int(b1.attrib['href'].split('/')[-1]) else: continue if b2 is not None and (match := RE.author_pseuds.match(b2.attrib['href'])): username, pseud = match.groups() else: username = pseud = '' datetime_block = block.cssselect('div.header.module > p.datetime')[0] summary = Util.html_to_text_with_newlines(summary_block[0]) if (summary_block := block.cssselect('blockquote.userstuff')) else '' kudoCount = Util.cleanInt(kudo_block[0].text_content()) if (kudo_block := block.cssselect('dd.kudos')) else None stats_block = block.cssselect('dl.stats')[0] works.append(SearchWorkItem( workId=work_id, title=b1.text_content().strip(), author=username, pseud=pseud, giftTo=b3, summary=summary.strip(), stats=AO3WorkStat( publishedTime=AO3Time.parse1(datetime_block.text_content()), wordCount=Util.cleanInt(stats_block.cssselect('dd.words')[0].text_content()), kudoCount=kudoCount, hitCount=Util.cleanInt(stats_block.cssselect('dd.hits')[0].text_content()), ), )) page_count = Util.cleanInt(next_block[0].getprevious().text_content()) \ if (page_block := tree.cssselect('ol.pagination.actions')) and (next_block := page_block[0].cssselect('li.next')) else 1 return SimpleSearchWorkResult( keyword=keyword, count=count, pageCount=page_count, page=page, works=works ) else: return SimpleSearchWorkResult( code=1, keyword=keyword, count=-1, pageCount=-1, page=page, works=[] ) network: Optional[Network] = None async def startup() -> None: global network logger.info("Create httpx async client.") network = Network(CONFIG['http_proxy']) async def shutdown() -> None: logger.info("Shutdown httpx async client.") @asynccontextmanager async def lifespan(app: FastAPI): await startup() yield await shutdown() def require_network() -> Optional[Network]: if network is None: raise HTTPException(status_code=503, detail="Network client not initialized.") return network app = FastAPI( lifespan=lifespan, ) @app.exception_handler(Exceptions.NetworkTimeout) async def network_timeout_handler(response: Response, exc: Exceptions.NetworkTimeout): return JSONResponse(status_code=504, content={"code": 1, "endpoint": exc.endpoint}) @app.exception_handler(httpx.HTTPStatusError) async def http_status_error_handler(response: Response, exc: httpx.HTTPStatusError): return JSONResponse(status_code=502, content={"code": 2, 'endpoint': str(exc.request.url),'status': exc.response.status_code}) @app.exception_handler(Exceptions.Unauthorized) async def http_status_error_handler(response: Response, exc: Exceptions.Unauthorized): return JSONResponse(status_code=401, content={"code": 1, 'endpoint': exc.endpoint}) @app.get("/search/simple") async def search_work_simple( network: Network = Depends(require_network), keyword: str = Query(...), page: int = Query(1) ) -> SimpleSearchWorkResult: result = await network.search_works(keyword, page) await network.send_message(f'[Info] 简单搜索\n关键词: {keyword}\n总数: {result.count}\n页面: {result.page}/{result.pageCount}') logger.info(f'Simple Search {keyword} count {result.count} page {result.page}/{result.pageCount}') return result @app.get("/work/{work_id}") async def get_work( network: Network = Depends(require_network), work_id: int = Path(..., description="Work ID"), ) -> WorkDataResult: try: result = await network.get_page_data(work_id) await network.send_message(f'[Info] 作品\nID: {result.workId}\n作者: {result.pseud}\n标题: {result.title}\n发布于: {result.stats.publishedTime}') logger.info(f'Work {work_id} title: {result.title}') except Exceptions.WorkNotFound as e: logger.warning(f"Work not found: {e.work_id}") raise JSONResponse(status_code=404, content={"code": 1, "work_id": e.work_id}) else: return result @app.get("/work/{work_id}/{chapter_id}") async def get_work_chapter( network: Network = Depends(require_network), work_id: int = Path(..., description="Work ID"), chapter_id: int = Path(..., description="Chapter ID"), ) -> WorkDataResult: try: result = await network.get_page_data(work_id, chapter_id) await network.send_message(f'[Info] 作品\nID: {result.workId}:{result.chapterId}\n作者: {result.pseud}\n标题: {result.title}\n发布于: {result.stats.publishedTime}') logger.info(f'Work {work_id}:{chapter_id} title: {result.title}') except Exceptions.WorkNotFound as e: logger.warning(f"Work not found: {e.work_id} {e.chapter_id}") return JSONResponse(status_code=404, content={"code": 1, "work_id": e.work_id, 'chapter_id': chapter_id}) else: return result if __name__ == "__main__": uvicorn.run("main:app", host="0.0.0.0", port=28001, log_level="debug" if CONFIG['dev'] else 'info', reload=CONFIG['dev'])