diff --git a/telegram/bot.py b/telegram/bot.py index ee6ba4e10d5..d72c21b0571 100644 --- a/telegram/bot.py +++ b/telegram/bot.py @@ -36,7 +36,6 @@ Tuple, TypeVar, Union, - cast, no_type_check, ) @@ -63,7 +62,6 @@ File, GameHighScore, InlineQueryResult, - InputFile, InputMedia, LabeledPrice, Location, @@ -94,9 +92,15 @@ ) from telegram.constants import MAX_INLINE_QUERY_RESULTS from telegram.error import InvalidToken, TelegramError -from telegram.utils.helpers import DEFAULT_NONE, DefaultValue, to_timestamp +from telegram.utils.helpers import ( + DEFAULT_NONE, + DefaultValue, + to_timestamp, + is_local_file, + parse_file_input, +) from telegram.utils.request import Request -from telegram.utils.types import FileLike, JSONDict +from telegram.utils.types import FileInput, JSONDict if TYPE_CHECKING: from telegram.ext import Defaults @@ -582,7 +586,8 @@ def send_photo( Args: chat_id (:obj:`int` | :obj:`str`): Unique identifier for the target chat or username of the target channel (in the format @channelusername). - photo (:obj:`str` | `filelike object` | :class:`telegram.PhotoSize`): Photo to send. + photo (:obj:`str` | `filelike object` | :class:`pathlib.Path` | \ + :class:`telegram.PhotoSize`): Photo to send. Pass a file_id as String to send a photo that exists on the Telegram servers (recommended), pass an HTTP URL as a String for Telegram to get a photo from the Internet, or upload a new photo using multipart/form-data. Lastly you can pass @@ -615,13 +620,7 @@ def send_photo( :class:`telegram.TelegramError` """ - if isinstance(photo, PhotoSize): - photo = photo.file_id - elif InputFile.is_file(photo): - photo = cast(IO, photo) - photo = InputFile(photo) # type: ignore[assignment] - - data: JSONDict = {'chat_id': chat_id, 'photo': photo} + data: JSONDict = {'chat_id': chat_id, 'photo': parse_file_input(photo, PhotoSize)} if caption: data['caption'] = caption @@ -645,7 +644,7 @@ def send_photo( def send_audio( self, chat_id: Union[int, str], - audio: Union[str, Audio, FileLike], + audio: Union[FileInput, Audio], duration: int = None, performer: str = None, title: str = None, @@ -655,7 +654,7 @@ def send_audio( reply_markup: ReplyMarkup = None, timeout: float = 20, parse_mode: str = None, - thumb: FileLike = None, + thumb: FileInput = None, api_kwargs: JSONDict = None, allow_sending_without_reply: bool = None, caption_entities: Union[List[MessageEntity], Tuple[MessageEntity, ...]] = None, @@ -676,7 +675,8 @@ def send_audio( Args: chat_id (:obj:`int` | :obj:`str`): Unique identifier for the target chat or username of the target channel (in the format @channelusername). - audio (:obj:`str` | `filelike object` | :class:`telegram.Audio`): Audio file to send. + audio (:obj:`str` | `filelike object` | :class:`pathlib.Path` | \ + :class:`telegram.Audio`): Audio file to send. Pass a file_id as String to send an audio file that exists on the Telegram servers (recommended), pass an HTTP URL as a String for Telegram to get an audio file from the Internet, or upload a new one using multipart/form-data. Lastly you can pass @@ -701,7 +701,8 @@ def send_audio( reply_markup (:class:`telegram.ReplyMarkup`, optional): Additional interface options. A JSON-serialized object for an inline keyboard, custom reply keyboard, instructions to remove reply keyboard or to force a reply from the user. - thumb (`filelike object`, optional): Thumbnail of the file sent; can be ignored if + thumb (`filelike object` | :class:`pathlib.Path`, optional): Thumbnail of the file + sent; can be ignored if thumbnail generation for the file is supported server-side. The thumbnail should be in JPEG format and less than 200 kB in size. A thumbnail's width and height should not exceed 320. Ignored if the file is not uploaded using multipart/form-data. @@ -717,13 +718,7 @@ def send_audio( :class:`telegram.TelegramError` """ - if isinstance(audio, Audio): - audio = audio.file_id - elif InputFile.is_file(audio): - audio = cast(IO, audio) - audio = InputFile(audio) - - data: JSONDict = {'chat_id': chat_id, 'audio': audio} + data: JSONDict = {'chat_id': chat_id, 'audio': parse_file_input(audio, Audio)} if duration: data['duration'] = duration @@ -738,10 +733,7 @@ def send_audio( if caption_entities: data['caption_entities'] = [me.to_dict() for me in caption_entities] if thumb: - if InputFile.is_file(thumb): - thumb = cast(IO, thumb) - thumb = InputFile(thumb, attach=True) - data['thumb'] = thumb + data['thumb'] = parse_file_input(thumb, attach=True) return self._message( # type: ignore[return-value] 'sendAudio', @@ -758,7 +750,7 @@ def send_audio( def send_document( self, chat_id: Union[int, str], - document: Union[str, Document, FileLike], + document: Union[FileInput, Document], filename: str = None, caption: str = None, disable_notification: bool = False, @@ -766,7 +758,7 @@ def send_document( reply_markup: ReplyMarkup = None, timeout: float = 20, parse_mode: str = None, - thumb: FileLike = None, + thumb: FileInput = None, api_kwargs: JSONDict = None, disable_content_type_detection: bool = None, allow_sending_without_reply: bool = None, @@ -785,7 +777,8 @@ def send_document( Args: chat_id (:obj:`int` | :obj:`str`): Unique identifier for the target chat or username of the target channel (in the format @channelusername). - document (:obj:`str` | `filelike object` | :class:`telegram.Document`): File to send. + document (:obj:`str` | `filelike object` | :class:`pathlib.Path` | \ + :class:`telegram.Document`): File to send. Pass a file_id as String to send a file that exists on the Telegram servers (recommended), pass an HTTP URL as a String for Telegram to get a file from the Internet, or upload a new one using multipart/form-data. Lastly you can pass @@ -811,7 +804,8 @@ def send_document( reply_markup (:class:`telegram.ReplyMarkup`, optional): Additional interface options. A JSON-serialized object for an inline keyboard, custom reply keyboard, instructions to remove reply keyboard or to force a reply from the user. - thumb (`filelike object`, optional): Thumbnail of the file sent; can be ignored if + thumb (`filelike object` | :class:`pathlib.Path`, optional): Thumbnail of the file + sent; can be ignored if thumbnail generation for the file is supported server-side. The thumbnail should be in JPEG format and less than 200 kB in size. A thumbnail's width and height should not exceed 320. Ignored if the file is not uploaded using multipart/form-data. @@ -827,13 +821,10 @@ def send_document( :class:`telegram.TelegramError` """ - if isinstance(document, Document): - document = document.file_id - elif InputFile.is_file(document): - document = cast(IO, document) - document = InputFile(document, filename=filename) - - data: JSONDict = {'chat_id': chat_id, 'document': document} + data: JSONDict = { + 'chat_id': chat_id, + 'document': parse_file_input(document, Document, filename=filename), + } if caption: data['caption'] = caption @@ -844,10 +835,7 @@ def send_document( if disable_content_type_detection is not None: data['disable_content_type_detection'] = disable_content_type_detection if thumb: - if InputFile.is_file(thumb): - thumb = cast(IO, thumb) - thumb = InputFile(thumb, attach=True) - data['thumb'] = thumb + data['thumb'] = parse_file_input(thumb, attach=True) return self._message( # type: ignore[return-value] 'sendDocument', @@ -864,7 +852,7 @@ def send_document( def send_sticker( self, chat_id: Union[int, str], - sticker: Union[str, Sticker, FileLike], + sticker: Union[FileInput, Sticker], disable_notification: bool = False, reply_to_message_id: Union[int, str] = None, reply_markup: ReplyMarkup = None, @@ -882,7 +870,8 @@ def send_sticker( Args: chat_id (:obj:`int` | :obj:`str`): Unique identifier for the target chat or username of the target channel (in the format @channelusername). - sticker (:obj:`str` | `filelike object` :class:`telegram.Sticker`): Sticker to send. + sticker (:obj:`str` | `filelike object` | :class:`pathlib.Path` | \ + :class:`telegram.Sticker`): Sticker to send. Pass a file_id as String to send a file that exists on the Telegram servers (recommended), pass an HTTP URL as a String for Telegram to get a .webp file from the Internet, or upload a new one using multipart/form-data. Lastly you can pass @@ -907,13 +896,7 @@ def send_sticker( :class:`telegram.TelegramError` """ - if isinstance(sticker, Sticker): - sticker = sticker.file_id - elif InputFile.is_file(sticker): - sticker = cast(IO, sticker) - sticker = InputFile(sticker) - - data: JSONDict = {'chat_id': chat_id, 'sticker': sticker} + data: JSONDict = {'chat_id': chat_id, 'sticker': parse_file_input(sticker, Sticker)} return self._message( # type: ignore[return-value] 'sendSticker', @@ -930,7 +913,7 @@ def send_sticker( def send_video( self, chat_id: Union[int, str], - video: Union[str, Video, FileLike], + video: Union[FileInput, Video], duration: int = None, caption: str = None, disable_notification: bool = False, @@ -941,7 +924,7 @@ def send_video( height: int = None, parse_mode: str = None, supports_streaming: bool = None, - thumb: FileLike = None, + thumb: FileInput = None, api_kwargs: JSONDict = None, allow_sending_without_reply: bool = None, caption_entities: Union[List[MessageEntity], Tuple[MessageEntity, ...]] = None, @@ -963,7 +946,8 @@ def send_video( Args: chat_id (:obj:`int` | :obj:`str`): Unique identifier for the target chat or username of the target channel (in the format @channelusername). - video (:obj:`str` | `filelike object` | :class:`telegram.Video`): Video file to send. + video (:obj:`str` | `filelike object` | :class:`pathlib.Path` | \ + :class:`telegram.Video`): Video file to send. Pass a file_id as String to send an video file that exists on the Telegram servers (recommended), pass an HTTP URL as a String for Telegram to get an video file from the Internet, or upload a new one using multipart/form-data. Lastly you can pass @@ -990,7 +974,8 @@ def send_video( reply_markup (:class:`telegram.ReplyMarkup`, optional): Additional interface options. A JSON-serialized object for an inline keyboard, custom reply keyboard, instructions to remove reply keyboard or to force a reply from the user. - thumb (`filelike object`, optional): Thumbnail of the file sent; can be ignored if + thumb (`filelike object` | :class:`pathlib.Path`, optional): Thumbnail of the file + sent; can be ignored if thumbnail generation for the file is supported server-side. The thumbnail should be in JPEG format and less than 200 kB in size. A thumbnail's width and height should not exceed 320. Ignored if the file is not uploaded using multipart/form-data. @@ -1006,13 +991,7 @@ def send_video( :class:`telegram.TelegramError` """ - if isinstance(video, Video): - video = video.file_id - elif InputFile.is_file(video): - video = cast(IO, video) - video = InputFile(video) - - data: JSONDict = {'chat_id': chat_id, 'video': video} + data: JSONDict = {'chat_id': chat_id, 'video': parse_file_input(video, Video)} if duration: data['duration'] = duration @@ -1029,10 +1008,7 @@ def send_video( if height: data['height'] = height if thumb: - if InputFile.is_file(thumb): - thumb = cast(IO, thumb) - thumb = InputFile(thumb, attach=True) - data['thumb'] = thumb + data['thumb'] = parse_file_input(thumb, attach=True) return self._message( # type: ignore[return-value] 'sendVideo', @@ -1049,14 +1025,14 @@ def send_video( def send_video_note( self, chat_id: Union[int, str], - video_note: Union[str, FileLike, VideoNote], + video_note: Union[FileInput, VideoNote], duration: int = None, length: int = None, disable_notification: bool = False, reply_to_message_id: Union[int, str] = None, reply_markup: ReplyMarkup = None, timeout: float = 20, - thumb: FileLike = None, + thumb: FileInput = None, api_kwargs: JSONDict = None, allow_sending_without_reply: bool = None, ) -> Optional[Message]: @@ -1074,7 +1050,8 @@ def send_video_note( Args: chat_id (:obj:`int` | :obj:`str`): Unique identifier for the target chat or username of the target channel (in the format @channelusername). - video_note (:obj:`str` | `filelike object` | :class:`telegram.VideoNote`): Video note + video_note (:obj:`str` | `filelike object` | :class:`pathlib.Path` | \ + :class:`telegram.VideoNote`): Video note to send. Pass a file_id as String to send a video note that exists on the Telegram servers (recommended) or upload a new video using multipart/form-data. Or you can pass an existing :class:`telegram.VideoNote` object to send. Sending video notes by @@ -1091,7 +1068,8 @@ def send_video_note( reply_markup (:class:`telegram.ReplyMarkup`, optional): Additional interface options. A JSON-serialized object for an inline keyboard, custom reply keyboard, instructions to remove reply keyboard or to force a reply from the user. - thumb (`filelike object`, optional): Thumbnail of the file sent; can be ignored if + thumb (`filelike object` | :class:`pathlib.Path`, optional): Thumbnail of the file + sent; can be ignored if thumbnail generation for the file is supported server-side. The thumbnail should be in JPEG format and less than 200 kB in size. A thumbnail's width and height should not exceed 320. Ignored if the file is not uploaded using multipart/form-data. @@ -1107,23 +1085,17 @@ def send_video_note( :class:`telegram.TelegramError` """ - if isinstance(video_note, VideoNote): - video_note = video_note.file_id - elif InputFile.is_file(video_note): - video_note = cast(IO, video_note) - video_note = InputFile(video_note) - - data: JSONDict = {'chat_id': chat_id, 'video_note': video_note} + data: JSONDict = { + 'chat_id': chat_id, + 'video_note': parse_file_input(video_note, VideoNote), + } if duration is not None: data['duration'] = duration if length is not None: data['length'] = length if thumb: - if InputFile.is_file(thumb): - thumb = cast(IO, thumb) - thumb = InputFile(thumb, attach=True) - data['thumb'] = thumb + data['thumb'] = parse_file_input(thumb, attach=True) return self._message( # type: ignore[return-value] 'sendVideoNote', @@ -1140,11 +1112,11 @@ def send_video_note( def send_animation( self, chat_id: Union[int, str], - animation: Union[str, FileLike, Animation], + animation: Union[FileInput, Animation], duration: int = None, width: int = None, height: int = None, - thumb: FileLike = None, + thumb: FileInput = None, caption: str = None, parse_mode: str = None, disable_notification: bool = False, @@ -1168,7 +1140,8 @@ def send_animation( Args: chat_id (:obj:`int` | :obj:`str`): Unique identifier for the target chat or username of the target channel (in the format @channelusername). - animation (:obj:`str` | `filelike object` | :class:`telegram.Animation`): Animation to + animation (:obj:`str` | `filelike object` | :class:`pathlib.Path` | \ + :class:`telegram.Animation`): Animation to send. Pass a file_id as String to send an animation that exists on the Telegram servers (recommended), pass an HTTP URL as a String for Telegram to get an animation from the Internet, or upload a new animation using multipart/form-data. @@ -1176,7 +1149,8 @@ def send_animation( duration (:obj:`int`, optional): Duration of sent animation in seconds. width (:obj:`int`, optional): Animation width. height (:obj:`int`, optional): Animation height. - thumb (`filelike object`, optional): Thumbnail of the file sent; can be ignored if + thumb (`filelike object` | :class:`pathlib.Path`, optional): Thumbnail of the file + sent; can be ignored if thumbnail generation for the file is supported server-side. The thumbnail should be in JPEG format and less than 200 kB in size. A thumbnail's width and height should not exceed 320. Ignored if the file is not uploaded using multipart/form-data. @@ -1209,13 +1183,7 @@ def send_animation( :class:`telegram.TelegramError` """ - if isinstance(animation, Animation): - animation = animation.file_id - elif InputFile.is_file(animation): - animation = cast(IO, animation) - animation = InputFile(animation) - - data: JSONDict = {'chat_id': chat_id, 'animation': animation} + data: JSONDict = {'chat_id': chat_id, 'animation': parse_file_input(animation, Animation)} if duration: data['duration'] = duration @@ -1224,10 +1192,7 @@ def send_animation( if height: data['height'] = height if thumb: - if InputFile.is_file(thumb): - thumb = cast(IO, thumb) - thumb = InputFile(thumb, attach=True) - data['thumb'] = thumb + data['thumb'] = parse_file_input(thumb, attach=True) if caption: data['caption'] = caption if parse_mode: @@ -1250,7 +1215,7 @@ def send_animation( def send_voice( self, chat_id: Union[int, str], - voice: Union[str, FileLike, Voice], + voice: Union[FileInput, Voice], duration: int = None, caption: str = None, disable_notification: bool = False, @@ -1275,7 +1240,8 @@ def send_voice( Args: chat_id (:obj:`int` | :obj:`str`): Unique identifier for the target chat or username of the target channel (in the format @channelusername). - voice (:obj:`str` | `filelike object` | :class:`telegram.Voice`): Voice file to send. + voice (:obj:`str` | `filelike object` | :class:`pathlib.Path` | \ + :class:`telegram.Voice`): Voice file to send. Pass a file_id as String to send an voice file that exists on the Telegram servers (recommended), pass an HTTP URL as a String for Telegram to get an voice file from the Internet, or upload a new one using multipart/form-data. Lastly you can pass @@ -1309,13 +1275,7 @@ def send_voice( :class:`telegram.TelegramError` """ - if isinstance(voice, Voice): - voice = voice.file_id - elif InputFile.is_file(voice): - voice = cast(IO, voice) - voice = InputFile(voice) - - data: JSONDict = {'chat_id': chat_id, 'voice': voice} + data: JSONDict = {'chat_id': chat_id, 'voice': parse_file_input(voice, Voice)} if duration: data['duration'] = duration @@ -2155,9 +2115,11 @@ def get_file( result = self._post('getFile', data, timeout=timeout, api_kwargs=api_kwargs) - if result.get('file_path'): # type: ignore - result['file_path'] = '{}/{}'.format( # type: ignore - self.base_file_url, result['file_path'] # type: ignore + if result.get('file_path') and not is_local_file( # type: ignore[union-attr] + result['file_path'] # type: ignore[index] + ): + result['file_path'] = '{}/{}'.format( # type: ignore[index] + self.base_file_url, result['file_path'] # type: ignore[index] ) return File.de_json(result, self) # type: ignore @@ -2680,7 +2642,7 @@ def get_updates( def set_webhook( self, url: str = None, - certificate: FileLike = None, + certificate: FileInput = None, timeout: float = None, max_connections: int = 40, allowed_updates: List[str] = None, @@ -2754,10 +2716,7 @@ def set_webhook( if url is not None: data['url'] = url if certificate: - if InputFile.is_file(certificate): - certificate = cast(IO, certificate) - certificate = InputFile(certificate) - data['certificate'] = certificate + data['certificate'] = parse_file_input(certificate) if max_connections is not None: data['max_connections'] = max_connections if allowed_updates is not None: @@ -3671,7 +3630,7 @@ def export_chat_invite_link( def set_chat_photo( self, chat_id: Union[str, int], - photo: FileLike, + photo: FileInput, timeout: float = 20, api_kwargs: JSONDict = None, ) -> bool: @@ -3683,7 +3642,7 @@ def set_chat_photo( Args: chat_id (:obj:`int` | :obj:`str`): Unique identifier for the target chat or username of the target channel (in the format @channelusername). - photo (`filelike object`): New chat photo. + photo (`filelike object` | :class:`pathlib.Path`): New chat photo. timeout (:obj:`int` | :obj:`float`, optional): If this value is specified, use it as the read timeout from the server (instead of the one specified during creation of the connection pool). @@ -3697,11 +3656,7 @@ def set_chat_photo( :class:`telegram.TelegramError` """ - if InputFile.is_file(photo): - photo = cast(IO, photo) - photo = InputFile(photo) - - data: JSONDict = {'chat_id': chat_id, 'photo': photo} + data: JSONDict = {'chat_id': chat_id, 'photo': parse_file_input(photo)} result = self._post('setChatPhoto', data, timeout=timeout, api_kwargs=api_kwargs) @@ -3962,7 +3917,7 @@ def get_sticker_set( def upload_sticker_file( self, user_id: Union[str, int], - png_sticker: Union[str, FileLike], + png_sticker: FileInput, timeout: float = 20, api_kwargs: JSONDict = None, ) -> File: @@ -3977,7 +3932,8 @@ def upload_sticker_file( Args: user_id (:obj:`int`): User identifier of sticker file owner. - png_sticker (:obj:`str` | `filelike object`): Png image with the sticker, + png_sticker (:obj:`str` | `filelike object` | :class:`pathlib.Path`): Png image with + the sticker, must be up to 512 kilobytes in size, dimensions must not exceed 512px, and either width or height must be exactly 512px. timeout (:obj:`int` | :obj:`float`, optional): If this value is specified, use it as @@ -3993,10 +3949,7 @@ def upload_sticker_file( :class:`telegram.TelegramError` """ - if InputFile.is_file(png_sticker): - png_sticker = InputFile(png_sticker) # type: ignore[assignment,arg-type] - - data: JSONDict = {'user_id': user_id, 'png_sticker': png_sticker} + data: JSONDict = {'user_id': user_id, 'png_sticker': parse_file_input(png_sticker)} result = self._post('uploadStickerFile', data, timeout=timeout, api_kwargs=api_kwargs) @@ -4009,11 +3962,11 @@ def create_new_sticker_set( name: str, title: str, emojis: str, - png_sticker: Union[str, FileLike] = None, + png_sticker: FileInput = None, contains_masks: bool = None, mask_position: MaskPosition = None, timeout: float = 20, - tgs_sticker: Union[str, FileLike] = None, + tgs_sticker: FileInput = None, api_kwargs: JSONDict = None, ) -> bool: """ @@ -4038,13 +3991,15 @@ def create_new_sticker_set( must end in "_by_". is case insensitive. 1-64 characters. title (:obj:`str`): Sticker set title, 1-64 characters. - png_sticker (:obj:`str` | `filelike object`, optional): Png image with the sticker, + png_sticker (:obj:`str` | `filelike object` | :class:`pathlib.Path`, optional): Png + image with the sticker, must be up to 512 kilobytes in size, dimensions must not exceed 512px, and either width or height must be exactly 512px. Pass a file_id as a String to send a file that already exists on the Telegram servers, pass an HTTP URL as a String for Telegram to get a file from the Internet, or upload a new one using multipart/form-data. - tgs_sticker (:obj:`str` | `filelike object`, optional): TGS animation with the sticker, + tgs_sticker (:obj:`str` | `filelike object` | :class:`pathlib.Path`, optional): TGS + animation with the sticker, uploaded using multipart/form-data. See https://core.telegram.org/animated_stickers#technical-requirements for technical requirements. @@ -4066,18 +4021,12 @@ def create_new_sticker_set( :class:`telegram.TelegramError` """ - if InputFile.is_file(png_sticker): - png_sticker = InputFile(png_sticker) # type: ignore[assignment,arg-type] - - if InputFile.is_file(tgs_sticker): - tgs_sticker = InputFile(tgs_sticker) # type: ignore[assignment,arg-type] - data: JSONDict = {'user_id': user_id, 'name': name, 'title': title, 'emojis': emojis} if png_sticker is not None: - data['png_sticker'] = png_sticker + data['png_sticker'] = parse_file_input(png_sticker) if tgs_sticker is not None: - data['tgs_sticker'] = tgs_sticker + data['tgs_sticker'] = parse_file_input(tgs_sticker) if contains_masks is not None: data['contains_masks'] = contains_masks if mask_position is not None: @@ -4095,10 +4044,10 @@ def add_sticker_to_set( user_id: Union[str, int], name: str, emojis: str, - png_sticker: Union[str, FileLike] = None, + png_sticker: FileInput = None, mask_position: MaskPosition = None, timeout: float = 20, - tgs_sticker: Union[str, FileLike] = None, + tgs_sticker: FileInput = None, api_kwargs: JSONDict = None, ) -> bool: """ @@ -4119,13 +4068,15 @@ def add_sticker_to_set( Args: user_id (:obj:`int`): User identifier of created sticker set owner. name (:obj:`str`): Sticker set name. - png_sticker (:obj:`str` | `filelike object`, optional): PNG image with the sticker, + png_sticker (:obj:`str` | `filelike object` | :class:`pathlib.Path`, optional): PNG + image with the sticker, must be up to 512 kilobytes in size, dimensions must not exceed 512px, and either width or height must be exactly 512px. Pass a file_id as a String to send a file that already exists on the Telegram servers, pass an HTTP URL as a String for Telegram to get a file from the Internet, or upload a new one using multipart/form-data. - tgs_sticker (:obj:`str` | `filelike object`, optional): TGS animation with the sticker, + tgs_sticker (:obj:`str` | `filelike object` | :class:`pathlib.Path`, optional): TGS + animation with the sticker, uploaded using multipart/form-data. See https://core.telegram.org/animated_stickers#technical-requirements for technical requirements. @@ -4145,18 +4096,12 @@ def add_sticker_to_set( :class:`telegram.TelegramError` """ - if InputFile.is_file(png_sticker): - png_sticker = InputFile(png_sticker) # type: ignore[assignment,arg-type] - - if InputFile.is_file(tgs_sticker): - tgs_sticker = InputFile(tgs_sticker) # type: ignore[assignment,arg-type] - data: JSONDict = {'user_id': user_id, 'name': name, 'emojis': emojis} if png_sticker is not None: - data['png_sticker'] = png_sticker + data['png_sticker'] = parse_file_input(png_sticker) if tgs_sticker is not None: - data['tgs_sticker'] = tgs_sticker + data['tgs_sticker'] = parse_file_input(tgs_sticker) if mask_position is not None: # We need to_json() instead of to_dict() here, because we're sending a media # message here, which isn't json dumped by utils.request @@ -4228,7 +4173,7 @@ def set_sticker_set_thumb( self, name: str, user_id: Union[str, int], - thumb: FileLike = None, + thumb: FileInput = None, timeout: float = None, api_kwargs: JSONDict = None, ) -> bool: @@ -4241,7 +4186,8 @@ def set_sticker_set_thumb( Args: name (:obj:`str`): Sticker set name user_id (:obj:`int`): User identifier of created sticker set owner. - thumb (:obj:`str` | `filelike object`, optional): A PNG image with the thumbnail, must + thumb (:obj:`str` | `filelike object` | :class:`pathlib.Path`, optional): A PNG image + with the thumbnail, must be up to 128 kilobytes in size and have width and height exactly 100px, or a TGS animation with the thumbnail up to 32 kilobytes in size; see https://core.telegram.org/animated_stickers#technical-requirements for animated @@ -4262,12 +4208,10 @@ def set_sticker_set_thumb( :class:`telegram.TelegramError` """ + data: JSONDict = {'name': name, 'user_id': user_id} - if InputFile.is_file(thumb): - thumb = cast(IO, thumb) - thumb = InputFile(thumb) - - data: JSONDict = {'name': name, 'user_id': user_id, 'thumb': thumb} + if thumb is not None: + data['thumb'] = parse_file_input(thumb) result = self._post('setStickerSetThumb', data, timeout=timeout, api_kwargs=api_kwargs) diff --git a/telegram/files/file.py b/telegram/files/file.py index 7a6d9917f8d..b1a347501d7 100644 --- a/telegram/files/file.py +++ b/telegram/files/file.py @@ -18,6 +18,7 @@ # along with this program. If not, see [http://www.gnu.org/licenses/]. """This module contains an object that represents a Telegram File.""" import os +import shutil import urllib.parse as urllib_parse from base64 import b64decode from os.path import basename @@ -25,6 +26,7 @@ from telegram import TelegramObject from telegram.passport.credentials import decrypt +from telegram.utils.helpers import is_local_file if TYPE_CHECKING: from telegram import Bot, FileCredentials @@ -98,7 +100,10 @@ def download( the ``out.write`` method. Note: - :attr:`custom_path` and :attr:`out` are mutually exclusive. + * :attr:`custom_path` and :attr:`out` are mutually exclusive. + * If neither :attr:`custom_path` nor :attr:`out` is provided and :attr:`file_path` is + the path of a local file (which is the case when a Bot API Server is running in + local mode), this method will just return the path. Args: custom_path (:obj:`str`, optional): Custom path. @@ -110,7 +115,7 @@ def download( Returns: :obj:`str` | :obj:`io.BufferedWriter`: The same object as :attr:`out` if specified. - Otherwise, returns the filename downloaded to. + Otherwise, returns the filename downloaded to or the file path of the local file. Raises: ValueError: If both :attr:`custom_path` and :attr:`out` are passed. @@ -119,20 +124,35 @@ def download( if custom_path is not None and out is not None: raise ValueError('custom_path and out are mutually exclusive') - # Convert any UTF-8 char into a url encoded ASCII string. - url = self._get_encoded_url() + local_file = is_local_file(self.file_path) + + if local_file: + url = self.file_path + else: + # Convert any UTF-8 char into a url encoded ASCII string. + url = self._get_encoded_url() if out: - buf = self.bot.request.retrieve(url) - if self._credentials: - buf = decrypt( - b64decode(self._credentials.secret), b64decode(self._credentials.hash), buf - ) + if local_file: + with open(url, 'rb') as file: + buf = file.read() + else: + buf = self.bot.request.retrieve(url) + if self._credentials: + buf = decrypt( + b64decode(self._credentials.secret), b64decode(self._credentials.hash), buf + ) out.write(buf) return out + if custom_path and local_file: + shutil.copyfile(self.file_path, custom_path) + return custom_path + if custom_path: filename = custom_path + elif local_file: + return self.file_path elif self.file_path: filename = basename(self.file_path) else: @@ -169,8 +189,11 @@ def download_as_bytearray(self, buf: bytearray = None) -> bytes: """ if buf is None: buf = bytearray() - - buf.extend(self.bot.request.retrieve(self._get_encoded_url())) + if is_local_file(self.file_path): + with open(self.file_path, "rb") as file: + buf.extend(file.read()) + else: + buf.extend(self.bot.request.retrieve(self._get_encoded_url())) return buf def set_credentials(self, credentials: 'FileCredentials') -> None: diff --git a/telegram/files/inputmedia.py b/telegram/files/inputmedia.py index 6cc69406c99..50091b853bd 100644 --- a/telegram/files/inputmedia.py +++ b/telegram/files/inputmedia.py @@ -18,7 +18,7 @@ # along with this program. If not, see [http://www.gnu.org/licenses/]. """Base class for Telegram InputMedia Objects.""" -from typing import IO, Union, cast, List, Tuple +from typing import Union, List, Tuple from telegram import ( Animation, @@ -30,8 +30,8 @@ Video, MessageEntity, ) -from telegram.utils.helpers import DEFAULT_NONE, DefaultValue -from telegram.utils.types import FileLike, JSONDict +from telegram.utils.helpers import DEFAULT_NONE, DefaultValue, parse_file_input +from telegram.utils.types import FileInput, JSONDict class InputMedia(TelegramObject): @@ -73,11 +73,13 @@ class InputMediaAnimation(InputMedia): Args: - media (:obj:`str` | `filelike object` | :class:`telegram.Animation`): File to send. Pass a + media (:obj:`str` | `filelike object` | :class:`pathlib.Path` | \ + :class:`telegram.Animation`): File to send. Pass a file_id to send a file that exists on the Telegram servers (recommended), pass an HTTP URL for Telegram to get a file from the Internet. Lastly you can pass an existing :class:`telegram.Animation` object to send. - thumb (`filelike object`, optional): Thumbnail of the file sent; can be ignored if + thumb (`filelike object` | :class:`pathlib.Path`, optional): Thumbnail of the file sent; + can be ignored if thumbnail generation for the file is supported server-side. The thumbnail should be in JPEG format and less than 200 kB in size. A thumbnail's width and height should not exceed 320. Ignored if the file is not uploaded using multipart/form-data. @@ -101,8 +103,8 @@ class InputMediaAnimation(InputMedia): def __init__( self, - media: Union[str, FileLike, Animation], - thumb: FileLike = None, + media: Union[FileInput, Animation], + thumb: FileInput = None, caption: str = None, parse_mode: Union[str, DefaultValue] = DEFAULT_NONE, width: int = None, @@ -117,18 +119,11 @@ def __init__( self.width = media.width self.height = media.height self.duration = media.duration - elif InputFile.is_file(media): - media = cast(IO, media) - self.media = InputFile(media, attach=True) else: - self.media = media # type: ignore[assignment] + self.media = parse_file_input(media, attach=True) if thumb: - if InputFile.is_file(thumb): - thumb = cast(IO, thumb) - self.thumb = InputFile(thumb, attach=True) - else: - self.thumb = thumb # type: ignore[assignment] + self.thumb = parse_file_input(thumb, attach=True) if caption: self.caption = caption @@ -154,7 +149,8 @@ class InputMediaPhoto(InputMedia): entities that appear in the caption. Args: - media (:obj:`str` | `filelike object` | :class:`telegram.PhotoSize`): File to send. Pass a + media (:obj:`str` | `filelike object` | :class:`pathlib.Path` | \ + :class:`telegram.PhotoSize`): File to send. Pass a file_id to send a file that exists on the Telegram servers (recommended), pass an HTTP URL for Telegram to get a file from the Internet. Lastly you can pass an existing :class:`telegram.PhotoSize` object to send. @@ -169,20 +165,13 @@ class InputMediaPhoto(InputMedia): def __init__( self, - media: Union[str, FileLike, PhotoSize], + media: Union[FileInput, PhotoSize], caption: str = None, parse_mode: Union[str, DefaultValue] = DEFAULT_NONE, caption_entities: Union[List[MessageEntity], Tuple[MessageEntity, ...]] = None, ): self.type = 'photo' - - if isinstance(media, PhotoSize): - self.media: Union[str, InputFile] = media.file_id - elif InputFile.is_file(media): - media = cast(IO, media) - self.media = InputFile(media, attach=True) - else: - self.media = media # type: ignore[assignment] + self.media = parse_file_input(media, PhotoSize, attach=True) if caption: self.caption = caption @@ -208,7 +197,8 @@ class InputMediaVideo(InputMedia): thumb (:class:`telegram.InputFile`): Optional. Thumbnail of the file to send. Args: - media (:obj:`str` | `filelike object` | :class:`telegram.Video`): File to send. Pass a + media (:obj:`str` | `filelike object` | :class:`pathlib.Path` | :class:`telegram.Video`): + File to send. Pass a file_id to send a file that exists on the Telegram servers (recommended), pass an HTTP URL for Telegram to get a file from the Internet. Lastly you can pass an existing :class:`telegram.Video` object to send. @@ -224,7 +214,8 @@ class InputMediaVideo(InputMedia): duration (:obj:`int`, optional): Video duration. supports_streaming (:obj:`bool`, optional): Pass :obj:`True`, if the uploaded video is suitable for streaming. - thumb (`filelike object`, optional): Thumbnail of the file sent; can be ignored if + thumb (`filelike object` | :class:`pathlib.Path`, optional): Thumbnail of the file sent; + can be ignored if thumbnail generation for the file is supported server-side. The thumbnail should be in JPEG format and less than 200 kB in size. A thumbnail's width and height should not exceed 320. Ignored if the file is not uploaded using multipart/form-data. @@ -241,14 +232,14 @@ class InputMediaVideo(InputMedia): def __init__( self, - media: Union[str, FileLike, Video], + media: Union[FileInput, Video], caption: str = None, width: int = None, height: int = None, duration: int = None, supports_streaming: bool = None, parse_mode: Union[str, DefaultValue] = DEFAULT_NONE, - thumb: FileLike = None, + thumb: FileInput = None, caption_entities: Union[List[MessageEntity], Tuple[MessageEntity, ...]] = None, ): self.type = 'video' @@ -258,18 +249,11 @@ def __init__( self.width = media.width self.height = media.height self.duration = media.duration - elif InputFile.is_file(media): - media = cast(IO, media) - self.media = InputFile(media, attach=True) else: - self.media = media # type: ignore[assignment] + self.media = parse_file_input(media, attach=True) if thumb: - if InputFile.is_file(thumb): - thumb = cast(IO, thumb) - self.thumb = InputFile(thumb, attach=True) - else: - self.thumb = thumb # type: ignore[assignment] + self.thumb = parse_file_input(thumb, attach=True) if caption: self.caption = caption @@ -302,7 +286,8 @@ class InputMediaAudio(InputMedia): thumb (:class:`telegram.InputFile`): Optional. Thumbnail of the file to send. Args: - media (:obj:`str` | `filelike object` | :class:`telegram.Audio`): File to send. Pass a + media (:obj:`str` | `filelike object` | :class:`pathlib.Path` | :class:`telegram.Audio`): + File to send. Pass a file_id to send a file that exists on the Telegram servers (recommended), pass an HTTP URL for Telegram to get a file from the Internet. Lastly you can pass an existing :class:`telegram.Audio` object to send. @@ -317,7 +302,8 @@ class InputMediaAudio(InputMedia): performer (:obj:`str`, optional): Performer of the audio as defined by sender or by audio tags. title (:obj:`str`, optional): Title of the audio as defined by sender or by audio tags. - thumb (`filelike object`, optional): Thumbnail of the file sent; can be ignored if + thumb (`filelike object` | :class:`pathlib.Path`, optional): Thumbnail of the file sent; + can be ignored if thumbnail generation for the file is supported server-side. The thumbnail should be in JPEG format and less than 200 kB in size. A thumbnail's width and height should not exceed 320. Ignored if the file is not uploaded using multipart/form-data. @@ -331,8 +317,8 @@ class InputMediaAudio(InputMedia): def __init__( self, - media: Union[str, FileLike, Audio], - thumb: FileLike = None, + media: Union[FileInput, Audio], + thumb: FileInput = None, caption: str = None, parse_mode: Union[str, DefaultValue] = DEFAULT_NONE, duration: int = None, @@ -347,18 +333,11 @@ def __init__( self.duration = media.duration self.performer = media.performer self.title = media.title - elif InputFile.is_file(media): - media = cast(IO, media) - self.media = InputFile(media, attach=True) else: - self.media = media # type: ignore[assignment] + self.media = parse_file_input(media, attach=True) if thumb: - if InputFile.is_file(thumb): - thumb = cast(IO, thumb) - self.thumb = InputFile(thumb, attach=True) - else: - self.thumb = thumb # type: ignore[assignment] + self.thumb = parse_file_input(thumb, attach=True) if caption: self.caption = caption @@ -388,7 +367,8 @@ class InputMediaDocument(InputMedia): the document is sent as part of an album. Args: - media (:obj:`str` | `filelike object` | :class:`telegram.Document`): File to send. Pass a + media (:obj:`str` | `filelike object` | :class:`pathlib.Path` | \ + :class:`telegram.Document`): File to send. Pass a file_id to send a file that exists on the Telegram servers (recommended), pass an HTTP URL for Telegram to get a file from the Internet. Lastly you can pass an existing :class:`telegram.Document` object to send. @@ -399,7 +379,8 @@ class InputMediaDocument(InputMedia): in :class:`telegram.ParseMode` for the available modes. caption_entities (List[:class:`telegram.MessageEntity`], optional): List of special entities that appear in the caption, which can be specified instead of parse_mode. - thumb (`filelike object`, optional): Thumbnail of the file sent; can be ignored if + thumb (`filelike object` | :class:`pathlib.Path`, optional): Thumbnail of the file sent; + can be ignored if thumbnail generation for the file is supported server-side. The thumbnail should be in JPEG format and less than 200 kB in size. A thumbnail's width and height should not exceed 320. Ignored if the file is not uploaded using multipart/form-data. @@ -411,29 +392,18 @@ class InputMediaDocument(InputMedia): def __init__( self, - media: Union[str, FileLike, Document], - thumb: FileLike = None, + media: Union[FileInput, Document], + thumb: FileInput = None, caption: str = None, parse_mode: Union[str, DefaultValue] = DEFAULT_NONE, disable_content_type_detection: bool = None, caption_entities: Union[List[MessageEntity], Tuple[MessageEntity, ...]] = None, ): self.type = 'document' - - if isinstance(media, Document): - self.media: Union[str, InputFile] = media.file_id - elif InputFile.is_file(media): - media = cast(IO, media) - self.media = InputFile(media, attach=True) - else: - self.media = media # type: ignore[assignment] + self.media = parse_file_input(media, Document, attach=True) if thumb: - if InputFile.is_file(thumb): - thumb = cast(IO, thumb) - self.thumb = InputFile(thumb, attach=True) - else: - self.thumb = thumb # type: ignore[assignment] + self.thumb = parse_file_input(thumb, attach=True) if caption: self.caption = caption diff --git a/telegram/utils/helpers.py b/telegram/utils/helpers.py index a736a75fb1b..4aa9d67d6c9 100644 --- a/telegram/utils/helpers.py +++ b/telegram/utils/helpers.py @@ -26,15 +26,16 @@ from collections import defaultdict from html import escape from numbers import Number +from pathlib import Path -from typing import TYPE_CHECKING, Any, DefaultDict, Dict, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, DefaultDict, Dict, Optional, Tuple, Union, Type, cast, IO import pytz # pylint: disable=E0401 -from telegram.utils.types import JSONDict +from telegram.utils.types import JSONDict, FileInput if TYPE_CHECKING: - from telegram import MessageEntity + from telegram import MessageEntity, TelegramObject, InputFile try: import ujson as json @@ -55,6 +56,73 @@ def get_signal_name(signum: int) -> str: return _signames[signum] +def is_local_file(obj: Optional[Union[str, Path]]) -> bool: + """ + Checks if a given string is a file on local system. + + Args: + obj (:obj:`str`): The string to check. + """ + if obj is None: + return False + + path = Path(obj) + try: + return path.is_file() + except Exception: + return False + + +def parse_file_input( + file_input: Union[FileInput, 'TelegramObject'], + tg_type: Type['TelegramObject'] = None, + attach: bool = None, + filename: str = None, +) -> Union[str, 'InputFile', Any]: + """ + Parses input for sending files: + + * For string input, if the input is an absolute path of a local file, + adds the ``file://`` prefix. If the input is a relative path of a local file, computes the + absolute path and adds the ``file://`` prefix. Returns the input unchanged, otherwise. + * :class:`pathlib.Path` objects are treated the same way as strings. + * For IO input, returns an :class:`telegram.InputFile`. + * If :attr:`tg_type` is specified and the input is of that type, returns the ``file_id`` + attribute. + + Args: + file_input (:obj:`str` | `filelike object` | Telegram media object): The input to parse. + tg_type (:obj:`type`, optional): The Telegram media type the input can be. E.g. + :class:`telegram.Animation`. + attach (:obj:`bool`, optional): Whether this file should be send as one file or is part of + a collection of files. Only relevant in case an :class:`telegram.InputFile` is + returned. + filename (:obj:`str`, optional): The filename. Only relevant in case an + :class:`telegram.InputFile` is returned. + + Returns: + :obj:`str` | :class:`telegram.InputFile` | :obj:`object`: The parsed input or the untouched + :attr:`file_input`, in case it's no valid file input. + """ + # Importing on file-level yields cyclic Import Errors + from telegram import InputFile # pylint: disable=C0415 + + if isinstance(file_input, str) and file_input.startswith('file://'): + return file_input + if isinstance(file_input, (str, Path)): + if is_local_file(file_input): + out = f'file://{Path(file_input).absolute()}' + else: + out = file_input # type: ignore[assignment] + return out + if InputFile.is_file(file_input): + file_input = cast(IO, file_input) + return InputFile(file_input, attach=attach, filename=filename) + if tg_type and isinstance(file_input, tg_type): + return file_input.file_id # type: ignore[attr-defined] + return file_input + + def escape_markdown(text: str, version: int = 1, entity_type: str = None) -> str: """ Helper function to escape telegram markup symbols. diff --git a/telegram/utils/types.py b/telegram/utils/types.py index 1ac4f6dcd0c..6100cd243b9 100644 --- a/telegram/utils/types.py +++ b/telegram/utils/types.py @@ -17,13 +17,18 @@ # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. """This module contains custom typing aliases.""" +from pathlib import Path from typing import IO, TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeVar, Union if TYPE_CHECKING: from telegram import InputFile, Update FileLike = Union[IO, 'InputFile'] -"""Either an open file handler or in :class:`telegram.InputFile`.""" +"""Either an open file handler or a :class:`telegram.InputFile`.""" + +FileInput = Union[str, FileLike, Path] +"""Valid input for passing files to Telegram. Either a file id as string, a file like object or +a local file path as string or :class:`pathlib.Path`.""" JSONDict = Dict[str, Any] """Dictionary containing response from Telegram or data to send to the API.""" diff --git a/tests/data/local_file.txt b/tests/data/local_file.txt new file mode 100644 index 00000000000..85d6e1c2f61 --- /dev/null +++ b/tests/data/local_file.txt @@ -0,0 +1 @@ +Saint-Saƫns \ No newline at end of file diff --git a/tests/test_animation.py b/tests/test_animation.py index abf5dbbad35..91b86153aa2 100644 --- a/tests/test_animation.py +++ b/tests/test_animation.py @@ -18,6 +18,8 @@ # along with this program. If not, see [http://www.gnu.org/licenses/]. import os +from pathlib import Path + import pytest from flaky import flaky @@ -178,6 +180,20 @@ def test_send_animation_default_parse_mode_3(self, default_bot, chat_id, animati assert message.caption == test_markdown_string assert message.caption_markdown == escape_markdown(test_markdown_string) + def test_send_animation_local_files(self, monkeypatch, bot, chat_id): + # For just test that the correct paths are passed as we have no local bot API set up + test_flag = False + expected = f"file://{Path.cwd() / 'tests/data/telegram.jpg'}" + file = 'tests/data/telegram.jpg' + + def make_assertion(_, data, *args, **kwargs): + nonlocal test_flag + test_flag = data.get('animation') == expected and data.get('thumb') == expected + + monkeypatch.setattr(bot, '_post', make_assertion) + bot.send_animation(chat_id, file, thumb=file) + assert test_flag + @flaky(3, 1) @pytest.mark.timeout(10) @pytest.mark.parametrize( diff --git a/tests/test_audio.py b/tests/test_audio.py index c720853da95..340dc631d9c 100644 --- a/tests/test_audio.py +++ b/tests/test_audio.py @@ -17,6 +17,7 @@ # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. import os +from pathlib import Path import pytest from flaky import flaky @@ -201,6 +202,20 @@ def test_send_audio_default_parse_mode_3(self, default_bot, chat_id, audio_file, assert message.caption == test_markdown_string assert message.caption_markdown == escape_markdown(test_markdown_string) + def test_send_audio_local_files(self, monkeypatch, bot, chat_id): + # For just test that the correct paths are passed as we have no local bot API set up + test_flag = False + expected = f"file://{Path.cwd() / 'tests/data/telegram.jpg'}" + file = 'tests/data/telegram.jpg' + + def make_assertion(_, data, *args, **kwargs): + nonlocal test_flag + test_flag = data.get('audio') == expected and data.get('thumb') == expected + + monkeypatch.setattr(bot, '_post', make_assertion) + bot.send_audio(chat_id, file, thumb=file) + assert test_flag + def test_de_json(self, bot, audio): json_dict = { 'file_id': self.audio_file_id, diff --git a/tests/test_bot.py b/tests/test_bot.py index 0e8149e3332..0139e358131 100644 --- a/tests/test_bot.py +++ b/tests/test_bot.py @@ -18,6 +18,7 @@ # along with this program. If not, see [http://www.gnu.org/licenses/]. import time import datetime as dtm +from pathlib import Path from platform import python_implementation import pytest @@ -769,6 +770,23 @@ def test_get_one_user_profile_photo(self, bot, chat_id): assert user_profile_photos.photos[0][0].file_size == 5403 # get_file is tested multiple times in the test_*media* modules. + # Here we only test the behaviour for bot apis in local mode + def test_get_file_local_mode(self, bot, monkeypatch): + path = str(Path.cwd() / 'tests' / 'data' / 'game.gif') + + def _post(*args, **kwargs): + return { + 'file_id': None, + 'file_unique_id': None, + 'file_size': None, + 'file_path': path, + } + + monkeypatch.setattr(bot, '_post', _post) + + resulting_path = bot.get_file('file_id').file_path + assert bot.token not in resulting_path + assert resulting_path == path # TODO: Needs improvement. No feasable way to test until bots can add members. def test_kick_chat_member(self, monkeypatch, bot): @@ -1431,6 +1449,20 @@ def func(): with open('tests/data/telegram_test_channel.jpg', 'rb') as f: expect_bad_request(func, 'Type of file mismatch', 'Telegram did not accept the file.') + def test_set_chat_photo_local_files(self, monkeypatch, bot, chat_id): + # For just test that the correct paths are passed as we have no local bot API set up + test_flag = False + expected = f"file://{Path.cwd() / 'tests/data/telegram.jpg'}" + file = 'tests/data/telegram.jpg' + + def make_assertion(_, data, *args, **kwargs): + nonlocal test_flag + test_flag = data.get('photo') == expected + + monkeypatch.setattr(bot, '_post', make_assertion) + bot.set_chat_photo(chat_id, file) + assert test_flag + @flaky(3, 1) @pytest.mark.timeout(10) def test_delete_chat_photo(self, bot, channel_id): diff --git a/tests/test_document.py b/tests/test_document.py index 434cf268920..875a09e1534 100644 --- a/tests/test_document.py +++ b/tests/test_document.py @@ -17,6 +17,7 @@ # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. import os +from pathlib import Path import pytest from flaky import flaky @@ -236,6 +237,20 @@ def test_send_document_default_allow_sending_without_reply( chat_id, document, reply_to_message_id=reply_to_message.message_id ) + def test_send_document_local_files(self, monkeypatch, bot, chat_id): + # For just test that the correct paths are passed as we have no local bot API set up + test_flag = False + expected = f"file://{Path.cwd() / 'tests/data/telegram.jpg'}" + file = 'tests/data/telegram.jpg' + + def make_assertion(_, data, *args, **kwargs): + nonlocal test_flag + test_flag = data.get('document') == expected and data.get('thumb') == expected + + monkeypatch.setattr(bot, '_post', make_assertion) + bot.send_document(chat_id, file, thumb=file) + assert test_flag + def test_de_json(self, bot, document): json_dict = { 'file_id': self.document_file_id, diff --git a/tests/test_file.py b/tests/test_file.py index a4e0556041d..35281c599f2 100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -18,6 +18,7 @@ # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. import os +from pathlib import Path from tempfile import TemporaryFile, mkstemp import pytest @@ -37,6 +38,17 @@ def file(bot): ) +@pytest.fixture(scope='class') +def local_file(bot): + return File( + TestFile.file_id, + TestFile.file_unique_id, + file_path=str(Path.cwd() / 'tests' / 'data' / 'local_file.txt'), + file_size=TestFile.file_size, + bot=bot, + ) + + class TestFile: file_id = 'NOTVALIDDOESNOTMATTER' file_unique_id = 'adc3145fd2e84d95b64d68eaa22aa33e' @@ -88,6 +100,9 @@ def test(*args, **kwargs): finally: os.unlink(out_file) + def test_download_local_file(self, local_file): + assert local_file.download() == local_file.file_path + def test_download_custom_path(self, monkeypatch, file): def test(*args, **kwargs): return self.file_content @@ -104,6 +119,18 @@ def test(*args, **kwargs): os.close(file_handle) os.unlink(custom_path) + def test_download_custom_path_local_file(self, local_file): + file_handle, custom_path = mkstemp() + try: + out_file = local_file.download(custom_path) + assert out_file == custom_path + + with open(out_file, 'rb') as fobj: + assert fobj.read() == self.file_content + finally: + os.close(file_handle) + os.unlink(custom_path) + def test_download_no_filename(self, monkeypatch, file): def test(*args, **kwargs): return self.file_content @@ -132,6 +159,14 @@ def test(*args, **kwargs): out_fobj.seek(0) assert out_fobj.read() == self.file_content + def test_download_file_obj_local_file(self, local_file): + with TemporaryFile() as custom_fobj: + out_fobj = local_file.download(out=custom_fobj) + assert out_fobj is custom_fobj + + out_fobj.seek(0) + assert out_fobj.read() == self.file_content + def test_download_bytearray(self, monkeypatch, file): def test(*args, **kwargs): return self.file_content @@ -149,6 +184,18 @@ def test(*args, **kwargs): assert buf2[len(buf) :] == buf assert buf2[: len(buf)] == buf + def test_download_bytearray_local_file(self, local_file): + # Check that a download to a newly allocated bytearray works. + buf = local_file.download_as_bytearray() + assert buf == bytearray(self.file_content) + + # Check that a download to a given bytearray works (extends the bytearray). + buf2 = buf[:] + buf3 = local_file.download_as_bytearray(buf=buf2) + assert buf3 is buf2 + assert buf2[len(buf) :] == buf + assert buf2[: len(buf)] == buf + def test_equality(self, bot): a = File(self.file_id, self.file_unique_id, bot) b = File('', self.file_unique_id, bot) diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 3ef3270620f..ffdb928242c 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -18,10 +18,11 @@ # along with this program. If not, see [http://www.gnu.org/licenses/]. import time import datetime as dtm +from pathlib import Path import pytest -from telegram import Sticker +from telegram import Sticker, InputFile, Animation from telegram import Update from telegram import User from telegram import MessageEntity @@ -258,3 +259,68 @@ def test_mention_markdown_2(self): expected = r'[the\_name](tg://user?id=1)' assert expected == helpers.mention_markdown(1, 'the_name') + + @pytest.mark.parametrize( + 'string,expected', + [ + ('tests/data/game.gif', True), + ('tests/data', False), + (str(Path.cwd() / 'tests' / 'data' / 'game.gif'), True), + (str(Path.cwd() / 'tests' / 'data'), False), + (Path.cwd() / 'tests' / 'data' / 'game.gif', True), + (Path.cwd() / 'tests' / 'data', False), + ('https:/api.org/file/botTOKEN/document/file_3', False), + (None, False), + ], + ) + def test_is_local_file(self, string, expected): + assert helpers.is_local_file(string) == expected + + @pytest.mark.parametrize( + 'string,expected', + [ + ('tests/data/game.gif', f"file://{Path.cwd() / 'tests' / 'data' / 'game.gif'}"), + ('tests/data', 'tests/data'), + ('file://foobar', 'file://foobar'), + ( + str(Path.cwd() / 'tests' / 'data' / 'game.gif'), + f"file://{Path.cwd() / 'tests' / 'data' / 'game.gif'}", + ), + (str(Path.cwd() / 'tests' / 'data'), str(Path.cwd() / 'tests' / 'data')), + ( + Path.cwd() / 'tests' / 'data' / 'game.gif', + f"file://{Path.cwd() / 'tests' / 'data' / 'game.gif'}", + ), + (Path.cwd() / 'tests' / 'data', Path.cwd() / 'tests' / 'data'), + ( + 'https:/api.org/file/botTOKEN/document/file_3', + 'https:/api.org/file/botTOKEN/document/file_3', + ), + ], + ) + def test_parse_file_input_string(self, string, expected): + assert helpers.parse_file_input(string) == expected + + def test_parse_file_input_file_like(self): + with open('tests/data/game.gif', 'rb') as file: + parsed = helpers.parse_file_input(file) + + assert isinstance(parsed, InputFile) + assert not parsed.attach + assert parsed.filename == 'game.gif' + + with open('tests/data/game.gif', 'rb') as file: + parsed = helpers.parse_file_input(file, attach=True, filename='test_file') + + assert isinstance(parsed, InputFile) + assert parsed.attach + assert parsed.filename == 'test_file' + + def test_parse_file_input_tg_object(self): + animation = Animation('file_id', 'unique_id', 1, 1, 1) + assert helpers.parse_file_input(animation, Animation) == 'file_id' + assert helpers.parse_file_input(animation, MessageEntity) is animation + + @pytest.mark.parametrize('obj', [{1: 2}, [1, 2], (1, 2)]) + def test_parse_file_input_other(self, obj): + assert helpers.parse_file_input(obj) is obj diff --git a/tests/test_inputmedia.py b/tests/test_inputmedia.py index b13025b65cd..ba32507900b 100644 --- a/tests/test_inputmedia.py +++ b/tests/test_inputmedia.py @@ -16,6 +16,8 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. +from pathlib import Path + import pytest from flaky import flaky @@ -167,6 +169,13 @@ def test_with_video_file(self, video_file): # noqa: F811 assert isinstance(input_media_video.media, InputFile) assert input_media_video.caption == "test 3" + def test_with_local_files(self): + input_media_video = InputMediaVideo( + 'tests/data/telegram.mp4', thumb='tests/data/telegram.jpg' + ) + assert input_media_video.media == f"file://{Path.cwd() / 'tests/data/telegram.mp4'}" + assert input_media_video.thumb == f"file://{Path.cwd() / 'tests/data/telegram.jpg'}" + class TestInputMediaPhoto: type_ = "photo" @@ -206,6 +215,10 @@ def test_with_photo_file(self, photo_file): # noqa: F811 assert isinstance(input_media_photo.media, InputFile) assert input_media_photo.caption == "test 2" + def test_with_local_files(self): + input_media_photo = InputMediaPhoto('tests/data/telegram.mp4') + assert input_media_photo.media == f"file://{Path.cwd() / 'tests/data/telegram.mp4'}" + class TestInputMediaAnimation: type_ = "animation" @@ -252,6 +265,17 @@ def test_with_animation_file(self, animation_file): # noqa: F811 assert isinstance(input_media_animation.media, InputFile) assert input_media_animation.caption == "test 2" + def test_with_local_files(self): + input_media_animation = InputMediaAnimation( + 'tests/data/telegram.mp4', thumb='tests/data/telegram.jpg' + ) + assert input_media_animation.media == 'file://' + str( + Path.cwd() / 'tests/data/telegram.mp4' + ) + assert input_media_animation.thumb == 'file://' + str( + Path.cwd() / 'tests/data/telegram.jpg' + ) + class TestInputMediaAudio: type_ = "audio" @@ -304,6 +328,13 @@ def test_with_audio_file(self, audio_file): # noqa: F811 assert isinstance(input_media_audio.media, InputFile) assert input_media_audio.caption == "test 3" + def test_with_local_files(self): + input_media_audio = InputMediaAudio( + 'tests/data/telegram.mp4', thumb='tests/data/telegram.jpg' + ) + assert input_media_audio.media == f"file://{Path.cwd() / 'tests/data/telegram.mp4'}" + assert input_media_audio.thumb == f"file://{Path.cwd() / 'tests/data/telegram.jpg'}" + class TestInputMediaDocument: type_ = "document" @@ -353,6 +384,17 @@ def test_with_document_file(self, document_file): # noqa: F811 assert isinstance(input_media_document.media, InputFile) assert input_media_document.caption == "test 3" + def test_with_local_files(self): + input_media_document = InputMediaDocument( + 'tests/data/telegram.mp4', thumb='tests/data/telegram.jpg' + ) + assert input_media_document.media == 'file://' + str( + Path.cwd() / 'tests/data/telegram.mp4' + ) + assert input_media_document.thumb == 'file://' + str( + Path.cwd() / 'tests/data/telegram.jpg' + ) + @pytest.fixture(scope='function') # noqa: F811 def media_group(photo, thumb): # noqa: F811 diff --git a/tests/test_photo.py b/tests/test_photo.py index def14290a47..4d8cc1798bb 100644 --- a/tests/test_photo.py +++ b/tests/test_photo.py @@ -18,6 +18,7 @@ # along with this program. If not, see [http://www.gnu.org/licenses/]. import os from io import BytesIO +from pathlib import Path import pytest from flaky import flaky @@ -218,6 +219,20 @@ def test_send_photo_default_parse_mode_3(self, default_bot, chat_id, photo_file, assert message.caption == test_markdown_string assert message.caption_markdown == escape_markdown(test_markdown_string) + def test_send_photo_local_files(self, monkeypatch, bot, chat_id): + # For just test that the correct paths are passed as we have no local bot API set up + test_flag = False + expected = f"file://{Path.cwd() / 'tests/data/telegram.jpg'}" + file = 'tests/data/telegram.jpg' + + def make_assertion(_, data, *args, **kwargs): + nonlocal test_flag + test_flag = data.get('photo') == expected + + monkeypatch.setattr(bot, '_post', make_assertion) + bot.send_photo(chat_id, file) + assert test_flag + @flaky(3, 1) @pytest.mark.timeout(10) @pytest.mark.parametrize( diff --git a/tests/test_sticker.py b/tests/test_sticker.py index 906474b24e7..e107985a1d1 100644 --- a/tests/test_sticker.py +++ b/tests/test_sticker.py @@ -18,6 +18,7 @@ # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. import os +from pathlib import Path from time import sleep import pytest @@ -203,6 +204,20 @@ def test(url, data, **kwargs): message = bot.send_sticker(sticker=sticker, chat_id=chat_id) assert message + def test_send_sticker_local_files(self, monkeypatch, bot, chat_id): + # For just test that the correct paths are passed as we have no local bot API set up + test_flag = False + expected = f"file://{Path.cwd() / 'tests/data/telegram.jpg'}" + file = 'tests/data/telegram.jpg' + + def make_assertion(_, data, *args, **kwargs): + nonlocal test_flag + test_flag = data.get('sticker') == expected + + monkeypatch.setattr(bot, '_post', make_assertion) + bot.send_sticker(chat_id, file) + assert test_flag + @flaky(3, 1) @pytest.mark.timeout(10) @pytest.mark.parametrize( @@ -433,6 +448,64 @@ def test_bot_methods_4_tgs(self, bot, animated_sticker_set): file_id = animated_sticker_set.stickers[-1].file_id assert bot.delete_sticker_from_set(file_id) + def test_upload_sticker_file_local_files(self, monkeypatch, bot, chat_id): + # For just test that the correct paths are passed as we have no local bot API set up + test_flag = False + expected = f"file://{Path.cwd() / 'tests/data/telegram.jpg'}" + file = 'tests/data/telegram.jpg' + + def make_assertion(_, data, *args, **kwargs): + nonlocal test_flag + test_flag = data.get('png_sticker') == expected + + monkeypatch.setattr(bot, '_post', make_assertion) + bot.upload_sticker_file(chat_id, file) + assert test_flag + + def test_create_new_sticker_set_local_files(self, monkeypatch, bot, chat_id): + # For just test that the correct paths are passed as we have no local bot API set up + test_flag = False + expected = f"file://{Path.cwd() / 'tests/data/telegram.jpg'}" + file = 'tests/data/telegram.jpg' + + def make_assertion(_, data, *args, **kwargs): + nonlocal test_flag + test_flag = data.get('png_sticker') == expected and data.get('tgs_sticker') == expected + + monkeypatch.setattr(bot, '_post', make_assertion) + bot.create_new_sticker_set( + chat_id, 'name', 'title', 'emoji', png_sticker=file, tgs_sticker=file + ) + assert test_flag + + def test_add_sticker_to_set_local_files(self, monkeypatch, bot, chat_id): + # For just test that the correct paths are passed as we have no local bot API set up + test_flag = False + expected = f"file://{Path.cwd() / 'tests/data/telegram.jpg'}" + file = 'tests/data/telegram.jpg' + + def make_assertion(_, data, *args, **kwargs): + nonlocal test_flag + test_flag = data.get('png_sticker') == expected and data.get('tgs_sticker') == expected + + monkeypatch.setattr(bot, '_post', make_assertion) + bot.add_sticker_to_set(chat_id, 'name', 'emoji', png_sticker=file, tgs_sticker=file) + assert test_flag + + def test_set_sticker_set_thumb_local_files(self, monkeypatch, bot, chat_id): + # For just test that the correct paths are passed as we have no local bot API set up + test_flag = False + expected = f"file://{Path.cwd() / 'tests/data/telegram.jpg'}" + file = 'tests/data/telegram.jpg' + + def make_assertion(_, data, *args, **kwargs): + nonlocal test_flag + test_flag = data.get('thumb') == expected + + monkeypatch.setattr(bot, '_post', make_assertion) + bot.set_sticker_set_thumb('name', chat_id, thumb=file) + assert test_flag + def test_get_file_instance_method(self, monkeypatch, sticker): def test(*args, **kwargs): return args[1] == sticker.file_id diff --git a/tests/test_video.py b/tests/test_video.py index 7b3cf4e7158..fe9f5108b42 100644 --- a/tests/test_video.py +++ b/tests/test_video.py @@ -17,6 +17,7 @@ # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. import os +from pathlib import Path import pytest from flaky import flaky @@ -217,6 +218,20 @@ def test_send_video_default_parse_mode_3(self, default_bot, chat_id, video): assert message.caption == test_markdown_string assert message.caption_markdown == escape_markdown(test_markdown_string) + def test_send_video_local_files(self, monkeypatch, bot, chat_id): + # For just test that the correct paths are passed as we have no local bot API set up + test_flag = False + expected = f"file://{Path.cwd() / 'tests/data/telegram.jpg'}" + file = 'tests/data/telegram.jpg' + + def make_assertion(_, data, *args, **kwargs): + nonlocal test_flag + test_flag = data.get('video') == expected and data.get('thumb') == expected + + monkeypatch.setattr(bot, '_post', make_assertion) + bot.send_video(chat_id, file, thumb=file) + assert test_flag + @flaky(3, 1) @pytest.mark.timeout(10) @pytest.mark.parametrize( diff --git a/tests/test_videonote.py b/tests/test_videonote.py index a33b83895d8..517a07d0b2a 100644 --- a/tests/test_videonote.py +++ b/tests/test_videonote.py @@ -17,6 +17,7 @@ # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. import os +from pathlib import Path import pytest from flaky import flaky @@ -150,6 +151,20 @@ def test_to_dict(self, video_note): assert video_note_dict['duration'] == video_note.duration assert video_note_dict['file_size'] == video_note.file_size + def test_send_video_note_local_files(self, monkeypatch, bot, chat_id): + # For just test that the correct paths are passed as we have no local bot API set up + test_flag = False + expected = f"file://{Path.cwd() / 'tests/data/telegram.jpg'}" + file = 'tests/data/telegram.jpg' + + def make_assertion(_, data, *args, **kwargs): + nonlocal test_flag + test_flag = data.get('video_note') == expected and data.get('thumb') == expected + + monkeypatch.setattr(bot, '_post', make_assertion) + bot.send_video_note(chat_id, file, thumb=file) + assert test_flag + @flaky(3, 1) @pytest.mark.timeout(10) @pytest.mark.parametrize( diff --git a/tests/test_voice.py b/tests/test_voice.py index 718e251ca2b..4ebed9f3517 100644 --- a/tests/test_voice.py +++ b/tests/test_voice.py @@ -17,6 +17,7 @@ # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. import os +from pathlib import Path import pytest from flaky import flaky @@ -179,6 +180,20 @@ def test_send_voice_default_parse_mode_3(self, default_bot, chat_id, voice): assert message.caption == test_markdown_string assert message.caption_markdown == escape_markdown(test_markdown_string) + def test_send_voice_local_files(self, monkeypatch, bot, chat_id): + # For just test that the correct paths are passed as we have no local bot API set up + test_flag = False + expected = f"file://{Path.cwd() / 'tests/data/telegram.jpg'}" + file = 'tests/data/telegram.jpg' + + def make_assertion(_, data, *args, **kwargs): + nonlocal test_flag + test_flag = data.get('voice') == expected + + monkeypatch.setattr(bot, '_post', make_assertion) + bot.send_voice(chat_id, file) + assert test_flag + @flaky(3, 1) @pytest.mark.timeout(10) @pytest.mark.parametrize(