一般都是推荐这个

https://github.com/snow922841/telegram_channel_downloader.git

但是这个已经失效了,所以推荐这个

https://github.com/kuaizi/telegram_channel_downloader#telegram_channel_downloader

如果文件无法下载可以直接复制源代码,在文末

使用Python3,需要安装的依赖

cryptg
pillow
aiohttp
hachoir
# !/usr/bin/env python3
import difflib
import os
import re
import time
import asyncio
import asyncio.subprocess
import logging
from telethon import TelegramClient, events, errors
from telethon.tl.types import MessageMediaWebPage

#***********************************************************************************#
api_id = 1234567   # your telegram api id
api_hash = '1234567890abcdefgh'  # your telegram api hash
bot_token = '1234567890:ABCDEFGHIJKLMNOPQRST'  # your bot_token
admin_id = 1234567890  # your chat id
save_path = '/usr/downloads'  # file save path
upload_file_set = True  # set upload file to google drive
drive_id = '5FyJClXmsqNw0-Rz19'  # google teamdrive id
drive_name = 'gc'  # rclone drive name
max_num = 5  # 同时下载数量
# filter file name/文件名过滤
filter_list = ['你好,欢迎加入 Quantumu', '\n']
# filter chat id /过滤某些频道不下载
blacklist = [1388464914,]
donwload_all_chat = False # 监控所有你加入的频道,收到的新消息如果包含媒体都会下载,默认关闭
filter_file_name = ['jpg', ]
#***********************************************************************************#

logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
                    level=logging.WARNING)
logger = logging.getLogger(__name__)
queue = asyncio.Queue()


# 文件夹/文件名称处理
def validateTitle(title):
    r_str = r"[\/\\\:\*\?\"\<\>\|\n]"  # '/ \ : * ? " < > |'
    new_title = re.sub(r_str, "_", title)  # 替换为下划线
    return new_title


# 获取相册标题
async def get_group_caption(message):
    group_caption = ""
    entity = await client.get_entity(message.to_id)
    async for msg in client.iter_messages(entity=entity, reverse=True, offset_id=message.id - 9, limit=10):
        if msg.grouped_id == message.grouped_id:
            if msg.text != "":
                group_caption = msg.text
                return group_caption
    return group_caption


# 获取本地时间
def get_local_time():
    return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())


# 判断相似率
def get_equal_rate(str1, str2):
    return difflib.SequenceMatcher(None, str1, str2).quick_ratio()


# 返回文件大小
def bytes_to_string(byte_count):
    suffix_index = 0
    while byte_count >= 1024:
        byte_count /= 1024
        suffix_index += 1

    return '{:.2f}{}'.format(
        byte_count, [' bytes', 'KB', 'MB', 'GB', 'TB'][suffix_index]
    )


async def worker(name):
    while True:
        queue_item = await queue.get()
        message = queue_item[0]
        chat_title = queue_item[1]
        entity = queue_item[2]
        file_name = queue_item[3]
        for filter_file in filter_file_name:
            if file_name.endswith(filter_file):
                return
        dirname = validateTitle(f'{chat_title}({entity.id})')
        datetime_dir_name = message.date.strftime("%Y年%m月")
        file_save_path = os.path.join(save_path, dirname, datetime_dir_name)
        if not os.path.exists(file_save_path):
            os.makedirs(file_save_path)
        # 判断文件是否在本地存在
        if file_name in os.listdir(file_save_path):
            os.remove(os.path.join(file_save_path, file_name))
        print(f"{get_local_time()} 开始下载: {chat_title} - {file_name}")
        try:
            loop = asyncio.get_event_loop()
            task = loop.create_task(client.download_media(
                message, os.path.join(file_save_path, file_name)))
            await asyncio.wait_for(task, timeout=3600)
            if upload_file_set:
                proc = await asyncio.create_subprocess_exec('fclone',
                                                            'move',
                                                            os.path.join(
                                                                file_save_path, file_name),
                                                            f"{drive_name}:{{{drive_id}}}/{dirname}/{datetime_dir_name}",
                                                            '--ignore-existing',
                                                            stdout=asyncio.subprocess.DEVNULL)
                await proc.wait()
                if proc.returncode == 0:
                    print(f"{get_local_time()} - {file_name} 下载并上传完成")
        except (errors.FileReferenceExpiredError, asyncio.TimeoutError):
            logging.warning(f'{get_local_time()} - {file_name} 出现异常,重新尝试下载!')
            async for new_message in client.iter_messages(entity=entity, offset_id=message.id - 1, reverse=True,
                                                          limit=1):
                await queue.put((new_message, chat_title, entity, file_name))
        except Exception as e:
            print(f"{get_local_time()} - {file_name} {e}")
            await bot.send_message(admin_id, f'Error!\n\n{e}\n\n{file_name}')
        finally:
            queue.task_done()
            # 无论是否上传成功都删除文件。
            if upload_file_set:
                try:
                    os.remove(os.path.join(file_save_path, file_name))
                except:
                    pass


