From: Vásáry Dániel Date: Tue, 30 Jan 2024 15:00:42 +0000 (+0100) Subject: HLS transcode service v1 (no param handling) X-Git-Url: http://git.useribm.hu/?a=commitdiff_plain;h=c778bbcd0e3e775500054a934206e2511c2f85e8;p=mediacube.git HLS transcode service v1 (no param handling) --- diff --git a/media-samurai/api.py b/media-samurai/api.py index 87ecb69f..3fbb1730 100644 --- a/media-samurai/api.py +++ b/media-samurai/api.py @@ -5,7 +5,10 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi import BackgroundTasks, FastAPI from pydantic import BaseModel -from transcode import TranscodeItem, TranscodeStatus, TranscodeJob, background_tasks_results, simulate_background_task + +from hls import TranscodeJob +from transcode import TranscodeItem, TranscodeStatus, background_tasks_results, simulate_background_task, \ + execute_transcode_task # https://docs.pydantic.dev/2.4/concepts/models/ @@ -32,7 +35,7 @@ app.mount("/video", StaticFiles(directory="hls"), name="VIDEO") @app.post("/submit") async def submit(item: TranscodeItem, background_tasks: BackgroundTasks): task_id = str(len(background_tasks_results) + 1) - background_tasks.add_task(simulate_background_task, task_id, item.delay) + background_tasks.add_task(execute_transcode_task, task_id, item.delay) job = TranscodeJob() job.status = TranscodeStatus.RUNNING background_tasks_results[task_id] = job diff --git a/media-samurai/ftpupload.py b/media-samurai/ftpupload.py new file mode 100644 index 00000000..ba752b58 --- /dev/null +++ b/media-samurai/ftpupload.py @@ -0,0 +1,65 @@ +import os.path, os +from ftplib import FTP, error_perm + + +def main(): + local_source_dir = r'C:\temp\out' + remote_target_dir = r'/data/video/hls/out/test' + ftp_upload(local_source_dir, remote_target_dir) + + +def ftp_upload(local_source_dir, remote_target_dir): + host = 'localhost' + port = 21 + ftp = FTP() + ftp.connect(host, port) + ftp.login('dani', 'dani') + cwd_create_all(ftp, remote_target_dir) + place_files(ftp, local_source_dir) + ftp.quit() + + +def cwd_create_all(ftp, remote_target_dir): + remote_dir_tokens = remote_target_dir.split('/') + for remote_dir in remote_dir_tokens: + if not remote_dir: + continue + cwd_create(ftp, remote_dir) + + +def cwd_create(ftp, path): + try: + ftp.cwd(path) + except error_perm as e: + if not e.args[0].startswith('550'): + raise + ftp.mkd(path) + ftp.cwd(path) + + +def place_files(ftp, path): + for name in os.listdir(path): + local_path = os.path.join(path, name) + if os.path.isfile(local_path): + print("STOR", name, local_path) + ftp.storbinary('STOR ' + name, open(local_path, 'rb')) + elif os.path.isdir(local_path): + print("MKD", name) + + try: + ftp.mkd(name) + + # ignore "directory already exists" + except error_perm as e: + if not e.args[0].startswith('550'): + raise + + print("CWD", name) + ftp.cwd(name) + place_files(ftp, local_path) + print("CWD", "..") + ftp.cwd("..") + + +if __name__ == "__main__": + main() diff --git a/media-samurai/hls.py b/media-samurai/hls.py index dde8a279..53d8200a 100644 --- a/media-samurai/hls.py +++ b/media-samurai/hls.py @@ -1,44 +1,58 @@ import shutil +from enum import Enum -from ffmpeg import FFmpeg, Progress, FFmpegError +from ffmpeg.ffmpeg import FFmpeg, FFmpegError +from ffmpeg.progress import Progress from pymediainfo import MediaInfo +import subprocess +import json + +from ftpupload import ftp_upload + +class TranscodeJob: + def __init__(self): + self.status = TranscodeStatus.NONE + self.progress = 0 + self.started = None + self.finished = None + + +class TranscodeStatus(str, Enum): + PENDING = "PENDING" + RUNNING = "RUNNING" + ERROR = "ERROR" + NONE = "NONE" + COMPLETED = "COMPLETED" def main(): - input_file = r'd:\data\video\hls\ma.mkv' + local_source_file = 'd:/data/video/hls/ma.mkv' + remote_source_file = 'ftp://dani:dani@localhost/data/video/hls/ma.mkv' + local_target_directory = 'c:/temp/out' + remote_target_directory = r'/data/video/hls/out/test' + job = TranscodeJob() + transcode_hls(local_source_file, remote_source_file, local_target_directory, remote_target_directory, job) + - media_info = MediaInfo.parse(input_file) +def transcode_hls(local_source_file, remote_source_file, local_target_directory, + remote_target_directory, job: TranscodeJob): + shutil.rmtree(local_target_directory, ignore_errors=True) + output_file = f'{local_target_directory}/%v/track_index.m3u8' + + video_info = get_video_info(remote_source_file) + + frame_count = 0 + media_info = MediaInfo.parse(local_source_file) for track in media_info.tracks: - if track.track_type == "Video": + if track.track_type == 'Video': frame_count = int(track.frame_count) - # print("Duration (raw value):", track.duration) - output_directory = 'hls/out/' - output_file = output_directory + r'%v/prog_index.m3u8' - shutil.rmtree(output_directory, ignore_errors=True) - - output_options = { - 'preset': 'slow', - 'g': 48, - 'sc_threshold': 0, - 'map': ['0:0', '0:1', '0:8'], - 's:v:0': '960x540', - 'c:v:0': 'libx264', - 'b:v:0': '2000k', - 'c:a:0': 'aac', - 'c:a:1': 'aac', - 'var_stream_map': 'a:0,agroup:audio,default:yes a:1,agroup:audio v:0,agroup:audio', - 'master_pl_name': 'master.m3u8', - 'f': 'hls', - 'hls_time': 6, - 'hls_list_size': 0, - 'hls_segment_filename': f'{output_directory}/%v/fileSequence%03d.ts', - } + output_options = get_transcode_options(video_info, local_target_directory) ffmpeg = ( FFmpeg() .option("y") - .input(input_file) + .input(remote_source_file) .output( output_file, output_options @@ -51,13 +65,80 @@ def main(): @ffmpeg.on("progress") def on_progress(progress: Progress): - print(round(progress.frame * 100 / frame_count)) + job.progress = round(progress.frame * 100 / frame_count) try: ffmpeg.execute() except FFmpegError as e: print(e.args) + ftp_upload(local_target_directory, remote_target_directory) + shutil.rmtree(local_target_directory, ignore_errors=True) + + +def get_video_info(file_path): + cmd = ['ffprobe', '-v', 'error', '-show_entries', + 'stream=codec_type:stream=index:stream=codec_name:stream=duration', '-of', 'json', file_path] + + try: + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) + json_data = json.loads(result.stdout) + return json_data + except subprocess.CalledProcessError as e: + print(f"Error running ffprobe: {e}") + return None + + +def get_transcode_options(video_info, output_directory): + output_options = { + 'preset': 'slow', + 'g': 48, + 'sc_threshold': 0, + 'map': [], + 'var_stream_map': '', + 'master_pl_name': 'master.m3u8', + 'f': 'hls', + 'hls_time': 6, + 'hls_list_size': 0, + 'hls_segment_filename': f'{output_directory}/%v/chunk%03d.ts' + } + + audio_index = 0 + for stream in video_info['streams']: + index = stream['index'] + if stream['codec_type'] == 'video': + output_options['map'].append(f'0:{index}') + output_options['s:v:0'] = '960x540' + output_options['c:v:0'] = 'libx264' + output_options['b:v:0'] = '2000k' + output_options['var_stream_map'] += ' v:0,agroup:audio' + if stream['codec_type'] == 'audio': + output_options['map'].append(f'0:{index}') + output_options[f'c:a:{audio_index}'] = 'aac' + output_options['var_stream_map'] += f' a:{audio_index},agroup:audio' + audio_index += 1 + + output_options['var_stream_map'] = output_options['var_stream_map'].strip() + return output_options + + # output_options1 = { + # 'preset': 'slow', + # 'g': 48, + # 'sc_threshold': 0, + # 'map': ['0:0', '0:1', '0:8'], + # 's:v:0': '960x540', + # 'c:v:0': 'libx264', + # 'b:v:0': '2000k', + # 'c:a:0': 'aac', + # 'c:a:1': 'aac', + # 'var_stream_map': 'a:0,agroup:audio,default:yes a:1,agroup:audio v:0,agroup:audio', + # 'master_pl_name': 'master.m3u8', + # 'f': 'hls', + # 'hls_time': 6, + # 'hls_list_size': 0, + # 'hls_segment_filename': f'{output_directory}/%v/fileSequence%03d.ts', + # } + if __name__ == "__main__": main() diff --git a/media-samurai/hls/videoinfo.bat b/media-samurai/hls/videoinfo.bat new file mode 100644 index 00000000..f5a43542 --- /dev/null +++ b/media-samurai/hls/videoinfo.bat @@ -0,0 +1,12 @@ +ffprobe -v error ^ +-show_entries stream=codec_type:stream=index:stream=codec_name:stream=duration:stream=nb_read_packets -count_packets -of json ftp://dani:dani@localhost/data/video/hls/ma.mkv + +:ffprobe -v error -select_streams v:0 ^ +:-show_entries stream=codec_type:stream=index:stream=codec_name:stream=nb_frames -of json ftp://dani:dani@localhost/data/video/hls/ma.mkv + +:ffprobe -v error -show_format -show_streams ^ +:-of json ftp://dani:dani@localhost/data/video/hls/ma.mkv + +:ffmpeg -i ftp://dani:dani@localhost/data/video/hls/ma.mkv -vcodec copy -acodec copy -f null /dev/null +::ffmpeg -i ftp://dani:dani@localhost/data/video/x.mxf -vcodec copy -acodec copy -f null +ffmpeg -i ftp://dani:dani@localhost/data/video/hls/ma.mkv -vcodec copy -acodec copy -f null \ No newline at end of file diff --git a/media-samurai/requirements.txt b/media-samurai/requirements.txt index 4439ec33..07739c5d 100644 --- a/media-samurai/requirements.txt +++ b/media-samurai/requirements.txt @@ -3,5 +3,4 @@ fastapi~=0.104.1 pydantic~=2.4.2 nicegui~=1.4.2 pymediainfo~=6.1.0 -tqdm~=4.66.1 -gevent~=23.9.1 \ No newline at end of file +python-ffmpeg~=2.0.4 \ No newline at end of file diff --git a/media-samurai/transcode.py b/media-samurai/transcode.py index adc20ddf..c3b872c9 100644 --- a/media-samurai/transcode.py +++ b/media-samurai/transcode.py @@ -5,6 +5,8 @@ from pydantic import BaseModel from enum import Enum import time +from hls import transcode_hls, TranscodeStatus + background_tasks_results = {} executor = ThreadPoolExecutor(max_workers=5) @@ -14,22 +16,6 @@ class TranscodeItem(BaseModel): delay: int -class TranscodeStatus(str, Enum): - PENDING = "PENDING" - RUNNING = "RUNNING" - ERROR = "ERROR" - NONE = "NONE" - COMPLETED = "COMPLETED" - - -class TranscodeJob: - def __init__(self): - self.status = TranscodeStatus.NONE - self.progress = 0 - self.started = None - self.finished = None - - def simulate_background_task(task_id, duration): job = background_tasks_results[task_id] job.started = datetime.now() @@ -38,3 +24,16 @@ def simulate_background_task(task_id, duration): time.sleep(duration) job.finished = datetime.now() job.status = TranscodeStatus.COMPLETED + + +def execute_transcode_task(task_id, duration): + job = background_tasks_results[task_id] + job.started = datetime.now() + job.status = TranscodeStatus.RUNNING + local_source_file = 'd:/data/video/hls/ma.mkv' + remote_source_file = 'ftp://dani:dani@localhost/data/video/hls/ma.mkv' + local_target_directory = 'c:/temp/out' + remote_target_directory = r'/data/video/hls/out/test' + transcode_hls(local_source_file, remote_source_file, local_target_directory, remote_target_directory, job) + job.finished = datetime.now() + job.status = TranscodeStatus.COMPLETED diff --git a/server/user.jobengine.osgi.commons/src/user/commons/MediaSamuraiAPI.java b/server/user.jobengine.osgi.commons/src/user/commons/MediaSamuraiAPI.java index c24dd863..4c6adaab 100644 --- a/server/user.jobengine.osgi.commons/src/user/commons/MediaSamuraiAPI.java +++ b/server/user.jobengine.osgi.commons/src/user/commons/MediaSamuraiAPI.java @@ -1,6 +1,5 @@ package user.commons; -import java.util.Objects; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -22,89 +21,67 @@ public class MediaSamuraiAPI { private static final Logger log = LogManager.getLogger(); - public static void main1(String[] args) throws Exception { + public static void main(String[] args) throws Exception { + + final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); + + String inDir = "\\\\10.11.1.90\\input\\"; + String outDir = "\\\\10.11.1.100\\output\\"; +// String inputs[] = { "file-1.mov", "file-2.mov" }; + String inputs[] = { "file-1.mov" }; - final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); - - String inDir = "\\\\10.11.1.90\\data\\"; - String outDir = "\\\\10.11.1.100\\Promise\\TRANSCODER\\FFASTRANSCODER\\Out\\"; - - String inputs[] = { "13-02009-0000-1.mov" }; - for (String input2 : inputs) { - final String inputi = inDir + input2; - final String outputi = outDir + FilenameUtils.removeExtension(input2) + ".MP4"; - - Runnable task = new Runnable() { - private String input = inputi; - private String output = outputi; - - @Override - public void run() { - try { - log.info("Started"); - MediaSamuraiAPI api = new MediaSamuraiAPI("http://localhost:8181/", p -> { - System.out.println(output + " progress: " + p); - }); - BasicDBObject job = new BasicDBObject(); - job.put("name", inputi); - job.put("delay", 1); - String taskId = api.submit(job); - - for (int i = 0; i < 10; i++) { - Thread.sleep(3000); - BasicDBObject status = api.getStatus(taskId); - log.info("TaskId {} {}% {}", taskId, status.getInt("progress"), status.getString("status")); - } - } catch (Exception e) { - e.printStackTrace(); - } + for (String currentInput : inputs) { + Runnable task = () -> { + try { + transcode(inDir, outDir, currentInput); + } catch (Exception e) { + e.printStackTrace(); } }; - executor.execute(task); } executor.shutdown(); - executor.awaitTermination(1, TimeUnit.HOURS); + executor.awaitTermination(1, TimeUnit.MINUTES); } - public static void main(String[] args) throws Exception { - String inDir = "\\\\10.11.1.90\\data\\"; - String outDir = "\\\\10.11.1.100\\Promise\\TRANSCODER\\FFASTRANSCODER\\Out\\"; - - String inputs[] = { "13-02009-0000-1.mov" }; - for (String input2 : inputs) { - final String inputi = inDir + input2; - final String outputi = outDir + FilenameUtils.removeExtension(input2) + ".MP4"; - - try { - log.info("Started"); - MediaSamuraiAPI api = new MediaSamuraiAPI("http://localhost:8181/", p -> { - System.out.println(outputi + " progress: " + p); - }); - BasicDBObject job = new BasicDBObject(); - job.put("name", inputi); - job.put("delay", 1); - String taskId = api.submit(job); - while (true) { - Thread.sleep(3000); - BasicDBObject status = api.getStatus(taskId); - String taskStatus = status.getString("status"); - log.info("TaskId {} {}% {}", taskId, status.getInt("progress"), taskStatus); - if ("ERROR".equals(taskStatus) || "COMPLETED".equals(taskStatus)) { - break; - } + public static void main1(String[] args) throws Exception { + String inDir = "\\\\10.11.1.90\\input\\"; + String outDir = "\\\\10.11.1.100\\output\\"; + String inputs[] = { "file-1.mov" }; + + for (String currentInput : inputs) { + transcode(inDir, outDir, currentInput); + } + } + + private static void transcode(String inDir, String outDir, String input2) { + final String inputi = inDir + input2; + final String outputi = outDir + FilenameUtils.removeExtension(input2) + ".MP4"; + + try { + log.info("Started"); + MediaSamuraiAPI api = new MediaSamuraiAPI("http://localhost:8181/"); + BasicDBObject job = new BasicDBObject(); + job.put("name", inputi); + job.put("delay", 1); + String taskId = api.submit(job); + while (true) { + Thread.sleep(3000); + BasicDBObject status = api.getStatus(taskId); + String taskStatus = status.getString("status"); + log.info("TaskId {} {}% {}", taskId, status.getInt("progress"), taskStatus); + if ("ERROR".equals(taskStatus) || "COMPLETED".equals(taskStatus)) { + break; } - } catch (Exception e) { - e.printStackTrace(); } + } catch (Exception e) { + e.printStackTrace(); } } private ResteasyWebTarget webTarget; - private IProgressChangedListener listener; - public MediaSamuraiAPI(String apiAddress, IProgressChangedListener listener) { - this.listener = listener; + public MediaSamuraiAPI(String apiAddress) { try { webTarget = new ResteasyClientBuilder().build().target(apiAddress); @@ -138,9 +115,6 @@ public class MediaSamuraiAPI { } String json = apiResponse.readEntity(String.class); status = (BasicDBObject) JSONUtil.jsonToDbObject(json); - if (Objects.nonNull(listener)) { - listener.onProgressChanged(status.getInt("progress")); - } } catch (Exception e) { log.error(e.getClass() + " " + e.getMessage()); }