1 package user.jobengine.server.steps;
\r
4 import java.io.IOException;
\r
5 import java.nio.charset.Charset;
\r
6 import java.nio.file.DirectoryStream;
\r
7 import java.nio.file.Files;
\r
8 import java.nio.file.Path;
\r
9 import java.nio.file.Paths;
\r
10 import java.text.SimpleDateFormat;
\r
11 import java.util.ArrayList;
\r
12 import java.util.Arrays;
\r
13 import java.util.Date;
\r
14 import java.util.HashMap;
\r
15 import java.util.List;
\r
16 import java.util.Locale;
\r
17 import java.util.Map;
\r
18 import java.util.Set;
\r
20 import org.apache.logging.log4j.LogManager;
\r
21 import org.apache.logging.log4j.Logger;
\r
23 import com.ibm.nosql.json.api.BasicDBObject;
\r
24 import com.ibm.nosql.json.api.DB;
\r
25 import com.ibm.nosql.json.api.DBCollection;
\r
26 import com.ibm.nosql.json.api.DBObject;
\r
27 import com.ibm.nosql.json.api.QueryBuilder;
\r
29 import user.commons.IEntityBase;
\r
30 import user.commons.morpheus.MorpheusStrings;
\r
31 import user.commons.nosql.NoSQLUtils;
\r
32 import user.jobengine.db.IItemManager;
\r
33 import user.jobengine.db.MediaFile;
\r
34 import user.jobengine.db.MediaFileDAO;
\r
35 import user.jobengine.server.IJobEngine;
\r
36 import user.jobengine.server.IJobRuntime;
\r
37 import user.jobengine.server.steps.shared.EscortFiles;
\r
39 public class ImportMORPHEUSMissingMaterialsStep extends JobStep {
\r
40 private static final Logger logger = LogManager.getLogger();
\r
42 private static final String CSV_EXT = ".csv";
\r
43 private static final String MXF_EXT = ".MXF";
\r
44 private MediaFileDAO dao;
\r
45 private String processedFolder;
\r
47 private String targetPath;
\r
48 private IJobRuntime jobRuntime;
\r
49 private int overall;
\r
50 private int current;
\r
51 private final SimpleDateFormat enDateFormat = new SimpleDateFormat(MorpheusStrings.FORMAT_TIME_TO_AIR, Locale.ENGLISH);
\r
53 private Map<String, Integer> buildMetadataMap(Path csvFilePath, String[] data) throws Exception {
\r
54 Map<String, Integer> result = new HashMap<>();
\r
55 List<String> dataList = Arrays.asList(data);
\r
56 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.MATERIAL_ID, result);
\r
57 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.CHANNEL, result);
\r
58 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.TIME_TO_AIR, result);
\r
59 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.DURATION, result);
\r
60 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.TITLE, result);
\r
61 // storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.DEVICE_ID, result);
\r
62 // storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.REASON, result);
\r
67 public Object[] execute(String sourceCSVPath, String processedFolder, String targetPath, IJobEngine jobEngine, IJobRuntime jobRuntime) throws Exception {
\r
68 this.jobRuntime = jobRuntime;
\r
69 setAndCheck(sourceCSVPath, processedFolder, targetPath, jobEngine);
\r
71 List<Path> filePaths = new ArrayList<>();
\r
72 try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(Paths.get(sourceCSVPath))) {
\r
73 for (Path path : directoryStream) {
\r
74 filePaths.add(path);
\r
77 processPathItems(filePaths);
\r
78 } catch (Exception e) {
\r
80 logger.error(jobRuntime.getSessionMarker(), "Hiba a végrehajtás során. A rendszer üzenete: {}", e.getMessage());
\r
85 private void moveProcessedCSV(Path csvFilePath) throws IOException {
\r
86 EscortFiles.ensureUNCFolder(csvFilePath.getParent().toString(), processedFolder);
\r
87 String fileName = csvFilePath.getFileName() + "." + EscortFiles.composeKillDate(0);
\r
88 Path targetPath = Paths.get(csvFilePath.getParent().toString(), processedFolder, fileName);
\r
89 Files.move(csvFilePath, targetPath);
\r
92 private void processLine(String[] data, Map<String, Integer> metadatas) throws Exception {
\r
93 String channel = data[metadatas.get(MorpheusStrings.CHANNEL)];
\r
94 String timeToAir = data[metadatas.get(MorpheusStrings.TIME_TO_AIR)];
\r
95 String duration = data[metadatas.get(MorpheusStrings.DURATION)];
\r
96 String materialID = data[metadatas.get(MorpheusStrings.MATERIAL_ID)];
\r
97 String title = data[metadatas.get(MorpheusStrings.TITLE)];
\r
98 // String deviceID = data[metadatas.get(MorpheusStrings.DEVICE_ID)];
\r
99 // String reason = data[metadatas.get(MorpheusStrings.REASON)];
\r
101 DBObject query = QueryBuilder.start()
\r
102 .and(QueryBuilder.start(MorpheusStrings.MATERIALID).is(materialID).get(), QueryBuilder.start(MorpheusStrings.TIMETOAIR).is(timeToAir).get())
\r
104 DBCollection collection = db.getCollection(MorpheusStrings.COLLECTION_NAME);
\r
105 BasicDBObject existingObject = (BasicDBObject) collection.findOne(query);
\r
106 if (existingObject != null) {
\r
107 logger.warn(jobRuntime.getSessionMarker(), "Az '{}' anyag már feldolgozásra került az {} időpontban.", materialID,
\r
108 existingObject.getDate(MorpheusStrings.IMPORTED));
\r
112 BasicDBObject dbObject = new BasicDBObject(MorpheusStrings.IMPORTED, new Date());
\r
113 dbObject.put(MorpheusStrings.CHANNEL, channel);
\r
114 dbObject.put(MorpheusStrings.TIMETOAIR, enDateFormat.parse(timeToAir));
\r
115 dbObject.put(MorpheusStrings.DURATION, duration);
\r
116 dbObject.put(MorpheusStrings.MATERIALID, materialID);
\r
117 dbObject.put(MorpheusStrings.TITLE, title);
\r
118 // dbObject.put(MorpheusStrings.DEVICEID, deviceID);
\r
119 // dbObject.put(MorpheusStrings.REASON, reason);
\r
121 String fileName = materialID + MXF_EXT;
\r
123 Path targetFilePath = Paths.get(targetPath, fileName);
\r
124 boolean exists = Files.exists(targetFilePath);
\r
125 if (exists && targetFilePath.toFile().length() > 0) {
\r
126 logger.warn(jobRuntime.getSessionMarker(), "Az '{}' anyag már be van töltve.", materialID);
\r
127 dbObject.put(MorpheusStrings.STATUS, MorpheusStrings.STATUS_SKIPPED);
\r
129 List<IEntityBase> medias = dao.getByHouseId(fileName);
\r
130 if (medias == null || medias.size() == 0) {
\r
131 logger.warn(jobRuntime.getSessionMarker(), "Az '{}' anyag nem található az archívumban.", materialID);
\r
132 dbObject.put(MorpheusStrings.STATUS, MorpheusStrings.STATUS_UNAVAILABLE);
\r
133 } else if (medias.size() > 1) {
\r
134 logger.warn(jobRuntime.getSessionMarker(), "Az '{}' anyagból egynél több található az archívumban.", materialID);
\r
135 dbObject.put(MorpheusStrings.STATUS, MorpheusStrings.STATUS_MULTIPLE);
\r
137 logger.info(jobRuntime.getSessionMarker(), "Az '{}' anyag megtalálható az archívumban.", materialID);
\r
138 dbObject.put(MorpheusStrings.STATUS, MorpheusStrings.STATUS_RESTORABLE);
\r
140 MediaFile mf = (MediaFile) medias.get(0);
\r
141 dbObject.put(MorpheusStrings.MEDIAID, mf.getMediaId());
\r
146 collection.insert(dbObject);
\r
149 // Channel,Time to Air,Duration,Material ID,Title,Device ID,Reason,
\r
150 // TX02,10-JAN-2018 13:25:21:08,00:05:00:00,M009572A,Tiéd a pálya/26. - 1. seg - Eredeti ** mc ,ICELE-02,On Domain (ISILON, ICELE-01, ICELE-05) ,
\r
151 private void processMissingMaterialCSV(Path csvFilePath, List<String> lines) throws Exception {
\r
152 if (lines == null | lines.size() == 0) {
\r
156 Map<String, Integer> metadatas = null;
\r
157 for (int i = 0; i < lines.size(); i++) {
\r
158 String line = lines.get(i);
\r
161 String[] data = line.split(",");
\r
163 metadatas = buildMetadataMap(csvFilePath, data);
\r
165 processLine(data, metadatas);
\r
168 setProgress(current * 100 / overall);
\r
173 private void processPathItem(Path csvFilePath, List<String> lines) {
\r
174 File csvFile = csvFilePath.toFile();
\r
177 processMissingMaterialCSV(csvFilePath, lines);
\r
178 moveProcessedCSV(csvFilePath);
\r
179 } catch (Exception e) {
\r
180 logger.catching(e);
\r
181 logger.error(jobRuntime.getSessionMarker(), "A {} MORPHEUS állomány mozgatásakor hiba történt. A rendszer hibaüzenete: {}.", csvFile.getName(),
\r
186 private void processPathItems(List<Path> filePaths) throws IOException {
\r
188 Map<Path, List<String>> contents = new HashMap<>();
\r
189 for (Path filePath : filePaths) {
\r
190 File csvFile = filePath.toFile();
\r
191 if (csvFile.isDirectory() || !csvFile.getName().toLowerCase().endsWith(CSV_EXT.toLowerCase()))
\r
193 List<String> lines = Files.readAllLines(filePath, Charset.forName("UTF-8"));
\r
194 overall += lines.size();
\r
195 contents.put(filePath, lines);
\r
198 Set<Path> csvPaths = contents.keySet();
\r
199 for (Path csvPath : csvPaths) {
\r
200 processPathItem(csvPath, contents.get(csvPath));
\r
204 private void setAndCheck(String sourcePath, String processedFolder, String targetPath, IJobEngine jobEngine) {
\r
205 if (jobEngine == null) {
\r
206 logger.error(jobRuntime.getSessionMarker(), "Az folyamatkezelő réteg nem elérhető.");
\r
207 throw new NullPointerException("Internal error, missing JobEngine reference.");
\r
210 IItemManager manager = jobEngine.getItemManager();
\r
211 if (manager == null) {
\r
212 logger.error(jobRuntime.getSessionMarker(), "Az adatbáziskezelő réteg nem elérhető.");
\r
213 throw new NullPointerException("Internal error, missing ItemManager reference.");
\r
215 dao = (MediaFileDAO) manager.getBaseDAO(MediaFile.class);
\r
217 logger.error(jobRuntime.getSessionMarker(), "Az adatbáziskezelő réteg MediaFile kezelöje nem elérhető.");
\r
218 throw new NullPointerException("Internal error, missing MediaFile DAO reference.");
\r
220 if (sourcePath == null) {
\r
221 logger.error(jobRuntime.getSessionMarker(), "A folyamat 'sourcePath' bemeneti paramétere üres.");
\r
222 throw new NullPointerException("System is not configured properly, 'sourcePath' input parameter missing.");
\r
224 if (processedFolder == null) {
\r
225 logger.error(jobRuntime.getSessionMarker(), "A folyamat 'processedFolder' bemeneti paramétere üres.");
\r
226 throw new NullPointerException("System is not configured properly, 'processedFolder' input parameter missing.");
\r
228 this.processedFolder = processedFolder;
\r
230 if (targetPath == null) {
\r
231 logger.error(jobRuntime.getSessionMarker(), "A folyamat 'targetPath' bemeneti paramétere üres.");
\r
232 throw new NullPointerException("System is not configured properly, 'targetPath' input parameter missing.");
\r
234 this.targetPath = targetPath;
\r
236 db = NoSQLUtils.getNoSQLDB();
\r
238 logger.error(jobRuntime.getSessionMarker(), "Sikertelen kapcsolódás a NoSQL adatbázishoz.");
\r
239 throw new NullPointerException("Can not connect to NoSQL database.");
\r
244 private void storeMetadataPosition(Path csvFilePath, List<String> dataList, String name, Map<String, Integer> metadatas) throws Exception {
\r
245 int pos = dataList.indexOf(name);
\r
247 throw new Exception(String.format("A '%s' MORPHEUS állományban nem található a '%s' mező.", csvFilePath.getFileName(), name));
\r
248 metadatas.put(name, pos);
\r