Transcoder implemented, test ok
authorVásáry Dániel <vasary@elgekko.net>
Wed, 31 Jan 2024 20:53:29 +0000 (21:53 +0100)
committerVásáry Dániel <vasary@elgekko.net>
Wed, 31 Jan 2024 20:53:29 +0000 (21:53 +0100)
13 files changed:
media-samurai/api.py
media-samurai/app_config.py [new file with mode: 0644]
media-samurai/application.yaml [new file with mode: 0644]
media-samurai/ftpupload.py
media-samurai/hls.py
media-samurai/hls/003.bat
media-samurai/hls/004.bat [new file with mode: 0644]
media-samurai/hls/videoinfo.bat
media-samurai/logger.yaml [new file with mode: 0644]
media-samurai/main.py
media-samurai/requirements.txt
media-samurai/transcode.py
server/user.jobengine.osgi.commons/src/user/commons/MediaSamuraiAPI.java

index 3fbb17301d3ca310823f1a465569198bf7fe38d3..062b28d5757b0da3b41e1790aecd84539b1b4b64 100644 (file)
@@ -1,14 +1,14 @@
 from datetime import datetime
 from typing import Optional
 
+import shortuuid
 from fastapi.middleware.cors import CORSMiddleware
 from fastapi.staticfiles import StaticFiles
 from fastapi import BackgroundTasks, FastAPI
 from pydantic import BaseModel
 
 from hls import TranscodeJob
-from transcode import TranscodeItem, TranscodeStatus, background_tasks_results, simulate_background_task, \
-    execute_transcode_task
+from transcode import TranscodeItem, TranscodeStatus, background_tasks_results, execute_transcode_task
 
 
 # https://docs.pydantic.dev/2.4/concepts/models/
@@ -34,8 +34,8 @@ app.mount("/video", StaticFiles(directory="hls"), name="VIDEO")
 
 @app.post("/submit")
 async def submit(item: TranscodeItem, background_tasks: BackgroundTasks):
-    task_id = str(len(background_tasks_results) + 1)
-    background_tasks.add_task(execute_transcode_task, task_id, item.delay)
+    task_id = shortuuid.uuid()
+    background_tasks.add_task(execute_transcode_task, task_id, item.source, item.target, item.frames)
     job = TranscodeJob()
     job.status = TranscodeStatus.RUNNING
     background_tasks_results[task_id] = job
diff --git a/media-samurai/app_config.py b/media-samurai/app_config.py
new file mode 100644 (file)
index 0000000..e21851a
--- /dev/null
@@ -0,0 +1,23 @@
+import yaml
+from loguru_config import LoguruConfig
+
+LoguruConfig.load('logger.yaml')
+
+class AppConfig(object):
+    def __init__(self, file_name=None):
+        if file_name is not None:
+            with open(file_name) as config_file:
+                self.cfg = yaml.load(config_file, yaml.FullLoader)
+                self.load(self.cfg)
+
+    def load(self, dict_values):
+        for key in dict_values:
+            if type(dict_values[key]) is dict:
+                val = AppConfig()
+                val.load(dict_values[key])
+                setattr(self, key, val)
+            else:
+                setattr(self, key, dict_values[key])
+
+
+app_config = AppConfig('application.yaml')
diff --git a/media-samurai/application.yaml b/media-samurai/application.yaml
new file mode 100644 (file)
index 0000000..154501c
--- /dev/null
@@ -0,0 +1,3 @@
+cache_dir: c:/temp/transcode
+api:
+  port: 8181
index ba752b58106b48f14d925730c588ddc5ceeabc98..b2040f3d11272e8ed11c837d8c751a27f44b8e02 100644 (file)
@@ -1,5 +1,8 @@
 import os.path, os
 from ftplib import FTP, error_perm
+from urllib.parse import urlparse
+
+from loguru import logger
 
 
 def main():
@@ -9,17 +12,18 @@ def main():
 
 
 def ftp_upload(local_source_dir, remote_target_dir):
