small-thinking/taotie

Allow to disable duplicate check for the http request

Closed this issue · 1 comments

In https://github.com/small-thinking/taotie/blob/main/examples/summarize_to_notion/example.py#L32, we have a dedup memory to help filter out duplicate information. We'd like to bypass the dedup step if we explicitly specify it with a flag in the http request.

Actually in for http service source, we would like to add a flag to control whether we bypass the duplicate check.

You may check the implementation of HttpService and its parent class to add the function. Some related code:
https://github.com/small-thinking/taotie/blob/main/taotie/sources/http_service.py#L41-L77
https://github.com/small-thinking/taotie/blob/main/taotie/sources/base.py#L42-L63

Hey @yxjiang, I've started working on this PR! I'll be adding a flag in the HTTP request to control whether to bypass the duplicate check, and then modify the _send_data method in the BaseSource class and the _process method in the HttpService class accordingly. Give me a minute!

Some code snippets I looked at (click to expand). If some file is missing from here, you can mention the path in the ticket description.

repo_readme = ""
try:
async with session.get(
readme_url, verify_ssl=False
) as readme_response:
if readme_response.status == 200:
repo_readme = await readme_response.text()
repo_readme = repo_readme[: self.readme_truncate_size]
else:
self.logger.warning(
f"Failed to fetch from {readme_url}. Status: {readme_response.status}"
)
raise Exception(readme_response.status)
except Exception as e:
self.logger.warning(
f"Failed to fetch from {readme_url}. Reason: {e}"
)
github_event = Information(
type="github-repo",
datetime_str=get_datetime(),
id=repo_name,
uri=repo_url,
content=repo_readme,
repo_desc=repo_desc,
repo_lang=repo_lang,
repo_star=repo_star,
repo_fork=repo_fork,
)
res = await self._send_data(github_event)
if res:
self.logger.debug(f"{idx}: {github_event.encode()}")

def __init__(
self,
knowledge_source_uri: str,
date_lookback: int,
type_filters: List[str],
topic_filters: List[str],
verbose: bool = False,
**kwargs,
):

"""The main entry to collect the information from all the sources.
"""
import asyncio
import os
from taotie.consumer.info_summarizer import InfoSummarizer
from taotie.gatherer import Gatherer
from taotie.message_queue import RedisMessageQueue
from taotie.orchestrator import Orchestrator
from taotie.sources.arxiv import Arxiv
from taotie.sources.github import GithubTrends
from taotie.sources.http_service import HttpService
from taotie.sources.twitter import TwitterSubscriber
from taotie.storage.memory import DedupMemory
from taotie.storage.notion import NotionStorage
from taotie.utils import load_env
def create_notion_summarizer():
load_env() # This has to be called as early as possible.
verbose = True
batch_size = 1
fetch_interval = 10
redis_url = "taotie-redis"
channel_name = "taotie"
mq = RedisMessageQueue(redis_url=redis_url, channel_name=channel_name, verbose=True)
instruction = None
storage = NotionStorage(
root_page_id=os.getenv("NOTION_ROOT_PAGE_ID"), verbose=verbose
)
dedup_memory = DedupMemory(redis_url=redis_url)
consumer = InfoSummarizer(
buffer_size=1000,
summarize_instruction=instruction,
verbose=verbose,
dedup=True,
storage=storage,
max_tokens=1000,
max_buffer_size=1000,
)
gatherer = Gatherer(
message_queue=mq,
consumer=consumer,
batch_size=batch_size,
fetch_interval=fetch_interval,
verbose=verbose,
)
orchestrator = Orchestrator(verbose=verbose)
orchestrator.set_gatherer(gatherer=gatherer)
# Http service source.
http_service_source = HttpService(
sink=mq, verbose=verbose, dedup_memory=dedup_memory, truncate_size=200000
)
orchestrator.add_source(http_service_source)
# Twitter source.
# rules = ["from:RunGreatClasses", "#GPT", "#llm", "#AI", "#AGI", "foundation model"]
# twitter_source = TwitterSubscriber(rules=rules, sink=mq, verbose=verbose)
# orchestrator.add_source(twitter_source)
# Github source.
github_source = GithubTrends(sink=mq, verbose=verbose, dedup_memory=dedup_memory)
orchestrator.add_source(github_source)
# Arxiv source.
arxiv_source = Arxiv(sink=mq, verbose=verbose, dedup_memory=dedup_memory)
orchestrator.add_source(arxiv_source)
asyncio.run(orchestrator.run())
if __name__ == "__main__":
create_notion_summarizer()