@events.register(events.NewMessage(pattern='/start', from_users=admin_id))
async def handler(update):
    text = update.message.text.split(' ')
    if len(text) == 1:
        await bot.send_message(admin_id, '参数错误,请按照参考格式输入:\n\n '
                                         '<i>/start https://t.me/fkdhlg 0 </i>\n\n'
                                         'Tips:如果不输入offset_id,默认从第一条开始下载。', parse_mode='HTML')
        return
    elif len(text) == 2:
        chat_id = text[1]
        try:
            entity = await client.get_entity(chat_id)
            chat_title = entity.title
            offset_id = 0
            await update.reply(f'开始从{chat_title}的第一条消息下载。')
        except:
            await update.reply('chat输入错误,请输入频道或群组的链接')
            return
    elif len(text) == 3:
        chat_id = text[1]
        offset_id = int(text[2])
        try:
            entity = await client.get_entity(chat_id)
            chat_title = entity.title
            await update.reply(f'开始从{chat_title}的第{offset_id}条消息下载。')
        except:
            await update.reply('chat输入错误,请输入频道或群组的链接')
            return
    else:
        await bot.send_message(admin_id, '参数错误,请按照参考格式输入:\n\n '
                                         '<i>/start https://t.me/fkdhlg 0 </i>\n\n'
                                         'Tips:如果不输入offset_id,默认从第一条开始下载。', parse_mode='HTML')
        return
    if chat_title:
        print(f'{get_local_time()} - 开始下载:{chat_title}({entity.id}) - {offset_id}')
        last_msg_id = 0
        async for message in client.iter_messages(entity, offset_id=offset_id, reverse=True, limit=None):
            if message.media:
                # 如果是一组媒体
                caption = await get_group_caption(message) if (
                    message.grouped_id and message.text == "") else message.text
                # 过滤文件名称中的广告等词语
                if len(filter_list) and caption != "":
                    for filter_keyword in filter_list:
                        caption = caption.replace(filter_keyword, "")
                # 如果文件文件名不是空字符串,则进行过滤和截取,避免文件名过长导致的错误
                caption = "" if caption == "" else f'{validateTitle(caption)} - '[
                    :50]
                file_name = ''
                # 如果是文件
                if message.document:
                    if message.media.document.mime_type == "image/webp":
                        continue
                    if message.media.document.mime_type == "application/x-tgsticker":
                        continue
                    for i in message.document.attributes:
                        try:
                            file_name = i.file_name
                        except:
                            continue
                    if file_name == '':
                        file_name = f'{message.id} - {caption}.{message.document.mime_type.split("/")[-1]}'
                    else:
                        # 如果文件名中已经包含了标题,则过滤标题
                        if get_equal_rate(caption, file_name) > 0.6:
                            caption = ""
                        file_name = f'{message.id} - {caption}{file_name}'
                elif message.photo:
                    file_name = f'{message.id} - {caption}{message.photo.id}.jpg'
                else:
                    continue
                await queue.put((message, chat_title, entity, file_name))
                last_msg_id = message.id
        await bot.send_message(admin_id, f'{chat_title} all message added to task queue, last message is:{last_msg_id}')


