HLS transcode service v1 (no param handling)
authorVásáry Dániel <vasary@elgekko.net>
Tue, 30 Jan 2024 15:00:42 +0000 (16:00 +0100)
committerVásáry Dániel <vasary@elgekko.net>
Tue, 30 Jan 2024 15:00:42 +0000 (16:00 +0100)
media-samurai/api.py
media-samurai/ftpupload.py [new file with mode: 0644]
media-samurai/hls.py
media-samurai/hls/videoinfo.bat [new file with mode: 0644]
media-samurai/requirements.txt
media-samurai/transcode.py
server/user.jobengine.osgi.commons/src/user/commons/MediaSamuraiAPI.java

index 87ecb69f99e77091944472c175c05507930242e7..3fbb17301d3ca310823f1a465569198bf7fe38d3 100644 (file)
@@ -5,7 +5,10 @@ from fastapi.middleware.cors import CORSMiddleware
 from fastapi.staticfiles import StaticFiles
 from fastapi import BackgroundTasks, FastAPI
 from pydantic import BaseModel
-from transcode import TranscodeItem, TranscodeStatus, TranscodeJob, background_tasks_results, simulate_background_task
+
+from hls import TranscodeJob
+from transcode import TranscodeItem, TranscodeStatus, background_tasks_results, simulate_background_task, \
+    execute_transcode_task
 
 
 # https://docs.pydantic.dev/2.4/concepts/models/
@@ -32,7 +35,7 @@ app.mount("/video", StaticFiles(directory="hls"), name="VIDEO")
 @app.post("/submit")
 async def submit(item: TranscodeItem, background_tasks: BackgroundTasks):
     task_id = str(len(background_tasks_results) + 1)
-    background_tasks.add_task(simulate_background_task, task_id, item.delay)
+    background_tasks.add_task(execute_transcode_task, task_id, item.delay)
     job = TranscodeJob()
     job.status = TranscodeStatus.RUNNING
     background_tasks_results[task_id] = job
diff --git a/media-samurai/ftpupload.py b/media-samurai/ftpupload.py
new file mode 100644 (file)
index 0000000..ba752b5
--- /dev/null
@@ -0,0 +1,65 @@
+import os.path, os
+from ftplib import FTP, error_perm
+
+
+def main():
+    local_source_dir = r'C:\temp\out'
+    remote_target_dir = r'/data/video/hls/out/test'
+    ftp_upload(local_source_dir, remote_target_dir)
+
+
+def ftp_upload(local_source_dir, remote_target_dir):
+    host = 'localhost'
+    port = 21
+    ftp = FTP()
+    ftp.connect(host, port)
+    ftp.login('dani', 'dani')
+    cwd_create_all(ftp, remote_target_dir)
+    place_files(ftp, local_source_dir)
+    ftp.quit()
+
+
+def cwd_create_all(ftp, remote_target_dir):
+    remote_dir_tokens = remote_target_dir.split('/')
+    for remote_dir in remote_dir_tokens:
+        if not remote_dir:
+            continue
+        cwd_create(ftp, remote_dir)
+
+
+def cwd_create(ftp, path):
+    try:
+        ftp.cwd(path)
+    except error_perm as e:
+        if not e.args[0].startswith('550'):
+            raise
+        ftp.mkd(path)
+        ftp.cwd(path)
+
+
+def place_files(ftp, path):
+    for name in os.listdir(path):
+        local_path = os.path.join(path, name)
+        if os.path.isfile(local_path):
+            print("STOR", name, local_path)
+            ftp.storbinary('STOR ' + name, open(local_path, 'rb'))
+        elif os.path.isdir(local_path):
+            print("MKD", name)
+
+            try:
+                ftp.mkd(name)
+
+            # ignore "directory already exists"
+            except error_perm as e:
+                if not e.args[0].startswith('550'):
+                    raise
+
+            print("CWD", name)
+            ftp.cwd(name)
+            place_files(ftp, local_path)
+            print("CWD", "..")
+            ftp.cwd("..")
+
+
+if __name__ == "__main__":
+    main()
index dde8a279c35a241d1d4a8767bb34be8b78e449b2..53d8200a7f57b350f8778559a47a5fa75ab29584 100644 (file)
@@ -1,44 +1,58 @@
 import shutil
