Allow to disable duplicate check for the http request
Closed this issue · 1 comments
In https://github.com/small-thinking/taotie/blob/main/examples/summarize_to_notion/example.py#L32, we have a dedup memory to help filter out duplicate information. We'd like to bypass the dedup step if we explicitly specify it with a flag in the http request.
Actually in for http service source, we would like to add a flag to control whether we bypass the duplicate check.
You may check the implementation of HttpService and its parent class to add the function. Some related code:
https://github.com/small-thinking/taotie/blob/main/taotie/sources/http_service.py#L41-L77
https://github.com/small-thinking/taotie/blob/main/taotie/sources/base.py#L42-L63
Hey @yxjiang, I've started working on this PR! I'll be adding a flag in the HTTP request to control whether to bypass the duplicate check, and then modify the _send_data
method in the BaseSource
class and the _process
method in the HttpService
class accordingly. Give me a minute!
Some code snippets I looked at (click to expand). If some file is missing from here, you can mention the path in the ticket description.
taotie/taotie/sources/github.py
Lines 63 to 94 in 89549fb
repo_readme = "" | |
try: | |
async with session.get( | |
readme_url, verify_ssl=False | |
) as readme_response: | |
if readme_response.status == 200: | |
repo_readme = await readme_response.text() | |
repo_readme = repo_readme[: self.readme_truncate_size] | |
else: | |
self.logger.warning( | |
f"Failed to fetch from {readme_url}. Status: {readme_response.status}" | |
) | |
raise Exception(readme_response.status) | |
except Exception as e: | |
self.logger.warning( | |
f"Failed to fetch from {readme_url}. Reason: {e}" | |
) | |
github_event = Information( | |
type="github-repo", | |
datetime_str=get_datetime(), | |
id=repo_name, | |
uri=repo_url, | |
content=repo_readme, | |
repo_desc=repo_desc, | |
repo_lang=repo_lang, | |
repo_star=repo_star, | |
repo_fork=repo_fork, | |
) | |
res = await self._send_data(github_event) | |
if res: | |
self.logger.debug(f"{idx}: {github_event.encode()}") |
taotie/taotie/reporter/notion_reporter.py
Lines 20 to 29 in 89549fb
def __init__( | |
self, | |
knowledge_source_uri: str, | |
date_lookback: int, | |
type_filters: List[str], | |
topic_filters: List[str], | |
verbose: bool = False, | |
**kwargs, | |
): |
taotie/examples/summarize_to_notion/example.py
Lines 1 to 75 in 89549fb
"""The main entry to collect the information from all the sources. | |
""" | |
import asyncio | |
import os | |
from taotie.consumer.info_summarizer import InfoSummarizer | |
from taotie.gatherer import Gatherer | |
from taotie.message_queue import RedisMessageQueue | |
from taotie.orchestrator import Orchestrator | |
from taotie.sources.arxiv import Arxiv | |
from taotie.sources.github import GithubTrends | |
from taotie.sources.http_service import HttpService | |
from taotie.sources.twitter import TwitterSubscriber | |
from taotie.storage.memory import DedupMemory | |
from taotie.storage.notion import NotionStorage | |
from taotie.utils import load_env | |
def create_notion_summarizer(): | |
load_env() # This has to be called as early as possible. | |
verbose = True | |
batch_size = 1 | |
fetch_interval = 10 | |
redis_url = "taotie-redis" | |
channel_name = "taotie" | |
mq = RedisMessageQueue(redis_url=redis_url, channel_name=channel_name, verbose=True) | |
instruction = None | |
storage = NotionStorage( | |
root_page_id=os.getenv("NOTION_ROOT_PAGE_ID"), verbose=verbose | |
) | |
dedup_memory = DedupMemory(redis_url=redis_url) | |
consumer = InfoSummarizer( | |
buffer_size=1000, | |
summarize_instruction=instruction, | |
verbose=verbose, | |
dedup=True, | |
storage=storage, | |
max_tokens=1000, | |
max_buffer_size=1000, | |
) | |
gatherer = Gatherer( | |
message_queue=mq, | |
consumer=consumer, | |
batch_size=batch_size, | |
fetch_interval=fetch_interval, | |
verbose=verbose, | |
) | |
orchestrator = Orchestrator(verbose=verbose) | |
orchestrator.set_gatherer(gatherer=gatherer) | |
# Http service source. | |
http_service_source = HttpService( | |
sink=mq, verbose=verbose, dedup_memory=dedup_memory, truncate_size=200000 | |
) | |
orchestrator.add_source(http_service_source) | |
# Twitter source. | |
# rules = ["from:RunGreatClasses", "#GPT", "#llm", "#AI", "#AGI", "foundation model"] | |
# twitter_source = TwitterSubscriber(rules=rules, sink=mq, verbose=verbose) | |
# orchestrator.add_source(twitter_source) | |
# Github source. | |
github_source = GithubTrends(sink=mq, verbose=verbose, dedup_memory=dedup_memory) | |
orchestrator.add_source(github_source) | |
# Arxiv source. | |
arxiv_source = Arxiv(sink=mq, verbose=verbose, dedup_memory=dedup_memory) | |
orchestrator.add_source(arxiv_source) | |
asyncio.run(orchestrator.run()) | |
if __name__ == "__main__": | |
create_notion_summarizer() |
taotie/taotie/sources/http_service.py
Lines 1 to 144 in 89549fb
"""A web service that accept the http request to collect the data. | |
""" | |
import asyncio | |
import traceback | |
from urllib.parse import urlparse | |
import aiohttp | |
from bs4 import BeautifulSoup | |
from hypercorn.asyncio import serve | |
from hypercorn.config import Config | |
from quart import Quart, jsonify, request | |
from unstructured.partition.html import partition_html # type: ignore | |
from taotie.entity import Information | |
from taotie.message_queue import MessageQueue, SimpleMessageQueue | |
from taotie.sources.base import BaseSource | |
from taotie.utils import get_datetime | |
class HttpService(BaseSource): | |
"""A web service that accept the http request to collect the data.""" | |
def __init__(self, sink: MessageQueue, verbose=False, **kwargs): | |
super().__init__(sink=sink, verbose=verbose, **kwargs) | |
self.app = Quart(__name__) | |
self.app.add_url_rule( | |
"/api/v1/url", "check_url", self.check_url, methods=["POST"] | |
) | |
self.truncate_size = kwargs.get("truncate_size", -1) | |
self.logger.info("HttpService initialized.") | |
async def check_url(self): | |
data = await request.get_json() | |
if "url" not in data: | |
return jsonify({"error": "Missing URL parameter"}), 400 | |
url = data["url"] | |
content_type = data.get("content_type", "") | |
result = await self._process(url=url, content_type=content_type) | |
return jsonify({"result": result}) | |
async def _process(self, url: str, content_type: str = "html") -> str: | |
self.logger.info(f"HttpService received {url} and {content_type}.") | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.get( | |
url, allow_redirects=True, verify_ssl=False | |
) as response: | |
content = await response.text() | |
doc = None | |
if content_type == "github-repo": | |
doc = await self._parse_github_repo(url, content) | |
elif "arxiv.org/abs/" in url: | |
# Parse the arxiv link. Extract the title, abstract, authors, and link to the paper. | |
doc = await self._parse_arxiv(url, content) | |
elif "application/pdf" in content_type: | |
message = "pdf" | |
elif content_type in ["html", "blog"]: | |
elements = partition_html(text=content) | |
message = "\n".join([str(e) for e in elements]) | |
doc = Information( | |
type=content_type, | |
datetime_str=get_datetime(), | |
id=url, | |
uri=url, | |
content=message[: self.truncate_size], | |
) | |
else: | |
return f"unknown content type {content_type}." | |
if doc: | |
self.logger.output(doc.encode()) | |
await self._send_data(doc) | |
return "ok" | |
except Exception as e: | |
self.logger.error(f"Error: {e}") | |
traceback.print_exc() | |
return "error" | |
async def run(self): | |
config = Config() | |
config.bind = ["0.0.0.0:6543"] | |
await serve(self.app, config) | |
async def _cleanup(self): | |
pass | |
async def _parse_github_repo(self, url: str, content: str) -> Information: | |
elements = partition_html(text=content) | |
message = "\n".join([str(e) for e in elements]) | |
# Only keep the last two sections of the github repo and use it for the id. | |
parsed_url = urlparse(url) | |
last_two_segments = parsed_url.path.split("/")[-2:] | |
id = "/" + "/".join(last_two_segments) | |
return Information( | |
type="github-repo", | |
datetime_str=get_datetime(), | |
id=id, | |
uri=url, | |
content=message[: self.truncate_size], | |
) | |
async def _parse_arxiv(self, url: str, content: str) -> Information: | |
"""Parse the arxiv link. Extract the title, abstract, authors, and link to the paper.""" | |
soup = BeautifulSoup(content, "html.parser") | |
title = ( | |
soup.find("h1", class_="title mathjax") | |
.text.strip() | |
.replace("Title:", "") | |
.strip() | |
) | |
abstract = ( | |
soup.find("blockquote", class_="abstract mathjax") | |
.text.strip() | |
.replace("Abstract: ", "") | |
) | |
authors = ", ".join( | |
[ | |
author.text.strip() | |
for author in soup.find_all("div", class_="authors")[0].find_all("a") | |
] | |
) | |
pdf_link = ( | |
"https://arxiv.org" | |
+ soup.find("div", class_="full-text").find("a", class_="download-pdf")[ | |
"href" | |
] | |
) | |
doc = Information( | |
type="arxiv", | |
datetime_str=get_datetime(), | |
id=title, | |
uri=url, | |
content=f"Title: {title}\n\nAuthors: {authors}\n\nAbstract: {abstract}", | |
) | |
return doc | |
if __name__ == "__main__": | |
message_queue = SimpleMessageQueue() | |
http_service = HttpService(sink=message_queue, verbose=True) | |
asyncio.run(http_service.run()) |
Lines 1 to 68 in 89549fb
import atexit | |
import os | |
from abc import ABC, abstractmethod | |
from taotie.entity import Information | |
from taotie.message_queue import MessageQueue | |
from taotie.storage.memory import DedupMemory | |
from taotie.utils import * | |
class BaseSource(ABC): | |
"""Base class for all sources. | |
This class is used to provide a common interface for all sources. It | |
provides a method to get the source name and a method to get the source data. | |
""" | |
def __init__( | |
self, | |
sink: MessageQueue, | |
verbose: bool = False, | |
dedup_memory: Optional[DedupMemory] = None, | |
**kwargs, | |
): | |
load_dotenv() | |
if not sink: | |
raise ValueError("The sink cannot be None.") | |
self.logger = Logger(logger_name=os.path.basename(__file__), verbose=verbose) | |
self.verbose = (verbose,) | |
self.sink = sink | |
self.dedup_memory = dedup_memory | |
atexit.register(self._cleanup) | |
def __str__(self): | |
return self.__class__.__name__ | |
@abstractmethod | |
async def _cleanup(self): | |
"""Clean up the source.""" | |
async def _send_data(self, information: Information) -> bool: | |
"""This function is used to send the grabbed data to the message queue. | |
It is supposed to be called within the callback function of the streaming | |
function or in the forever loop. | |
Args: | |
information (Information): The data to send. | |
Returns: | |
bool: True if the data is sent successfully, False otherwise. | |
""" | |
# Skip duplicate information according to the id. | |
if self.dedup_memory: | |
id = information.get_id() | |
if await self.dedup_memory.exists(id): | |
self.logger.warning(f"Duplicated information: {id}, will ignore.") | |
return False | |
await self.sink.put(information.encode()) | |
# Record the index. | |
if self.dedup_memory: | |
await self.dedup_memory.check_and_save(id) | |
return True | |
@abstractmethod | |
async def run(self): | |
"""This method should wrap the streaming logic or a forever loop.""" | |
raise NotImplementedError |
I'm a bot that handles simple bugs and feature requests but I might make mistakes. Please be kind!