@events.register(events.NewMessage())
async def all_chat_download(update):
    message = update.message
    if message.media:
        chat_id = update.message.to_id
        entity = await client.get_entity(chat_id)
        if entity.id in blacklist:
            return
        chat_title = entity.title
        # 如果是一组媒体
        caption = await get_group_caption(message) if (
            message.grouped_id and message.text == "") else message.text
        if caption != "":
            for fw in filter_list:
                caption = caption.replace(fw, '')
        # 如果文件文件名不是空字符串,则进行过滤和截取,避免文件名过长导致的错误
        caption = "" if caption == "" else f'{validateTitle(caption)} - '[:50]
        file_name = ''
        # 如果是文件
        if message.document:
            try:
                if type(message.media) == MessageMediaWebPage:
                    return
                if message.media.document.mime_type == "image/webp":
                    file_name = f'{message.media.document.id}.webp'
                if message.media.document.mime_type == "application/x-tgsticker":
                    file_name = f'{message.media.document.id}.tgs'
                for i in message.document.attributes:
                    try:
                        file_name = i.file_name
                    except:
                        continue
                if file_name == '':
                    file_name = f'{message.id} - {caption}.{message.document.mime_type.split("/")[-1]}'
                else:
                    # 如果文件名中已经包含了标题,则过滤标题
                    if get_equal_rate(caption, file_name) > 0.6:
                        caption = ""
                    file_name = f'{message.id} - {caption}{file_name}'
            except:
                print(message.media)
        elif message.photo:
            file_name = f'{message.id} - {caption}{message.photo.id}.jpg'
        else:
            return
        # 过滤文件名称中的广告等词语
        for filter_keyword in filter_list:
            file_name = file_name.replace(filter_keyword, "")
        print(chat_title, file_name)
        await queue.put((message, chat_title, entity, file_name))


if __name__ == '__main__':
    bot = TelegramClient('telegram_channel_downloader_bot',
                         api_id, api_hash).start(bot_token=str(bot_token))
    client = TelegramClient(
        'telegram_channel_downloader', api_id, api_hash).start()
    bot.add_event_handler(handler)
    if donwload_all_chat:
      client.add_event_handler(all_chat_download)
    tasks = []
    try:
        for i in range(max_num):
            loop = asyncio.get_event_loop()
            task = loop.create_task(worker(f'worker-{i}'))
            tasks.append(task)
        print('Successfully started (Press Ctrl+C to stop)')
        client.run_until_disconnected()
    finally:
        for task in tasks:
            task.cancel()
        client.disconnect()
        print('Stopped!')

安装依赖之后,编辑 tg_channel_downloader.py这个文件,把相应参数填入,需要申请TG的API和机器人,然后联系机器人发送消息进行下载

使用方法

  1. 按照提示输入telegram绑定的手机号获取验证码并输入
  2. 配置完成后需要给bot机器人发送 /start 频道的链接 0 才会正式开始运行脚本,否则无法启动 0代表开始下载消息的ID,可以自行修改。例如/start https://t.me/guomotaotu 3143表示从这个频道的第3143个文件开始下载。
  3. 给bot发送 /ping 如果回复ping 表示脚本正在运行,但是是否在下载还需要自己判断。
  4. 可以手动配置消息偏移 给bot 发送 ‘ /change 123 ’ 代表从第123条消息开始下载,请自行修改数字。

转自Github:https://github.com/kuaizi/telegram_channel_downloader/tree/master

Last modification:July 2, 2023
如果觉得我的文章对你有用,请随意赞赏