+from enum import Enum
 
-from ffmpeg import FFmpeg, Progress, FFmpegError
+from ffmpeg.ffmpeg import FFmpeg, FFmpegError
+from ffmpeg.progress import Progress
 from pymediainfo import MediaInfo
+import subprocess
+import json
+
+from ftpupload import ftp_upload
+
+class TranscodeJob:
+    def __init__(self):
+        self.status = TranscodeStatus.NONE
+        self.progress = 0
+        self.started = None
+        self.finished = None
+
+
+class TranscodeStatus(str, Enum):
+    PENDING = "PENDING"
+    RUNNING = "RUNNING"
+    ERROR = "ERROR"
+    NONE = "NONE"
+    COMPLETED = "COMPLETED"
 
 
 def main():
-    input_file = r'd:\data\video\hls\ma.mkv'
+    local_source_file = 'd:/data/video/hls/ma.mkv'
+    remote_source_file = 'ftp://dani:dani@localhost/data/video/hls/ma.mkv'
+    local_target_directory = 'c:/temp/out'
+    remote_target_directory = r'/data/video/hls/out/test'
+    job = TranscodeJob()
+    transcode_hls(local_source_file, remote_source_file, local_target_directory, remote_target_directory, job)
+
 
-    media_info = MediaInfo.parse(input_file)
+def transcode_hls(local_source_file, remote_source_file, local_target_directory,
+                  remote_target_directory, job: TranscodeJob):
+    shutil.rmtree(local_target_directory, ignore_errors=True)
+    output_file = f'{local_target_directory}/%v/track_index.m3u8'
+
+    video_info = get_video_info(remote_source_file)
+
+    frame_count = 0
+    media_info = MediaInfo.parse(local_source_file)
     for track in media_info.tracks:
-        if track.track_type == "Video":
+        if track.track_type == 'Video':
             frame_count = int(track.frame_count)
-            # print("Duration (raw value):", track.duration)
 