"""A web service that accept the http request to collect the data.
"""
import asyncio
import traceback
from urllib.parse import urlparse
import aiohttp
from bs4 import BeautifulSoup
from hypercorn.asyncio import serve
from hypercorn.config import Config
from quart import Quart, jsonify, request
from unstructured.partition.html import partition_html # type: ignore
from taotie.entity import Information
from taotie.message_queue import MessageQueue, SimpleMessageQueue
from taotie.sources.base import BaseSource
from taotie.utils import get_datetime
class HttpService(BaseSource):
"""A web service that accept the http request to collect the data."""
def __init__(self, sink: MessageQueue, verbose=False, **kwargs):
super().__init__(sink=sink, verbose=verbose, **kwargs)
self.app = Quart(__name__)
self.app.add_url_rule(
"/api/v1/url", "check_url", self.check_url, methods=["POST"]
)
self.truncate_size = kwargs.get("truncate_size", -1)
self.logger.info("HttpService initialized.")
async def check_url(self):
data = await request.get_json()
if "url" not in data:
return jsonify({"error": "Missing URL parameter"}), 400
url = data["url"]
content_type = data.get("content_type", "")
result = await self._process(url=url, content_type=content_type)
return jsonify({"result": result})
async def _process(self, url: str, content_type: str = "html") -> str:
self.logger.info(f"HttpService received {url} and {content_type}.")
try:
async with aiohttp.ClientSession() as session:
async with session.get(
url, allow_redirects=True, verify_ssl=False
) as response:
content = await response.text()
doc = None
if content_type == "github-repo":
doc = await self._parse_github_repo(url, content)
elif "arxiv.org/abs/" in url:
# Parse the arxiv link. Extract the title, abstract, authors, and link to the paper.
doc = await self._parse_arxiv(url, content)
elif "application/pdf" in content_type:
message = "pdf"
elif content_type in ["html", "blog"]:
elements = partition_html(text=content)
message = "\n".join([str(e) for e in elements])
doc = Information(
type=content_type,
datetime_str=get_datetime(),
id=url,
uri=url,
content=message[: self.truncate_size],
)
else:
return f"unknown content type {content_type}."
if doc:
self.logger.output(doc.encode())
await self._send_data(doc)
return "ok"
except Exception as e:
self.logger.error(f"Error: {e}")
traceback.print_exc()
return "error"
async def run(self):
config = Config()
config.bind = ["0.0.0.0:6543"]
await serve(self.app, config)
async def _cleanup(self):
pass
async def _parse_github_repo(self, url: str, content: str) -> Information:
elements = partition_html(text=content)
message = "\n".join([str(e) for e in elements])
# Only keep the last two sections of the github repo and use it for the id.
parsed_url = urlparse(url)
last_two_segments = parsed_url.path.split("/")[-2:]
id = "/" + "/".join(last_two_segments)
return Information(
type="github-repo",
datetime_str=get_datetime(),
id=id,
uri=url,
content=message[: self.truncate_size],
)
async def _parse_arxiv(self, url: str, content: str) -> Information:
"""Parse the arxiv link. Extract the title, abstract, authors, and link to the paper."""
soup = BeautifulSoup(content, "html.parser")
title = (
soup.find("h1", class_="title mathjax")
.text.strip()
.replace("Title:", "")
.strip()
)
abstract = (
soup.find("blockquote", class_="abstract mathjax")
.text.strip()
.replace("Abstract: ", "")
)
authors = ", ".join(
[
author.text.strip()
for author in soup.find_all("div", class_="authors")[0].find_all("a")
]
)
pdf_link = (
"https://arxiv.org"
+ soup.find("div", class_="full-text").find("a", class_="download-pdf")[
"href"
]
)
doc = Information(
type="arxiv",
datetime_str=get_datetime(),
id=title,
uri=url,
content=f"Title: {title}\n\nAuthors: {authors}\n\nAbstract: {abstract}",
)
return doc
if __name__ == "__main__":
message_queue = SimpleMessageQueue()
http_service = HttpService(sink=message_queue, verbose=True)
asyncio.run(http_service.run())

import atexit
import os
from abc import ABC, abstractmethod
from taotie.entity import Information
from taotie.message_queue import MessageQueue
from taotie.storage.memory import DedupMemory
from taotie.utils import *
class BaseSource(ABC):
"""Base class for all sources.
This class is used to provide a common interface for all sources. It
provides a method to get the source name and a method to get the source data.
"""
def __init__(
self,
sink: MessageQueue,
verbose: bool = False,
dedup_memory: Optional[DedupMemory] = None,
**kwargs,
):
load_dotenv()
if not sink:
raise ValueError("The sink cannot be None.")
self.logger = Logger(logger_name=os.path.basename(__file__), verbose=verbose)
self.verbose = (verbose,)
self.sink = sink
self.dedup_memory = dedup_memory
atexit.register(self._cleanup)
def __str__(self):
return self.__class__.__name__
@abstractmethod
async def _cleanup(self):
"""Clean up the source."""
async def _send_data(self, information: Information) -> bool:
"""This function is used to send the grabbed data to the message queue.
It is supposed to be called within the callback function of the streaming
function or in the forever loop.
Args:
information (Information): The data to send.
Returns:
bool: True if the data is sent successfully, False otherwise.
"""
# Skip duplicate information according to the id.
if self.dedup_memory:
id = information.get_id()
if await self.dedup_memory.exists(id):
self.logger.warning(f"Duplicated information: {id}, will ignore.")
return False
await self.sink.put(information.encode())
# Record the index.
if self.dedup_memory:
await self.dedup_memory.check_and_save(id)
return True
@abstractmethod
async def run(self):
"""This method should wrap the streaming logic or a forever loop."""
raise NotImplementedError


I'm a bot that handles simple bugs and feature requests but I might make mistakes. Please be kind!