From 8820812539d1e8e406d6dbeeac7f62836fa1409a Mon Sep 17 00:00:00 2001 From: Hinrich Mahler Date: Tue, 10 Nov 2020 21:57:30 +0100 Subject: [PATCH 1/3] Build a base class for Filters.{via_bot,chat,user} and add Filters.sender_chat --- telegram/ext/filters.py | 604 +++++++++++++++++++--------------------- tests/test_filters.py | 162 +++++++++++ 2 files changed, 445 insertions(+), 321 deletions(-) diff --git a/telegram/ext/filters.py b/telegram/ext/filters.py index 7fc2467050d..d897daf6692 100644 --- a/telegram/ext/filters.py +++ b/telegram/ext/filters.py @@ -38,7 +38,7 @@ NoReturn, ) -from telegram import Chat, Message, MessageEntity, Update +from telegram import Chat, Message, MessageEntity, Update, User __all__ = [ 'Filters', @@ -69,7 +69,7 @@ class BaseFilter(ABC): Exclusive Or: - >>> (Filters.regex('To Be') ^ Filters.regex('Not To Be')) + >>> (Filters.regex('To Be') ^ Filters.regex('Not 2B')) Not: @@ -1238,7 +1238,148 @@ def filter(self, message: Message) -> bool: private: Updates sent in private chat """ - class user(MessageFilter): + class _ChatUserBaseFilter(MessageFilter): + def __init__( + self, + chat_id: SLT[int] = None, + username: SLT[str] = None, + allow_empty: bool = False, + ): + self.chat_id_name = 'chat_id' + self.username_name = 'username' + self.allow_empty = allow_empty + self.__lock = Lock() + + self._chat_ids: Set[int] = set() + self._usernames: Set[str] = set() + + self._set_chat_ids(chat_id) + self._set_usernames(username) + + @abstractmethod + def get_chat_or_user(self, message: Message) -> Union[Chat, User, None]: + pass + + @staticmethod + def _parse_chat_id(chat_id: SLT[int]) -> Set[int]: + if chat_id is None: + return set() + if isinstance(chat_id, int): + return {chat_id} + return set(chat_id) + + @staticmethod + def _parse_username(username: SLT[str]) -> Set[str]: + if username is None: + return set() + if isinstance(username, str): + return {username[1:] if username.startswith('@') else username} + return {chat[1:] if chat.startswith('@') else chat for chat in username} + + def _set_chat_ids(self, chat_id: SLT[int]) -> None: + with self.__lock: + if chat_id and self._usernames: + raise RuntimeError( + f"Can't set {self.chat_id_name} in conjunction with (already set) " + f"{self.username_name}s." + ) + self._chat_ids = self._parse_chat_id(chat_id) + + def _set_usernames(self, username: SLT[str]) -> None: + with self.__lock: + if username and self._chat_ids: + raise RuntimeError( + f"Can't set {self.username_name} in conjunction with (already set) " + f"{self.chat_id_name}s." + ) + self._usernames = self._parse_username(username) + + @property + def chat_ids(self) -> FrozenSet[int]: + with self.__lock: + return frozenset(self._chat_ids) + + @chat_ids.setter + def chat_ids(self, chat_id: SLT[int]) -> None: + self._set_chat_ids(chat_id) + + @property + def usernames(self) -> FrozenSet[str]: + with self.__lock: + return frozenset(self._usernames) + + @usernames.setter + def usernames(self, username: SLT[str]) -> None: + self._set_usernames(username) + + def add_usernames(self, username: SLT[str]) -> None: + with self.__lock: + if self._chat_ids: + raise RuntimeError( + f"Can't set {self.username_name} in conjunction with (already set) " + f"{self.chat_id_name}s." + ) + + parsed_username = self._parse_username(username) + self._usernames |= parsed_username + + def add_chat_ids(self, chat_id: SLT[int]) -> None: + with self.__lock: + if self._usernames: + raise RuntimeError( + f"Can't set {self.chat_id_name} in conjunction with (already set) " + f"{self.username_name}s." + ) + + parsed_chat_id = self._parse_chat_id(chat_id) + + self._chat_ids |= parsed_chat_id + + def remove_usernames(self, username: SLT[str]) -> None: + with self.__lock: + if self._chat_ids: + raise RuntimeError( + f"Can't set {self.username_name} in conjunction with (already set) " + f"{self.chat_id_name}s." + ) + + parsed_username = self._parse_username(username) + self._usernames -= parsed_username + + def remove_chat_ids(self, chat_id: SLT[int]) -> None: + with self.__lock: + if self._usernames: + raise RuntimeError( + f"Can't set {self.chat_id_name} in conjunction with (already set) " + f"{self.username_name}s." + ) + parsed_chat_id = self._parse_chat_id(chat_id) + self._chat_ids -= parsed_chat_id + + def filter(self, message: Message) -> bool: + """""" # remove method from docs + chat_or_user = self.get_chat_or_user(message) + if chat_or_user: + if self.chat_ids: + return chat_or_user.id in self.chat_ids + if self.usernames: + return bool(chat_or_user.username and chat_or_user.username in self.usernames) + return self.allow_empty + return False + + @property + def name(self) -> str: + return ( + f'Filters.{self.__class__.__name__}(' + f'{", ".join(str(s) for s in (self.usernames or self.chat_ids))})' + ) + + @name.setter + def name(self, name: str) -> NoReturn: + raise RuntimeError(f'Cannot set name for Filters.{self.__class__.__name__}') + + class user(_ChatUserBaseFilter): + # pylint: disable=W0235 """Filters messages to allow only those which are from specified user ID(s) or username(s). @@ -1279,64 +1420,19 @@ def __init__( username: SLT[str] = None, allow_empty: bool = False, ): - self.allow_empty = allow_empty - self.__lock = Lock() - - self._user_ids: Set[int] = set() - self._usernames: Set[str] = set() - - self._set_user_ids(user_id) - self._set_usernames(username) - - @staticmethod - def _parse_user_id(user_id: SLT[int]) -> Set[int]: - if user_id is None: - return set() - if isinstance(user_id, int): - return {user_id} - return set(user_id) - - @staticmethod - def _parse_username(username: SLT[str]) -> Set[str]: - if username is None: - return set() - if isinstance(username, str): - return {username[1:] if username.startswith('@') else username} - return {user[1:] if user.startswith('@') else user for user in username} + super().__init__(chat_id=user_id, username=username, allow_empty=allow_empty) + self.chat_id_name = 'user_id' - def _set_user_ids(self, user_id: SLT[int]) -> None: - with self.__lock: - if user_id and self._usernames: - raise RuntimeError( - "Can't set user_id in conjunction with (already set) " "usernames." - ) - self._user_ids = self._parse_user_id(user_id) - - def _set_usernames(self, username: SLT[str]) -> None: - with self.__lock: - if username and self._user_ids: - raise RuntimeError( - "Can't set username in conjunction with (already set) " "user_ids." - ) - self._usernames = self._parse_username(username) + def get_chat_or_user(self, message: Message) -> Optional[User]: + return message.from_user @property def user_ids(self) -> FrozenSet[int]: - with self.__lock: - return frozenset(self._user_ids) + return self.chat_ids @user_ids.setter def user_ids(self, user_id: SLT[int]) -> None: - self._set_user_ids(user_id) - - @property - def usernames(self) -> FrozenSet[str]: - with self.__lock: - return frozenset(self._usernames) - - @usernames.setter - def usernames(self, username: SLT[str]) -> None: - self._set_usernames(username) + self.chat_ids = user_id # type: ignore[assignment] def add_usernames(self, username: SLT[str]) -> None: """ @@ -1347,14 +1443,7 @@ def add_usernames(self, username: SLT[str]) -> None: Which username(s) to allow through. Leading '@'s in usernames will be discarded. """ - with self.__lock: - if self._user_ids: - raise RuntimeError( - "Can't set username in conjunction with (already set) " "user_ids." - ) - - parsed_username = self._parse_username(username) - self._usernames |= parsed_username + return super().add_usernames(username) def add_user_ids(self, user_id: SLT[int]) -> None: """ @@ -1364,15 +1453,7 @@ def add_user_ids(self, user_id: SLT[int]) -> None: user_id(:class:`telegram.utils.types.SLT[int]`, optional): Which user ID(s) to allow through. """ - with self.__lock: - if self._usernames: - raise RuntimeError( - "Can't set user_id in conjunction with (already set) " "usernames." - ) - - parsed_user_id = self._parse_user_id(user_id) - - self._user_ids |= parsed_user_id + return super().add_chat_ids(user_id) def remove_usernames(self, username: SLT[str]) -> None: """ @@ -1383,14 +1464,7 @@ def remove_usernames(self, username: SLT[str]) -> None: Which username(s) to disallow through. Leading '@'s in usernames will be discarded. """ - with self.__lock: - if self._user_ids: - raise RuntimeError( - "Can't set username in conjunction with (already set) " "user_ids." - ) - - parsed_username = self._parse_username(username) - self._usernames -= parsed_username + return super().remove_usernames(username) def remove_user_ids(self, user_id: SLT[int]) -> None: """ @@ -1400,35 +1474,10 @@ def remove_user_ids(self, user_id: SLT[int]) -> None: user_id(:class:`telegram.utils.types.SLT[int]`, optional): Which user ID(s) to disallow through. """ - with self.__lock: - if self._usernames: - raise RuntimeError( - "Can't set user_id in conjunction with (already set) " "usernames." - ) - parsed_user_id = self._parse_user_id(user_id) - self._user_ids -= parsed_user_id + return super().remove_chat_ids(user_id) - def filter(self, message: Message) -> bool: - """""" # remove method from docs - if message.from_user: - if self.user_ids: - return message.from_user.id in self.user_ids - if self.usernames: - return bool( - message.from_user.username and message.from_user.username in self.usernames - ) - return self.allow_empty - return False - - @property - def name(self) -> str: - return f'Filters.user({", ".join(str(s) for s in (self.usernames or self.user_ids))})' - - @name.setter - def name(self, name: str) -> NoReturn: - raise RuntimeError('Cannot set name for Filters.user') - - class via_bot(MessageFilter): + class via_bot(_ChatUserBaseFilter): + # pylint: disable=W0235 """Filters messages to allow only those which are from specified via_bot ID(s) or username(s). @@ -1468,64 +1517,19 @@ def __init__( username: SLT[str] = None, allow_empty: bool = False, ): - self.allow_empty = allow_empty - self.__lock = Lock() + super().__init__(chat_id=bot_id, username=username, allow_empty=allow_empty) + self.chat_id_name = 'bot_id' - self._bot_ids: Set[int] = set() - self._usernames: Set[str] = set() - - self._set_bot_ids(bot_id) - self._set_usernames(username) - - @staticmethod - def _parse_bot_id(bot_id: SLT[int]) -> Set[int]: - if bot_id is None: - return set() - if isinstance(bot_id, int): - return {bot_id} - return set(bot_id) - - @staticmethod - def _parse_username(username: SLT[str]) -> Set[str]: - if username is None: - return set() - if isinstance(username, str): - return {username[1:] if username.startswith('@') else username} - return {bot[1:] if bot.startswith('@') else bot for bot in username} - - def _set_bot_ids(self, bot_id: SLT[int]) -> None: - with self.__lock: - if bot_id and self._usernames: - raise RuntimeError( - "Can't set bot_id in conjunction with (already set) " "usernames." - ) - self._bot_ids = self._parse_bot_id(bot_id) - - def _set_usernames(self, username: SLT[str]) -> None: - with self.__lock: - if username and self._bot_ids: - raise RuntimeError( - "Can't set username in conjunction with (already set) " "bot_ids." - ) - self._usernames = self._parse_username(username) + def get_chat_or_user(self, message: Message) -> Optional[User]: + return message.via_bot @property def bot_ids(self) -> FrozenSet[int]: - with self.__lock: - return frozenset(self._bot_ids) + return self.chat_ids @bot_ids.setter def bot_ids(self, bot_id: SLT[int]) -> None: - self._set_bot_ids(bot_id) - - @property - def usernames(self) -> FrozenSet[str]: - with self.__lock: - return frozenset(self._usernames) - - @usernames.setter - def usernames(self, username: SLT[str]) -> None: - self._set_usernames(username) + self.chat_ids = bot_id # type: ignore[assignment] def add_usernames(self, username: SLT[str]) -> None: """ @@ -1536,14 +1540,7 @@ def add_usernames(self, username: SLT[str]) -> None: Which username(s) to allow through. Leading '@'s in usernames will be discarded. """ - with self.__lock: - if self._bot_ids: - raise RuntimeError( - "Can't set username in conjunction with (already set) " "bot_ids." - ) - - parsed_username = self._parse_username(username) - self._usernames |= parsed_username + return super().add_usernames(username) def add_bot_ids(self, bot_id: SLT[int]) -> None: """ @@ -1554,15 +1551,7 @@ def add_bot_ids(self, bot_id: SLT[int]) -> None: bot_id(:class:`telegram.utils.types.SLT[int]`, optional): Which bot ID(s) to allow through. """ - with self.__lock: - if self._usernames: - raise RuntimeError( - "Can't set bot_id in conjunction with (already set) " "usernames." - ) - - parsed_bot_id = self._parse_bot_id(bot_id) - - self._bot_ids |= parsed_bot_id + return super().add_chat_ids(bot_id) def remove_usernames(self, username: SLT[str]) -> None: """ @@ -1573,14 +1562,7 @@ def remove_usernames(self, username: SLT[str]) -> None: Which username(s) to disallow through. Leading '@'s in usernames will be discarded. """ - with self.__lock: - if self._bot_ids: - raise RuntimeError( - "Can't set username in conjunction with (already set) " "bot_ids." - ) - - parsed_username = self._parse_username(username) - self._usernames -= parsed_username + return super().remove_usernames(username) def remove_bot_ids(self, bot_id: SLT[int]) -> None: """ @@ -1590,36 +1572,10 @@ def remove_bot_ids(self, bot_id: SLT[int]) -> None: bot_id(:class:`telegram.utils.types.SLT[int]`, optional): Which bot ID(s) to disallow through. """ - with self.__lock: - if self._usernames: - raise RuntimeError( - "Can't set bot_id in conjunction with (already set) " "usernames." - ) - parsed_bot_id = self._parse_bot_id(bot_id) - self._bot_ids -= parsed_bot_id - - def filter(self, message: Message) -> bool: - """""" # remove method from docs - if message.via_bot: - if self.bot_ids: - return message.via_bot.id in self.bot_ids - if self.usernames: - return bool( - message.via_bot.username and message.via_bot.username in self.usernames - ) - return self.allow_empty - return False + return super().remove_chat_ids(bot_id) - @property - def name(self) -> str: - entries = [str(s) for s in (self.usernames or self.bot_ids)] - return f'Filters.via_bot({", ".join(entries)})' - - @name.setter - def name(self, name: str) -> NoReturn: - raise RuntimeError('Cannot set name for Filters.via_bot') - - class chat(MessageFilter): + class chat(_ChatUserBaseFilter): + # pylint: disable=W0235 """Filters messages to allow only those which are from a specified chat ID or username. Examples: @@ -1654,158 +1610,164 @@ class chat(MessageFilter): """ - def __init__( - self, - chat_id: SLT[int] = None, - username: SLT[str] = None, - allow_empty: bool = False, - ): - self.allow_empty = allow_empty - self.__lock = Lock() + def get_chat_or_user(self, message: Message) -> Optional[Chat]: + return message.chat - self._chat_ids: Set[int] = set() - self._usernames: Set[str] = set() + def add_usernames(self, username: SLT[str]) -> None: + """ + Add one or more chats to the allowed usernames. - self._set_chat_ids(chat_id) - self._set_usernames(username) + Args: + username(:class:`telegram.utils.types.SLT[str]`, optional): + Which username(s) to allow through. + Leading `'@'` s in usernames will be discarded. + """ + return super().add_usernames(username) - @staticmethod - def _parse_chat_id(chat_id: SLT[int]) -> Set[int]: - if chat_id is None: - return set() - if isinstance(chat_id, int): - return {chat_id} - return set(chat_id) + def add_chat_ids(self, chat_id: SLT[int]) -> None: + """ + Add one or more chats to the allowed chat ids. - @staticmethod - def _parse_username(username: SLT[str]) -> Set[str]: - if username is None: - return set() - if isinstance(username, str): - return {username[1:] if username.startswith('@') else username} - return {chat[1:] if chat.startswith('@') else chat for chat in username} + Args: + chat_id(:class:`telegram.utils.types.SLT[int]`, optional): + Which chat ID(s) to allow through. + """ + return super().add_chat_ids(chat_id) - def _set_chat_ids(self, chat_id: SLT[int]) -> None: - with self.__lock: - if chat_id and self._usernames: - raise RuntimeError( - "Can't set chat_id in conjunction with (already set) " "usernames." - ) - self._chat_ids = self._parse_chat_id(chat_id) + def remove_usernames(self, username: SLT[str]) -> None: + """ + Remove one or more chats from allowed usernames. - def _set_usernames(self, username: SLT[str]) -> None: - with self.__lock: - if username and self._chat_ids: - raise RuntimeError( - "Can't set username in conjunction with (already set) " "chat_ids." - ) - self._usernames = self._parse_username(username) + Args: + username(:class:`telegram.utils.types.SLT[str]`, optional): + Which username(s) to disallow through. + Leading '@'s in usernames will be discarded. + """ + return super().remove_usernames(username) - @property - def chat_ids(self) -> FrozenSet[int]: - with self.__lock: - return frozenset(self._chat_ids) + def remove_chat_ids(self, chat_id: SLT[int]) -> None: + """ + Remove one or more chats from allowed chat ids. - @chat_ids.setter - def chat_ids(self, chat_id: SLT[int]) -> None: - self._set_chat_ids(chat_id) + Args: + chat_id(:class:`telegram.utils.types.SLT[int]`, optional): + Which chat ID(s) to disallow through. + """ + return super().remove_chat_ids(chat_id) - @property - def usernames(self) -> FrozenSet[str]: - with self.__lock: - return frozenset(self._usernames) + class sender_chat(_ChatUserBaseFilter): + # pylint: disable=W0235 + """Filters messages to allow only those which are from a specified sender chats chat ID or + username. - @usernames.setter - def usernames(self, username: SLT[str]) -> None: - self._set_usernames(username) + Examples: + * To filter for messages forwarded from a channel with ID ``-1234``, use + ``MessageHandler(Filters.sender_chat(-1234), callback_method)``. + * To filter for messages of anonymous admins in a super group with username + ``@anonymous``, use + ``MessageHandler(Filters.sender_chat(username='anonymous'), callback_method)``. + * To filter for messages forwarded from *any* channel, use + ``MessageHandler(Filters.sender_chat.channel, callback_method)`` + * To filter for messages of anonymous admins in *any* super group, use + ``MessageHandler(Filters.sender_chat.super_group, callback_method)`` + + Warning: + :attr:`chat_ids` will give a *copy* of the saved chat ids as :class:`frozenset`. This + is to ensure thread safety. To add/remove a chat, you should use :meth:`add_usernames`, + :meth:`add_chat_ids`, :meth:`remove_usernames` and :meth:`remove_chat_ids`. Only update + the entire set by ``filter.chat_ids/usernames = new_set``, if you are entirely sure + that it is not causing race conditions, as this will complete replace the current set + of allowed chats. + + Attributes: + chat_ids(set(:obj:`int`), optional): Which sender chat chat ID(s) to allow through. + usernames(set(:obj:`str`), optional): Which sender chat username(s) (without leading + '@') to allow through. + allow_empty(:obj:`bool`, optional): Whether updates should be processed, if no sender + chat is specified in :attr:`chat_ids` and :attr:`usernames`. + super_group: Messages whose sender chat is a super group. + + Examples: + ``Filters.sender_chat.supergroup`` + channel: Messages whose sender chat is a channel. + + Examples: + ``Filters.sender_chat.channel`` + + Args: + chat_id(:class:`telegram.utils.types.SLT[int]`, optional): + Which sender chat chat ID(s) to allow through. + username(:class:`telegram.utils.types.SLT[str]`, optional): + Which sender chat sername(s) to allow through. + Leading `'@'` s in usernames will be discarded. + allow_empty(:obj:`bool`, optional): Whether updates should be processed, if no sender + chat is specified in :attr:`chat_ids` and :attr:`usernames`. Defaults to + :obj:`False` + + Raises: + RuntimeError: If chat_id and username are both present. + + """ + + def get_chat_or_user(self, message: Message) -> Optional[Chat]: + return message.sender_chat def add_usernames(self, username: SLT[str]) -> None: """ - Add one or more chats to the allowed usernames. + Add one or more sender chats to the allowed usernames. Args: username(:class:`telegram.utils.types.SLT[str]`, optional): - Which username(s) to allow through. + Which sender chat username(s) to allow through. Leading `'@'` s in usernames will be discarded. """ - with self.__lock: - if self._chat_ids: - raise RuntimeError( - "Can't set username in conjunction with (already set) " "chat_ids." - ) - - parsed_username = self._parse_username(username) - self._usernames |= parsed_username + return super().add_usernames(username) def add_chat_ids(self, chat_id: SLT[int]) -> None: """ - Add one or more chats to the allowed chat ids. + Add one or more sender chats to the allowed chat ids. Args: chat_id(:class:`telegram.utils.types.SLT[int]`, optional): - Which chat ID(s) to allow through. + Which sender chat ID(s) to allow through. """ - with self.__lock: - if self._usernames: - raise RuntimeError( - "Can't set chat_id in conjunction with (already set) " "usernames." - ) - - parsed_chat_id = self._parse_chat_id(chat_id) - - self._chat_ids |= parsed_chat_id + return super().add_chat_ids(chat_id) def remove_usernames(self, username: SLT[str]) -> None: """ - Remove one or more chats from allowed usernames. + Remove one or more sender chats from allowed usernames. Args: username(:class:`telegram.utils.types.SLT[str]`, optional): - Which username(s) to disallow through. + Which sender chat username(s) to disallow through. Leading '@'s in usernames will be discarded. """ - with self.__lock: - if self._chat_ids: - raise RuntimeError( - "Can't set username in conjunction with (already set) " "chat_ids." - ) - - parsed_username = self._parse_username(username) - self._usernames -= parsed_username + return super().remove_usernames(username) def remove_chat_ids(self, chat_id: SLT[int]) -> None: """ - Remove one or more chats from allowed chat ids. + Remove one or more sender chats from allowed chat ids. Args: chat_id(:class:`telegram.utils.types.SLT[int]`, optional): - Which chat ID(s) to disallow through. + Which sender chat ID(s) to disallow through. """ - with self.__lock: - if self._usernames: - raise RuntimeError( - "Can't set chat_id in conjunction with (already set) " "usernames." - ) - parsed_chat_id = self._parse_chat_id(chat_id) - self._chat_ids -= parsed_chat_id + return super().remove_chat_ids(chat_id) - def filter(self, message: Message) -> bool: - """""" # remove method from docs - if message.chat: - if self.chat_ids: - return message.chat.id in self.chat_ids - if self.usernames: - return bool(message.chat.username and message.chat.username in self.usernames) - return self.allow_empty - return False + class _SuperGroup(MessageFilter): + def filter(self, message: Message) -> bool: + if message.sender_chat: + return message.sender_chat.type == Chat.SUPERGROUP + return False - @property - def name(self) -> str: - return f'Filters.chat({", ".join(str(s) for s in (self.usernames or self.chat_ids))})' + class _Channel(MessageFilter): + def filter(self, message: Message) -> bool: + if message.sender_chat: + return message.sender_chat.type == Chat.CHANNEL + return False - @name.setter - def name(self, name: str) -> NoReturn: - raise RuntimeError('Cannot set name for Filters.chat') + super_group = _SuperGroup() + channel = _Channel() class _Invoice(MessageFilter): name = 'Filters.invoice' diff --git a/tests/test_filters.py b/tests/test_filters.py index e32c5dd7424..c5eab65cdc2 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -37,6 +37,7 @@ def update(): Chat(0, 'private'), from_user=User(0, 'Testuser', False), via_bot=User(0, "Testbot", True), + sender_chat=Chat(0, 'Channel'), ), ) @@ -1206,6 +1207,167 @@ def test_filters_chat_repr(self): with pytest.raises(RuntimeError, match='Cannot set name'): f.name = 'foo' + def test_filters_sender_chat_init(self): + with pytest.raises(RuntimeError, match='in conjunction with'): + Filters.sender_chat(chat_id=1, username='chat') + + def test_filters_sender_chat_allow_empty(self, update): + assert not Filters.sender_chat()(update) + assert Filters.sender_chat(allow_empty=True)(update) + + def test_filters_sender_chat_id(self, update): + assert not Filters.sender_chat(chat_id=1)(update) + update.message.sender_chat.id = 1 + assert Filters.sender_chat(chat_id=1)(update) + update.message.sender_chat.id = 2 + assert Filters.sender_chat(chat_id=[1, 2])(update) + assert not Filters.sender_chat(chat_id=[3, 4])(update) + update.message.sender_chat = None + assert not Filters.sender_chat(chat_id=[3, 4])(update) + + def test_filters_sender_chat_username(self, update): + assert not Filters.sender_chat(username='chat')(update) + assert not Filters.sender_chat(username='Testchat')(update) + update.message.sender_chat.username = 'chat@' + assert Filters.sender_chat(username='@chat@')(update) + assert Filters.sender_chat(username='chat@')(update) + assert Filters.sender_chat(username=['chat1', 'chat@', 'chat2'])(update) + assert not Filters.sender_chat(username=['@username', '@chat_2'])(update) + update.message.sender_chat = None + assert not Filters.sender_chat(username=['@username', '@chat_2'])(update) + + def test_filters_sender_chat_change_id(self, update): + f = Filters.sender_chat(chat_id=1) + update.message.sender_chat.id = 1 + assert f(update) + update.message.sender_chat.id = 2 + assert not f(update) + f.chat_ids = 2 + assert f(update) + + with pytest.raises(RuntimeError, match='username in conjunction'): + f.usernames = 'chat' + + def test_filters_sender_chat_change_username(self, update): + f = Filters.sender_chat(username='chat') + update.message.sender_chat.username = 'chat' + assert f(update) + update.message.sender_chat.username = 'User' + assert not f(update) + f.usernames = 'User' + assert f(update) + + with pytest.raises(RuntimeError, match='chat_id in conjunction'): + f.chat_ids = 1 + + def test_filters_sender_chat_add_sender_chat_by_name(self, update): + chats = ['chat_a', 'chat_b', 'chat_c'] + f = Filters.sender_chat() + + for chat in chats: + update.message.sender_chat.username = chat + assert not f(update) + + f.add_usernames('chat_a') + f.add_usernames(['chat_b', 'chat_c']) + + for chat in chats: + update.message.sender_chat.username = chat + assert f(update) + + with pytest.raises(RuntimeError, match='chat_id in conjunction'): + f.add_chat_ids(1) + + def test_filters_sender_chat_add_sender_chat_by_id(self, update): + chats = [1, 2, 3] + f = Filters.sender_chat() + + for chat in chats: + update.message.sender_chat.id = chat + assert not f(update) + + f.add_chat_ids(1) + f.add_chat_ids([2, 3]) + + for chat in chats: + update.message.sender_chat.username = chat + assert f(update) + + with pytest.raises(RuntimeError, match='username in conjunction'): + f.add_usernames('chat') + + def test_filters_sender_chat_remove_sender_chat_by_name(self, update): + chats = ['chat_a', 'chat_b', 'chat_c'] + f = Filters.sender_chat(username=chats) + + with pytest.raises(RuntimeError, match='chat_id in conjunction'): + f.remove_chat_ids(1) + + for chat in chats: + update.message.sender_chat.username = chat + assert f(update) + + f.remove_usernames('chat_a') + f.remove_usernames(['chat_b', 'chat_c']) + + for chat in chats: + update.message.sender_chat.username = chat + assert not f(update) + + def test_filters_sender_chat_remove_sender_chat_by_id(self, update): + chats = [1, 2, 3] + f = Filters.sender_chat(chat_id=chats) + + with pytest.raises(RuntimeError, match='username in conjunction'): + f.remove_usernames('chat') + + for chat in chats: + update.message.sender_chat.id = chat + assert f(update) + + f.remove_chat_ids(1) + f.remove_chat_ids([2, 3]) + + for chat in chats: + update.message.sender_chat.username = chat + assert not f(update) + + def test_filters_sender_chat_repr(self): + f = Filters.sender_chat([1, 2]) + assert str(f) == 'Filters.sender_chat(1, 2)' + f.remove_chat_ids(1) + f.remove_chat_ids(2) + assert str(f) == 'Filters.sender_chat()' + f.add_usernames('@foobar') + assert str(f) == 'Filters.sender_chat(foobar)' + f.add_usernames('@barfoo') + assert str(f).startswith('Filters.sender_chat(') + # we don't know th exact order + assert 'barfoo' in str(f) and 'foobar' in str(f) + + with pytest.raises(RuntimeError, match='Cannot set name'): + f.name = 'foo' + + def test_filters_sender_chat_super_group(self, update): + update.message.sender_chat.type = Chat.PRIVATE + assert not Filters.sender_chat.super_group(update) + update.message.sender_chat.type = Chat.CHANNEL + assert not Filters.sender_chat.super_group(update) + update.message.sender_chat.type = Chat.SUPERGROUP + assert Filters.sender_chat.super_group(update) + update.message.sender_chat = None + assert not Filters.sender_chat.super_group(update) + + def test_filters_sender_chat_channel(self, update): + update.message.sender_chat.type = Chat.PRIVATE + assert not Filters.sender_chat.channel(update) + update.message.sender_chat.type = Chat.SUPERGROUP + assert not Filters.sender_chat.channel(update) + update.message.sender_chat.type = Chat.CHANNEL + assert Filters.sender_chat.channel(update) + update.message.sender_chat = None + assert not Filters.sender_chat.channel(update) + def test_filters_invoice(self, update): assert not Filters.invoice(update) update.message.invoice = 'test' From dea95e64337008f230822637909d1ee92c7f72df Mon Sep 17 00:00:00 2001 From: Bibo-Joshi Date: Sat, 14 Nov 2020 03:05:19 +0100 Subject: [PATCH 2/3] Fine tune docs Co-authored-by: Poolitzer <25934244+Poolitzer@users.noreply.github.com> --- telegram/ext/filters.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/telegram/ext/filters.py b/telegram/ext/filters.py index d897daf6692..adacb77ccbb 100644 --- a/telegram/ext/filters.py +++ b/telegram/ext/filters.py @@ -1667,12 +1667,12 @@ class sender_chat(_ChatUserBaseFilter): ``@anonymous``, use ``MessageHandler(Filters.sender_chat(username='anonymous'), callback_method)``. * To filter for messages forwarded from *any* channel, use - ``MessageHandler(Filters.sender_chat.channel, callback_method)`` + ``MessageHandler(Filters.sender_chat.channel, callback_method)``. * To filter for messages of anonymous admins in *any* super group, use - ``MessageHandler(Filters.sender_chat.super_group, callback_method)`` + ``MessageHandler(Filters.sender_chat.super_group, callback_method)``. Warning: - :attr:`chat_ids` will give a *copy* of the saved chat ids as :class:`frozenset`. This + :attr:`chat_ids` will return a *copy* of the saved chat ids as :class:`frozenset`. This is to ensure thread safety. To add/remove a chat, you should use :meth:`add_usernames`, :meth:`add_chat_ids`, :meth:`remove_usernames` and :meth:`remove_chat_ids`. Only update the entire set by ``filter.chat_ids/usernames = new_set``, if you are entirely sure From ef37571e387851f97c9563a4642cadeb66db45e4 Mon Sep 17 00:00:00 2001 From: Hinrich Mahler Date: Wed, 18 Nov 2020 17:51:41 +0100 Subject: [PATCH 3/3] Slightly increase coverage --- tests/test_filters.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/test_filters.py b/tests/test_filters.py index c5eab65cdc2..8c654767efc 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -956,11 +956,13 @@ def test_filters_username(self, update): def test_filters_user_change_id(self, update): f = Filters.user(user_id=1) + assert f.user_ids == {1} update.message.from_user.id = 1 assert f(update) update.message.from_user.id = 2 assert not f(update) f.user_ids = 2 + assert f.user_ids == {2} assert f(update) with pytest.raises(RuntimeError, match='username in conjunction'): @@ -1097,11 +1099,13 @@ def test_filters_chat_username(self, update): def test_filters_chat_change_id(self, update): f = Filters.chat(chat_id=1) + assert f.chat_ids == {1} update.message.chat.id = 1 assert f(update) update.message.chat.id = 2 assert not f(update) f.chat_ids = 2 + assert f.chat_ids == {2} assert f(update) with pytest.raises(RuntimeError, match='username in conjunction'): @@ -1238,11 +1242,13 @@ def test_filters_sender_chat_username(self, update): def test_filters_sender_chat_change_id(self, update): f = Filters.sender_chat(chat_id=1) + assert f.chat_ids == {1} update.message.sender_chat.id = 1 assert f(update) update.message.sender_chat.id = 2 assert not f(update) f.chat_ids = 2 + assert f.chat_ids == {2} assert f(update) with pytest.raises(RuntimeError, match='username in conjunction'): @@ -1768,11 +1774,13 @@ def test_filters_via_bot_username(self, update): def test_filters_via_bot_change_id(self, update): f = Filters.via_bot(bot_id=3) + assert f.bot_ids == {3} update.message.via_bot.id = 3 assert f(update) update.message.via_bot.id = 2 assert not f(update) f.bot_ids = 2 + assert f.bot_ids == {2} assert f(update) with pytest.raises(RuntimeError, match='username in conjunction'):