-    host = 'localhost'
-    port = 21
     ftp = FTP()
-    ftp.connect(host, port)
-    ftp.login('dani', 'dani')
-    cwd_create_all(ftp, remote_target_dir)
+    parsed_url = urlparse(remote_target_dir)
+    ftp.connect(parsed_url.hostname, parsed_url.port or 21)
+    ftp.login(parsed_url.username, parsed_url.password)
+    force_cwd_to(ftp, parsed_url.path)
+    logger.info(f'Storing local path {local_source_dir} recursively on remote site')
     place_files(ftp, local_source_dir)
     ftp.quit()
 
 
-def cwd_create_all(ftp, remote_target_dir):
+def force_cwd_to(ftp, remote_target_dir):
+    logger.info(f'Creating remote directory {remote_target_dir}')
     remote_dir_tokens = remote_target_dir.split('/')
     for remote_dir in remote_dir_tokens:
         if not remote_dir:
@@ -41,23 +45,24 @@ def place_files(ftp, path):
     for name in os.listdir(path):
         local_path = os.path.join(path, name)
         if os.path.isfile(local_path):
-            print("STOR", name, local_path)
+            print("STOR", name, local_path)
             ftp.storbinary('STOR ' + name, open(local_path, 'rb'))
         elif os.path.isdir(local_path):
-            print("MKD", name)
+            print("MKD", name)
 
             try:
                 ftp.mkd(name)
 
             # ignore "directory already exists"
             except error_perm as e:
-                if not e.args[0].startswith('550'):
+                if e.args[0].startswith('550'):
+                    logger.warning(f'Directory {name} already exists under remote {ftp.pwd()}')
+                else:
+                    logger.error(f'FTP upload error! Details: {e}')
                     raise
 
-            print("CWD", name)
             ftp.cwd(name)
             place_files(ftp, local_path)
-            print("CWD", "..")
             ftp.cwd("..")
 
 
index 53d8200a7f57b350f8778559a47a5fa75ab29584..c8d6ace479db87d79544f63a895e4264258d64ac 100644 (file)
@@ -1,12 +1,16 @@
+import os
 import shutil
 from enum import Enum
+from pathlib import Path
 
 from ffmpeg.ffmpeg import FFmpeg, FFmpegError
 from ffmpeg.progress import Progress
+from loguru import logger
 from pymediainfo import MediaInfo
 import subprocess
 import json
 
+from app_config import app_config
 from ftpupload import ftp_upload
 
 class TranscodeJob:
@@ -26,28 +30,29 @@ class TranscodeStatus(str, Enum):
 
 
 def main():
-    local_source_file = 'd:/data/video/hls/ma.mkv'
     remote_source_file = 'ftp://dani:dani@localhost/data/video/hls/ma.mkv'
-    local_target_directory = 'c:/temp/out'
     remote_target_directory = r'/data/video/hls/out/test'
     job = TranscodeJob()
-    transcode_hls(local_source_file, remote_source_file, local_target_directory, remote_target_directory, job)
+    transcode_hls(remote_source_file, remote_target_directory, '1', job)
 
 
-def transcode_hls(local_source_file, remote_source_file, local_target_directory,
-                  remote_target_directory, job: TranscodeJob):
-    shutil.rmtree(local_target_directory, ignore_errors=True)
-    output_file = f'{local_target_directory}/%v/track_index.m3u8'
-
-    video_info = get_video_info(remote_source_file)
+def transcode_hls(remote_source_file, remote_target_directory, frames, task_id, job: TranscodeJob):
+    local_target_directory = Path(app_config.cache_dir) / task_id
+    file_name = os.path.splitext(os.path.basename(remote_source_file))[0]
+    encoded_target_directory = local_target_directory / '/'.join(list(file_name))
 
