From 6546279e1e412f1d9185a054f0a4e87cd8fc5495 Mon Sep 17 00:00:00 2001 From: maskbnmask Date: Wed, 25 Jan 2023 09:13:59 +0330 Subject: [PATCH 1/5] adding static method get_post_text to class TelegramChannelScraper for having line breaks in result --- snscrape/modules/telegram.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/snscrape/modules/telegram.py b/snscrape/modules/telegram.py index 11cc19bb..2c9d6dfc 100644 --- a/snscrape/modules/telegram.py +++ b/snscrape/modules/telegram.py @@ -79,6 +79,7 @@ def _initial_page(self): return self._initialPage, self._initialPageSoup def _soup_to_items(self, soup, pageUrl, onlyUsername = False): + # soup.get_text(separator="\n") posts = soup.find_all('div', attrs = {'class': 'tgme_widget_message', 'data-post': True}) for post in reversed(posts): if onlyUsername: @@ -91,7 +92,7 @@ def _soup_to_items(self, soup, pageUrl, onlyUsername = False): url = rawUrl.replace('//t.me/', '//t.me/s/') date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z') if (message := post.find('div', class_ = 'tgme_widget_message_text')): - content = message.text + content = self.get_post_text(message) outlinks = [] for link in post.find_all('a'): if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')): @@ -194,10 +195,18 @@ def parse_num(s): return Channel(**kwargs) + @staticmethod + def get_post_text(post) -> str: + result = [] + for s in post.stripped_strings: + result.append(s) + return '\n'.join(result) + + @classmethod - def _cli_setup_parser(cls, subparser): + def cli_setup_parser(cls, subparser): subparser.add_argument('channel', type = snscrape.base.nonempty_string('channel'), help = 'A channel name') @classmethod - def _cli_from_args(cls, args): - return cls._cli_construct(args, args.channel) + def cli_from_args(cls, args): + return cls.cli_construct(args, args.channel) From 117cab7aa83153e7e668ec942cb24babd09efcd3 Mon Sep 17 00:00:00 2001 From: maskbnmask Date: Wed, 25 Jan 2023 09:16:11 +0330 Subject: [PATCH 2/5] remove extra line soup.get_text(separator="\n") from my TelegramChannelScraper._soup_to_items method --- snscrape/modules/telegram.py | 1 - 1 file changed, 1 deletion(-) diff --git a/snscrape/modules/telegram.py b/snscrape/modules/telegram.py index 2c9d6dfc..2bfb0772 100644 --- a/snscrape/modules/telegram.py +++ b/snscrape/modules/telegram.py @@ -79,7 +79,6 @@ def _initial_page(self): return self._initialPage, self._initialPageSoup def _soup_to_items(self, soup, pageUrl, onlyUsername = False): - # soup.get_text(separator="\n") posts = soup.find_all('div', attrs = {'class': 'tgme_widget_message', 'data-post': True}) for post in reversed(posts): if onlyUsername: From 70c1a1fe5ef18cabcdc26a831b08876d1fa5a502 Mon Sep 17 00:00:00 2001 From: maskbnmask Date: Wed, 25 Jan 2023 09:24:09 +0330 Subject: [PATCH 3/5] reformat telegram.py file: remove redundant parentheses in conditions - convert long lines to paragraphs --- snscrape/modules/telegram.py | 372 ++++++++++++++++++----------------- 1 file changed, 191 insertions(+), 181 deletions(-) diff --git a/snscrape/modules/telegram.py b/snscrape/modules/telegram.py index 2bfb0772..50cfd98d 100644 --- a/snscrape/modules/telegram.py +++ b/snscrape/modules/telegram.py @@ -1,6 +1,5 @@ __all__ = ['LinkPreview', 'TelegramPost', 'Channel', 'TelegramChannelScraper'] - import bs4 import dataclasses import datetime @@ -10,202 +9,213 @@ import typing import urllib.parse - _logger = logging.getLogger(__name__) _SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$') @dataclasses.dataclass class LinkPreview: - href: str - siteName: typing.Optional[str] = None - title: typing.Optional[str] = None - description: typing.Optional[str] = None - image: typing.Optional[str] = None + href: str + siteName: typing.Optional[str] = None + title: typing.Optional[str] = None + description: typing.Optional[str] = None + image: typing.Optional[str] = None @dataclasses.dataclass class TelegramPost(snscrape.base.Item): - url: str - date: datetime.datetime - content: str - outlinks: list - linkPreview: typing.Optional[LinkPreview] = None + url: str + date: datetime.datetime + content: str + outlinks: list + linkPreview: typing.Optional[LinkPreview] = None - outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks') + outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks') - def __str__(self): - return self.url + def __str__(self): + return self.url @dataclasses.dataclass class Channel(snscrape.base.Entity): - username: str - title: str - verified: bool - photo: str - description: typing.Optional[str] = None - members: typing.Optional[int] = None - photos: typing.Optional[snscrape.base.IntWithGranularity] = None - videos: typing.Optional[snscrape.base.IntWithGranularity] = None - links: typing.Optional[snscrape.base.IntWithGranularity] = None - files: typing.Optional[snscrape.base.IntWithGranularity] = None - - photosGranularity = snscrape.base._DeprecatedProperty('photosGranularity', lambda self: self.photos.granularity, 'photos.granularity') - videosGranularity = snscrape.base._DeprecatedProperty('videosGranularity', lambda self: self.videos.granularity, 'videos.granularity') - linksGranularity = snscrape.base._DeprecatedProperty('linksGranularity', lambda self: self.links.granularity, 'links.granularity') - filesGranularity = snscrape.base._DeprecatedProperty('filesGranularity', lambda self: self.files.granularity, 'files.granularity') - - def __str__(self): - return f'https://t.me/s/{self.username}' + username: str + title: str + verified: bool + photo: str + description: typing.Optional[str] = None + members: typing.Optional[int] = None + photos: typing.Optional[snscrape.base.IntWithGranularity] = None + videos: typing.Optional[snscrape.base.IntWithGranularity] = None + links: typing.Optional[snscrape.base.IntWithGranularity] = None + files: typing.Optional[snscrape.base.IntWithGranularity] = None + + photosGranularity = snscrape.base._DeprecatedProperty('photosGranularity', lambda self: self.photos.granularity, + 'photos.granularity') + videosGranularity = snscrape.base._DeprecatedProperty('videosGranularity', lambda self: self.videos.granularity, + 'videos.granularity') + linksGranularity = snscrape.base._DeprecatedProperty('linksGranularity', lambda self: self.links.granularity, + 'links.granularity') + filesGranularity = snscrape.base._DeprecatedProperty('filesGranularity', lambda self: self.files.granularity, + 'files.granularity') + + def __str__(self): + return f'https://t.me/s/{self.username}' class TelegramChannelScraper(snscrape.base.Scraper): - name = 'telegram-channel' - - def __init__(self, name, **kwargs): - super().__init__(**kwargs) - self._name = name - self._headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'} - self._initialPage = None - self._initialPageSoup = None - - def _initial_page(self): - if self._initialPage is None: - r = self._get(f'https://t.me/s/{self._name}', headers = self._headers) - if r.status_code != 200: - raise snscrape.base.ScraperException(f'Got status code {r.status_code}') - self._initialPage, self._initialPageSoup = r, bs4.BeautifulSoup(r.text, 'lxml') - return self._initialPage, self._initialPageSoup - - def _soup_to_items(self, soup, pageUrl, onlyUsername = False): - posts = soup.find_all('div', attrs = {'class': 'tgme_widget_message', 'data-post': True}) - for post in reversed(posts): - if onlyUsername: - yield post['data-post'].split('/')[0] - return - dateDiv = post.find('div', class_ = 'tgme_widget_message_footer').find('a', class_ = 'tgme_widget_message_date') - rawUrl = dateDiv['href'] - if not rawUrl.startswith('https://t.me/') or sum(x == '/' for x in rawUrl) != 4 or rawUrl.rsplit('/', 1)[1].strip('0123456789') != '': - _logger.warning(f'Possibly incorrect URL: {rawUrl!r}') - url = rawUrl.replace('//t.me/', '//t.me/s/') - date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z') - if (message := post.find('div', class_ = 'tgme_widget_message_text')): - content = self.get_post_text(message) - outlinks = [] - for link in post.find_all('a'): - if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')): - # Author links at the top (avatar and name) - continue - if link['href'] == rawUrl or link['href'] == url: - # Generic filter of links to the post itself, catches videos, photos, and the date link - continue - if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']): - # Individual photo or video link - continue - href = urllib.parse.urljoin(pageUrl, link['href']) - if href not in outlinks: - outlinks.append(href) - else: - content = None - outlinks = [] - linkPreview = None - if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')): - kwargs = {} - kwargs['href'] = urllib.parse.urljoin(pageUrl, linkPreviewA['href']) - if (siteNameDiv := linkPreviewA.find('div', class_ = 'link_preview_site_name')): - kwargs['siteName'] = siteNameDiv.text - if (titleDiv := linkPreviewA.find('div', class_ = 'link_preview_title')): - kwargs['title'] = titleDiv.text - if (descriptionDiv := linkPreviewA.find('div', class_ = 'link_preview_description')): - kwargs['description'] = descriptionDiv.text - if (imageI := linkPreviewA.find('i', class_ = 'link_preview_image')): - if imageI['style'].startswith("background-image:url('"): - kwargs['image'] = imageI['style'][22 : imageI['style'].index("'", 22)] - else: - _logger.warning(f'Could not process link preview image on {url}') - linkPreview = LinkPreview(**kwargs) - yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, linkPreview = linkPreview) - - def get_items(self): - r, soup = self._initial_page() - if '/s/' not in r.url: - _logger.warning('No public post list for this user') - return - while True: - yield from self._soup_to_items(soup, r.url) - pageLink = soup.find('a', attrs = {'class': 'tme_messages_more', 'data-before': True}) - if not pageLink: - break - nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href']) - r = self._get(nextPageUrl, headers = self._headers) - if r.status_code != 200: - raise snscrape.base.ScraperException(f'Got status code {r.status_code}') - soup = bs4.BeautifulSoup(r.text, 'lxml') - - def _get_entity(self): - kwargs = {} - # /channel has a more accurate member count and bigger profile picture - r = self._get(f'https://t.me/{self._name}', headers = self._headers) - if r.status_code != 200: - raise snscrape.base.ScraperException(f'Got status code {r.status_code}') - soup = bs4.BeautifulSoup(r.text, 'lxml') - membersDiv = soup.find('div', class_ = 'tgme_page_extra') - if membersDiv.text.endswith(' members'): - kwargs['members'] = int(membersDiv.text[:-8].replace(' ', '')) - kwargs['photo'] = soup.find('img', class_ = 'tgme_page_photo_image').attrs['src'] - - r, soup = self._initial_page() - if '/s/' not in r.url: # Redirect on channels without public posts - return - channelInfoDiv = soup.find('div', class_ = 'tgme_channel_info') - assert channelInfoDiv, 'channel info div not found' - titleDiv = channelInfoDiv.find('div', class_ = 'tgme_channel_info_header_title') - kwargs['title'] = titleDiv.find('span').text - kwargs['verified'] = bool(titleDiv.find('i', class_ = 'verified-icon')) - # The username in the channel info is not canonicalised, nor is the one on the /channel page anywhere. - # However, the post URLs are, so extract the first post and use that. - try: - kwargs['username'] = next(self._soup_to_items(soup, r.url, onlyUsername = True)) - except StopIteration: - # If there are no posts, fall back to the channel info div, although that should never happen due to the 'Channel created' entry. - _logger.warning('Could not find a post; extracting username from channel info div, which may not be capitalised correctly') - kwargs['username'] = channelInfoDiv.find('div', class_ = 'tgme_channel_info_header_username').text[1:] # Remove @ - if (descriptionDiv := channelInfoDiv.find('div', class_ = 'tgme_channel_info_description')): - kwargs['description'] = descriptionDiv.text - - def parse_num(s): - s = s.replace(' ', '') - if s.endswith('M'): - return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1])) - elif s.endswith('K'): - return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1])) - else: - return int(s), 1 - - for div in channelInfoDiv.find_all('div', class_ = 'tgme_channel_info_counter'): - value, granularity = parse_num(div.find('span', class_ = 'counter_value').text) - type_ = div.find('span', class_ = 'counter_type').text - if type_ == 'members': - # Already extracted more accurately from /channel, skip - continue - elif type_ in ('photos', 'videos', 'links', 'files'): - kwargs[type_] = snscrape.base.IntWithGranularity(value, granularity) - - return Channel(**kwargs) - - @staticmethod - def get_post_text(post) -> str: - result = [] - for s in post.stripped_strings: - result.append(s) - return '\n'.join(result) - - - @classmethod - def cli_setup_parser(cls, subparser): - subparser.add_argument('channel', type = snscrape.base.nonempty_string('channel'), help = 'A channel name') - - @classmethod - def cli_from_args(cls, args): - return cls.cli_construct(args, args.channel) + name = 'telegram-channel' + + def __init__(self, name, **kwargs): + super().__init__(**kwargs) + self._name = name + self._headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' + 'Chrome/81.0.4044.138 Safari/537.36'} + self._initialPage = None + self._initialPageSoup = None + + def _initial_page(self): + if self._initialPage is None: + r = self._get(f'https://t.me/s/{self._name}', headers=self._headers) + if r.status_code != 200: + raise snscrape.base.ScraperException(f'Got status code {r.status_code}') + self._initialPage, self._initialPageSoup = r, bs4.BeautifulSoup(r.text, 'lxml') + return self._initialPage, self._initialPageSoup + + def _soup_to_items(self, soup, pageUrl, onlyUsername=False): + posts = soup.find_all('div', attrs={'class': 'tgme_widget_message', 'data-post': True}) + for post in reversed(posts): + if onlyUsername: + yield post['data-post'].split('/')[0] + return + dateDiv = post.find('div', class_='tgme_widget_message_footer').find('a', class_='tgme_widget_message_date') + rawUrl = dateDiv['href'] + if not rawUrl.startswith('https://t.me/') or sum(x == '/' for x in rawUrl) != 4 or rawUrl.rsplit('/', 1)[ + 1].strip('0123456789') != '': + _logger.warning(f'Possibly incorrect URL: {rawUrl!r}') + url = rawUrl.replace('//t.me/', '//t.me/s/') + date = datetime.datetime.strptime( + dateDiv.find('time', datetime=True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z') + if message := post.find('div', class_='tgme_widget_message_text'): + content = self.get_post_text(message) + outlinks = [] + for link in post.find_all('a'): + if any(x in link.parent.attrs.get('class', []) for x in + ('tgme_widget_message_user', 'tgme_widget_message_author')): + # Author links at the top (avatar and name) + continue + if link['href'] == rawUrl or link['href'] == url: + # Generic filter of links to the post itself, catches videos, photos, and the date link + continue + if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']): + # Individual photo or video link + continue + href = urllib.parse.urljoin(pageUrl, link['href']) + if href not in outlinks: + outlinks.append(href) + else: + content = None + outlinks = [] + linkPreview = None + if linkPreviewA := post.find('a', class_='time_widget_message_link_preview'): + kwargs = {} + kwargs['href'] = urllib.parse.urljoin(pageUrl, linkPreviewA['href']) + if siteNameDiv := linkPreviewA.find('div', class_='link_preview_site_name'): + kwargs['siteName'] = siteNameDiv.text + if titleDiv := linkPreviewA.find('div', class_='link_preview_title'): + kwargs['title'] = titleDiv.text + if descriptionDiv := linkPreviewA.find('div', class_='link_preview_description'): + kwargs['description'] = descriptionDiv.text + if imageI := linkPreviewA.find('i', class_='link_preview_image'): + if imageI['style'].startswith("background-image:url('"): + kwargs['image'] = imageI['style'][22: imageI['style'].index("'", 22)] + else: + _logger.warning(f'Could not process link preview image on {url}') + linkPreview = LinkPreview(**kwargs) + yield TelegramPost(url=url, date=date, content=content, outlinks=outlinks, linkPreview=linkPreview) + + def get_items(self): + r, soup = self._initial_page() + if '/s/' not in r.url: + _logger.warning('No public post list for this user') + return + while True: + yield from self._soup_to_items(soup, r.url) + pageLink = soup.find('a', attrs={'class': 'tme_messages_more', 'data-before': True}) + if not pageLink: + break + nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href']) + r = self._get(nextPageUrl, headers=self._headers) + if r.status_code != 200: + raise snscrape.base.ScraperException(f'Got status code {r.status_code}') + soup = bs4.BeautifulSoup(r.text, 'lxml') + + def _get_entity(self): + kwargs = {} + # /channel has a more accurate member count and bigger profile picture + r = self._get(f'https://t.me/{self._name}', headers=self._headers) + if r.status_code != 200: + raise snscrape.base.ScraperException(f'Got status code {r.status_code}') + soup = bs4.BeautifulSoup(r.text, 'lxml') + membersDiv = soup.find('div', class_='tgme_page_extra') + if membersDiv.text.endswith(' members'): + kwargs['members'] = int(membersDiv.text[:-8].replace(' ', '')) + kwargs['photo'] = soup.find('img', class_='tgme_page_photo_image').attrs['src'] + + r, soup = self._initial_page() + if '/s/' not in r.url: # Redirect on channels without public posts + return + channelInfoDiv = soup.find('div', class_='tgme_channel_info') + assert channelInfoDiv, 'channel info div not found' + titleDiv = channelInfoDiv.find('div', class_='tgme_channel_info_header_title') + kwargs['title'] = titleDiv.find('span').text + kwargs['verified'] = bool(titleDiv.find('i', class_='verified-icon')) + # The username in the channel info is not canonicalised, nor is the one on the /channel page anywhere. + # However, the post URLs are, so extract the first post and use that. + try: + kwargs['username'] = next(self._soup_to_items(soup, r.url, onlyUsername=True)) + except StopIteration: + # If there are no posts, fall back to the channel info div, although that should never happen due to the + # 'Channel created' entry. + _logger.warning( + 'Could not find a post; extracting username from channel info div, which may not be capitalised ' + 'correctly') + kwargs['username'] = channelInfoDiv.find('div', class_='tgme_channel_info_header_username').text[ + 1:] # Remove @ + if descriptionDiv := channelInfoDiv.find('div', class_='tgme_channel_info_description'): + kwargs['description'] = descriptionDiv.text + + def parse_num(s): + s = s.replace(' ', '') + if s.endswith('M'): + return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1])) + elif s.endswith('K'): + return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1])) + else: + return int(s), 1 + + for div in channelInfoDiv.find_all('div', class_='tgme_channel_info_counter'): + value, granularity = parse_num(div.find('span', class_='counter_value').text) + type_ = div.find('span', class_='counter_type').text + if type_ == 'members': + # Already extracted more accurately from /channel, skip + continue + elif type_ in ('photos', 'videos', 'links', 'files'): + kwargs[type_] = snscrape.base.IntWithGranularity(value, granularity) + + return Channel(**kwargs) + + @staticmethod + def get_post_text(post) -> str: + result = [] + for s in post.stripped_strings: + result.append(s) + return '\n'.join(result) + + @classmethod + def _cli_setup_parser(cls, subparser): + subparser.add_argument('channel', type=snscrape.base.nonempty_string('channel'), help='A channel name') + + @classmethod + def cli_from_args(cls, args): + return cls._cli_construct(args, args.channel) From ed3d520e1aa5cec055119c5f3b09bb5f816f177e Mon Sep 17 00:00:00 2001 From: maskbnmask Date: Thu, 2 Feb 2023 20:02:30 +0330 Subject: [PATCH 4/5] revert identations because of the request of the repository owner --- snscrape/modules/telegram.py | 107 ++++++++++++++++------------------- 1 file changed, 48 insertions(+), 59 deletions(-) diff --git a/snscrape/modules/telegram.py b/snscrape/modules/telegram.py index 50cfd98d..13d51678 100644 --- a/snscrape/modules/telegram.py +++ b/snscrape/modules/telegram.py @@ -1,5 +1,6 @@ __all__ = ['LinkPreview', 'TelegramPost', 'Channel', 'TelegramChannelScraper'] + import bs4 import dataclasses import datetime @@ -9,6 +10,7 @@ import typing import urllib.parse + _logger = logging.getLogger(__name__) _SINGLE_MEDIA_LINK_PATTERN = re.compile(r'^https://t\.me/[^/]+/\d+\?single$') @@ -49,14 +51,10 @@ class Channel(snscrape.base.Entity): links: typing.Optional[snscrape.base.IntWithGranularity] = None files: typing.Optional[snscrape.base.IntWithGranularity] = None - photosGranularity = snscrape.base._DeprecatedProperty('photosGranularity', lambda self: self.photos.granularity, - 'photos.granularity') - videosGranularity = snscrape.base._DeprecatedProperty('videosGranularity', lambda self: self.videos.granularity, - 'videos.granularity') - linksGranularity = snscrape.base._DeprecatedProperty('linksGranularity', lambda self: self.links.granularity, - 'links.granularity') - filesGranularity = snscrape.base._DeprecatedProperty('filesGranularity', lambda self: self.files.granularity, - 'files.granularity') + photosGranularity = snscrape.base._DeprecatedProperty('photosGranularity', lambda self: self.photos.granularity, 'photos.granularity') + videosGranularity = snscrape.base._DeprecatedProperty('videosGranularity', lambda self: self.videos.granularity, 'videos.granularity') + linksGranularity = snscrape.base._DeprecatedProperty('linksGranularity', lambda self: self.links.granularity, 'links.granularity') + filesGranularity = snscrape.base._DeprecatedProperty('filesGranularity', lambda self: self.files.granularity, 'files.granularity') def __str__(self): return f'https://t.me/s/{self.username}' @@ -68,40 +66,35 @@ class TelegramChannelScraper(snscrape.base.Scraper): def __init__(self, name, **kwargs): super().__init__(**kwargs) self._name = name - self._headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' - 'Chrome/81.0.4044.138 Safari/537.36'} + self._headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'} self._initialPage = None self._initialPageSoup = None def _initial_page(self): if self._initialPage is None: - r = self._get(f'https://t.me/s/{self._name}', headers=self._headers) + r = self._get(f'https://t.me/s/{self._name}', headers = self._headers) if r.status_code != 200: raise snscrape.base.ScraperException(f'Got status code {r.status_code}') self._initialPage, self._initialPageSoup = r, bs4.BeautifulSoup(r.text, 'lxml') return self._initialPage, self._initialPageSoup - def _soup_to_items(self, soup, pageUrl, onlyUsername=False): - posts = soup.find_all('div', attrs={'class': 'tgme_widget_message', 'data-post': True}) + def _soup_to_items(self, soup, pageUrl, onlyUsername = False): + posts = soup.find_all('div', attrs = {'class': 'tgme_widget_message', 'data-post': True}) for post in reversed(posts): if onlyUsername: yield post['data-post'].split('/')[0] return - dateDiv = post.find('div', class_='tgme_widget_message_footer').find('a', class_='tgme_widget_message_date') + dateDiv = post.find('div', class_ = 'tgme_widget_message_footer').find('a', class_ = 'tgme_widget_message_date') rawUrl = dateDiv['href'] - if not rawUrl.startswith('https://t.me/') or sum(x == '/' for x in rawUrl) != 4 or rawUrl.rsplit('/', 1)[ - 1].strip('0123456789') != '': + if not rawUrl.startswith('https://t.me/') or sum(x == '/' for x in rawUrl) != 4 or rawUrl.rsplit('/', 1)[1].strip('0123456789') != '': _logger.warning(f'Possibly incorrect URL: {rawUrl!r}') url = rawUrl.replace('//t.me/', '//t.me/s/') - date = datetime.datetime.strptime( - dateDiv.find('time', datetime=True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z') - if message := post.find('div', class_='tgme_widget_message_text'): + date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z') + if (message := post.find('div', class_ = 'tgme_widget_message_text')): content = self.get_post_text(message) outlinks = [] for link in post.find_all('a'): - if any(x in link.parent.attrs.get('class', []) for x in - ('tgme_widget_message_user', 'tgme_widget_message_author')): + if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')): # Author links at the top (avatar and name) continue if link['href'] == rawUrl or link['href'] == url: @@ -117,22 +110,22 @@ def _soup_to_items(self, soup, pageUrl, onlyUsername=False): content = None outlinks = [] linkPreview = None - if linkPreviewA := post.find('a', class_='time_widget_message_link_preview'): + if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')): kwargs = {} kwargs['href'] = urllib.parse.urljoin(pageUrl, linkPreviewA['href']) - if siteNameDiv := linkPreviewA.find('div', class_='link_preview_site_name'): + if (siteNameDiv := linkPreviewA.find('div', class_ = 'link_preview_site_name')): kwargs['siteName'] = siteNameDiv.text - if titleDiv := linkPreviewA.find('div', class_='link_preview_title'): + if (titleDiv := linkPreviewA.find('div', class_ = 'link_preview_title')): kwargs['title'] = titleDiv.text - if descriptionDiv := linkPreviewA.find('div', class_='link_preview_description'): + if (descriptionDiv := linkPreviewA.find('div', class_ = 'link_preview_description')): kwargs['description'] = descriptionDiv.text - if imageI := linkPreviewA.find('i', class_='link_preview_image'): + if (imageI := linkPreviewA.find('i', class_ = 'link_preview_image')): if imageI['style'].startswith("background-image:url('"): - kwargs['image'] = imageI['style'][22: imageI['style'].index("'", 22)] + kwargs['image'] = imageI['style'][22 : imageI['style'].index("'", 22)] else: _logger.warning(f'Could not process link preview image on {url}') linkPreview = LinkPreview(**kwargs) - yield TelegramPost(url=url, date=date, content=content, outlinks=outlinks, linkPreview=linkPreview) + yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, linkPreview = linkPreview) def get_items(self): r, soup = self._initial_page() @@ -141,48 +134,51 @@ def get_items(self): return while True: yield from self._soup_to_items(soup, r.url) - pageLink = soup.find('a', attrs={'class': 'tme_messages_more', 'data-before': True}) + pageLink = soup.find('a', attrs = {'class': 'tme_messages_more', 'data-before': True}) if not pageLink: break nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href']) - r = self._get(nextPageUrl, headers=self._headers) + r = self._get(nextPageUrl, headers = self._headers) if r.status_code != 200: raise snscrape.base.ScraperException(f'Got status code {r.status_code}') soup = bs4.BeautifulSoup(r.text, 'lxml') + @staticmethod + def get_post_text(post) -> str: + result = [] + for s in post.stripped_strings: + result.append(s) + return '\n'.join(result) + def _get_entity(self): kwargs = {} # /channel has a more accurate member count and bigger profile picture - r = self._get(f'https://t.me/{self._name}', headers=self._headers) + r = self._get(f'https://t.me/{self._name}', headers = self._headers) if r.status_code != 200: raise snscrape.base.ScraperException(f'Got status code {r.status_code}') soup = bs4.BeautifulSoup(r.text, 'lxml') - membersDiv = soup.find('div', class_='tgme_page_extra') + membersDiv = soup.find('div', class_ = 'tgme_page_extra') if membersDiv.text.endswith(' members'): kwargs['members'] = int(membersDiv.text[:-8].replace(' ', '')) - kwargs['photo'] = soup.find('img', class_='tgme_page_photo_image').attrs['src'] + kwargs['photo'] = soup.find('img', class_ = 'tgme_page_photo_image').attrs['src'] r, soup = self._initial_page() - if '/s/' not in r.url: # Redirect on channels without public posts + if '/s/' not in r.url: # Redirect on channels without public posts return - channelInfoDiv = soup.find('div', class_='tgme_channel_info') + channelInfoDiv = soup.find('div', class_ = 'tgme_channel_info') assert channelInfoDiv, 'channel info div not found' - titleDiv = channelInfoDiv.find('div', class_='tgme_channel_info_header_title') + titleDiv = channelInfoDiv.find('div', class_ = 'tgme_channel_info_header_title') kwargs['title'] = titleDiv.find('span').text - kwargs['verified'] = bool(titleDiv.find('i', class_='verified-icon')) + kwargs['verified'] = bool(titleDiv.find('i', class_ = 'verified-icon')) # The username in the channel info is not canonicalised, nor is the one on the /channel page anywhere. # However, the post URLs are, so extract the first post and use that. try: - kwargs['username'] = next(self._soup_to_items(soup, r.url, onlyUsername=True)) + kwargs['username'] = next(self._soup_to_items(soup, r.url, onlyUsername = True)) except StopIteration: - # If there are no posts, fall back to the channel info div, although that should never happen due to the - # 'Channel created' entry. - _logger.warning( - 'Could not find a post; extracting username from channel info div, which may not be capitalised ' - 'correctly') - kwargs['username'] = channelInfoDiv.find('div', class_='tgme_channel_info_header_username').text[ - 1:] # Remove @ - if descriptionDiv := channelInfoDiv.find('div', class_='tgme_channel_info_description'): + # If there are no posts, fall back to the channel info div, although that should never happen due to the 'Channel created' entry. + _logger.warning('Could not find a post; extracting username from channel info div, which may not be capitalised correctly') + kwargs['username'] = channelInfoDiv.find('div', class_ = 'tgme_channel_info_header_username').text[1:] # Remove @ + if (descriptionDiv := channelInfoDiv.find('div', class_ = 'tgme_channel_info_description')): kwargs['description'] = descriptionDiv.text def parse_num(s): @@ -194,9 +190,9 @@ def parse_num(s): else: return int(s), 1 - for div in channelInfoDiv.find_all('div', class_='tgme_channel_info_counter'): - value, granularity = parse_num(div.find('span', class_='counter_value').text) - type_ = div.find('span', class_='counter_type').text + for div in channelInfoDiv.find_all('div', class_ = 'tgme_channel_info_counter'): + value, granularity = parse_num(div.find('span', class_ = 'counter_value').text) + type_ = div.find('span', class_ = 'counter_type').text if type_ == 'members': # Already extracted more accurately from /channel, skip continue @@ -205,17 +201,10 @@ def parse_num(s): return Channel(**kwargs) - @staticmethod - def get_post_text(post) -> str: - result = [] - for s in post.stripped_strings: - result.append(s) - return '\n'.join(result) - @classmethod def _cli_setup_parser(cls, subparser): - subparser.add_argument('channel', type=snscrape.base.nonempty_string('channel'), help='A channel name') + subparser.add_argument('channel', type = snscrape.base.nonempty_string('channel'), help = 'A channel name') @classmethod - def cli_from_args(cls, args): + def _cli_from_args(cls, args): return cls._cli_construct(args, args.channel) From 41aeff1f7e4df3ddc90cb313275ccf2e117fefff Mon Sep 17 00:00:00 2001 From: maskbnmask Date: Sun, 5 Feb 2023 11:44:33 +0330 Subject: [PATCH 5/5] Just copy the source code and add my function to it without any other change --- snscrape/modules/telegram.py | 343 ++++++++++++++++++----------------- 1 file changed, 172 insertions(+), 171 deletions(-) diff --git a/snscrape/modules/telegram.py b/snscrape/modules/telegram.py index 13d51678..ea7b6556 100644 --- a/snscrape/modules/telegram.py +++ b/snscrape/modules/telegram.py @@ -17,194 +17,195 @@ @dataclasses.dataclass class LinkPreview: - href: str - siteName: typing.Optional[str] = None - title: typing.Optional[str] = None - description: typing.Optional[str] = None - image: typing.Optional[str] = None + href: str + siteName: typing.Optional[str] = None + title: typing.Optional[str] = None + description: typing.Optional[str] = None + image: typing.Optional[str] = None @dataclasses.dataclass class TelegramPost(snscrape.base.Item): - url: str - date: datetime.datetime - content: str - outlinks: list - linkPreview: typing.Optional[LinkPreview] = None + url: str + date: datetime.datetime + content: str + outlinks: list + linkPreview: typing.Optional[LinkPreview] = None - outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks') + outlinksss = snscrape.base._DeprecatedProperty('outlinksss', lambda self: ' '.join(self.outlinks), 'outlinks') - def __str__(self): - return self.url + def __str__(self): + return self.url @dataclasses.dataclass class Channel(snscrape.base.Entity): - username: str - title: str - verified: bool - photo: str - description: typing.Optional[str] = None - members: typing.Optional[int] = None - photos: typing.Optional[snscrape.base.IntWithGranularity] = None - videos: typing.Optional[snscrape.base.IntWithGranularity] = None - links: typing.Optional[snscrape.base.IntWithGranularity] = None - files: typing.Optional[snscrape.base.IntWithGranularity] = None - - photosGranularity = snscrape.base._DeprecatedProperty('photosGranularity', lambda self: self.photos.granularity, 'photos.granularity') - videosGranularity = snscrape.base._DeprecatedProperty('videosGranularity', lambda self: self.videos.granularity, 'videos.granularity') - linksGranularity = snscrape.base._DeprecatedProperty('linksGranularity', lambda self: self.links.granularity, 'links.granularity') - filesGranularity = snscrape.base._DeprecatedProperty('filesGranularity', lambda self: self.files.granularity, 'files.granularity') - - def __str__(self): - return f'https://t.me/s/{self.username}' + username: str + title: str + verified: bool + photo: str + description: typing.Optional[str] = None + members: typing.Optional[int] = None + photos: typing.Optional[snscrape.base.IntWithGranularity] = None + videos: typing.Optional[snscrape.base.IntWithGranularity] = None + links: typing.Optional[snscrape.base.IntWithGranularity] = None + files: typing.Optional[snscrape.base.IntWithGranularity] = None + + photosGranularity = snscrape.base._DeprecatedProperty('photosGranularity', lambda self: self.photos.granularity, 'photos.granularity') + videosGranularity = snscrape.base._DeprecatedProperty('videosGranularity', lambda self: self.videos.granularity, 'videos.granularity') + linksGranularity = snscrape.base._DeprecatedProperty('linksGranularity', lambda self: self.links.granularity, 'links.granularity') + filesGranularity = snscrape.base._DeprecatedProperty('filesGranularity', lambda self: self.files.granularity, 'files.granularity') + + def __str__(self): + return f'https://t.me/s/{self.username}' class TelegramChannelScraper(snscrape.base.Scraper): - name = 'telegram-channel' - - def __init__(self, name, **kwargs): - super().__init__(**kwargs) - self._name = name - self._headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'} - self._initialPage = None - self._initialPageSoup = None - - def _initial_page(self): - if self._initialPage is None: - r = self._get(f'https://t.me/s/{self._name}', headers = self._headers) - if r.status_code != 200: - raise snscrape.base.ScraperException(f'Got status code {r.status_code}') - self._initialPage, self._initialPageSoup = r, bs4.BeautifulSoup(r.text, 'lxml') - return self._initialPage, self._initialPageSoup - - def _soup_to_items(self, soup, pageUrl, onlyUsername = False): - posts = soup.find_all('div', attrs = {'class': 'tgme_widget_message', 'data-post': True}) - for post in reversed(posts): - if onlyUsername: - yield post['data-post'].split('/')[0] - return - dateDiv = post.find('div', class_ = 'tgme_widget_message_footer').find('a', class_ = 'tgme_widget_message_date') - rawUrl = dateDiv['href'] - if not rawUrl.startswith('https://t.me/') or sum(x == '/' for x in rawUrl) != 4 or rawUrl.rsplit('/', 1)[1].strip('0123456789') != '': - _logger.warning(f'Possibly incorrect URL: {rawUrl!r}') - url = rawUrl.replace('//t.me/', '//t.me/s/') - date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z') - if (message := post.find('div', class_ = 'tgme_widget_message_text')): - content = self.get_post_text(message) - outlinks = [] - for link in post.find_all('a'): - if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')): - # Author links at the top (avatar and name) - continue - if link['href'] == rawUrl or link['href'] == url: - # Generic filter of links to the post itself, catches videos, photos, and the date link - continue - if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']): - # Individual photo or video link - continue - href = urllib.parse.urljoin(pageUrl, link['href']) - if href not in outlinks: - outlinks.append(href) - else: - content = None - outlinks = [] - linkPreview = None - if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')): - kwargs = {} - kwargs['href'] = urllib.parse.urljoin(pageUrl, linkPreviewA['href']) - if (siteNameDiv := linkPreviewA.find('div', class_ = 'link_preview_site_name')): - kwargs['siteName'] = siteNameDiv.text - if (titleDiv := linkPreviewA.find('div', class_ = 'link_preview_title')): - kwargs['title'] = titleDiv.text - if (descriptionDiv := linkPreviewA.find('div', class_ = 'link_preview_description')): - kwargs['description'] = descriptionDiv.text - if (imageI := linkPreviewA.find('i', class_ = 'link_preview_image')): - if imageI['style'].startswith("background-image:url('"): - kwargs['image'] = imageI['style'][22 : imageI['style'].index("'", 22)] - else: - _logger.warning(f'Could not process link preview image on {url}') - linkPreview = LinkPreview(**kwargs) - yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, linkPreview = linkPreview) - - def get_items(self): - r, soup = self._initial_page() - if '/s/' not in r.url: - _logger.warning('No public post list for this user') - return - while True: - yield from self._soup_to_items(soup, r.url) - pageLink = soup.find('a', attrs = {'class': 'tme_messages_more', 'data-before': True}) - if not pageLink: - break - nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href']) - r = self._get(nextPageUrl, headers = self._headers) - if r.status_code != 200: - raise snscrape.base.ScraperException(f'Got status code {r.status_code}') - soup = bs4.BeautifulSoup(r.text, 'lxml') + name = 'telegram-channel' + + def __init__(self, name, **kwargs): + super().__init__(**kwargs) + self._name = name + self._headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'} + self._initialPage = None + self._initialPageSoup = None + + def _initial_page(self): + if self._initialPage is None: + r = self._get(f'https://t.me/s/{self._name}', headers = self._headers) + if r.status_code != 200: + raise snscrape.base.ScraperException(f'Got status code {r.status_code}') + self._initialPage, self._initialPageSoup = r, bs4.BeautifulSoup(r.text, 'lxml') + return self._initialPage, self._initialPageSoup + + def _soup_to_items(self, soup, pageUrl, onlyUsername = False): + posts = soup.find_all('div', attrs = {'class': 'tgme_widget_message', 'data-post': True}) + for post in reversed(posts): + if onlyUsername: + yield post['data-post'].split('/')[0] + return + dateDiv = post.find('div', class_ = 'tgme_widget_message_footer').find('a', class_ = 'tgme_widget_message_date') + rawUrl = dateDiv['href'] + if not rawUrl.startswith('https://t.me/') or sum(x == '/' for x in rawUrl) != 4 or rawUrl.rsplit('/', 1)[1].strip('0123456789') != '': + _logger.warning(f'Possibly incorrect URL: {rawUrl!r}') + url = rawUrl.replace('//t.me/', '//t.me/s/') + date = datetime.datetime.strptime(dateDiv.find('time', datetime = True)['datetime'].replace('-', '', 2).replace(':', ''), '%Y%m%dT%H%M%S%z') + if (message := post.find('div', class_ = 'tgme_widget_message_text')): + content = self.get_post_text(message) + outlinks = [] + for link in post.find_all('a'): + if any(x in link.parent.attrs.get('class', []) for x in ('tgme_widget_message_user', 'tgme_widget_message_author')): + # Author links at the top (avatar and name) + continue + if link['href'] == rawUrl or link['href'] == url: + # Generic filter of links to the post itself, catches videos, photos, and the date link + continue + if _SINGLE_MEDIA_LINK_PATTERN.match(link['href']): + # Individual photo or video link + continue + href = urllib.parse.urljoin(pageUrl, link['href']) + if href not in outlinks: + outlinks.append(href) + else: + content = None + outlinks = [] + linkPreview = None + if (linkPreviewA := post.find('a', class_ = 'tgme_widget_message_link_preview')): + kwargs = {} + kwargs['href'] = urllib.parse.urljoin(pageUrl, linkPreviewA['href']) + if (siteNameDiv := linkPreviewA.find('div', class_ = 'link_preview_site_name')): + kwargs['siteName'] = siteNameDiv.text + if (titleDiv := linkPreviewA.find('div', class_ = 'link_preview_title')): + kwargs['title'] = titleDiv.text + if (descriptionDiv := linkPreviewA.find('div', class_ = 'link_preview_description')): + kwargs['description'] = descriptionDiv.text + if (imageI := linkPreviewA.find('i', class_ = 'link_preview_image')): + if imageI['style'].startswith("background-image:url('"): + kwargs['image'] = imageI['style'][22 : imageI['style'].index("'", 22)] + else: + _logger.warning(f'Could not process link preview image on {url}') + linkPreview = LinkPreview(**kwargs) + yield TelegramPost(url = url, date = date, content = content, outlinks = outlinks, linkPreview = linkPreview) + + def get_items(self): + r, soup = self._initial_page() + if '/s/' not in r.url: + _logger.warning('No public post list for this user') + return + while True: + yield from self._soup_to_items(soup, r.url) + pageLink = soup.find('a', attrs = {'class': 'tme_messages_more', 'data-before': True}) + if not pageLink: + break + nextPageUrl = urllib.parse.urljoin(r.url, pageLink['href']) + r = self._get(nextPageUrl, headers = self._headers) + if r.status_code != 200: + raise snscrape.base.ScraperException(f'Got status code {r.status_code}') + soup = bs4.BeautifulSoup(r.text, 'lxml') @staticmethod def get_post_text(post) -> str: result = [] + # Using the features of the BS4 module itself for s in post.stripped_strings: result.append(s) return '\n'.join(result) - - def _get_entity(self): - kwargs = {} - # /channel has a more accurate member count and bigger profile picture - r = self._get(f'https://t.me/{self._name}', headers = self._headers) - if r.status_code != 200: - raise snscrape.base.ScraperException(f'Got status code {r.status_code}') - soup = bs4.BeautifulSoup(r.text, 'lxml') - membersDiv = soup.find('div', class_ = 'tgme_page_extra') - if membersDiv.text.endswith(' members'): - kwargs['members'] = int(membersDiv.text[:-8].replace(' ', '')) - kwargs['photo'] = soup.find('img', class_ = 'tgme_page_photo_image').attrs['src'] - - r, soup = self._initial_page() - if '/s/' not in r.url: # Redirect on channels without public posts - return - channelInfoDiv = soup.find('div', class_ = 'tgme_channel_info') - assert channelInfoDiv, 'channel info div not found' - titleDiv = channelInfoDiv.find('div', class_ = 'tgme_channel_info_header_title') - kwargs['title'] = titleDiv.find('span').text - kwargs['verified'] = bool(titleDiv.find('i', class_ = 'verified-icon')) - # The username in the channel info is not canonicalised, nor is the one on the /channel page anywhere. - # However, the post URLs are, so extract the first post and use that. - try: - kwargs['username'] = next(self._soup_to_items(soup, r.url, onlyUsername = True)) - except StopIteration: - # If there are no posts, fall back to the channel info div, although that should never happen due to the 'Channel created' entry. - _logger.warning('Could not find a post; extracting username from channel info div, which may not be capitalised correctly') - kwargs['username'] = channelInfoDiv.find('div', class_ = 'tgme_channel_info_header_username').text[1:] # Remove @ - if (descriptionDiv := channelInfoDiv.find('div', class_ = 'tgme_channel_info_description')): - kwargs['description'] = descriptionDiv.text - - def parse_num(s): - s = s.replace(' ', '') - if s.endswith('M'): - return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1])) - elif s.endswith('K'): - return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1])) - else: - return int(s), 1 - - for div in channelInfoDiv.find_all('div', class_ = 'tgme_channel_info_counter'): - value, granularity = parse_num(div.find('span', class_ = 'counter_value').text) - type_ = div.find('span', class_ = 'counter_type').text - if type_ == 'members': - # Already extracted more accurately from /channel, skip - continue - elif type_ in ('photos', 'videos', 'links', 'files'): - kwargs[type_] = snscrape.base.IntWithGranularity(value, granularity) - - return Channel(**kwargs) - - @classmethod - def _cli_setup_parser(cls, subparser): - subparser.add_argument('channel', type = snscrape.base.nonempty_string('channel'), help = 'A channel name') - - @classmethod - def _cli_from_args(cls, args): - return cls._cli_construct(args, args.channel) + + def _get_entity(self): + kwargs = {} + # /channel has a more accurate member count and bigger profile picture + r = self._get(f'https://t.me/{self._name}', headers = self._headers) + if r.status_code != 200: + raise snscrape.base.ScraperException(f'Got status code {r.status_code}') + soup = bs4.BeautifulSoup(r.text, 'lxml') + membersDiv = soup.find('div', class_ = 'tgme_page_extra') + if membersDiv.text.endswith(' members'): + kwargs['members'] = int(membersDiv.text[:-8].replace(' ', '')) + kwargs['photo'] = soup.find('img', class_ = 'tgme_page_photo_image').attrs['src'] + + r, soup = self._initial_page() + if '/s/' not in r.url: # Redirect on channels without public posts + return + channelInfoDiv = soup.find('div', class_ = 'tgme_channel_info') + assert channelInfoDiv, 'channel info div not found' + titleDiv = channelInfoDiv.find('div', class_ = 'tgme_channel_info_header_title') + kwargs['title'] = titleDiv.find('span').text + kwargs['verified'] = bool(titleDiv.find('i', class_ = 'verified-icon')) + # The username in the channel info is not canonicalised, nor is the one on the /channel page anywhere. + # However, the post URLs are, so extract the first post and use that. + try: + kwargs['username'] = next(self._soup_to_items(soup, r.url, onlyUsername = True)) + except StopIteration: + # If there are no posts, fall back to the channel info div, although that should never happen due to the 'Channel created' entry. + _logger.warning('Could not find a post; extracting username from channel info div, which may not be capitalised correctly') + kwargs['username'] = channelInfoDiv.find('div', class_ = 'tgme_channel_info_header_username').text[1:] # Remove @ + if (descriptionDiv := channelInfoDiv.find('div', class_ = 'tgme_channel_info_description')): + kwargs['description'] = descriptionDiv.text + + def parse_num(s): + s = s.replace(' ', '') + if s.endswith('M'): + return int(float(s[:-1]) * 1e6), 10 ** (6 if '.' not in s else 6 - len(s[:-1].split('.')[1])) + elif s.endswith('K'): + return int(float(s[:-1]) * 1000), 10 ** (3 if '.' not in s else 3 - len(s[:-1].split('.')[1])) + else: + return int(s), 1 + + for div in channelInfoDiv.find_all('div', class_ = 'tgme_channel_info_counter'): + value, granularity = parse_num(div.find('span', class_ = 'counter_value').text) + type_ = div.find('span', class_ = 'counter_type').text + if type_ == 'members': + # Already extracted more accurately from /channel, skip + continue + elif type_ in ('photos', 'videos', 'links', 'files'): + kwargs[type_] = snscrape.base.IntWithGranularity(value, granularity) + + return Channel(**kwargs) + + @classmethod + def _cli_setup_parser(cls, subparser): + subparser.add_argument('channel', type = snscrape.base.nonempty_string('channel'), help = 'A channel name') + + @classmethod + def _cli_from_args(cls, args): + return cls._cli_construct(args, args.channel) \ No newline at end of file