from datetime import datetime
from typing import Optional
+import shortuuid
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi import BackgroundTasks, FastAPI
from pydantic import BaseModel
from hls import TranscodeJob
-from transcode import TranscodeItem, TranscodeStatus, background_tasks_results, simulate_background_task, \
- execute_transcode_task
+from transcode import TranscodeItem, TranscodeStatus, background_tasks_results, execute_transcode_task
# https://docs.pydantic.dev/2.4/concepts/models/
@app.post("/submit")
async def submit(item: TranscodeItem, background_tasks: BackgroundTasks):
- task_id = str(len(background_tasks_results) + 1)
- background_tasks.add_task(execute_transcode_task, task_id, item.delay)
+ task_id = shortuuid.uuid()
+ background_tasks.add_task(execute_transcode_task, task_id, item.source, item.target, item.frames)
job = TranscodeJob()
job.status = TranscodeStatus.RUNNING
background_tasks_results[task_id] = job
--- /dev/null
+import yaml
+from loguru_config import LoguruConfig
+
+LoguruConfig.load('logger.yaml')
+
+class AppConfig(object):
+ def __init__(self, file_name=None):
+ if file_name is not None:
+ with open(file_name) as config_file:
+ self.cfg = yaml.load(config_file, yaml.FullLoader)
+ self.load(self.cfg)
+
+ def load(self, dict_values):
+ for key in dict_values:
+ if type(dict_values[key]) is dict:
+ val = AppConfig()
+ val.load(dict_values[key])
+ setattr(self, key, val)
+ else:
+ setattr(self, key, dict_values[key])
+
+
+app_config = AppConfig('application.yaml')
--- /dev/null
+cache_dir: c:/temp/transcode
+api:
+ port: 8181
import os.path, os
from ftplib import FTP, error_perm
+from urllib.parse import urlparse
+
+from loguru import logger
def main():
def ftp_upload(local_source_dir, remote_target_dir):
- host = 'localhost'
- port = 21
ftp = FTP()
- ftp.connect(host, port)
- ftp.login('dani', 'dani')
- cwd_create_all(ftp, remote_target_dir)
+ parsed_url = urlparse(remote_target_dir)
+ ftp.connect(parsed_url.hostname, parsed_url.port or 21)
+ ftp.login(parsed_url.username, parsed_url.password)
+ force_cwd_to(ftp, parsed_url.path)
+ logger.info(f'Storing local path {local_source_dir} recursively on remote site')
place_files(ftp, local_source_dir)
ftp.quit()
-def cwd_create_all(ftp, remote_target_dir):
+def force_cwd_to(ftp, remote_target_dir):
+ logger.info(f'Creating remote directory {remote_target_dir}')
remote_dir_tokens = remote_target_dir.split('/')
for remote_dir in remote_dir_tokens:
if not remote_dir:
for name in os.listdir(path):
local_path = os.path.join(path, name)
if os.path.isfile(local_path):
- print("STOR", name, local_path)
+ # print("STOR", name, local_path)
ftp.storbinary('STOR ' + name, open(local_path, 'rb'))
elif os.path.isdir(local_path):
- print("MKD", name)
+ # print("MKD", name)
try:
ftp.mkd(name)
# ignore "directory already exists"
except error_perm as e:
- if not e.args[0].startswith('550'):
+ if e.args[0].startswith('550'):
+ logger.warning(f'Directory {name} already exists under remote {ftp.pwd()}')
+ else:
+ logger.error(f'FTP upload error! Details: {e}')
raise
- print("CWD", name)
ftp.cwd(name)
place_files(ftp, local_path)
- print("CWD", "..")
ftp.cwd("..")
+import os
import shutil
from enum import Enum
+from pathlib import Path
from ffmpeg.ffmpeg import FFmpeg, FFmpegError
from ffmpeg.progress import Progress
+from loguru import logger
from pymediainfo import MediaInfo
import subprocess
import json
+from app_config import app_config
from ftpupload import ftp_upload
class TranscodeJob:
def main():
- local_source_file = 'd:/data/video/hls/ma.mkv'
remote_source_file = 'ftp://dani:dani@localhost/data/video/hls/ma.mkv'
- local_target_directory = 'c:/temp/out'
remote_target_directory = r'/data/video/hls/out/test'
job = TranscodeJob()
- transcode_hls(local_source_file, remote_source_file, local_target_directory, remote_target_directory, job)
+ transcode_hls(remote_source_file, remote_target_directory, '1', job)
-def transcode_hls(local_source_file, remote_source_file, local_target_directory,
- remote_target_directory, job: TranscodeJob):
- shutil.rmtree(local_target_directory, ignore_errors=True)
- output_file = f'{local_target_directory}/%v/track_index.m3u8'
-
- video_info = get_video_info(remote_source_file)
+def transcode_hls(remote_source_file, remote_target_directory, frames, task_id, job: TranscodeJob):
+ local_target_directory = Path(app_config.cache_dir) / task_id
+ file_name = os.path.splitext(os.path.basename(remote_source_file))[0]
+ encoded_target_directory = local_target_directory / '/'.join(list(file_name))
- frame_count = 0
- media_info = MediaInfo.parse(local_source_file)
- for track in media_info.tracks:
- if track.track_type == 'Video':
- frame_count = int(track.frame_count)
+ if os.path.exists(local_target_directory) and os.path.isdir(encoded_target_directory):
+ logger.info(f'Removing existing {encoded_target_directory}')
+ shutil.rmtree(encoded_target_directory, ignore_errors=True)
+ # if not os.path.isdir(local_target_directory):
+ # logger.info(f'cache_dir {local_target_directory} not exists, creating it.')
+ # os.makedirs(local_target_directory)
+ # else:
+ output_file = f'{encoded_target_directory}/%v/track_index.m3u8'
- output_options = get_transcode_options(video_info, local_target_directory)
+ video_info = get_video_info(remote_source_file, frames)
+ frame_count = video_info[0]
+ output_options = get_transcode_options(video_info[1], encoded_target_directory)
ffmpeg = (
FFmpeg()
)
)
- @ffmpeg.on("start")
- def on_start(arguments: list[str]):
- print("arguments:", arguments)
+ # @ffmpeg.on("start")
+ # def on_start(arguments: list[str]):
+ # print("arguments:", arguments)
@ffmpeg.on("progress")
def on_progress(progress: Progress):
shutil.rmtree(local_target_directory, ignore_errors=True)
-def get_video_info(file_path):
- cmd = ['ffprobe', '-v', 'error', '-show_entries',
- 'stream=codec_type:stream=index:stream=codec_name:stream=duration', '-of', 'json', file_path]
-
+def get_video_info(file_path, frames):
+ if not frames:
+ cmd = ['ffprobe', '-v', 'error', '-show_entries',
+ 'stream=codec_type:stream=index:stream=codec_name:stream=duration:stream=nb_read_packets',
+ '-count_packets', '-of', 'json', file_path]
+ else:
+ cmd = ['ffprobe', '-v', 'error', '-show_entries',
+ 'stream=codec_type:stream=index:stream=codec_name:stream=duration', '-of', 'json', file_path]
try:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
json_data = json.loads(result.stdout)
- return json_data
+
+ if not frames:
+ for stream in json_data['streams']:
+ if stream['codec_type'] == 'video':
+ frames = int(stream['nb_read_packets'])
+ break
+ return frames, json_data
except subprocess.CalledProcessError as e:
- print(f"Error running ffprobe: {e}")
- return None
+ logger.error(f"Error running ffprobe: {e}")
+ return None, None
def get_transcode_options(video_info, output_directory):
output_options = {
- 'preset': 'slow',
- 'g': 48,
- 'sc_threshold': 0,
+ 'preset': 'faster',
'map': [],
'var_stream_map': '',
- 'master_pl_name': 'master.m3u8',
+ 'c:v:': 'libx264',
+ 'b:v': '1.5M',
+ 'vf': 'scale=720:576,format=yuv420p',
+ 'g': 48,
+ 'sc_threshold': 0,
'f': 'hls',
- 'hls_time': 6,
+ 'hls_time': 5,
'hls_list_size': 0,
- 'hls_segment_filename': f'{output_directory}/%v/chunk%03d.ts'
+ 'hls_segment_filename': f'{output_directory}/%v/chunk%03d.ts',
+ 'master_pl_name': 'master.m3u8'
}
audio_index = 0
index = stream['index']
if stream['codec_type'] == 'video':
output_options['map'].append(f'0:{index}')
- output_options['s:v:0'] = '960x540'
- output_options['c:v:0'] = 'libx264'
- output_options['b:v:0'] = '2000k'
output_options['var_stream_map'] += ' v:0,agroup:audio'
if stream['codec_type'] == 'audio':
output_options['map'].append(f'0:{index}')
ffmpeg -y -i d:\data\video\hls\ma.mkv ^
--preset slow -g 48 -sc_threshold 0 ^
--map 0:0 -map 0:1 -map 0:8 ^
--s:v:0 960x540 -c:v:0 libx264 -b:v:0 2000k ^
--c:a:0 aac ^
--c:a:1 aac ^
--var_stream_map "a:0,agroup:audio,default:yes a:1,agroup:audio v:0,agroup:audio" ^
--f hls ^
--hls_time 6 ^
--hls_playlist_type vod ^
--hls_segment_type mpegts ^
--hls_flags independent_segments ^
--hls_segment_filename "out/%%v/fileSequence%%03d.ts" ^
--master_pl_name master.m3u8 ^
-out/%%v/prog_index.m3u8
+ -map 0:0 -map 0:1 -map 0:1 ^
+ -c:v:0 libx264 ^
+ -x264-params "nal-hrd=cbr:force-cfr=1" ^
+ -profile:v main ^
+ -level 3.1 ^
+ -preset faster ^
+ -g 48 ^
+ -sc_threshold 0 ^
+ -vf scale=720:576,format=yuv420p ^
+ -b:v:0 1500k ^
+ -c:a:0 aac -b:a:0 96k ^
+ -c:a:1 aac -b:a:1 96k ^
+ -var_stream_map "a:0,agroup:audio,default:yes a:1,agroup:audio v:0,agroup:audio" ^
+ -f hls ^
+ -hls_time 5 ^
+ -hls_list_size 0 ^
+ -hls_playlist_type vod ^
+ -hls_flags independent_segments ^
+ -hls_segment_filename "out/%%v/fileSequence%%03d.ts" ^
+ -master_pl_name master.m3u8 ^
+ out/%%v/prog_index.m3u8
--- /dev/null
+ffmpeg -y -i d:\data\video\hls\ma.mkv ^
+ -map 0:0 -map 0:1 -map 0:8 ^
+ -var_stream_map "a:0,agroup:audio,default:yes a:1,agroup:audio v:0,agroup:audio" ^
+ -c:v libx264 ^
+ -profile:v main ^
+ -level 3.1 ^
+ -preset faster ^
+ -g 48 ^
+ -sc_threshold 0 ^
+ -vf "scale=720:576,format=yuv420p" ^
+ -b:v 1.5M ^
+ -c:a:0 aac ^
+ -c:a:1 aac ^
+ -f hls ^
+ -hls_time 5 ^
+ -hls_list_size 0 ^
+ -hls_segment_filename "out/%%v/fileSequence%%03d.ts" ^
+ -master_pl_name master.m3u8 ^
+ out/%%v/prog_index.m3u8
ffprobe -v error ^
-show_entries stream=codec_type:stream=index:stream=codec_name:stream=duration:stream=nb_read_packets -count_packets -of json ftp://dani:dani@localhost/data/video/hls/ma.mkv
+:ffprobe -v error ^
+:-show_entries stream=codec_type:stream=index:stream=codec_name:stream=duration:stream=nb_read_frames -count_frames -of json ftp://dani:dani@localhost/data/video/hls/ma.mkv
+
:ffprobe -v error -select_streams v:0 ^
:-show_entries stream=codec_type:stream=index:stream=codec_name:stream=nb_frames -of json ftp://dani:dani@localhost/data/video/hls/ma.mkv
:ffmpeg -i ftp://dani:dani@localhost/data/video/hls/ma.mkv -vcodec copy -acodec copy -f null /dev/null
::ffmpeg -i ftp://dani:dani@localhost/data/video/x.mxf -vcodec copy -acodec copy -f null
-ffmpeg -i ftp://dani:dani@localhost/data/video/hls/ma.mkv -vcodec copy -acodec copy -f null
\ No newline at end of file
+:ffmpeg -i ftp://dani:dani@localhost/data/video/hls/ma.mkv -vcodec copy -acodec copy -f null
\ No newline at end of file
--- /dev/null
+handlers:
+ - sink: ext://sys.stdout
+ format: '<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> <level>{message}</level>'
+ colorize: true
+ diagnose: true
+ - sink: media-samurai.log
+ enqueue: true
+ serialize: false
+ format: '<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> <level>{message}</level>'
+ colorize: true
+ diagnose: true
+levels:
+ - name: NEW
+ 'no': 13
+ icon: ¤
+ color: ""
+extra:
+ common_to_all: default
+activation:
+ - [ "my_module.secret", false ]
+ - [ "another_library.module", true ]
\ No newline at end of file
+import json
+
import uvicorn
+from loguru import logger
+
from api import app
+from app_config import app_config
from site_icon import site_icon_lizzard
from nicegui import ui
-def start_job():
- ui.notify("Start job!")
-
-
-ui.label("MEDIA SAMURAI")
-grid = ui.aggrid({
- "columnDefs": [
- {"headerName": "Name", "field": "name"},
- {"headerName": "Type", "field": "type"},
- ],
- "rowData": [],
-})
-grid.props('inline height=500px')
-
-with ui.row():
- ui.button('Start', on_click=start_job).props('small outline')
-
-
+logger.info(f'MediaSamurai listening on port: {app_config.api.port}')
+logger.info(app_config.cfg)
+ui.label(f'MediaSamurai listening on port: {app_config.api.port}')
+ui.link('Check documentation', '/docs')
ui.run_with(app, favicon=site_icon_lizzard)
if __name__ == "__main__":
- uvicorn.run(app, port=8181)
+ uvicorn.run(app, port=8181, log_level='critical')
-uvicorn~=0.22.0
-fastapi~=0.104.1
-pydantic~=2.4.2
-nicegui~=1.4.2
-pymediainfo~=6.1.0
-python-ffmpeg~=2.0.4
\ No newline at end of file
+uvicorn==0.22.0
+fastapi==0.104.1
+pydantic==2.4.2
+nicegui==1.4.2
+pymediainfo==6.1.0
+python-ffmpeg==2.0.4
+loguru==0.7.2
+loguru-config==0.1.0
+shortuuid==1.0.11
+urlparse3==1.1
\ No newline at end of file
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
+from loguru import logger
from pydantic import BaseModel
from enum import Enum
import time
class TranscodeItem(BaseModel):
- name: str
- delay: int
+ source: str
+ target: str
+ frames: int
def simulate_background_task(task_id, duration):
job.status = TranscodeStatus.COMPLETED
-def execute_transcode_task(task_id, duration):
+def execute_transcode_task(task_id, remote_source_file, remote_target_directory, frames):
job = background_tasks_results[task_id]
job.started = datetime.now()
job.status = TranscodeStatus.RUNNING
- local_source_file = 'd:/data/video/hls/ma.mkv'
- remote_source_file = 'ftp://dani:dani@localhost/data/video/hls/ma.mkv'
- local_target_directory = 'c:/temp/out'
- remote_target_directory = r'/data/video/hls/out/test'
- transcode_hls(local_source_file, remote_source_file, local_target_directory, remote_target_directory, job)
- job.finished = datetime.now()
- job.status = TranscodeStatus.COMPLETED
+ try:
+ logger.info(f'Start transcode job for {remote_source_file}, task ID {task_id}')
+ transcode_hls(remote_source_file, remote_target_directory, frames, task_id, job)
+ job.status = TranscodeStatus.COMPLETED
+ logger.info(f'Transcode job completed for {remote_source_file}, task ID {task_id}')
+ except Exception as e:
+ job.status = TranscodeStatus.ERROR
+ logger.error(f'Transcode job failed for {remote_source_file}, task ID {task_id}. Details: {e}')
+ finally:
+ job.finished = datetime.now()
+
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import org.apache.commons.io.FilenameUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder;
final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
- String inDir = "\\\\10.11.1.90\\input\\";
- String outDir = "\\\\10.11.1.100\\output\\";
-// String inputs[] = { "file-1.mov", "file-2.mov" };
- String inputs[] = { "file-1.mov" };
-
- for (String currentInput : inputs) {
+// String sources[] = { "ftp://dani:dani@localhost/data/video/hls/ma.mkv" };
+ String sources[] = { "ftp://dani:dani@localhost/data/video/X.MXF" };
+// String sources[] = { "ftp://dani:dani@localhost/data/video/N.mxf" };
+ String target = "ftp://dani:dani@localhost/data/video/hls/out/";
+ for (String source : sources) {
Runnable task = () -> {
try {
- transcode(inDir, outDir, currentInput);
+ transcode(source, target, 0);
} catch (Exception e) {
e.printStackTrace();
}
executor.awaitTermination(1, TimeUnit.MINUTES);
}
- public static void main1(String[] args) throws Exception {
- String inDir = "\\\\10.11.1.90\\input\\";
- String outDir = "\\\\10.11.1.100\\output\\";
- String inputs[] = { "file-1.mov" };
-
- for (String currentInput : inputs) {
- transcode(inDir, outDir, currentInput);
- }
- }
-
- private static void transcode(String inDir, String outDir, String input2) {
- final String inputi = inDir + input2;
- final String outputi = outDir + FilenameUtils.removeExtension(input2) + ".MP4";
-
+ private static void transcode(String source, String target, long frames) {
try {
log.info("Started");
MediaSamuraiAPI api = new MediaSamuraiAPI("http://localhost:8181/");
BasicDBObject job = new BasicDBObject();
- job.put("name", inputi);
- job.put("delay", 1);
+ job.put("source", source);
+ job.put("target", target);
+ job.put("frames", frames);
String taskId = api.submit(job);
while (true) {
Thread.sleep(3000);