17c1249e6b2108bb5ccc3066ed1e098cfe6eb759
[mediacube.git] /
1 package user.jobengine.server.steps;\r
2 \r
3 import java.io.File;\r
4 import java.io.IOException;\r
5 import java.nio.charset.Charset;\r
6 import java.nio.file.DirectoryStream;\r
7 import java.nio.file.Files;\r
8 import java.nio.file.Path;\r
9 import java.nio.file.Paths;\r
10 import java.util.ArrayList;\r
11 import java.util.Arrays;\r
12 import java.util.Date;\r
13 import java.util.HashMap;\r
14 import java.util.List;\r
15 import java.util.Map;\r
16 import java.util.Set;\r
17 \r
18 import org.apache.logging.log4j.LogManager;\r
19 import org.apache.logging.log4j.Logger;\r
20 \r
21 import com.ibm.nosql.json.api.BasicDBObject;\r
22 import com.ibm.nosql.json.api.DB;\r
23 import com.ibm.nosql.json.api.DBCollection;\r
24 import com.ibm.nosql.json.api.DBObject;\r
25 import com.ibm.nosql.json.api.QueryBuilder;\r
26 \r
27 import user.commons.IEntityBase;\r
28 import user.commons.nosql.NoSQLUtils;\r
29 import user.jobengine.db.IItemManager;\r
30 import user.jobengine.db.MediaFile;\r
31 import user.jobengine.db.MediaFileDAO;\r
32 import user.jobengine.server.IJobEngine;\r
33 import user.jobengine.server.IJobRuntime;\r
34 \r
35 public class ImportMORPHEUSMissingMaterialsStep extends JobStep {\r
36         private static final String STATUS = "Status";\r
37         private static final String IMPORTED = "Imported";\r
38         private static final Logger logger = LogManager.getLogger();\r
39         private static final String MATERIAL_ID = "Material ID";\r
40         private static final String CHANNEL = "Channel";\r
41         private static final String TIME_TO_AIR = "Time to Air";\r
42         private static final String DURATION = "Duration";\r
43         private static final String TITLE = "Title";\r
44         private static final String DEVICE_ID = "Device ID";\r
45         private static final String REASON = "Reason";\r
46         private static final String COLLECTION_NAME = "missing_materials";\r
47 \r
48         private static final String CSV_EXT = ".csv";\r
49         private static final String MXF_EXT = ".MXF";\r
50         private MediaFileDAO dao;\r
51         private String processedFolder;\r
52         private DB db;\r
53         private String targetPath;\r
54         private IJobRuntime jobRuntime;\r
55         private int overall;\r
56         private int current;\r
57 \r
58         private Map<String, Integer> buildMetadataMap(Path csvFilePath, String[] data) throws Exception {\r
59                 Map<String, Integer> result = new HashMap<>();\r
60                 List<String> dataList = Arrays.asList(data);\r
61                 storeMetadataPosition(csvFilePath, dataList, MATERIAL_ID, result);\r
62                 storeMetadataPosition(csvFilePath, dataList, CHANNEL, result);\r
63                 storeMetadataPosition(csvFilePath, dataList, TIME_TO_AIR, result);\r
64                 storeMetadataPosition(csvFilePath, dataList, DURATION, result);\r
65                 storeMetadataPosition(csvFilePath, dataList, TITLE, result);\r
66                 storeMetadataPosition(csvFilePath, dataList, DEVICE_ID, result);\r
67                 storeMetadataPosition(csvFilePath, dataList, REASON, result);\r
68                 return result;\r
69         }\r
70 \r
71         @StepEntry\r
72         public Object[] execute(String sourceCSVPath, String processedFolder, String targetPath, IJobEngine jobEngine, IJobRuntime jobRuntime) throws Exception {\r
73                 this.jobRuntime = jobRuntime;\r
74                 setAndCheck(sourceCSVPath, processedFolder, targetPath, jobEngine);\r
75 \r
76                 List<Path> filePaths = new ArrayList<>();\r
77                 try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(Paths.get(sourceCSVPath))) {\r
78                         for (Path path : directoryStream) {\r
79                                 filePaths.add(path);\r
80                         }\r
81 \r
82                         processPathItems(filePaths);\r
83                 } catch (Exception e) {\r
84                         logger.catching(e);\r
85                         logger.error(jobRuntime.getMarker(), "Hiba a végrehajtás során. A rendszer üzenete: {}", e.getMessage());\r
86                 }\r
87                 return null;\r
88         }\r
89 \r
90         private void moveProcessedCSV(Path csvFilePath) throws IOException {\r
91                 EscortFiles.ensureUNCFolder(csvFilePath.getParent().toString(), processedFolder);\r
92                 String fileName = csvFilePath.getFileName() + "." + EscortFiles.composeKillDate(0);\r
93                 Path targetPath = Paths.get(csvFilePath.getParent().toString(), processedFolder, fileName);\r
94                 Files.move(csvFilePath, targetPath);\r
95         }\r
96 \r
97         private void processLine(String[] data, Map<String, Integer> metadatas) throws Exception {\r
98                 String channel = data[metadatas.get(CHANNEL)];\r
99                 String timeToAir = data[metadatas.get(TIME_TO_AIR)];\r
100                 String duration = data[metadatas.get(DURATION)];\r
101                 String materialID = data[metadatas.get(MATERIAL_ID)];\r
102                 String title = data[metadatas.get(TITLE)];\r
103                 String deviceID = data[metadatas.get(DEVICE_ID)];\r
104                 String reason = data[metadatas.get(REASON)];\r
105 \r
106                 DBObject query = QueryBuilder.start().and(QueryBuilder.start(MATERIAL_ID).is(materialID).get(), QueryBuilder.start(TIME_TO_AIR).is(timeToAir).get())\r
107                                 .get();\r
108                 DBCollection collection = db.getCollection(COLLECTION_NAME);\r
109                 BasicDBObject existingObject = (BasicDBObject) collection.findOne(query);\r
110                 if (existingObject != null) {\r
111                         logger.warn(jobRuntime.getMarker(), "Az '{}' anyag már feldolgozásra került az {} időpontban.", materialID, existingObject.getDate(IMPORTED));\r
112                         return;\r
113                 }\r
114 \r
115                 BasicDBObject dbObject = new BasicDBObject(IMPORTED, new Date());\r
116                 dbObject.put(CHANNEL, channel);\r
117                 dbObject.put(TIME_TO_AIR.replace(" ", ""), timeToAir);\r
118                 dbObject.put(DURATION, duration);\r
119                 dbObject.put(MATERIAL_ID.replace(" ", ""), materialID);\r
120                 dbObject.put(TITLE, title);\r
121                 dbObject.put(DEVICE_ID.replace(" ", ""), deviceID);\r
122                 dbObject.put(REASON, reason);\r
123 \r
124                 String fileName = materialID + MXF_EXT;\r
125                 if (Files.exists(Paths.get(targetPath, fileName))) {\r
126                         logger.warn(jobRuntime.getMarker(), "Az '{}' anyag már be van töltve.", materialID);\r
127                         dbObject.put(STATUS, "SKIPPED");\r
128                 } else {\r
129                         List<IEntityBase> medias = dao.getByHouseId(fileName);\r
130                         if (medias == null || medias.size() == 0) {\r
131                                 logger.warn(jobRuntime.getMarker(), "Az '{}' anyag nem található az archívumban.", materialID);\r
132                                 dbObject.put(STATUS, "UNAVAILABLE");\r
133                         } else if (medias.size() > 1) {\r
134                                 logger.warn(jobRuntime.getMarker(), "Az '{}' anyagból egynél több található az archívumban.", materialID);\r
135                                 dbObject.put(STATUS, "MULTIPLE");\r
136                         } else {\r
137                                 logger.info(jobRuntime.getMarker(), "Az '{}' anyag megtalálható az archívumban.", materialID);\r
138                                 dbObject.put(STATUS, "RESTORABLE");\r
139                         }\r
140 \r
141                 }\r
142 \r
143                 collection.insert(dbObject);\r
144         }\r
145 \r
146         //      Channel,Time to Air,Duration,Material ID,Title,Device ID,Reason,\r
147         //      TX02,10-JAN-2018 13:25:21:08,00:05:00:00,M009572A,Tiéd a pálya/26. - 1. seg - Eredeti ** mc ,ICELE-02,On Domain (ISILON, ICELE-01, ICELE-05) ,\r
148         private void processMissingMaterialCSV(Path csvFilePath, List<String> lines) throws Exception {\r
149                 if (lines == null | lines.size() == 0) {\r
150                         return;\r
151                 }\r
152 \r
153                 Map<String, Integer> metadatas = null;\r
154                 for (int i = 0; i < lines.size(); i++) {\r
155                         String line = lines.get(i);\r
156                         if (line == null)\r
157                                 continue;\r
158                         String[] data = line.split(",");\r
159                         if (i == 0)\r
160                                 metadatas = buildMetadataMap(csvFilePath, data);\r
161                         else\r
162                                 processLine(data, metadatas);\r
163 \r
164                         current++;\r
165                         setProgress(current * 100 / overall);\r
166                 }\r
167 \r
168         }\r
169 \r
170         private void processPathItem(Path csvFilePath, List<String> lines) {\r
171                 File csvFile = csvFilePath.toFile();\r
172 \r
173                 try {\r
174                         processMissingMaterialCSV(csvFilePath, lines);\r
175                         moveProcessedCSV(csvFilePath);\r
176                 } catch (Exception e) {\r
177                         logger.catching(e);\r
178                         logger.error(jobRuntime.getMarker(), "A {} MORPHEUS állomány mozgatásakor hiba történt. A rendszer hibaüzenete: {}.", csvFile.getName(),\r
179                                         e.getMessage());\r
180                 }\r
181         }\r
182 \r
183         private void processPathItems(List<Path> filePaths) throws IOException {\r
184                 overall = 0;\r
185                 Map<Path, List<String>> contents = new HashMap<>();\r
186                 for (Path filePath : filePaths) {\r
187                         File csvFile = filePath.toFile();\r
188                         if (csvFile.isDirectory() || !csvFile.getName().toLowerCase().endsWith(CSV_EXT.toLowerCase()))\r
189                                 continue;\r
190                         List<String> lines = Files.readAllLines(filePath, Charset.forName("UTF-8"));\r
191                         overall += lines.size();\r
192                         contents.put(filePath, lines);\r
193                 }\r
194 \r
195                 Set<Path> csvPaths = contents.keySet();\r
196                 for (Path csvPath : csvPaths) {\r
197                         processPathItem(csvPath, contents.get(csvPath));\r
198                 }\r
199         }\r
200 \r
201         private void setAndCheck(String sourcePath, String processedFolder, String targetPath, IJobEngine jobEngine) {\r
202                 if (jobEngine == null) {\r
203                         logger.error(jobRuntime.getMarker(), "Az folyamatkezelő réteg nem elérhető.");\r
204                         throw new NullPointerException("Internal error, missing JobEngine reference.");\r
205                 }\r
206 \r
207                 IItemManager manager = jobEngine.getItemManager();\r
208                 if (manager == null) {\r
209                         logger.error(jobRuntime.getMarker(), "Az adatbáziskezelő réteg nem elérhető.");\r
210                         throw new NullPointerException("Internal error, missing ItemManager reference.");\r
211                 }\r
212                 dao = (MediaFileDAO) manager.getBaseDAO(MediaFile.class);\r
213                 if (dao == null) {\r
214                         logger.error(jobRuntime.getMarker(), "Az adatbáziskezelő réteg MediaFile kezelöje nem elérhető.");\r
215                         throw new NullPointerException("Internal error, missing MediaFile DAO reference.");\r
216                 }\r
217                 if (sourcePath == null) {\r
218                         logger.error(jobRuntime.getMarker(), "A folyamat 'sourcePath' bemeneti paramétere üres.");\r
219                         throw new NullPointerException("System is not configured properly, 'sourcePath' input parameter missing.");\r
220                 }\r
221                 if (processedFolder == null) {\r
222                         logger.error(jobRuntime.getMarker(), "A folyamat 'processedFolder' bemeneti paramétere üres.");\r
223                         throw new NullPointerException("System is not configured properly, 'processedFolder' input parameter missing.");\r
224                 }\r
225                 this.processedFolder = processedFolder;\r
226 \r
227                 if (targetPath == null) {\r
228                         logger.error(jobRuntime.getMarker(), "A folyamat 'targetPath' bemeneti paramétere üres.");\r
229                         throw new NullPointerException("System is not configured properly, 'targetPath' input parameter missing.");\r
230                 }\r
231                 this.targetPath = targetPath;\r
232 \r
233                 db = NoSQLUtils.getNoSQLDB();\r
234                 if (db == null) {\r
235                         logger.error(jobRuntime.getMarker(), "Sikertelen kapcsolódás a NoSQL adatbázishoz.");\r
236                         throw new NullPointerException("Can not connect to NoSQL database.");\r
237                 }\r
238 \r
239         }\r
240 \r
241         private void storeMetadataPosition(Path csvFilePath, List<String> dataList, String name, Map<String, Integer> metadatas) throws Exception {\r
242                 int pos = dataList.indexOf(name);\r
243                 if (pos < 0)\r
244                         throw new Exception(String.format("A '%s' MORPHEUS állományban nem található a '%s' mező.", csvFilePath.getFileName(), MATERIAL_ID));\r
245                 metadatas.put(name, pos);\r
246         }\r
247 \r
248 }\r