-    frame_count = 0
-    media_info = MediaInfo.parse(local_source_file)
-    for track in media_info.tracks:
-        if track.track_type == 'Video':
-            frame_count = int(track.frame_count)
+    if os.path.exists(local_target_directory) and os.path.isdir(encoded_target_directory):
+        logger.info(f'Removing existing {encoded_target_directory}')
+        shutil.rmtree(encoded_target_directory, ignore_errors=True)
+    #     if not os.path.isdir(local_target_directory):
+    #         logger.info(f'cache_dir {local_target_directory} not exists, creating it.')
+    #         os.makedirs(local_target_directory)
+    # else:
+    output_file = f'{encoded_target_directory}/%v/track_index.m3u8'
 
-    output_options = get_transcode_options(video_info, local_target_directory)
+    video_info = get_video_info(remote_source_file, frames)
+    frame_count = video_info[0]
+    output_options = get_transcode_options(video_info[1], encoded_target_directory)
 
     ffmpeg = (
         FFmpeg()
@@ -59,9 +64,9 @@ def transcode_hls(local_source_file, remote_source_file, local_target_directory,
         )
     )
 
-    @ffmpeg.on("start")
-    def on_start(arguments: list[str]):
-        print("arguments:", arguments)
+    @ffmpeg.on("start")
+    def on_start(arguments: list[str]):
+        print("arguments:", arguments)
 
     @ffmpeg.on("progress")
     def on_progress(progress: Progress):
