Make the example configurable
Closed this issue · 1 comments
In example.py, we hard-coded the data sources we want to use in the function.
A better solution is to make the data source configurable. To do this, we want to add an argparser that allow us to pass in the flags to control what data sources to use. And we want to use a string flag "data-sources" (or sd for short). The data sources can be passed in as a comma separated string. By default we would only use the data source http request.
Hey @yxjiang,
I've started working on this PR. The plan is to add an argument parser to the script, which will allow you to specify the data sources you want to use. This will be done through a "data-sources" flag where you can pass in the data sources as a comma-separated string. By default, we'll only use the HTTP request data source.
Give me a minute!
Some code snippets I looked at (click to expand). If some file is missing from here, you can mention the path in the ticket description.
Lines 21 to 44 in e5a0ba0
## Example | |
Here's an example of how to use TaoTie to subscribe to Twitter, GitHub, and HTTP sources, summarize the information using an LLM agent, and store the summaries in Notion. | |
The example code can be found in [examples/summarize_to_notion/example.py](examples/summarize_to_notion/example.py). | |
### 1. Set up your environment | |
Create a .env file and add the necessary API tokens: | |
```bash | |
OPENAI_API_KEY=<your OpenAI API key> | |
# Please follow https://developers.notion.com/docs/create-a-notion-integration. | |
NOTION_TOKEN=<your Notion API token> | |
# The id of the page where you want to dump the summary. | |
NOTION_ROOT_PAGE_ID=<the ID of the page where you want to store the summaries> | |
# (Optional) Please follow https://developer.twitter.com/en/portal. | |
TWITTER_BEARER_TOKEN=<your Twitter bearer token> | |
# (Optional) The list of authors whose papers you care about. | |
ARXIV_AUTHORS=Yann LeCun,Kaiming He,Ross Girshick,Piotr Dollár,Alec Radford,Ilya Sutskever,Dario Amodei,Geoffrey E. Hinton | |
``` | |
taotie/taotie/reporter/notion_reporter.py
Lines 98 to 118 in e5a0ba0
""" | |
async def _connect(self): | |
self.notion = AsyncClient(auth=self.token) | |
async def _cleanup(self): | |
print("cleanup") | |
async def _distill(self): | |
"""Grab the gathered knowledge from notion database and generate the text report. | |
Returns: | |
str: The text report. | |
""" | |
doc_list = await self._retrieve_data() | |
self.logger.output(f"Number docs retrieved: {len(doc_list)}\n") | |
self.logger.output(json.dumps(doc_list, indent=2)) | |
report = await self._generate_report(doc_list) | |
self.logger.output(f"{report}\n", color=Fore.BLUE) | |
taotie/taotie/consumer/info_summarizer.py
Lines 18 to 26 in e5a0ba0
def __init__( | |
self, | |
summarize_instruction: str, | |
verbose: bool = False, | |
dedup: bool = False, | |
storage: Optional[Storage] = None, | |
**kwargs, | |
): |
Lines 12 to 55 in e5a0ba0
class Orchestrator: | |
"""The main entry to collect the information from all the sources.""" | |
def __init__(self, verbose: bool = False): | |
self.sources: Dict[str, BaseSource] = {} | |
self.logger = Logger(logger_name=os.path.basename(__file__), verbose=verbose) | |
def add_source(self, source: BaseSource): | |
self.sources[str(source)] = source | |
def set_gatherer(self, gatherer: Gatherer): | |
self.gatherer = gatherer | |
async def run(self): | |
if not self.sources: | |
self.logger.error("No sources are added.") | |
return | |
if not self.gatherer: | |
self.logger.error("No gatherer is set.") | |
return | |
tasks = {} | |
for source in self.sources.values(): | |
tasks[source.__class__.__name__] = asyncio.create_task(source.run()) | |
tasks[self.gatherer.__class__.__name__] = asyncio.create_task( | |
self.gatherer.run() | |
) | |
# Add a signal handler to stop tasks on KeyboardInterrupt | |
loop = asyncio.get_running_loop() | |
for sig in (signal.SIGINT, signal.SIGTERM): | |
loop.add_signal_handler(sig, self.stop_tasks, tasks) | |
await asyncio.gather(*tasks.values()) | |
def stop_tasks(self, tasks): | |
for name, task in tasks.items(): | |
self.logger.info(f"Stopping task {name}...") | |
if isinstance(task, Gatherer): | |
task._running = False | |
task.cancel() | |
asyncio.get_event_loop().stop() |
taotie/examples/summarize_to_notion/example.py
Lines 1 to 75 in e5a0ba0
"""The main entry to collect the information from all the sources. | |
""" | |
import asyncio | |
import os | |
from taotie.consumer.info_summarizer import InfoSummarizer | |
from taotie.gatherer import Gatherer | |
from taotie.message_queue import RedisMessageQueue | |
from taotie.orchestrator import Orchestrator | |
from taotie.sources.arxiv import Arxiv | |
from taotie.sources.github import GithubTrends | |
from taotie.sources.http_service import HttpService | |
from taotie.sources.twitter import TwitterSubscriber | |
from taotie.storage.memory import DedupMemory | |
from taotie.storage.notion import NotionStorage | |
from taotie.utils import load_env | |
def create_notion_summarizer(): | |
load_env() # This has to be called as early as possible. | |
verbose = True | |
batch_size = 1 | |
fetch_interval = 10 | |
redis_url = "taotie-redis" | |
channel_name = "taotie" | |
mq = RedisMessageQueue(redis_url=redis_url, channel_name=channel_name, verbose=True) | |
instruction = None | |
storage = NotionStorage( | |
root_page_id=os.getenv("NOTION_ROOT_PAGE_ID"), verbose=verbose | |
) | |
dedup_memory = DedupMemory(redis_url=redis_url) | |
consumer = InfoSummarizer( | |
buffer_size=1000, | |
summarize_instruction=instruction, | |
verbose=verbose, | |
dedup=False, | |
storage=storage, | |
max_tokens=1000, | |
max_buffer_size=1000, | |
) | |
gatherer = Gatherer( | |
message_queue=mq, | |
consumer=consumer, | |
batch_size=batch_size, | |
fetch_interval=fetch_interval, | |
verbose=verbose, | |
) | |
orchestrator = Orchestrator(verbose=verbose) | |
orchestrator.set_gatherer(gatherer=gatherer) | |
# Http service source. | |
http_service_source = HttpService( | |
sink=mq, verbose=verbose, dedup_memory=dedup_memory, truncate_size=200000 | |
) | |
orchestrator.add_source(http_service_source) | |
# Twitter source. | |
# rules = ["from:RunGreatClasses", "#GPT", "#llm", "#AI", "#AGI", "foundation model"] | |
# twitter_source = TwitterSubscriber(rules=rules, sink=mq, verbose=verbose) | |
# orchestrator.add_source(twitter_source) | |
# Github source. | |
github_source = GithubTrends(sink=mq, verbose=verbose, dedup_memory=dedup_memory) | |
orchestrator.add_source(github_source) | |
# Arxiv source. | |
arxiv_source = Arxiv(sink=mq, verbose=verbose, dedup_memory=dedup_memory) | |
orchestrator.add_source(arxiv_source) | |
asyncio.run(orchestrator.run()) | |
if __name__ == "__main__": | |
create_notion_summarizer() |
I'm a bot that handles simple bugs and feature requests but I might make mistakes. Please be kind!