diff --git a/.gitignore b/.gitignore index bf0d076b..bc2ec3d0 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,9 @@ __pycache__/ # C extensions *.so +# Telegram stuff +*.telegramFile +*.telegramToken # ignore .pem file *.pem @@ -154,3 +157,4 @@ crashlytics.properties crashlytics-build.properties fabric.properties +/asg-config.json \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 6f6d50e4..8dad5a09 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,6 @@ -FROM python:3.8.12-slim-buster - -# YOUR COMMANDS HERE -# .... -# .... - +FROM python:3.8-slim-bullseye +WORKDIR /botapp +LABEL app=bot +COPY . . +RUN pip install -r requirements.txt CMD ["python3", "bot.py"] \ No newline at end of file diff --git a/PR.Jenkinsfile b/PR.Jenkinsfile new file mode 100644 index 00000000..eba9cb91 --- /dev/null +++ b/PR.Jenkinsfile @@ -0,0 +1,39 @@ +pipeline { + agent any + + stages { + stage('Unittest') { + steps { + sh ''' + pip3 install -r requirements.txt + python3 -m pytest --junitxml results.xml tests + ''' + } + post { + always { + junit allowEmptyResults: true, testResults: 'results.xml' + } + } + } + stage('Functional test') { + steps { + echo "testing" + } + } + stage('Static code linting') { + steps { + sh 'python3 -m pylint -f parseable --reports=no *.py > pylint.log' + } + post { + always { + sh 'cat pylint.log' + recordIssues ( + enabledForFailure: true, + aggregatingResults: true, + tools: [pyLint(name: 'Pylint', pattern: '**/pylint.log')] + ) + } + } + } + } +} diff --git a/bot.py b/bot.py index 7e311287..157f84c7 100644 --- a/bot.py +++ b/bot.py @@ -1,6 +1,12 @@ +import json +import threading +import botocore from telegram.ext import Updater, MessageHandler, Filters -from utils import search_download_youtube_video from loguru import logger +import boto3 +from boto3.dynamodb.conditions import Key +from utils import calc_backlog_per_instance, search_youtube_video, send_videos_from_queue2 + class Bot: @@ -26,10 +32,13 @@ def send_video(self, update, context, file_path): """Sends video to a chat""" context.bot.send_video(chat_id=update.message.chat_id, video=open(file_path, 'rb'), supports_streaming=True) - def send_text(self, update, text, quote=False): + def send_text(self, update, text, chat_id=None, quote=False): """Sends text to a chat""" - # retry https://github.com/python-telegram-bot/python-telegram-bot/issues/1124 - update.message.reply_text(text, quote=quote) + if chat_id: + self.updater.bot.send_message(chat_id, text=text) + else: + # retry https://github.com/python-telegram-bot/python-telegram-bot/issues/1124 + update.message.reply_text(text, quote=quote) class QuoteBot(Bot): @@ -42,14 +51,68 @@ def _message_handler(self, update, context): self.send_text(update, f'Your original message: {update.message.text}', quote=to_quote) -class YoutubeBot(Bot): - pass +class YoutubeObjectDetectBot(Bot): + def __init__(self, token): + super().__init__(token) + threading.Thread( + target=calc_backlog_per_instance, + args=(workers_queue, asg, config.get("autoscaling_group_name"), config.get('aws_region')) + ).start() + threading.Thread( + target=send_videos_from_queue2, + args=(worker_to_bot_queue, config.get('bucket_name')) + ).start() + + def _message_handler(self, update, context): + try: + chat_id = str(update.effective_message.chat_id) + if update.message.text.startswith('/myvideos'): + response = table.query(KeyConditionExpression=Key('chatId').eq(chat_id)) + for key, value in response.items(): + if isinstance(value, list): + array_length = len(value) + for i in range(array_length): + temp_dict = value[i] + video_url = temp_dict['url'] + video = search_youtube_video(None, video_url) + self.send_text(update, f'Video Name: {video["title"]}, Video Link: {video["webpage_url"]}', chat_id=chat_id) + logger.info(f'sent videos information to client, chat_id: {chat_id}') + else: + response = workers_queue.send_message( + MessageBody=update.message.text, + MessageAttributes={ + 'chat_id': {'StringValue': chat_id, 'DataType': 'String'} + } + ) + logger.info(f'msg {response.get("MessageId")} has been sent to queue') + self.send_text(update, f'Your message is being processed...', chat_id=chat_id) + for video in search_youtube_video(update.message.text, None): + item = { + 'chatId': chat_id, + 'videoId': video['id'], + 'url': video['webpage_url'], + 'title': video['title'] + } + response2 = table.put_item(Item=item) + + except botocore.exceptions.ClientError as error: + logger.error(error) + self.send_text(update, f'Something went wrong, please try again...') if __name__ == '__main__': with open('.telegramToken') as f: _token = f.read() - my_bot = Bot(_token) - my_bot.start() + with open('config.json') as f: + config = json.load(f) + + sqs = boto3.resource('sqs', region_name=config.get('aws_region')) + workers_queue = sqs.get_queue_by_name(QueueName=config.get('bot_to_worker_queue_name')) + worker_to_bot_queue = sqs.get_queue_by_name(QueueName=config.get('worker_to_bot_queue_name')) + asg = boto3.client('autoscaling', region_name=config.get('aws_region')) + dynamodb = boto3.resource('dynamodb', region_name=config.get('aws_region')) + table = dynamodb.Table(config.get('table_name')) + my_bot = YoutubeObjectDetectBot(_token) + my_bot.start() diff --git a/config.json b/config.json new file mode 100644 index 00000000..0e0a30c2 --- /dev/null +++ b/config.json @@ -0,0 +1,9 @@ +{ + "aws_region": "eu-central-1", + "bot_to_worker_queue_name": "daniel-reuven-aws-ex1-polybot-queue", + "worker_to_bot_queue_name": "daniel-reuven-aws-ex1-polybot-queue2", + "autoscaling_group_name": "daniel-reuven-polybot-asg", + "cloudwatch_namespace": "cloudwatch-namespace", + "bucket_name": "daniel-reuven-aws-ex1-polybot-bucket", + "table_name": "daniel-reuven-aws-ex1-polybot-ddb-table1" +} \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 28cc6ee3..442653ad 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,9 @@ python-telegram-bot>=13.11 -youtube-dl>=2021.12.17 -loguru \ No newline at end of file +yt-dlp>=2022.6.29 +loguru~=0.6.0 +botocore~=1.27.13 +boto3~=1.24.13 +requests~=2.28.1 +unittest2~=1.1.0 +pytest +pylint \ No newline at end of file diff --git a/tests/test_autoscaling_metric.py b/tests/test_autoscaling_metric.py new file mode 100644 index 00000000..9b462a0b --- /dev/null +++ b/tests/test_autoscaling_metric.py @@ -0,0 +1,50 @@ +import unittest2 as unittest +from unittest.mock import Mock +from utils import calc_backlog_per_instance + +# run by `PYTHONPATH=. python3 -m pytest --junitxml results.xml tests` + + +class TestBacklogPerInstanceMetric(unittest.TestCase): + def setUp(self): + self.sqs_queue_client = Mock() + self.asg_client = Mock() + + def test_no_worker_full_queue(self): + self.sqs_queue_client.attributes = { + 'ApproximateNumberOfMessages': '100' + } + + self.asg_client.describe_auto_scaling_groups = Mock(return_value={ + 'AutoScalingGroups': [{ + 'DesiredCapacity': 0 + }] + }) + + self.assertEqual(calc_backlog_per_instance(self.sqs_queue_client, self.asg_client, None, None), 99) + + def test_no_workers_empty_queue(self): + self.sqs_queue_client.attributes = { + 'ApproximateNumberOfMessages': '0' + } + + self.asg_client.describe_auto_scaling_groups = Mock(return_value={ + 'AutoScalingGroups': [{ + 'DesiredCapacity': 0 + }] + }) + + self.assertEqual(calc_backlog_per_instance(self.sqs_queue_client, self.asg_client, None, None), 0) + + def test_2_workers_100_msgs_in_queue(self): + self.sqs_queue_client.attributes = { + 'ApproximateNumberOfMessages': '100' + } + + self.asg_client.describe_auto_scaling_groups = Mock(return_value={ + 'AutoScalingGroups': [{ + 'DesiredCapacity': 2 + }] + }) + + self.assertEqual(calc_backlog_per_instance(self.sqs_queue_client, self.asg_client, None, None), 50) diff --git a/utils.py b/utils.py index 0dcbc25c..4b481f70 100644 --- a/utils.py +++ b/utils.py @@ -1,15 +1,206 @@ -from youtube_dl import YoutubeDL +import json +import time +import os +import boto3 +import botocore +import requests +from botocore.exceptions import ClientError +from botocore.config import Config +from yt_dlp import YoutubeDL +from loguru import logger +from time import sleep -def search_download_youtube_video(video_name, num_results=1): +def search_download_youtube_video(video_name, num_results, s3_bucket_name): """ This function downloads the first num_results search results from Youtube + :param s3_bucket_name: string of the S3 bucket name :param video_name: string of the video name :param num_results: integer representing how many videos to download :return: list of paths to your downloaded video files """ - with YoutubeDL() as ydl: - videos = ydl.extract_info(f"ytsearch{num_results}:{video_name}", download=True)['entries'] + # Parameters for youtube_dl use + ydl = {'noplaylist': 'True', 'format': 'bestvideo[ext=mp4]+bestaudio[ext=mp4]/mp4', 'outtmpl': '%(id)s.%(ext)s'} + # Try to download and return list of video/s or error msg + with YoutubeDL(ydl) as ydl: + ydl.cache.remove() + # 1. get a list of video file names with download=false parameter + videos = ydl.extract_info(f"ytsearch{num_results}:{video_name}", download=False)['entries'] + for video in videos: + localprefix = video['id'] + '.mp4' + prefix = 'ytdlAppData/' + video['id'] + '.mp4' + # check aws s3 bucket for file, then locally and act accordingly,prefix != ydl.prepare_filename(video) + if not (check_s3_file(prefix, s3_bucket_name)): + if not (os.path.isfile(ydl.prepare_filename(video))): + video_url = video['webpage_url'] + ydl.extract_info(video_url, download=True) + upload_file(localprefix, s3_bucket_name) + os.remove(ydl.prepare_filename(video)) + else: + upload_file(localprefix, s3_bucket_name) + os.remove(ydl.prepare_filename(video)) + else: + if os.path.isfile(ydl.prepare_filename(video)): + # download_file(prefix, s3_bucket_name) + os.remove(ydl.prepare_filename(video)) + sleep(1) + return [ydl.prepare_filename(video) for video in videos] - return [ydl.prepare_filename(video) for video in videos] +def search_youtube_video(video_name, video_url): + """ + This function downloads the first num_results search results from YouTube + :param video_url: url of the video + :param video_name: string of the video name + :return: list of paths to your downloaded video files + """ + # Parameters for youtube_dl use + ydl = {'noplaylist': 'True', 'format': 'bestvideo[ext=mp4]+bestaudio[ext=mp4]/mp4', 'outtmpl': '%(id)s.%(ext)s'} + # Try to download and return list of video/s or error msg + with YoutubeDL(ydl) as ydl: + ydl.cache.remove() + if video_name is None: + videos = ydl.extract_info(video_url, download=False) + return videos + elif video_url is None: + videos = ydl.extract_info(f"ytsearch{1}:{video_name}", download=False)['entries'] + return videos + + +def calc_backlog_per_instance(sqs_queue_client, asg_client, asg_group_name, aws_region): + while True: + msgs_in_queue = int(sqs_queue_client.attributes.get('ApproximateNumberOfMessages')) + asg_size = asg_client.describe_auto_scaling_groups(AutoScalingGroupNames=[asg_group_name])['AutoScalingGroups'][0]['DesiredCapacity'] + if msgs_in_queue == 0: + backlog_per_instance = 0 + elif asg_size == 0: + backlog_per_instance = 99 + else: + backlog_per_instance = msgs_in_queue / asg_size + logger.info(f'backlog per instance: {backlog_per_instance}') + # Create CloudWatch client + # cloudwatch = boto3.client('cloudwatch', aws_region) + # Put custom metrics + # cloudwatch.put_metric_data( + # Namespace='daniel-reuven-monitor-polybot-asg', + # MetricData=[ + # { + # 'MetricName': 'backlog_per_instance', + # 'Value': backlog_per_instance, + # 'Unit': 'Count' + # }, + # ] + #) + # time.sleep(60) + return backlog_per_instance + + +def send_videos_from_queue2(sqs_queue_client2, bucket_name): + i = 0 + while True: + i += 1 + try: + messages = sqs_queue_client2.receive_messages( + MessageAttributeNames=['All'], + MaxNumberOfMessages=10, + WaitTimeSeconds=5 + ) + logger.info(f'msgs in videos queue: {len(messages)}') + if messages: + logger.info(f'Attempting to send video to user via chat') + for msg in messages: + logger.info(f'processing message {msg}') + video_filename = msg.body + chat_id = msg.message_attributes.get('chat_id').get('StringValue') + video_presigned_url = generate_presigned_url(video_filename, bucket_name, None) + send_message(chat_id, f'The following download link will be available for the next few minutes: {video_presigned_url}') + # delete message from the queue after it was handled + response = sqs_queue_client2.delete_messages(Entries=[{ + 'Id': msg.message_id, + 'ReceiptHandle': msg.receipt_handle + }]) + if 'Successful' in response: + logger.info(f'msg {msg} has been handled successfully') + logger.info(f'file has been downloaded') + except botocore.exceptions.ClientError as err: + logger.exception(f"Couldn't receive messages {err}") + if i == 6: + logger.info(f'Process is running, checking queue every 10 seconds, this msg repeats every 60 seconds') + i = 0 + sleep(10) + + +def check_s3_file(key_filename, s3_bucket_name): + s3_prefix = 'ytdlAppData/' + key_filename + s3 = boto3.resource('s3') + try: + s3.Object(s3_bucket_name, s3_prefix).load() + except botocore.exceptions.ClientError as e: + if e.response['Error']['Code'] == "404": + # The object does not exist. + return False + else: + # Something else has gone wrong. + raise + else: + # The object does exist. + return True + + +def upload_file(key_filename, bucket, object_name=None): + s3_prefix = 'ytdlAppData/' + key_filename + # Upload the file + s3_client = boto3.client('s3') + # If S3 object_name was not specified, use key_filename + if object_name is None: + object_name = s3_prefix + try: + response = s3_client.upload_file(key_filename, bucket, s3_prefix) + except ClientError as e: + logger.error(e) + return False + return True + + +def download_file(key_filename, bucket, object_name=None): + s3_prefix = 'ytdlAppData/' + key_filename + # Upload the file + s3_client = boto3.client('s3') + # If S3 object_name was not specified, use key_filename + if object_name is None: + object_name = s3_prefix + try: + response = s3_client.download_file(bucket, object_name, s3_prefix) + except ClientError as e: + logger.error(e) + return False + return True + + +def generate_presigned_url(key_filename, bucket, object_name=None): + s3_prefix = 'ytdlAppData/' + key_filename + # Upload the file + s3_client = boto3.client("s3", 'eu-central-1', config=Config(signature_version='s3v4')) + + # If S3 object_name was not specified, use key_filename + if object_name is None: + object_name = s3_prefix + try: + response = s3_client.generate_presigned_url('get_object', Params={'Bucket': bucket, 'Key': s3_prefix}, ExpiresIn=1800) + except ClientError as e: + logger.error(e) + return False + return response + + +def send_message(chat_id, text): + with open('.telegramToken') as f: + _token = f.read() + url = f"https://api.telegram.org/bot{_token}/sendMessage" + params = { + "chat_id": chat_id, + "text": text, + } + resp = requests.get(url, params=params) + # Throw an exception if Telegram API fails + resp.raise_for_status() diff --git a/worker.py b/worker.py new file mode 100644 index 00000000..59c4abdf --- /dev/null +++ b/worker.py @@ -0,0 +1,65 @@ +import json +import time +import boto3 +import botocore +import os +from loguru import logger +from utils import search_download_youtube_video + + +def process_msg(msg): + video_filename = search_download_youtube_video(msg, 1, s3_bucket_name) + print(f'the video file name is : {video_filename}') + return video_filename + + +def main(): + while True: + try: + messages = queue.receive_messages( + MessageAttributeNames=['All'], + MaxNumberOfMessages=1, + WaitTimeSeconds=10 + ) + for msg in messages: + logger.info(f'processing message {msg}') + video_filename = process_msg(msg.body) + chat_id = msg.message_attributes.get('chat_id').get('StringValue') + response2 = worker_to_bot_queue.send_message( + MessageBody=video_filename[0], + MessageAttributes={'chat_id': {'StringValue': chat_id, 'DataType': 'String'} + } + ) + logger.info(f'msg {response2.get("MessageId")} has been sent to queue 2') + # delete message from the queue after it was handled + response = queue.delete_messages(Entries=[{ + 'Id': msg.message_id, + 'ReceiptHandle': msg.receipt_handle + }]) + if 'Successful' in response: + logger.info(f'msg {msg} has been handled successfully') + except botocore.exceptions.ClientError as err: + logger.exception(f"Couldn't receive messages {err}") + logger.info(f'Waiting for new msgs') + time.sleep(10) + + +if __name__ == '__main__': + with open('config.json') as f: + config = json.load(f) + + sqs = boto3.resource('sqs', region_name=config.get('aws_region')) + queue = sqs.get_queue_by_name(QueueName=config.get('bot_to_worker_queue_name')) + worker_to_bot_queue = sqs.get_queue_by_name(QueueName=config.get('worker_to_bot_queue_name')) + s3_bucket_name = config.get('bucket_name') + cwd = os.getcwd() + path = f"{cwd}/ytdlAppData" + # Check whether the specified path exists or not + isExist = os.path.exists(path) + + if not isExist: + # Create a new directory because it does not exist + os.makedirs(path) + print("The new directory is created!") + + main()