-    output_directory = 'hls/out/'
-    output_file = output_directory + r'%v/prog_index.m3u8'
-    shutil.rmtree(output_directory, ignore_errors=True)
-
-    output_options = {
-        'preset': 'slow',
-        'g': 48,
-        'sc_threshold': 0,
-        'map': ['0:0', '0:1', '0:8'],
-        's:v:0': '960x540',
-        'c:v:0': 'libx264',
-        'b:v:0': '2000k',
-        'c:a:0': 'aac',
-        'c:a:1': 'aac',
-        'var_stream_map': 'a:0,agroup:audio,default:yes a:1,agroup:audio v:0,agroup:audio',
-        'master_pl_name': 'master.m3u8',
-        'f': 'hls',
-        'hls_time': 6,
-        'hls_list_size': 0,
-        'hls_segment_filename': f'{output_directory}/%v/fileSequence%03d.ts',
-    }
+    output_options = get_transcode_options(video_info, local_target_directory)
 
     ffmpeg = (
         FFmpeg()
         .option("y")
-        .input(input_file)
+        .input(remote_source_file)
         .output(
             output_file,
             output_options
@@ -51,13 +65,80 @@ def main():
 
     @ffmpeg.on("progress")
     def on_progress(progress: Progress):
-        print(round(progress.frame * 100 / frame_count))
+        job.progress = round(progress.frame * 100 / frame_count)
 
     try:
         ffmpeg.execute()
     except FFmpegError as e:
         print(e.args)
 
+    ftp_upload(local_target_directory, remote_target_directory)
+    shutil.rmtree(local_target_directory, ignore_errors=True)
+
+
+def get_video_info(file_path):
+    cmd = ['ffprobe', '-v', 'error', '-show_entries',
+           'stream=codec_type:stream=index:stream=codec_name:stream=duration', '-of', 'json', file_path]
+
+    try:
+        result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
+        json_data = json.loads(result.stdout)
+        return json_data
+    except subprocess.CalledProcessError as e:
+        print(f"Error running ffprobe: {e}")
+        return None
+
+
+def get_transcode_options(video_info, output_directory):
+    output_options = {
+        'preset': 'slow',
+        'g': 48,
+        'sc_threshold': 0,
+        'map': [],
+        'var_stream_map': '',
+        'master_pl_name': 'master.m3u8',
+        'f': 'hls',
+        'hls_time': 6,
+        'hls_list_size': 0,
+        'hls_segment_filename': f'{output_directory}/%v/chunk%03d.ts'
+    }
+
+    audio_index = 0
+    for stream in video_info['streams']:
+        index = stream['index']
+        if stream['codec_type'] == 'video':
+            output_options['map'].append(f'0:{index}')
+            output_options['s:v:0'] = '960x540'
+            output_options['c:v:0'] = 'libx264'
+            output_options['b:v:0'] = '2000k'
+            output_options['var_stream_map'] += ' v:0,agroup:audio'
+        if stream['codec_type'] == 'audio':
+            output_options['map'].append(f'0:{index}')
+            output_options[f'c:a:{audio_index}'] = 'aac'
+            output_options['var_stream_map'] += f' a:{audio_index},agroup:audio'
+            audio_index += 1
+
+    output_options['var_stream_map'] = output_options['var_stream_map'].strip()
+    return output_options
+
+    # output_options1 = {
+    #     'preset': 'slow',
+    #     'g': 48,
+    #     'sc_threshold': 0,
+    #     'map': ['0:0', '0:1', '0:8'],
+    #     's:v:0': '960x540',
+    #     'c:v:0': 'libx264',
+    #     'b:v:0': '2000k',
+    #     'c:a:0': 'aac',
+    #     'c:a:1': 'aac',
+    #     'var_stream_map': 'a:0,agroup:audio,default:yes a:1,agroup:audio v:0,agroup:audio',
+    #     'master_pl_name': 'master.m3u8',
+    #     'f': 'hls',
+    #     'hls_time': 6,
+    #     'hls_list_size': 0,
+    #     'hls_segment_filename': f'{output_directory}/%v/fileSequence%03d.ts',
+    # }
+
 
 if __name__ == "__main__":
     main()
diff --git a/media-samurai/hls/videoinfo.bat b/media-samurai/hls/videoinfo.bat
new file mode 100644 (file)
index 0000000..f5a4354
--- /dev/null
@@ -0,0 +1,12 @@
+ffprobe -v error ^
+-show_entries stream=codec_type:stream=index:stream=codec_name:stream=duration:stream=nb_read_packets -count_packets -of json ftp://dani:dani@localhost/data/video/hls/ma.mkv
+
+:ffprobe -v error -select_streams v:0 ^
+:-show_entries stream=codec_type:stream=index:stream=codec_name:stream=nb_frames -of json ftp://dani:dani@localhost/data/video/hls/ma.mkv
+
+:ffprobe -v error -show_format -show_streams ^
+:-of json ftp://dani:dani@localhost/data/video/hls/ma.mkv
+
+:ffmpeg -i ftp://dani:dani@localhost/data/video/hls/ma.mkv -vcodec copy -acodec copy -f null /dev/null
+::ffmpeg -i ftp://dani:dani@localhost/data/video/x.mxf -vcodec copy -acodec copy -f null
+ffmpeg -i ftp://dani:dani@localhost/data/video/hls/ma.mkv -vcodec copy -acodec copy -f null
\ No newline at end of file
index 4439ec33144fc839f6db0809aba1e3fe3dd90360..07739c5d5dbcc30f72fafade4efa2a1d4034876e 100644 (file)
@@ -3,5 +3,4 @@ fastapi~=0.104.1
 pydantic~=2.4.2
 nicegui~=1.4.2
 pymediainfo~=6.1.0
-tqdm~=4.66.1
-gevent~=23.9.1
\ No newline at end of file
+python-ffmpeg~=2.0.4
\ No newline at end of file
index adc20ddf7e134ce2a84073ebddd21b0ccc2bc497..c3b872c9affb28b03f6c643eb8fb3aa02035a81b 100644 (file)
@@ -5,6 +5,8 @@ from pydantic import BaseModel
 from enum import Enum
 import time
 
+from hls import transcode_hls, TranscodeStatus
+
 background_tasks_results = {}
 executor = ThreadPoolExecutor(max_workers=5)
 
@@ -14,22 +16,6 @@ class TranscodeItem(BaseModel):
     delay: int
 
 
-class TranscodeStatus(str, Enum):
-    PENDING = "PENDING"
-    RUNNING = "RUNNING"
-    ERROR = "ERROR"
-    NONE = "NONE"
-    COMPLETED = "COMPLETED"
-
-
-class TranscodeJob:
-    def __init__(self):
-        self.status = TranscodeStatus.NONE
-        self.progress = 0
-        self.started = None
-        self.finished = None
-
-
 def simulate_background_task(task_id, duration):
     job = background_tasks_results[task_id]
     job.started = datetime.now()
@@ -38,3 +24,16 @@ def simulate_background_task(task_id, duration):
         time.sleep(duration)
     job.finished = datetime.now()
     job.status = TranscodeStatus.COMPLETED
+
+
+def execute_transcode_task(task_id, duration):
+    job = background_tasks_results[task_id]
+    job.started = datetime.now()
+    job.status = TranscodeStatus.RUNNING
+    local_source_file = 'd:/data/video/hls/ma.mkv'
+    remote_source_file = 'ftp://dani:dani@localhost/data/video/hls/ma.mkv'
+    local_target_directory = 'c:/temp/out'
+    remote_target_directory = r'/data/video/hls/out/test'
+    transcode_hls(local_source_file, remote_source_file, local_target_directory, remote_target_directory, job)
+    job.finished = datetime.now()
+    job.status = TranscodeStatus.COMPLETED
index c24dd8638b79461b1c26d9031c47896d7603ca44..4c6adaab9c1528e5f76533d630aa1c0cadb44e60 100644 (file)
@@ -1,6 +1,5 @@
 package user.commons;
 
-import java.util.Objects;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -22,89 +21,67 @@ public class MediaSamuraiAPI {
 
        private static final Logger log = LogManager.getLogger();
 
-       public static void main1(String[] args) throws Exception {
+       public static void main(String[] args) throws Exception {
+
+               final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
+
+               String inDir = "\\\\10.11.1.90\\input\\";
+               String outDir = "\\\\10.11.1.100\\output\\";
+//             String inputs[] = { "file-1.mov", "file-2.mov" };
+               String inputs[] = { "file-1.mov" };
 
-               final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
-
-               String inDir = "\\\\10.11.1.90\\data\\";
-               String outDir = "\\\\10.11.1.100\\Promise\\TRANSCODER\\FFASTRANSCODER\\Out\\";
-
-               String inputs[] = { "13-02009-0000-1.mov" };
-               for (String input2 : inputs) {
-                       final String inputi = inDir + input2;
-                       final String outputi = outDir + FilenameUtils.removeExtension(input2) + ".MP4";
-
-                       Runnable task = new Runnable() {
-                               private String input = inputi;
-                               private String output = outputi;
-
-                               @Override
-                               public void run() {
-                                       try {
-                                               log.info("Started");
-                                               MediaSamuraiAPI api = new MediaSamuraiAPI("http://localhost:8181/", p -> {
-                                                       System.out.println(output + " progress: " + p);
-                                               });
-                                               BasicDBObject job = new BasicDBObject();
-                                               job.put("name", inputi);
-                                               job.put("delay", 1);
-                                               String taskId = api.submit(job);
-
-                                               for (int i = 0; i < 10; i++) {
-                                                       Thread.sleep(3000);
-                                                       BasicDBObject status = api.getStatus(taskId);
-                                                       log.info("TaskId {} {}% {}", taskId, status.getInt("progress"), status.getString("status"));
-                                               }
-                                       } catch (Exception e) {
-                                               e.printStackTrace();
-                                       }
+               for (String currentInput : inputs) {
+                       Runnable task = () -> {
+                               try {
+                                       transcode(inDir, outDir, currentInput);
+                               } catch (Exception e) {
+                                       e.printStackTrace();
                                }
                        };
-
                        executor.execute(task);
                }
                executor.shutdown();
-               executor.awaitTermination(1, TimeUnit.HOURS);
+               executor.awaitTermination(1, TimeUnit.MINUTES);
        }
 
-       public static void main(String[] args) throws Exception {
-               String inDir = "\\\\10.11.1.90\\data\\";
-               String outDir = "\\\\10.11.1.100\\Promise\\TRANSCODER\\FFASTRANSCODER\\Out\\";
-
-               String inputs[] = { "13-02009-0000-1.mov" };
-               for (String input2 : inputs) {
-                       final String inputi = inDir + input2;
-                       final String outputi = outDir + FilenameUtils.removeExtension(input2) + ".MP4";
-
-                       try {
-                               log.info("Started");
-                               MediaSamuraiAPI api = new MediaSamuraiAPI("http://localhost:8181/", p -> {
-                                       System.out.println(outputi + " progress: " + p);
-                               });
-                               BasicDBObject job = new BasicDBObject();
-                               job.put("name", inputi);
-                               job.put("delay", 1);
-                               String taskId = api.submit(job);
-                               while (true) {
-                                       Thread.sleep(3000);
-                                       BasicDBObject status = api.getStatus(taskId);
-                                       String taskStatus = status.getString("status");
-                                       log.info("TaskId {} {}% {}", taskId, status.getInt("progress"), taskStatus);
-                                       if ("ERROR".equals(taskStatus) || "COMPLETED".equals(taskStatus)) {
-                                               break;
-                                       }
+       public static void main1(String[] args) throws Exception {
+               String inDir = "\\\\10.11.1.90\\input\\";
+               String outDir = "\\\\10.11.1.100\\output\\";
+               String inputs[] = { "file-1.mov" };
+
+               for (String currentInput : inputs) {
+                       transcode(inDir, outDir, currentInput);
+               }
+       }
+
+       private static void transcode(String inDir, String outDir, String input2) {
+               final String inputi = inDir + input2;
+               final String outputi = outDir + FilenameUtils.removeExtension(input2) + ".MP4";
+
+               try {
+                       log.info("Started");
+                       MediaSamuraiAPI api = new MediaSamuraiAPI("http://localhost:8181/");
+                       BasicDBObject job = new BasicDBObject();
+                       job.put("name", inputi);
+                       job.put("delay", 1);
+                       String taskId = api.submit(job);
+                       while (true) {
+                               Thread.sleep(3000);
+                               BasicDBObject status = api.getStatus(taskId);
+                               String taskStatus = status.getString("status");
+                               log.info("TaskId {} {}% {}", taskId, status.getInt("progress"), taskStatus);
+                               if ("ERROR".equals(taskStatus) || "COMPLETED".equals(taskStatus)) {
+                                       break;
                                }
-                       } catch (Exception e) {
-                               e.printStackTrace();
                        }
+               } catch (Exception e) {
+                       e.printStackTrace();
                }
        }
 
        private ResteasyWebTarget webTarget;
-       private IProgressChangedListener listener;
 
-       public MediaSamuraiAPI(String apiAddress, IProgressChangedListener listener) {
-               this.listener = listener;
+       public MediaSamuraiAPI(String apiAddress) {
                try {
 
                        webTarget = new ResteasyClientBuilder().build().target(apiAddress);
@@ -138,9 +115,6 @@ public class MediaSamuraiAPI {
                        }
                        String json = apiResponse.readEntity(String.class);
                        status = (BasicDBObject) JSONUtil.jsonToDbObject(json);
-                       if (Objects.nonNull(listener)) {
-                               listener.onProgressChanged(status.getInt("progress"));
-                       }
                } catch (Exception e) {
                        log.error(e.getClass() + " " + e.getMessage());
                }