@@ -76,31 +81,44 @@ def transcode_hls(local_source_file, remote_source_file, local_target_directory,
     shutil.rmtree(local_target_directory, ignore_errors=True)
 
 
-def get_video_info(file_path):
-    cmd = ['ffprobe', '-v', 'error', '-show_entries',
-           'stream=codec_type:stream=index:stream=codec_name:stream=duration', '-of', 'json', file_path]
-
+def get_video_info(file_path, frames):
+    if not frames:
+        cmd = ['ffprobe', '-v', 'error', '-show_entries',
+               'stream=codec_type:stream=index:stream=codec_name:stream=duration:stream=nb_read_packets',
+               '-count_packets', '-of', 'json', file_path]
+    else:
+        cmd = ['ffprobe', '-v', 'error', '-show_entries',
+               'stream=codec_type:stream=index:stream=codec_name:stream=duration', '-of', 'json', file_path]
     try:
         result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
         json_data = json.loads(result.stdout)
-        return json_data
+
+        if not frames:
+            for stream in json_data['streams']:
+                if stream['codec_type'] == 'video':
+                    frames = int(stream['nb_read_packets'])
+                    break
+        return frames, json_data
     except subprocess.CalledProcessError as e:
-        print(f"Error running ffprobe: {e}")
-        return None
+        logger.error(f"Error running ffprobe: {e}")
+        return None, None
 
 
 def get_transcode_options(video_info, output_directory):
     output_options = {
-        'preset': 'slow',
-        'g': 48,
-        'sc_threshold': 0,
+        'preset': 'faster',
         'map': [],
         'var_stream_map': '',
-        'master_pl_name': 'master.m3u8',
+        'c:v:': 'libx264',
+        'b:v': '1.5M',
+        'vf': 'scale=720:576,format=yuv420p',
+        'g': 48,
+        'sc_threshold': 0,
         'f': 'hls',
-        'hls_time': 6,
+        'hls_time': 5,
         'hls_list_size': 0,
-        'hls_segment_filename': f'{output_directory}/%v/chunk%03d.ts'
+        'hls_segment_filename': f'{output_directory}/%v/chunk%03d.ts',
+        'master_pl_name': 'master.m3u8'
     }
 
     audio_index = 0
@@ -108,9 +126,6 @@ def get_transcode_options(video_info, output_directory):
         index = stream['index']
         if stream['codec_type'] == 'video':
             output_options['map'].append(f'0:{index}')
-            output_options['s:v:0'] = '960x540'
-            output_options['c:v:0'] = 'libx264'
-            output_options['b:v:0'] = '2000k'
             output_options['var_stream_map'] += ' v:0,agroup:audio'
         if stream['codec_type'] == 'audio':
             output_options['map'].append(f'0:{index}')
index 5d5da674e13c36738ece4e8295daa48076b73b07..5914b9f5a85a30d347dc0bdb830b1b985171e2b8 100644 (file)
@@ -1,15 +1,22 @@
 ffmpeg -y -i d:\data\video\hls\ma.mkv ^
--preset slow -g 48 -sc_threshold 0 ^
--map 0:0 -map 0:1 -map 0:8 ^
--s:v:0 960x540 -c:v:0 libx264 -b:v:0 2000k  ^
--c:a:0 aac ^
--c:a:1 aac ^
--var_stream_map "a:0,agroup:audio,default:yes a:1,agroup:audio v:0,agroup:audio" ^
--f hls ^
--hls_time 6 ^
--hls_playlist_type vod ^
--hls_segment_type mpegts ^
--hls_flags independent_segments ^
--hls_segment_filename "out/%%v/fileSequence%%03d.ts" ^
--master_pl_name master.m3u8 ^
-out/%%v/prog_index.m3u8
+    -map 0:0 -map 0:1 -map 0:1 ^
+    -c:v:0 libx264 ^
+    -x264-params "nal-hrd=cbr:force-cfr=1" ^
+    -profile:v main ^
+    -level 3.1 ^
+    -preset faster ^
+    -g 48 ^
+    -sc_threshold 0 ^
+    -vf scale=720:576,format=yuv420p ^
+    -b:v:0 1500k ^
+    -c:a:0 aac -b:a:0 96k ^
+    -c:a:1 aac -b:a:1 96k ^
+    -var_stream_map "a:0,agroup:audio,default:yes a:1,agroup:audio v:0,agroup:audio" ^
+    -f hls ^
+    -hls_time 5 ^
+    -hls_list_size 0 ^
+    -hls_playlist_type vod ^
+    -hls_flags independent_segments ^
+    -hls_segment_filename "out/%%v/fileSequence%%03d.ts" ^
+    -master_pl_name master.m3u8 ^
+    out/%%v/prog_index.m3u8
diff --git a/media-samurai/hls/004.bat b/media-samurai/hls/004.bat
new file mode 100644 (file)
index 0000000..c06fdcf
--- /dev/null
@@ -0,0 +1,19 @@
+ffmpeg -y -i d:\data\video\hls\ma.mkv ^
+    -map 0:0 -map 0:1 -map 0:8 ^
+    -var_stream_map "a:0,agroup:audio,default:yes a:1,agroup:audio v:0,agroup:audio" ^
+    -c:v libx264 ^
+    -profile:v main ^
+    -level 3.1 ^
+    -preset faster ^
+    -g 48 ^
+    -sc_threshold 0 ^
+    -vf "scale=720:576,format=yuv420p" ^
+    -b:v 1.5M ^
+    -c:a:0 aac ^
+    -c:a:1 aac ^
+    -f hls ^
+    -hls_time 5 ^
+    -hls_list_size 0 ^
+    -hls_segment_filename "out/%%v/fileSequence%%03d.ts" ^
+    -master_pl_name master.m3u8 ^
+    out/%%v/prog_index.m3u8
index f5a43542872ef27ad23c8cb6cd48b691ffbb090c..070dfa836f24e4648aa225c4912b6efb575f0566 100644 (file)
@@ -1,6 +1,9 @@
 ffprobe -v error ^
 -show_entries stream=codec_type:stream=index:stream=codec_name:stream=duration:stream=nb_read_packets -count_packets -of json ftp://dani:dani@localhost/data/video/hls/ma.mkv
 
+:ffprobe -v error ^
+:-show_entries stream=codec_type:stream=index:stream=codec_name:stream=duration:stream=nb_read_frames -count_frames -of json ftp://dani:dani@localhost/data/video/hls/ma.mkv
+
 :ffprobe -v error -select_streams v:0 ^
 :-show_entries stream=codec_type:stream=index:stream=codec_name:stream=nb_frames -of json ftp://dani:dani@localhost/data/video/hls/ma.mkv
 
@@ -9,4 +12,4 @@ ffprobe -v error ^
 
 :ffmpeg -i ftp://dani:dani@localhost/data/video/hls/ma.mkv -vcodec copy -acodec copy -f null /dev/null
 ::ffmpeg -i ftp://dani:dani@localhost/data/video/x.mxf -vcodec copy -acodec copy -f null
-ffmpeg -i ftp://dani:dani@localhost/data/video/hls/ma.mkv -vcodec copy -acodec copy -f null
\ No newline at end of file
+:ffmpeg -i ftp://dani:dani@localhost/data/video/hls/ma.mkv -vcodec copy -acodec copy -f null
\ No newline at end of file
diff --git a/media-samurai/logger.yaml b/media-samurai/logger.yaml
new file mode 100644 (file)
index 0000000..3dddc5e
--- /dev/null
@@ -0,0 +1,21 @@
+handlers:
+  - sink: ext://sys.stdout
+    format: '<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> <level>{message}</level>'
+    colorize: true
+    diagnose: true
+  - sink: media-samurai.log
+    enqueue: true
+    serialize: false
+    format: '<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> <level>{message}</level>'
+    colorize: true
+    diagnose: true
+levels:
+  - name: NEW
+    'no': 13
+    icon: ¤
+    color: ""
+extra:
+  common_to_all: default
+activation:
+  - [ "my_module.secret", false ]
+  - [ "another_library.module", true ]
\ No newline at end of file
index 472b7800362dfabb04a0bed45ba5d74314f48473..08c5b3a37ff87778f3c08e87e04eae0b2705092a 100644 (file)
@@ -1,27 +1,18 @@
+import json
+
 import uvicorn
+from loguru import logger
+
 from api import app
+from app_config import app_config
 from site_icon import site_icon_lizzard
 from nicegui import ui
 
 
-def start_job():
-    ui.notify("Start job!")
-
-
-ui.label("MEDIA SAMURAI")
-grid = ui.aggrid({
-    "columnDefs": [
-        {"headerName": "Name", "field": "name"},
-        {"headerName": "Type", "field": "type"},
-    ],
-    "rowData": [],
-})
-grid.props('inline height=500px')
-
-with ui.row():
-    ui.button('Start', on_click=start_job).props('small outline')
-
-
+logger.info(f'MediaSamurai listening on port: {app_config.api.port}')
+logger.info(app_config.cfg)
+ui.label(f'MediaSamurai listening on port: {app_config.api.port}')
+ui.link('Check documentation', '/docs')
 ui.run_with(app, favicon=site_icon_lizzard)
 if __name__ == "__main__":
-    uvicorn.run(app, port=8181)
+    uvicorn.run(app, port=8181, log_level='critical')
index 07739c5d5dbcc30f72fafade4efa2a1d4034876e..e0062016ea408d75966dd33cf02043cd02405dda 100644 (file)
@@ -1,6 +1,10 @@
-uvicorn~=0.22.0
-fastapi~=0.104.1
-pydantic~=2.4.2
-nicegui~=1.4.2
-pymediainfo~=6.1.0
-python-ffmpeg~=2.0.4
\ No newline at end of file
+uvicorn==0.22.0
+fastapi==0.104.1
+pydantic==2.4.2
+nicegui==1.4.2
+pymediainfo==6.1.0
+python-ffmpeg==2.0.4
+loguru==0.7.2
+loguru-config==0.1.0
+shortuuid==1.0.11
+urlparse3==1.1
\ No newline at end of file
index c3b872c9affb28b03f6c643eb8fb3aa02035a81b..546c914a8eb196aecf4fd1fd3d419028b826b02d 100644 (file)
@@ -1,6 +1,7 @@
 from concurrent.futures import ThreadPoolExecutor
 from datetime import datetime
 
+from loguru import logger
 from pydantic import BaseModel
 from enum import Enum
 import time
@@ -12,8 +13,9 @@ executor = ThreadPoolExecutor(max_workers=5)
 
 
 class TranscodeItem(BaseModel):
-    name: str
-    delay: int
+    source: str
+    target: str
+    frames: int
 
 
 def simulate_background_task(task_id, duration):
@@ -26,14 +28,18 @@ def simulate_background_task(task_id, duration):
     job.status = TranscodeStatus.COMPLETED
 
 
-def execute_transcode_task(task_id, duration):
+def execute_transcode_task(task_id, remote_source_file, remote_target_directory, frames):
     job = background_tasks_results[task_id]
     job.started = datetime.now()
     job.status = TranscodeStatus.RUNNING
-    local_source_file = 'd:/data/video/hls/ma.mkv'
-    remote_source_file = 'ftp://dani:dani@localhost/data/video/hls/ma.mkv'
-    local_target_directory = 'c:/temp/out'
-    remote_target_directory = r'/data/video/hls/out/test'
-    transcode_hls(local_source_file, remote_source_file, local_target_directory, remote_target_directory, job)
-    job.finished = datetime.now()
-    job.status = TranscodeStatus.COMPLETED
+    try:
+        logger.info(f'Start transcode job for {remote_source_file}, task ID {task_id}')
+        transcode_hls(remote_source_file, remote_target_directory, frames, task_id, job)
+        job.status = TranscodeStatus.COMPLETED
+        logger.info(f'Transcode job completed for {remote_source_file}, task ID {task_id}')
+    except Exception as e:
+        job.status = TranscodeStatus.ERROR
+        logger.error(f'Transcode job failed for {remote_source_file}, task ID {task_id}. Details: {e}')
+    finally:
+        job.finished = datetime.now()
+
index 4c6adaab9c1528e5f76533d630aa1c0cadb44e60..721b72ecfd466ef15a84c35ea12c4a05b246bd79 100644 (file)
@@ -8,7 +8,6 @@ import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
-import org.apache.commons.io.FilenameUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder;
@@ -25,15 +24,14 @@ public class MediaSamuraiAPI {
 
                final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
 
-               String inDir = "\\\\10.11.1.90\\input\\";
-               String outDir = "\\\\10.11.1.100\\output\\";
-//             String inputs[] = { "file-1.mov", "file-2.mov" };
-               String inputs[] = { "file-1.mov" };
-
-               for (String currentInput : inputs) {
+//             String sources[] = { "ftp://dani:dani@localhost/data/video/hls/ma.mkv" };
+               String sources[] = { "ftp://dani:dani@localhost/data/video/X.MXF" };
+//             String sources[] = { "ftp://dani:dani@localhost/data/video/N.mxf" };
+               String target = "ftp://dani:dani@localhost/data/video/hls/out/";
+               for (String source : sources) {
                        Runnable task = () -> {
                                try {
-                                       transcode(inDir, outDir, currentInput);
+                                       transcode(source, target, 0);
                                } catch (Exception e) {
                                        e.printStackTrace();
                                }
@@ -44,26 +42,14 @@ public class MediaSamuraiAPI {
                executor.awaitTermination(1, TimeUnit.MINUTES);
        }
 
-       public static void main1(String[] args) throws Exception {
-               String inDir = "\\\\10.11.1.90\\input\\";
-               String outDir = "\\\\10.11.1.100\\output\\";
-               String inputs[] = { "file-1.mov" };
-
-               for (String currentInput : inputs) {
-                       transcode(inDir, outDir, currentInput);
-               }
-       }
-
-       private static void transcode(String inDir, String outDir, String input2) {
-               final String inputi = inDir + input2;
-               final String outputi = outDir + FilenameUtils.removeExtension(input2) + ".MP4";
-
+       private static void transcode(String source, String target, long frames) {
                try {
                        log.info("Started");
                        MediaSamuraiAPI api = new MediaSamuraiAPI("http://localhost:8181/");
                        BasicDBObject job = new BasicDBObject();
-                       job.put("name", inputi);
-                       job.put("delay", 1);
+                       job.put("source", source);
+                       job.put("target", target);
+                       job.put("frames", frames);
                        String taskId = api.submit(job);
                        while (true) {
                                Thread.sleep(3000);