from fastapi.staticfiles import StaticFiles
from fastapi import BackgroundTasks, FastAPI
from pydantic import BaseModel
-from transcode import TranscodeItem, TranscodeStatus, TranscodeJob, background_tasks_results, simulate_background_task
+
+from hls import TranscodeJob
+from transcode import TranscodeItem, TranscodeStatus, background_tasks_results, simulate_background_task, \
+ execute_transcode_task
# https://docs.pydantic.dev/2.4/concepts/models/
@app.post("/submit")
async def submit(item: TranscodeItem, background_tasks: BackgroundTasks):
task_id = str(len(background_tasks_results) + 1)
- background_tasks.add_task(simulate_background_task, task_id, item.delay)
+ background_tasks.add_task(execute_transcode_task, task_id, item.delay)
job = TranscodeJob()
job.status = TranscodeStatus.RUNNING
background_tasks_results[task_id] = job
--- /dev/null
+import os.path, os
+from ftplib import FTP, error_perm
+
+
+def main():
+ local_source_dir = r'C:\temp\out'
+ remote_target_dir = r'/data/video/hls/out/test'
+ ftp_upload(local_source_dir, remote_target_dir)
+
+
+def ftp_upload(local_source_dir, remote_target_dir):
+ host = 'localhost'
+ port = 21
+ ftp = FTP()
+ ftp.connect(host, port)
+ ftp.login('dani', 'dani')
+ cwd_create_all(ftp, remote_target_dir)
+ place_files(ftp, local_source_dir)
+ ftp.quit()
+
+
+def cwd_create_all(ftp, remote_target_dir):
+ remote_dir_tokens = remote_target_dir.split('/')
+ for remote_dir in remote_dir_tokens:
+ if not remote_dir:
+ continue
+ cwd_create(ftp, remote_dir)
+
+
+def cwd_create(ftp, path):
+ try:
+ ftp.cwd(path)
+ except error_perm as e:
+ if not e.args[0].startswith('550'):
+ raise
+ ftp.mkd(path)
+ ftp.cwd(path)
+
+
+def place_files(ftp, path):
+ for name in os.listdir(path):
+ local_path = os.path.join(path, name)
+ if os.path.isfile(local_path):
+ print("STOR", name, local_path)
+ ftp.storbinary('STOR ' + name, open(local_path, 'rb'))
+ elif os.path.isdir(local_path):
+ print("MKD", name)
+
+ try:
+ ftp.mkd(name)
+
+ # ignore "directory already exists"
+ except error_perm as e:
+ if not e.args[0].startswith('550'):
+ raise
+
+ print("CWD", name)
+ ftp.cwd(name)
+ place_files(ftp, local_path)
+ print("CWD", "..")
+ ftp.cwd("..")
+
+
+if __name__ == "__main__":
+ main()
import shutil
+from enum import Enum
-from ffmpeg import FFmpeg, Progress, FFmpegError
+from ffmpeg.ffmpeg import FFmpeg, FFmpegError
+from ffmpeg.progress import Progress
from pymediainfo import MediaInfo
+import subprocess
+import json
+
+from ftpupload import ftp_upload
+
+class TranscodeJob:
+ def __init__(self):
+ self.status = TranscodeStatus.NONE
+ self.progress = 0
+ self.started = None
+ self.finished = None
+
+
+class TranscodeStatus(str, Enum):
+ PENDING = "PENDING"
+ RUNNING = "RUNNING"
+ ERROR = "ERROR"
+ NONE = "NONE"
+ COMPLETED = "COMPLETED"
def main():
- input_file = r'd:\data\video\hls\ma.mkv'
+ local_source_file = 'd:/data/video/hls/ma.mkv'
+ remote_source_file = 'ftp://dani:dani@localhost/data/video/hls/ma.mkv'
+ local_target_directory = 'c:/temp/out'
+ remote_target_directory = r'/data/video/hls/out/test'
+ job = TranscodeJob()
+ transcode_hls(local_source_file, remote_source_file, local_target_directory, remote_target_directory, job)
+
- media_info = MediaInfo.parse(input_file)
+def transcode_hls(local_source_file, remote_source_file, local_target_directory,
+ remote_target_directory, job: TranscodeJob):
+ shutil.rmtree(local_target_directory, ignore_errors=True)
+ output_file = f'{local_target_directory}/%v/track_index.m3u8'
+
+ video_info = get_video_info(remote_source_file)
+
+ frame_count = 0
+ media_info = MediaInfo.parse(local_source_file)
for track in media_info.tracks:
- if track.track_type == "Video":
+ if track.track_type == 'Video':
frame_count = int(track.frame_count)
- # print("Duration (raw value):", track.duration)
- output_directory = 'hls/out/'
- output_file = output_directory + r'%v/prog_index.m3u8'
- shutil.rmtree(output_directory, ignore_errors=True)
-
- output_options = {
- 'preset': 'slow',
- 'g': 48,
- 'sc_threshold': 0,
- 'map': ['0:0', '0:1', '0:8'],
- 's:v:0': '960x540',
- 'c:v:0': 'libx264',
- 'b:v:0': '2000k',
- 'c:a:0': 'aac',
- 'c:a:1': 'aac',
- 'var_stream_map': 'a:0,agroup:audio,default:yes a:1,agroup:audio v:0,agroup:audio',
- 'master_pl_name': 'master.m3u8',
- 'f': 'hls',
- 'hls_time': 6,
- 'hls_list_size': 0,
- 'hls_segment_filename': f'{output_directory}/%v/fileSequence%03d.ts',
- }
+ output_options = get_transcode_options(video_info, local_target_directory)
ffmpeg = (
FFmpeg()
.option("y")
- .input(input_file)
+ .input(remote_source_file)
.output(
output_file,
output_options
@ffmpeg.on("progress")
def on_progress(progress: Progress):
- print(round(progress.frame * 100 / frame_count))
+ job.progress = round(progress.frame * 100 / frame_count)
try:
ffmpeg.execute()
except FFmpegError as e:
print(e.args)
+ ftp_upload(local_target_directory, remote_target_directory)
+ shutil.rmtree(local_target_directory, ignore_errors=True)
+
+
+def get_video_info(file_path):
+ cmd = ['ffprobe', '-v', 'error', '-show_entries',
+ 'stream=codec_type:stream=index:stream=codec_name:stream=duration', '-of', 'json', file_path]
+
+ try:
+ result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
+ json_data = json.loads(result.stdout)
+ return json_data
+ except subprocess.CalledProcessError as e:
+ print(f"Error running ffprobe: {e}")
+ return None
+
+
+def get_transcode_options(video_info, output_directory):
+ output_options = {
+ 'preset': 'slow',
+ 'g': 48,
+ 'sc_threshold': 0,
+ 'map': [],
+ 'var_stream_map': '',
+ 'master_pl_name': 'master.m3u8',
+ 'f': 'hls',
+ 'hls_time': 6,
+ 'hls_list_size': 0,
+ 'hls_segment_filename': f'{output_directory}/%v/chunk%03d.ts'
+ }
+
+ audio_index = 0
+ for stream in video_info['streams']:
+ index = stream['index']
+ if stream['codec_type'] == 'video':
+ output_options['map'].append(f'0:{index}')
+ output_options['s:v:0'] = '960x540'
+ output_options['c:v:0'] = 'libx264'
+ output_options['b:v:0'] = '2000k'
+ output_options['var_stream_map'] += ' v:0,agroup:audio'
+ if stream['codec_type'] == 'audio':
+ output_options['map'].append(f'0:{index}')
+ output_options[f'c:a:{audio_index}'] = 'aac'
+ output_options['var_stream_map'] += f' a:{audio_index},agroup:audio'
+ audio_index += 1
+
+ output_options['var_stream_map'] = output_options['var_stream_map'].strip()
+ return output_options
+
+ # output_options1 = {
+ # 'preset': 'slow',
+ # 'g': 48,
+ # 'sc_threshold': 0,
+ # 'map': ['0:0', '0:1', '0:8'],
+ # 's:v:0': '960x540',
+ # 'c:v:0': 'libx264',
+ # 'b:v:0': '2000k',
+ # 'c:a:0': 'aac',
+ # 'c:a:1': 'aac',
+ # 'var_stream_map': 'a:0,agroup:audio,default:yes a:1,agroup:audio v:0,agroup:audio',
+ # 'master_pl_name': 'master.m3u8',
+ # 'f': 'hls',
+ # 'hls_time': 6,
+ # 'hls_list_size': 0,
+ # 'hls_segment_filename': f'{output_directory}/%v/fileSequence%03d.ts',
+ # }
+
if __name__ == "__main__":
main()
--- /dev/null
+ffprobe -v error ^
+-show_entries stream=codec_type:stream=index:stream=codec_name:stream=duration:stream=nb_read_packets -count_packets -of json ftp://dani:dani@localhost/data/video/hls/ma.mkv
+
+:ffprobe -v error -select_streams v:0 ^
+:-show_entries stream=codec_type:stream=index:stream=codec_name:stream=nb_frames -of json ftp://dani:dani@localhost/data/video/hls/ma.mkv
+
+:ffprobe -v error -show_format -show_streams ^
+:-of json ftp://dani:dani@localhost/data/video/hls/ma.mkv
+
+:ffmpeg -i ftp://dani:dani@localhost/data/video/hls/ma.mkv -vcodec copy -acodec copy -f null /dev/null
+::ffmpeg -i ftp://dani:dani@localhost/data/video/x.mxf -vcodec copy -acodec copy -f null
+ffmpeg -i ftp://dani:dani@localhost/data/video/hls/ma.mkv -vcodec copy -acodec copy -f null
\ No newline at end of file
pydantic~=2.4.2
nicegui~=1.4.2
pymediainfo~=6.1.0
-tqdm~=4.66.1
-gevent~=23.9.1
\ No newline at end of file
+python-ffmpeg~=2.0.4
\ No newline at end of file
from enum import Enum
import time
+from hls import transcode_hls, TranscodeStatus
+
background_tasks_results = {}
executor = ThreadPoolExecutor(max_workers=5)
delay: int
-class TranscodeStatus(str, Enum):
- PENDING = "PENDING"
- RUNNING = "RUNNING"
- ERROR = "ERROR"
- NONE = "NONE"
- COMPLETED = "COMPLETED"
-
-
-class TranscodeJob:
- def __init__(self):
- self.status = TranscodeStatus.NONE
- self.progress = 0
- self.started = None
- self.finished = None
-
-
def simulate_background_task(task_id, duration):
job = background_tasks_results[task_id]
job.started = datetime.now()
time.sleep(duration)
job.finished = datetime.now()
job.status = TranscodeStatus.COMPLETED
+
+
+def execute_transcode_task(task_id, duration):
+ job = background_tasks_results[task_id]
+ job.started = datetime.now()
+ job.status = TranscodeStatus.RUNNING
+ local_source_file = 'd:/data/video/hls/ma.mkv'
+ remote_source_file = 'ftp://dani:dani@localhost/data/video/hls/ma.mkv'
+ local_target_directory = 'c:/temp/out'
+ remote_target_directory = r'/data/video/hls/out/test'
+ transcode_hls(local_source_file, remote_source_file, local_target_directory, remote_target_directory, job)
+ job.finished = datetime.now()
+ job.status = TranscodeStatus.COMPLETED
package user.commons;
-import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
private static final Logger log = LogManager.getLogger();
- public static void main1(String[] args) throws Exception {
+ public static void main(String[] args) throws Exception {
+
+ final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
+
+ String inDir = "\\\\10.11.1.90\\input\\";
+ String outDir = "\\\\10.11.1.100\\output\\";
+// String inputs[] = { "file-1.mov", "file-2.mov" };
+ String inputs[] = { "file-1.mov" };
- final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
-
- String inDir = "\\\\10.11.1.90\\data\\";
- String outDir = "\\\\10.11.1.100\\Promise\\TRANSCODER\\FFASTRANSCODER\\Out\\";
-
- String inputs[] = { "13-02009-0000-1.mov" };
- for (String input2 : inputs) {
- final String inputi = inDir + input2;
- final String outputi = outDir + FilenameUtils.removeExtension(input2) + ".MP4";
-
- Runnable task = new Runnable() {
- private String input = inputi;
- private String output = outputi;
-
- @Override
- public void run() {
- try {
- log.info("Started");
- MediaSamuraiAPI api = new MediaSamuraiAPI("http://localhost:8181/", p -> {
- System.out.println(output + " progress: " + p);
- });
- BasicDBObject job = new BasicDBObject();
- job.put("name", inputi);
- job.put("delay", 1);
- String taskId = api.submit(job);
-
- for (int i = 0; i < 10; i++) {
- Thread.sleep(3000);
- BasicDBObject status = api.getStatus(taskId);
- log.info("TaskId {} {}% {}", taskId, status.getInt("progress"), status.getString("status"));
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
+ for (String currentInput : inputs) {
+ Runnable task = () -> {
+ try {
+ transcode(inDir, outDir, currentInput);
+ } catch (Exception e) {
+ e.printStackTrace();
}
};
-
executor.execute(task);
}
executor.shutdown();
- executor.awaitTermination(1, TimeUnit.HOURS);
+ executor.awaitTermination(1, TimeUnit.MINUTES);
}
- public static void main(String[] args) throws Exception {
- String inDir = "\\\\10.11.1.90\\data\\";
- String outDir = "\\\\10.11.1.100\\Promise\\TRANSCODER\\FFASTRANSCODER\\Out\\";
-
- String inputs[] = { "13-02009-0000-1.mov" };
- for (String input2 : inputs) {
- final String inputi = inDir + input2;
- final String outputi = outDir + FilenameUtils.removeExtension(input2) + ".MP4";
-
- try {
- log.info("Started");
- MediaSamuraiAPI api = new MediaSamuraiAPI("http://localhost:8181/", p -> {
- System.out.println(outputi + " progress: " + p);
- });
- BasicDBObject job = new BasicDBObject();
- job.put("name", inputi);
- job.put("delay", 1);
- String taskId = api.submit(job);
- while (true) {
- Thread.sleep(3000);
- BasicDBObject status = api.getStatus(taskId);
- String taskStatus = status.getString("status");
- log.info("TaskId {} {}% {}", taskId, status.getInt("progress"), taskStatus);
- if ("ERROR".equals(taskStatus) || "COMPLETED".equals(taskStatus)) {
- break;
- }
+ public static void main1(String[] args) throws Exception {
+ String inDir = "\\\\10.11.1.90\\input\\";
+ String outDir = "\\\\10.11.1.100\\output\\";
+ String inputs[] = { "file-1.mov" };
+
+ for (String currentInput : inputs) {
+ transcode(inDir, outDir, currentInput);
+ }
+ }
+
+ private static void transcode(String inDir, String outDir, String input2) {
+ final String inputi = inDir + input2;
+ final String outputi = outDir + FilenameUtils.removeExtension(input2) + ".MP4";
+
+ try {
+ log.info("Started");
+ MediaSamuraiAPI api = new MediaSamuraiAPI("http://localhost:8181/");
+ BasicDBObject job = new BasicDBObject();
+ job.put("name", inputi);
+ job.put("delay", 1);
+ String taskId = api.submit(job);
+ while (true) {
+ Thread.sleep(3000);
+ BasicDBObject status = api.getStatus(taskId);
+ String taskStatus = status.getString("status");
+ log.info("TaskId {} {}% {}", taskId, status.getInt("progress"), taskStatus);
+ if ("ERROR".equals(taskStatus) || "COMPLETED".equals(taskStatus)) {
+ break;
}
- } catch (Exception e) {
- e.printStackTrace();
}
+ } catch (Exception e) {
+ e.printStackTrace();
}
}
private ResteasyWebTarget webTarget;
- private IProgressChangedListener listener;
- public MediaSamuraiAPI(String apiAddress, IProgressChangedListener listener) {
- this.listener = listener;
+ public MediaSamuraiAPI(String apiAddress) {
try {
webTarget = new ResteasyClientBuilder().build().target(apiAddress);
}
String json = apiResponse.readEntity(String.class);
status = (BasicDBObject) JSONUtil.jsonToDbObject(json);
- if (Objects.nonNull(listener)) {
- listener.onProgressChanged(status.getInt("progress"));
- }
} catch (Exception e) {
log.error(e.getClass() + " " + e.getMessage());
}