1#!/usr/bin/env python
2# pylint: disable=unused-argument
3# This program is dedicated to the public domain under the CC0 license.
4
5"""
6Simple Bot to handle '(my_)chat_member' updates.
7Greets new users & keeps track of which chats the bot is in.
8
9Usage:
10Press Ctrl-C on the command line or send a signal to the process to stop the
11bot.
12"""
13
14import logging
15
16from telegram import Chat, ChatMember, ChatMemberUpdated, Update
17from telegram.constants import ParseMode
18from telegram.ext import (
19 Application,
20 ChatMemberHandler,
21 CommandHandler,
22 ContextTypes,
23 MessageHandler,
24 filters,
25)
26
27# Enable logging
28
29logging.basicConfig(
30 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
31)
32
33# set higher logging level for httpx to avoid all GET and POST requests being logged
34logging.getLogger("httpx").setLevel(logging.WARNING)
35
36logger = logging.getLogger(__name__)
37
38
39def extract_status_change(chat_member_update: ChatMemberUpdated) -> tuple[bool, bool] | None:
40 """Takes a ChatMemberUpdated instance and extracts whether the 'old_chat_member' was a member
41 of the chat and whether the 'new_chat_member' is a member of the chat. Returns None, if
42 the status didn't change.
43 """
44 status_change = chat_member_update.difference().get("status")
45 old_is_member, new_is_member = chat_member_update.difference().get("is_member", (None, None))
46
47 if status_change is None:
48 return None
49
50 old_status, new_status = status_change
51 was_member = old_status in [
52 ChatMember.MEMBER,
53 ChatMember.OWNER,
54 ChatMember.ADMINISTRATOR,
55 ] or (old_status == ChatMember.RESTRICTED and old_is_member is True)
56 is_member = new_status in [
57 ChatMember.MEMBER,
58 ChatMember.OWNER,
59 ChatMember.ADMINISTRATOR,
60 ] or (new_status == ChatMember.RESTRICTED and new_is_member is True)
61
62 return was_member, is_member
63
64
65async def track_chats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
66 """Tracks the chats the bot is in."""
67 result = extract_status_change(update.my_chat_member)
68 if result is None:
69 return
70 was_member, is_member = result
71
72 # Let's check who is responsible for the change
73 cause_name = update.effective_user.full_name
74
75 # Handle chat types differently:
76 chat = update.effective_chat
77 if chat.type == Chat.PRIVATE:
78 if not was_member and is_member:
79 # This may not be really needed in practice because most clients will automatically
80 # send a /start command after the user unblocks the bot, and start_private_chat()
81 # will add the user to "user_ids".
82 # We're including this here for the sake of the example.
83 logger.info("%s unblocked the bot", cause_name)
84 context.bot_data.setdefault("user_ids", set()).add(chat.id)
85 elif was_member and not is_member:
86 logger.info("%s blocked the bot", cause_name)
87 context.bot_data.setdefault("user_ids", set()).discard(chat.id)
88 elif chat.type in [Chat.GROUP, Chat.SUPERGROUP]:
89 if not was_member and is_member:
90 logger.info("%s added the bot to the group %s", cause_name, chat.title)
91 context.bot_data.setdefault("group_ids", set()).add(chat.id)
92 elif was_member and not is_member:
93 logger.info("%s removed the bot from the group %s", cause_name, chat.title)
94 context.bot_data.setdefault("group_ids", set()).discard(chat.id)
95 elif not was_member and is_member:
96 logger.info("%s added the bot to the channel %s", cause_name, chat.title)
97 context.bot_data.setdefault("channel_ids", set()).add(chat.id)
98 elif was_member and not is_member:
99 logger.info("%s removed the bot from the channel %s", cause_name, chat.title)
100 context.bot_data.setdefault("channel_ids", set()).discard(chat.id)
101
102
103async def show_chats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
104 """Shows which chats the bot is in"""
105 user_ids = ", ".join(str(uid) for uid in context.bot_data.setdefault("user_ids", set()))
106 group_ids = ", ".join(str(gid) for gid in context.bot_data.setdefault("group_ids", set()))
107 channel_ids = ", ".join(str(cid) for cid in context.bot_data.setdefault("channel_ids", set()))
108 text = (
109 f"@{context.bot.username} is currently in a conversation with the user IDs {user_ids}."
110 f" Moreover it is a member of the groups with IDs {group_ids} "
111 f"and administrator in the channels with IDs {channel_ids}."
112 )
113 await update.effective_message.reply_text(text)
114
115
116async def greet_chat_members(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
117 """Greets new users in chats and announces when someone leaves"""
118 result = extract_status_change(update.chat_member)
119 if result is None:
120 return
121
122 was_member, is_member = result
123 cause_name = update.chat_member.from_user.mention_html()
124 member_name = update.chat_member.new_chat_member.user.mention_html()
125
126 if not was_member and is_member:
127 await update.effective_chat.send_message(
128 f"{member_name} was added by {cause_name}. Welcome!",
129 parse_mode=ParseMode.HTML,
130 )
131 elif was_member and not is_member:
132 await update.effective_chat.send_message(
133 f"{member_name} is no longer with us. Thanks a lot, {cause_name} ...",
134 parse_mode=ParseMode.HTML,
135 )
136
137
138async def start_private_chat(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
139 """Greets the user and records that they started a chat with the bot if it's a private chat.
140 Since no `my_chat_member` update is issued when a user starts a private chat with the bot
141 for the first time, we have to track it explicitly here.
142 """
143 user_name = update.effective_user.full_name
144 chat = update.effective_chat
145 if chat.type != Chat.PRIVATE or chat.id in context.bot_data.get("user_ids", set()):
146 return
147
148 logger.info("%s started a private chat with the bot", user_name)
149 context.bot_data.setdefault("user_ids", set()).add(chat.id)
150
151 await update.effective_message.reply_text(
152 f"Welcome {user_name}. Use /show_chats to see what chats I'm in."
153 )
154
155
156def main() -> None:
157 """Start the bot."""
158 # Create the Application and pass it your bot's token.
159 application = Application.builder().token("TOKEN").build()
160
161 # Keep track of which chats the bot is in
162 application.add_handler(ChatMemberHandler(track_chats, ChatMemberHandler.MY_CHAT_MEMBER))
163 application.add_handler(CommandHandler("show_chats", show_chats))
164
165 # Handle members joining/leaving chats.
166 application.add_handler(ChatMemberHandler(greet_chat_members, ChatMemberHandler.CHAT_MEMBER))
167
168 # Interpret any other command or text message as a start of a private chat.
169 # This will record the user as being in a private chat with bot.
170 application.add_handler(MessageHandler(filters.ALL, start_private_chat))
171
172 # Run the bot until the user presses Ctrl-C
173 # We pass 'allowed_updates' handle *all* updates including `chat_member` updates
174 # To reset this, simply pass `allowed_updates=[]`
175 application.run_polling(allowed_updates=Update.ALL_TYPES)
176
177
178if __name__ == "__main__":
179 main()