sockysec/Telerecon

Scrape channel by custom date range

Opened this issue · 1 comments

In option 5 you state it is possible to scrape a channel by a custom date range, how do I go about doing this? I cant see any instructions- thanks

Hey, thanks for the issue report. Apologies, the feature you are referencing has temporarily been removed from the current release as it was causing several issues related to timezones etc (that I never got around to formally solving).

I've included a temp custom working version below that should meet your requirements.

Instructions

  1. Copy the code below
  2. Paste it into the Telerecon/channelscraper.py (overwrite/replace the existing code)
  3. Launch the script
  4. This will give you the desired ability to select a custom date range
import os
import asyncio
import pandas as pd
from datetime import datetime, timezone
from colorama import Fore, Style
from telethon.sync import TelegramClient, types
import details as ds

api_id = ds.apiID
api_hash = ds.apiHash
phone = ds.number


async def scrape_channel_content(channel_name, start_date=None, end_date=None):
    async with TelegramClient(phone, api_id, api_hash) as client:
        try:
            entity = await client.get_entity(channel_name)
            content = []
            post_count = 0

            # Convert string dates to datetime objects and make them timezone-aware
            if start_date:
                start_date = datetime.strptime(start_date, '%Y-%m-%d').replace(tzinfo=timezone.utc)
            if end_date:
                end_date = datetime.strptime(end_date, '%Y-%m-%d').replace(tzinfo=timezone.utc)

            async for post in client.iter_messages(entity):
                # Filter messages by date if start_date or end_date is set
                if start_date and post.date < start_date:
                    continue
                if end_date and post.date > end_date:
                    continue

                post_count += 1
                text = post.text or ""
                username = getattr(post.sender, 'username', 'N/A') or 'N/A'
                first_name = getattr(post.sender, 'first_name', 'N/A') or 'N/A'
                last_name = getattr(post.sender, 'last_name', 'N/A') or 'N/A'
                user_id = getattr(post.sender, 'id', 'N/A')

                views = post.views or "N/A"
                message_url = f"https://t.me/{channel_name}/{post.id}"

                content.append((text, username, first_name, last_name, user_id, views, message_url))

                if post_count % 10 == 0:
                    print(
                        f"{Fore.WHITE}{post_count} Posts scraped in {Fore.LIGHTYELLOW_EX}{channel_name}{Style.RESET_ALL}")

            return content

        except Exception as e:
            print(f"An error occurred: {Fore.RED}{e}{Style.RESET_ALL}")
            return []


async def main():
    try:
        channel_name = input(f"{Fore.CYAN}Please enter a target Telegram channel:\n")
        print(f'You entered "{Fore.LIGHTYELLOW_EX}{channel_name}{Style.RESET_ALL}"')
        start_date = input("Enter start date (YYYY-MM-DD) or leave blank for no start date: ")
        end_date = input("Enter end date (YYYY-MM-DD) or leave blank for no end date: ")

        output_directory = f"Collection/{channel_name}"
        if not os.path.exists(output_directory):
            os.makedirs(output_directory)

        csv_filename = f'{output_directory}/{channel_name}_messages.csv'
        print(f'Scraping content from {Fore.LIGHTYELLOW_EX}{channel_name}{Style.RESET_ALL}...')

        content = await scrape_channel_content(channel_name, start_date, end_date)

        if content:
            df = pd.DataFrame(content, columns=['Text', 'Username', 'First Name', 'Last Name', 'User ID', 'Views', 'Message URL'])
            df.to_csv(csv_filename, index=False)
            print(f'Successfully scraped and saved content to {Fore.LIGHTYELLOW_EX}{csv_filename}{Style.RESET_ALL}.')
        else:
            print(f'{Fore.RED}No content scraped.{Style.RESET_ALL}')

    except Exception as e:
        print(f"An error occurred: {Fore.RED}{e}{Style.RESET_ALL}")

if __name__ == '__main__':
    asyncio.run(main())