1 package user.jobengine.server.steps;
\r
4 import java.io.IOException;
\r
5 import java.nio.charset.Charset;
\r
6 import java.nio.file.DirectoryStream;
\r
7 import java.nio.file.Files;
\r
8 import java.nio.file.Path;
\r
9 import java.nio.file.Paths;
\r
10 import java.util.ArrayList;
\r
11 import java.util.Arrays;
\r
12 import java.util.Date;
\r
13 import java.util.HashMap;
\r
14 import java.util.List;
\r
15 import java.util.Map;
\r
16 import java.util.Set;
\r
18 import org.apache.logging.log4j.LogManager;
\r
19 import org.apache.logging.log4j.Logger;
\r
21 import com.ibm.nosql.json.api.BasicDBObject;
\r
22 import com.ibm.nosql.json.api.DB;
\r
23 import com.ibm.nosql.json.api.DBCollection;
\r
24 import com.ibm.nosql.json.api.DBObject;
\r
25 import com.ibm.nosql.json.api.QueryBuilder;
\r
27 import user.commons.IEntityBase;
\r
28 import user.commons.nosql.NoSQLUtils;
\r
29 import user.jobengine.db.IItemManager;
\r
30 import user.jobengine.db.MediaFile;
\r
31 import user.jobengine.db.MediaFileDAO;
\r
32 import user.jobengine.server.IJobEngine;
\r
33 import user.jobengine.server.IJobRuntime;
\r
35 public class ImportMORPHEUSMissingMaterialsStep extends JobStep {
\r
36 private static final String STATUS = "Status";
\r
37 private static final String IMPORTED = "Imported";
\r
38 private static final Logger logger = LogManager.getLogger();
\r
39 private static final String MATERIAL_ID = "Material ID";
\r
40 private static final String CHANNEL = "Channel";
\r
41 private static final String TIME_TO_AIR = "Time to Air";
\r
42 private static final String DURATION = "Duration";
\r
43 private static final String TITLE = "Title";
\r
44 private static final String DEVICE_ID = "Device ID";
\r
45 private static final String REASON = "Reason";
\r
46 private static final String COLLECTION_NAME = "missing_materials";
\r
48 private static final String CSV_EXT = ".csv";
\r
49 private static final String MXF_EXT = ".MXF";
\r
50 private MediaFileDAO dao;
\r
51 private String processedFolder;
\r
53 private String targetPath;
\r
54 private IJobRuntime jobRuntime;
\r
55 private int overall;
\r
56 private int current;
\r
58 private Map<String, Integer> buildMetadataMap(Path csvFilePath, String[] data) throws Exception {
\r
59 Map<String, Integer> result = new HashMap<>();
\r
60 List<String> dataList = Arrays.asList(data);
\r
61 storeMetadataPosition(csvFilePath, dataList, MATERIAL_ID, result);
\r
62 storeMetadataPosition(csvFilePath, dataList, CHANNEL, result);
\r
63 storeMetadataPosition(csvFilePath, dataList, TIME_TO_AIR, result);
\r
64 storeMetadataPosition(csvFilePath, dataList, DURATION, result);
\r
65 storeMetadataPosition(csvFilePath, dataList, TITLE, result);
\r
66 storeMetadataPosition(csvFilePath, dataList, DEVICE_ID, result);
\r
67 storeMetadataPosition(csvFilePath, dataList, REASON, result);
\r
72 public Object[] execute(String sourceCSVPath, String processedFolder, String targetPath, IJobEngine jobEngine, IJobRuntime jobRuntime) throws Exception {
\r
73 this.jobRuntime = jobRuntime;
\r
74 setAndCheck(sourceCSVPath, processedFolder, targetPath, jobEngine);
\r
76 List<Path> filePaths = new ArrayList<>();
\r
77 try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(Paths.get(sourceCSVPath))) {
\r
78 for (Path path : directoryStream) {
\r
79 filePaths.add(path);
\r
82 processPathItems(filePaths);
\r
83 } catch (Exception e) {
\r
85 logger.error(jobRuntime.getMarker(), "Hiba a végrehajtás során. A rendszer üzenete: {}", e.getMessage());
\r
90 private void moveProcessedCSV(Path csvFilePath) throws IOException {
\r
91 EscortFiles.ensureUNCFolder(csvFilePath.getParent().toString(), processedFolder);
\r
92 String fileName = csvFilePath.getFileName() + "." + EscortFiles.composeKillDate(0);
\r
93 Path targetPath = Paths.get(csvFilePath.getParent().toString(), processedFolder, fileName);
\r
94 Files.move(csvFilePath, targetPath);
\r
97 private void processLine(String[] data, Map<String, Integer> metadatas) throws Exception {
\r
98 String channel = data[metadatas.get(CHANNEL)];
\r
99 String timeToAir = data[metadatas.get(TIME_TO_AIR)];
\r
100 String duration = data[metadatas.get(DURATION)];
\r
101 String materialID = data[metadatas.get(MATERIAL_ID)];
\r
102 String title = data[metadatas.get(TITLE)];
\r
103 String deviceID = data[metadatas.get(DEVICE_ID)];
\r
104 String reason = data[metadatas.get(REASON)];
\r
106 DBObject query = QueryBuilder.start().and(QueryBuilder.start(MATERIAL_ID).is(materialID).get(), QueryBuilder.start(TIME_TO_AIR).is(timeToAir).get())
\r
108 DBCollection collection = db.getCollection(COLLECTION_NAME);
\r
109 BasicDBObject existingObject = (BasicDBObject) collection.findOne(query);
\r
110 if (existingObject != null) {
\r
111 logger.warn(jobRuntime.getMarker(), "Az '{}' anyag már feldolgozásra került az {} időpontban.", materialID, existingObject.getDate(IMPORTED));
\r
115 BasicDBObject dbObject = new BasicDBObject(IMPORTED, new Date());
\r
116 dbObject.put(CHANNEL, channel);
\r
117 dbObject.put(TIME_TO_AIR.replace(" ", ""), timeToAir);
\r
118 dbObject.put(DURATION, duration);
\r
119 dbObject.put(MATERIAL_ID.replace(" ", ""), materialID);
\r
120 dbObject.put(TITLE, title);
\r
121 dbObject.put(DEVICE_ID.replace(" ", ""), deviceID);
\r
122 dbObject.put(REASON, reason);
\r
124 String fileName = materialID + MXF_EXT;
\r
125 if (Files.exists(Paths.get(targetPath, fileName))) {
\r
126 logger.warn(jobRuntime.getMarker(), "Az '{}' anyag már be van töltve.", materialID);
\r
127 dbObject.put(STATUS, "SKIPPED");
\r
129 List<IEntityBase> medias = dao.getByHouseId(fileName);
\r
130 if (medias == null || medias.size() == 0) {
\r
131 logger.warn(jobRuntime.getMarker(), "Az '{}' anyag nem található az archívumban.", materialID);
\r
132 dbObject.put(STATUS, "UNAVAILABLE");
\r
133 } else if (medias.size() > 1) {
\r
134 logger.warn(jobRuntime.getMarker(), "Az '{}' anyagból egynél több található az archívumban.", materialID);
\r
135 dbObject.put(STATUS, "MULTIPLE");
\r
137 logger.info(jobRuntime.getMarker(), "Az '{}' anyag megtalálható az archívumban.", materialID);
\r
138 dbObject.put(STATUS, "RESTORABLE");
\r
143 collection.insert(dbObject);
\r
146 // Channel,Time to Air,Duration,Material ID,Title,Device ID,Reason,
\r
147 // TX02,10-JAN-2018 13:25:21:08,00:05:00:00,M009572A,Tiéd a pálya/26. - 1. seg - Eredeti ** mc ,ICELE-02,On Domain (ISILON, ICELE-01, ICELE-05) ,
\r
148 private void processMissingMaterialCSV(Path csvFilePath, List<String> lines) throws Exception {
\r
149 if (lines == null | lines.size() == 0) {
\r
153 Map<String, Integer> metadatas = null;
\r
154 for (int i = 0; i < lines.size(); i++) {
\r
155 String line = lines.get(i);
\r
158 String[] data = line.split(",");
\r
160 metadatas = buildMetadataMap(csvFilePath, data);
\r
162 processLine(data, metadatas);
\r
165 setProgress(current * 100 / overall);
\r
170 private void processPathItem(Path csvFilePath, List<String> lines) {
\r
171 File csvFile = csvFilePath.toFile();
\r
174 processMissingMaterialCSV(csvFilePath, lines);
\r
175 moveProcessedCSV(csvFilePath);
\r
176 } catch (Exception e) {
\r
177 logger.catching(e);
\r
178 logger.error(jobRuntime.getMarker(), "A {} MORPHEUS állomány mozgatásakor hiba történt. A rendszer hibaüzenete: {}.", csvFile.getName(),
\r
183 private void processPathItems(List<Path> filePaths) throws IOException {
\r
185 Map<Path, List<String>> contents = new HashMap<>();
\r
186 for (Path filePath : filePaths) {
\r
187 File csvFile = filePath.toFile();
\r
188 if (csvFile.isDirectory() || !csvFile.getName().toLowerCase().endsWith(CSV_EXT.toLowerCase()))
\r
190 List<String> lines = Files.readAllLines(filePath, Charset.forName("UTF-8"));
\r
191 overall += lines.size();
\r
192 contents.put(filePath, lines);
\r
195 Set<Path> csvPaths = contents.keySet();
\r
196 for (Path csvPath : csvPaths) {
\r
197 processPathItem(csvPath, contents.get(csvPath));
\r
201 private void setAndCheck(String sourcePath, String processedFolder, String targetPath, IJobEngine jobEngine) {
\r
202 if (jobEngine == null) {
\r
203 logger.error(jobRuntime.getMarker(), "Az folyamatkezelő réteg nem elérhető.");
\r
204 throw new NullPointerException("Internal error, missing JobEngine reference.");
\r
207 IItemManager manager = jobEngine.getItemManager();
\r
208 if (manager == null) {
\r
209 logger.error(jobRuntime.getMarker(), "Az adatbáziskezelő réteg nem elérhető.");
\r
210 throw new NullPointerException("Internal error, missing ItemManager reference.");
\r
212 dao = (MediaFileDAO) manager.getBaseDAO(MediaFile.class);
\r
214 logger.error(jobRuntime.getMarker(), "Az adatbáziskezelő réteg MediaFile kezelöje nem elérhető.");
\r
215 throw new NullPointerException("Internal error, missing MediaFile DAO reference.");
\r
217 if (sourcePath == null) {
\r
218 logger.error(jobRuntime.getMarker(), "A folyamat 'sourcePath' bemeneti paramétere üres.");
\r
219 throw new NullPointerException("System is not configured properly, 'sourcePath' input parameter missing.");
\r
221 if (processedFolder == null) {
\r
222 logger.error(jobRuntime.getMarker(), "A folyamat 'processedFolder' bemeneti paramétere üres.");
\r
223 throw new NullPointerException("System is not configured properly, 'processedFolder' input parameter missing.");
\r
225 this.processedFolder = processedFolder;
\r
227 if (targetPath == null) {
\r
228 logger.error(jobRuntime.getMarker(), "A folyamat 'targetPath' bemeneti paramétere üres.");
\r
229 throw new NullPointerException("System is not configured properly, 'targetPath' input parameter missing.");
\r
231 this.targetPath = targetPath;
\r
233 db = NoSQLUtils.getNoSQLDB();
\r
235 logger.error(jobRuntime.getMarker(), "Sikertelen kapcsolódás a NoSQL adatbázishoz.");
\r
236 throw new NullPointerException("Can not connect to NoSQL database.");
\r
241 private void storeMetadataPosition(Path csvFilePath, List<String> dataList, String name, Map<String, Integer> metadatas) throws Exception {
\r
242 int pos = dataList.indexOf(name);
\r
244 throw new Exception(String.format("A '%s' MORPHEUS állományban nem található a '%s' mező.", csvFilePath.getFileName(), MATERIAL_ID));
\r
245 metadatas.put(name, pos);
\r