From 3338ec0fedef2f7a6b56e881b05dd0bbd79eaa72 Mon Sep 17 00:00:00 2001 From: =?utf8?q?V=C3=A1s=C3=A1ry=20D=C3=A1niel?= Date: Wed, 31 Jan 2024 21:53:29 +0100 Subject: [PATCH] Transcoder implemented, test ok --- media-samurai/api.py | 8 +- media-samurai/app_config.py | 23 +++++ media-samurai/application.yaml | 3 + media-samurai/ftpupload.py | 27 +++--- media-samurai/hls.py | 83 +++++++++++-------- media-samurai/hls/003.bat | 35 ++++---- media-samurai/hls/004.bat | 19 +++++ media-samurai/hls/videoinfo.bat | 5 +- media-samurai/logger.yaml | 21 +++++ media-samurai/main.py | 29 +++---- media-samurai/requirements.txt | 16 ++-- media-samurai/transcode.py | 26 +++--- .../src/user/commons/MediaSamuraiAPI.java | 34 +++----- 13 files changed, 206 insertions(+), 123 deletions(-) create mode 100644 media-samurai/app_config.py create mode 100644 media-samurai/application.yaml create mode 100644 media-samurai/hls/004.bat create mode 100644 media-samurai/logger.yaml diff --git a/media-samurai/api.py b/media-samurai/api.py index 3fbb1730..062b28d5 100644 --- a/media-samurai/api.py +++ b/media-samurai/api.py @@ -1,14 +1,14 @@ from datetime import datetime from typing import Optional +import shortuuid from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi import BackgroundTasks, FastAPI from pydantic import BaseModel from hls import TranscodeJob -from transcode import TranscodeItem, TranscodeStatus, background_tasks_results, simulate_background_task, \ - execute_transcode_task +from transcode import TranscodeItem, TranscodeStatus, background_tasks_results, execute_transcode_task # https://docs.pydantic.dev/2.4/concepts/models/ @@ -34,8 +34,8 @@ app.mount("/video", StaticFiles(directory="hls"), name="VIDEO") @app.post("/submit") async def submit(item: TranscodeItem, background_tasks: BackgroundTasks): - task_id = str(len(background_tasks_results) + 1) - background_tasks.add_task(execute_transcode_task, task_id, item.delay) + task_id = shortuuid.uuid() + background_tasks.add_task(execute_transcode_task, task_id, item.source, item.target, item.frames) job = TranscodeJob() job.status = TranscodeStatus.RUNNING background_tasks_results[task_id] = job diff --git a/media-samurai/app_config.py b/media-samurai/app_config.py new file mode 100644 index 00000000..e21851a6 --- /dev/null +++ b/media-samurai/app_config.py @@ -0,0 +1,23 @@ +import yaml +from loguru_config import LoguruConfig + +LoguruConfig.load('logger.yaml') + +class AppConfig(object): + def __init__(self, file_name=None): + if file_name is not None: + with open(file_name) as config_file: + self.cfg = yaml.load(config_file, yaml.FullLoader) + self.load(self.cfg) + + def load(self, dict_values): + for key in dict_values: + if type(dict_values[key]) is dict: + val = AppConfig() + val.load(dict_values[key]) + setattr(self, key, val) + else: + setattr(self, key, dict_values[key]) + + +app_config = AppConfig('application.yaml') diff --git a/media-samurai/application.yaml b/media-samurai/application.yaml new file mode 100644 index 00000000..154501ca --- /dev/null +++ b/media-samurai/application.yaml @@ -0,0 +1,3 @@ +cache_dir: c:/temp/transcode +api: + port: 8181 diff --git a/media-samurai/ftpupload.py b/media-samurai/ftpupload.py index ba752b58..b2040f3d 100644 --- a/media-samurai/ftpupload.py +++ b/media-samurai/ftpupload.py @@ -1,5 +1,8 @@ import os.path, os from ftplib import FTP, error_perm +from urllib.parse import urlparse + +from loguru import logger def main(): @@ -9,17 +12,18 @@ def main(): def ftp_upload(local_source_dir, remote_target_dir): - host = 'localhost' - port = 21 ftp = FTP() - ftp.connect(host, port) - ftp.login('dani', 'dani') - cwd_create_all(ftp, remote_target_dir) + parsed_url = urlparse(remote_target_dir) + ftp.connect(parsed_url.hostname, parsed_url.port or 21) + ftp.login(parsed_url.username, parsed_url.password) + force_cwd_to(ftp, parsed_url.path) + logger.info(f'Storing local path {local_source_dir} recursively on remote site') place_files(ftp, local_source_dir) ftp.quit() -def cwd_create_all(ftp, remote_target_dir): +def force_cwd_to(ftp, remote_target_dir): + logger.info(f'Creating remote directory {remote_target_dir}') remote_dir_tokens = remote_target_dir.split('/') for remote_dir in remote_dir_tokens: if not remote_dir: @@ -41,23 +45,24 @@ def place_files(ftp, path): for name in os.listdir(path): local_path = os.path.join(path, name) if os.path.isfile(local_path): - print("STOR", name, local_path) + # print("STOR", name, local_path) ftp.storbinary('STOR ' + name, open(local_path, 'rb')) elif os.path.isdir(local_path): - print("MKD", name) + # print("MKD", name) try: ftp.mkd(name) # ignore "directory already exists" except error_perm as e: - if not e.args[0].startswith('550'): + if e.args[0].startswith('550'): + logger.warning(f'Directory {name} already exists under remote {ftp.pwd()}') + else: + logger.error(f'FTP upload error! Details: {e}') raise - print("CWD", name) ftp.cwd(name) place_files(ftp, local_path) - print("CWD", "..") ftp.cwd("..") diff --git a/media-samurai/hls.py b/media-samurai/hls.py index 53d8200a..c8d6ace4 100644 --- a/media-samurai/hls.py +++ b/media-samurai/hls.py @@ -1,12 +1,16 @@ +import os import shutil from enum import Enum +from pathlib import Path from ffmpeg.ffmpeg import FFmpeg, FFmpegError from ffmpeg.progress import Progress +from loguru import logger from pymediainfo import MediaInfo import subprocess import json +from app_config import app_config from ftpupload import ftp_upload class TranscodeJob: @@ -26,28 +30,29 @@ class TranscodeStatus(str, Enum): def main(): - local_source_file = 'd:/data/video/hls/ma.mkv' remote_source_file = 'ftp://dani:dani@localhost/data/video/hls/ma.mkv' - local_target_directory = 'c:/temp/out' remote_target_directory = r'/data/video/hls/out/test' job = TranscodeJob() - transcode_hls(local_source_file, remote_source_file, local_target_directory, remote_target_directory, job) + transcode_hls(remote_source_file, remote_target_directory, '1', job) -def transcode_hls(local_source_file, remote_source_file, local_target_directory, - remote_target_directory, job: TranscodeJob): - shutil.rmtree(local_target_directory, ignore_errors=True) - output_file = f'{local_target_directory}/%v/track_index.m3u8' - - video_info = get_video_info(remote_source_file) +def transcode_hls(remote_source_file, remote_target_directory, frames, task_id, job: TranscodeJob): + local_target_directory = Path(app_config.cache_dir) / task_id + file_name = os.path.splitext(os.path.basename(remote_source_file))[0] + encoded_target_directory = local_target_directory / '/'.join(list(file_name)) - frame_count = 0 - media_info = MediaInfo.parse(local_source_file) - for track in media_info.tracks: - if track.track_type == 'Video': - frame_count = int(track.frame_count) + if os.path.exists(local_target_directory) and os.path.isdir(encoded_target_directory): + logger.info(f'Removing existing {encoded_target_directory}') + shutil.rmtree(encoded_target_directory, ignore_errors=True) + # if not os.path.isdir(local_target_directory): + # logger.info(f'cache_dir {local_target_directory} not exists, creating it.') + # os.makedirs(local_target_directory) + # else: + output_file = f'{encoded_target_directory}/%v/track_index.m3u8' - output_options = get_transcode_options(video_info, local_target_directory) + video_info = get_video_info(remote_source_file, frames) + frame_count = video_info[0] + output_options = get_transcode_options(video_info[1], encoded_target_directory) ffmpeg = ( FFmpeg() @@ -59,9 +64,9 @@ def transcode_hls(local_source_file, remote_source_file, local_target_directory, ) ) - @ffmpeg.on("start") - def on_start(arguments: list[str]): - print("arguments:", arguments) + # @ffmpeg.on("start") + # def on_start(arguments: list[str]): + # print("arguments:", arguments) @ffmpeg.on("progress") def on_progress(progress: Progress): @@ -76,31 +81,44 @@ def transcode_hls(local_source_file, remote_source_file, local_target_directory, shutil.rmtree(local_target_directory, ignore_errors=True) -def get_video_info(file_path): - cmd = ['ffprobe', '-v', 'error', '-show_entries', - 'stream=codec_type:stream=index:stream=codec_name:stream=duration', '-of', 'json', file_path] - +def get_video_info(file_path, frames): + if not frames: + cmd = ['ffprobe', '-v', 'error', '-show_entries', + 'stream=codec_type:stream=index:stream=codec_name:stream=duration:stream=nb_read_packets', + '-count_packets', '-of', 'json', file_path] + else: + cmd = ['ffprobe', '-v', 'error', '-show_entries', + 'stream=codec_type:stream=index:stream=codec_name:stream=duration', '-of', 'json', file_path] try: result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) json_data = json.loads(result.stdout) - return json_data + + if not frames: + for stream in json_data['streams']: + if stream['codec_type'] == 'video': + frames = int(stream['nb_read_packets']) + break + return frames, json_data except subprocess.CalledProcessError as e: - print(f"Error running ffprobe: {e}") - return None + logger.error(f"Error running ffprobe: {e}") + return None, None def get_transcode_options(video_info, output_directory): output_options = { - 'preset': 'slow', - 'g': 48, - 'sc_threshold': 0, + 'preset': 'faster', 'map': [], 'var_stream_map': '', - 'master_pl_name': 'master.m3u8', + 'c:v:': 'libx264', + 'b:v': '1.5M', + 'vf': 'scale=720:576,format=yuv420p', + 'g': 48, + 'sc_threshold': 0, 'f': 'hls', - 'hls_time': 6, + 'hls_time': 5, 'hls_list_size': 0, - 'hls_segment_filename': f'{output_directory}/%v/chunk%03d.ts' + 'hls_segment_filename': f'{output_directory}/%v/chunk%03d.ts', + 'master_pl_name': 'master.m3u8' } audio_index = 0 @@ -108,9 +126,6 @@ def get_transcode_options(video_info, output_directory): index = stream['index'] if stream['codec_type'] == 'video': output_options['map'].append(f'0:{index}') - output_options['s:v:0'] = '960x540' - output_options['c:v:0'] = 'libx264' - output_options['b:v:0'] = '2000k' output_options['var_stream_map'] += ' v:0,agroup:audio' if stream['codec_type'] == 'audio': output_options['map'].append(f'0:{index}') diff --git a/media-samurai/hls/003.bat b/media-samurai/hls/003.bat index 5d5da674..5914b9f5 100644 --- a/media-samurai/hls/003.bat +++ b/media-samurai/hls/003.bat @@ -1,15 +1,22 @@ ffmpeg -y -i d:\data\video\hls\ma.mkv ^ --preset slow -g 48 -sc_threshold 0 ^ --map 0:0 -map 0:1 -map 0:8 ^ --s:v:0 960x540 -c:v:0 libx264 -b:v:0 2000k ^ --c:a:0 aac ^ --c:a:1 aac ^ --var_stream_map "a:0,agroup:audio,default:yes a:1,agroup:audio v:0,agroup:audio" ^ --f hls ^ --hls_time 6 ^ --hls_playlist_type vod ^ --hls_segment_type mpegts ^ --hls_flags independent_segments ^ --hls_segment_filename "out/%%v/fileSequence%%03d.ts" ^ --master_pl_name master.m3u8 ^ -out/%%v/prog_index.m3u8 + -map 0:0 -map 0:1 -map 0:1 ^ + -c:v:0 libx264 ^ + -x264-params "nal-hrd=cbr:force-cfr=1" ^ + -profile:v main ^ + -level 3.1 ^ + -preset faster ^ + -g 48 ^ + -sc_threshold 0 ^ + -vf scale=720:576,format=yuv420p ^ + -b:v:0 1500k ^ + -c:a:0 aac -b:a:0 96k ^ + -c:a:1 aac -b:a:1 96k ^ + -var_stream_map "a:0,agroup:audio,default:yes a:1,agroup:audio v:0,agroup:audio" ^ + -f hls ^ + -hls_time 5 ^ + -hls_list_size 0 ^ + -hls_playlist_type vod ^ + -hls_flags independent_segments ^ + -hls_segment_filename "out/%%v/fileSequence%%03d.ts" ^ + -master_pl_name master.m3u8 ^ + out/%%v/prog_index.m3u8 diff --git a/media-samurai/hls/004.bat b/media-samurai/hls/004.bat new file mode 100644 index 00000000..c06fdcfc --- /dev/null +++ b/media-samurai/hls/004.bat @@ -0,0 +1,19 @@ +ffmpeg -y -i d:\data\video\hls\ma.mkv ^ + -map 0:0 -map 0:1 -map 0:8 ^ + -var_stream_map "a:0,agroup:audio,default:yes a:1,agroup:audio v:0,agroup:audio" ^ + -c:v libx264 ^ + -profile:v main ^ + -level 3.1 ^ + -preset faster ^ + -g 48 ^ + -sc_threshold 0 ^ + -vf "scale=720:576,format=yuv420p" ^ + -b:v 1.5M ^ + -c:a:0 aac ^ + -c:a:1 aac ^ + -f hls ^ + -hls_time 5 ^ + -hls_list_size 0 ^ + -hls_segment_filename "out/%%v/fileSequence%%03d.ts" ^ + -master_pl_name master.m3u8 ^ + out/%%v/prog_index.m3u8 diff --git a/media-samurai/hls/videoinfo.bat b/media-samurai/hls/videoinfo.bat index f5a43542..070dfa83 100644 --- a/media-samurai/hls/videoinfo.bat +++ b/media-samurai/hls/videoinfo.bat @@ -1,6 +1,9 @@ ffprobe -v error ^ -show_entries stream=codec_type:stream=index:stream=codec_name:stream=duration:stream=nb_read_packets -count_packets -of json ftp://dani:dani@localhost/data/video/hls/ma.mkv +:ffprobe -v error ^ +:-show_entries stream=codec_type:stream=index:stream=codec_name:stream=duration:stream=nb_read_frames -count_frames -of json ftp://dani:dani@localhost/data/video/hls/ma.mkv + :ffprobe -v error -select_streams v:0 ^ :-show_entries stream=codec_type:stream=index:stream=codec_name:stream=nb_frames -of json ftp://dani:dani@localhost/data/video/hls/ma.mkv @@ -9,4 +12,4 @@ ffprobe -v error ^ :ffmpeg -i ftp://dani:dani@localhost/data/video/hls/ma.mkv -vcodec copy -acodec copy -f null /dev/null ::ffmpeg -i ftp://dani:dani@localhost/data/video/x.mxf -vcodec copy -acodec copy -f null -ffmpeg -i ftp://dani:dani@localhost/data/video/hls/ma.mkv -vcodec copy -acodec copy -f null \ No newline at end of file +:ffmpeg -i ftp://dani:dani@localhost/data/video/hls/ma.mkv -vcodec copy -acodec copy -f null \ No newline at end of file diff --git a/media-samurai/logger.yaml b/media-samurai/logger.yaml new file mode 100644 index 00000000..3dddc5ed --- /dev/null +++ b/media-samurai/logger.yaml @@ -0,0 +1,21 @@ +handlers: + - sink: ext://sys.stdout + format: '{time:YYYY-MM-DD HH:mm:ss.SSS} {message}' + colorize: true + diagnose: true + - sink: media-samurai.log + enqueue: true + serialize: false + format: '{time:YYYY-MM-DD HH:mm:ss.SSS} {message}' + colorize: true + diagnose: true +levels: + - name: NEW + 'no': 13 + icon: ¤ + color: "" +extra: + common_to_all: default +activation: + - [ "my_module.secret", false ] + - [ "another_library.module", true ] \ No newline at end of file diff --git a/media-samurai/main.py b/media-samurai/main.py index 472b7800..08c5b3a3 100644 --- a/media-samurai/main.py +++ b/media-samurai/main.py @@ -1,27 +1,18 @@ +import json + import uvicorn +from loguru import logger + from api import app +from app_config import app_config from site_icon import site_icon_lizzard from nicegui import ui -def start_job(): - ui.notify("Start job!") - - -ui.label("MEDIA SAMURAI") -grid = ui.aggrid({ - "columnDefs": [ - {"headerName": "Name", "field": "name"}, - {"headerName": "Type", "field": "type"}, - ], - "rowData": [], -}) -grid.props('inline height=500px') - -with ui.row(): - ui.button('Start', on_click=start_job).props('small outline') - - +logger.info(f'MediaSamurai listening on port: {app_config.api.port}') +logger.info(app_config.cfg) +ui.label(f'MediaSamurai listening on port: {app_config.api.port}') +ui.link('Check documentation', '/docs') ui.run_with(app, favicon=site_icon_lizzard) if __name__ == "__main__": - uvicorn.run(app, port=8181) + uvicorn.run(app, port=8181, log_level='critical') diff --git a/media-samurai/requirements.txt b/media-samurai/requirements.txt index 07739c5d..e0062016 100644 --- a/media-samurai/requirements.txt +++ b/media-samurai/requirements.txt @@ -1,6 +1,10 @@ -uvicorn~=0.22.0 -fastapi~=0.104.1 -pydantic~=2.4.2 -nicegui~=1.4.2 -pymediainfo~=6.1.0 -python-ffmpeg~=2.0.4 \ No newline at end of file +uvicorn==0.22.0 +fastapi==0.104.1 +pydantic==2.4.2 +nicegui==1.4.2 +pymediainfo==6.1.0 +python-ffmpeg==2.0.4 +loguru==0.7.2 +loguru-config==0.1.0 +shortuuid==1.0.11 +urlparse3==1.1 \ No newline at end of file diff --git a/media-samurai/transcode.py b/media-samurai/transcode.py index c3b872c9..546c914a 100644 --- a/media-samurai/transcode.py +++ b/media-samurai/transcode.py @@ -1,6 +1,7 @@ from concurrent.futures import ThreadPoolExecutor from datetime import datetime +from loguru import logger from pydantic import BaseModel from enum import Enum import time @@ -12,8 +13,9 @@ executor = ThreadPoolExecutor(max_workers=5) class TranscodeItem(BaseModel): - name: str - delay: int + source: str + target: str + frames: int def simulate_background_task(task_id, duration): @@ -26,14 +28,18 @@ def simulate_background_task(task_id, duration): job.status = TranscodeStatus.COMPLETED -def execute_transcode_task(task_id, duration): +def execute_transcode_task(task_id, remote_source_file, remote_target_directory, frames): job = background_tasks_results[task_id] job.started = datetime.now() job.status = TranscodeStatus.RUNNING - local_source_file = 'd:/data/video/hls/ma.mkv' - remote_source_file = 'ftp://dani:dani@localhost/data/video/hls/ma.mkv' - local_target_directory = 'c:/temp/out' - remote_target_directory = r'/data/video/hls/out/test' - transcode_hls(local_source_file, remote_source_file, local_target_directory, remote_target_directory, job) - job.finished = datetime.now() - job.status = TranscodeStatus.COMPLETED + try: + logger.info(f'Start transcode job for {remote_source_file}, task ID {task_id}') + transcode_hls(remote_source_file, remote_target_directory, frames, task_id, job) + job.status = TranscodeStatus.COMPLETED + logger.info(f'Transcode job completed for {remote_source_file}, task ID {task_id}') + except Exception as e: + job.status = TranscodeStatus.ERROR + logger.error(f'Transcode job failed for {remote_source_file}, task ID {task_id}. Details: {e}') + finally: + job.finished = datetime.now() + diff --git a/server/user.jobengine.osgi.commons/src/user/commons/MediaSamuraiAPI.java b/server/user.jobengine.osgi.commons/src/user/commons/MediaSamuraiAPI.java index 4c6adaab..721b72ec 100644 --- a/server/user.jobengine.osgi.commons/src/user/commons/MediaSamuraiAPI.java +++ b/server/user.jobengine.osgi.commons/src/user/commons/MediaSamuraiAPI.java @@ -8,7 +8,6 @@ import javax.ws.rs.client.Entity; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import org.apache.commons.io.FilenameUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder; @@ -25,15 +24,14 @@ public class MediaSamuraiAPI { final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); - String inDir = "\\\\10.11.1.90\\input\\"; - String outDir = "\\\\10.11.1.100\\output\\"; -// String inputs[] = { "file-1.mov", "file-2.mov" }; - String inputs[] = { "file-1.mov" }; - - for (String currentInput : inputs) { +// String sources[] = { "ftp://dani:dani@localhost/data/video/hls/ma.mkv" }; + String sources[] = { "ftp://dani:dani@localhost/data/video/X.MXF" }; +// String sources[] = { "ftp://dani:dani@localhost/data/video/N.mxf" }; + String target = "ftp://dani:dani@localhost/data/video/hls/out/"; + for (String source : sources) { Runnable task = () -> { try { - transcode(inDir, outDir, currentInput); + transcode(source, target, 0); } catch (Exception e) { e.printStackTrace(); } @@ -44,26 +42,14 @@ public class MediaSamuraiAPI { executor.awaitTermination(1, TimeUnit.MINUTES); } - public static void main1(String[] args) throws Exception { - String inDir = "\\\\10.11.1.90\\input\\"; - String outDir = "\\\\10.11.1.100\\output\\"; - String inputs[] = { "file-1.mov" }; - - for (String currentInput : inputs) { - transcode(inDir, outDir, currentInput); - } - } - - private static void transcode(String inDir, String outDir, String input2) { - final String inputi = inDir + input2; - final String outputi = outDir + FilenameUtils.removeExtension(input2) + ".MP4"; - + private static void transcode(String source, String target, long frames) { try { log.info("Started"); MediaSamuraiAPI api = new MediaSamuraiAPI("http://localhost:8181/"); BasicDBObject job = new BasicDBObject(); - job.put("name", inputi); - job.put("delay", 1); + job.put("source", source); + job.put("target", target); + job.put("frames", frames); String taskId = api.submit(job); while (true) { Thread.sleep(3000); -- 2.54.0