c5796eb4017b77cc3fa8dee331d2d54d02b8b809
[mediacube.git] /
1 package user.jobengine.server.steps;\r
2 \r
3 import java.io.File;\r
4 import java.io.IOException;\r
5 import java.nio.charset.Charset;\r
6 import java.nio.file.DirectoryStream;\r
7 import java.nio.file.Files;\r
8 import java.nio.file.Path;\r
9 import java.nio.file.Paths;\r
10 import java.text.SimpleDateFormat;\r
11 import java.util.ArrayList;\r
12 import java.util.Arrays;\r
13 import java.util.Date;\r
14 import java.util.HashMap;\r
15 import java.util.List;\r
16 import java.util.Locale;\r
17 import java.util.Map;\r
18 import java.util.Set;\r
19 \r
20 import org.apache.logging.log4j.LogManager;\r
21 import org.apache.logging.log4j.Logger;\r
22 \r
23 import com.ibm.nosql.json.api.BasicDBObject;\r
24 import com.ibm.nosql.json.api.DB;\r
25 import com.ibm.nosql.json.api.DBCollection;\r
26 import com.ibm.nosql.json.api.DBObject;\r
27 import com.ibm.nosql.json.api.QueryBuilder;\r
28 \r
29 import user.commons.IEntityBase;\r
30 import user.commons.morpheus.MorpheusStrings;\r
31 import user.commons.nosql.NoSQLUtils;\r
32 import user.jobengine.db.IItemManager;\r
33 import user.jobengine.db.MediaFile;\r
34 import user.jobengine.db.MediaFileDAO;\r
35 import user.jobengine.server.IJobEngine;\r
36 import user.jobengine.server.IJobRuntime;\r
37 \r
38 public class ImportMORPHEUSMissingMaterialsStep extends JobStep {\r
39         private static final Logger logger = LogManager.getLogger();\r
40 \r
41         private static final String CSV_EXT = ".csv";\r
42         private static final String MXF_EXT = ".MXF";\r
43         private MediaFileDAO dao;\r
44         private String processedFolder;\r
45         private DB db;\r
46         private String targetPath;\r
47         private IJobRuntime jobRuntime;\r
48         private int overall;\r
49         private int current;\r
50         private final SimpleDateFormat enDateFormat = new SimpleDateFormat("dd-MMM-yyyy HH:mm:ss:S", Locale.ENGLISH);\r
51 \r
52         private Map<String, Integer> buildMetadataMap(Path csvFilePath, String[] data) throws Exception {\r
53                 Map<String, Integer> result = new HashMap<>();\r
54                 List<String> dataList = Arrays.asList(data);\r
55                 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.MATERIAL_ID, result);\r
56                 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.CHANNEL, result);\r
57                 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.TIME_TO_AIR, result);\r
58                 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.DURATION, result);\r
59                 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.TITLE, result);\r
60                 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.DEVICE_ID, result);\r
61                 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.REASON, result);\r
62                 return result;\r
63         }\r
64 \r
65         @StepEntry\r
66         public Object[] execute(String sourceCSVPath, String processedFolder, String targetPath, IJobEngine jobEngine, IJobRuntime jobRuntime) throws Exception {\r
67                 this.jobRuntime = jobRuntime;\r
68                 setAndCheck(sourceCSVPath, processedFolder, targetPath, jobEngine);\r
69 \r
70                 List<Path> filePaths = new ArrayList<>();\r
71                 try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(Paths.get(sourceCSVPath))) {\r
72                         for (Path path : directoryStream) {\r
73                                 filePaths.add(path);\r
74                         }\r
75 \r
76                         processPathItems(filePaths);\r
77                 } catch (Exception e) {\r
78                         logger.catching(e);\r
79                         logger.error(jobRuntime.getMarker(), "Hiba a végrehajtás során. A rendszer üzenete: {}", e.getMessage());\r
80                 }\r
81                 return null;\r
82         }\r
83 \r
84         private void moveProcessedCSV(Path csvFilePath) throws IOException {\r
85                 EscortFiles.ensureUNCFolder(csvFilePath.getParent().toString(), processedFolder);\r
86                 String fileName = csvFilePath.getFileName() + "." + EscortFiles.composeKillDate(0);\r
87                 Path targetPath = Paths.get(csvFilePath.getParent().toString(), processedFolder, fileName);\r
88                 Files.move(csvFilePath, targetPath);\r
89         }\r
90 \r
91         private void processLine(String[] data, Map<String, Integer> metadatas) throws Exception {\r
92                 String channel = data[metadatas.get(MorpheusStrings.CHANNEL)];\r
93                 String timeToAir = data[metadatas.get(MorpheusStrings.TIME_TO_AIR)];\r
94                 String duration = data[metadatas.get(MorpheusStrings.DURATION)];\r
95                 String materialID = data[metadatas.get(MorpheusStrings.MATERIAL_ID)];\r
96                 String title = data[metadatas.get(MorpheusStrings.TITLE)];\r
97                 String deviceID = data[metadatas.get(MorpheusStrings.DEVICE_ID)];\r
98                 String reason = data[metadatas.get(MorpheusStrings.REASON)];\r
99 \r
100                 DBObject query = QueryBuilder.start()\r
101                                 .and(QueryBuilder.start(MorpheusStrings.MATERIALID).is(materialID).get(), QueryBuilder.start(MorpheusStrings.TIMETOAIR).is(timeToAir).get())\r
102                                 .get();\r
103                 DBCollection collection = db.getCollection(MorpheusStrings.COLLECTION_NAME);\r
104                 BasicDBObject existingObject = (BasicDBObject) collection.findOne(query);\r
105                 if (existingObject != null) {\r
106                         logger.warn(jobRuntime.getMarker(), "Az '{}' anyag már feldolgozásra került az {} időpontban.", materialID,\r
107                                         existingObject.getDate(MorpheusStrings.IMPORTED));\r
108                         return;\r
109                 }\r
110 \r
111                 BasicDBObject dbObject = new BasicDBObject(MorpheusStrings.IMPORTED, new Date());\r
112                 dbObject.put(MorpheusStrings.CHANNEL, channel);\r
113                 dbObject.put(MorpheusStrings.TIMETOAIR, enDateFormat.parse(timeToAir));\r
114                 dbObject.put(MorpheusStrings.DURATION, duration);\r
115                 dbObject.put(MorpheusStrings.MATERIALID, materialID);\r
116                 dbObject.put(MorpheusStrings.TITLE, title);\r
117                 dbObject.put(MorpheusStrings.DEVICEID, deviceID);\r
118                 dbObject.put(MorpheusStrings.REASON, reason);\r
119 \r
120                 String fileName = materialID + MXF_EXT;\r
121                 if (Files.exists(Paths.get(targetPath, fileName))) {\r
122                         logger.warn(jobRuntime.getMarker(), "Az '{}' anyag már be van töltve.", materialID);\r
123                         dbObject.put(MorpheusStrings.STATUS, MorpheusStrings.STATUS_SKIPPED);\r
124                 } else {\r
125                         List<IEntityBase> medias = dao.getByHouseId(fileName);\r
126                         if (medias == null || medias.size() == 0) {\r
127                                 logger.warn(jobRuntime.getMarker(), "Az '{}' anyag nem található az archívumban.", materialID);\r
128                                 dbObject.put(MorpheusStrings.STATUS, MorpheusStrings.STATUS_UNAVAILABLE);\r
129                         } else if (medias.size() > 1) {\r
130                                 logger.warn(jobRuntime.getMarker(), "Az '{}' anyagból egynél több található az archívumban.", materialID);\r
131                                 dbObject.put(MorpheusStrings.STATUS, MorpheusStrings.STATUS_MULTIPLE);\r
132                         } else {\r
133                                 logger.info(jobRuntime.getMarker(), "Az '{}' anyag megtalálható az archívumban.", materialID);\r
134                                 dbObject.put(MorpheusStrings.STATUS, MorpheusStrings.STATUS_RESTORABLE);\r
135                         }\r
136 \r
137                 }\r
138 \r
139                 collection.insert(dbObject);\r
140         }\r
141 \r
142         //      Channel,Time to Air,Duration,Material ID,Title,Device ID,Reason,\r
143         //      TX02,10-JAN-2018 13:25:21:08,00:05:00:00,M009572A,Tiéd a pálya/26. - 1. seg - Eredeti ** mc ,ICELE-02,On Domain (ISILON, ICELE-01, ICELE-05) ,\r
144         private void processMissingMaterialCSV(Path csvFilePath, List<String> lines) throws Exception {\r
145                 if (lines == null | lines.size() == 0) {\r
146                         return;\r
147                 }\r
148 \r
149                 Map<String, Integer> metadatas = null;\r
150                 for (int i = 0; i < lines.size(); i++) {\r
151                         String line = lines.get(i);\r
152                         if (line == null)\r
153                                 continue;\r
154                         String[] data = line.split(",");\r
155                         if (i == 0)\r
156                                 metadatas = buildMetadataMap(csvFilePath, data);\r
157                         else\r
158                                 processLine(data, metadatas);\r
159 \r
160                         current++;\r
161                         setProgress(current * 100 / overall);\r
162                 }\r
163 \r
164         }\r
165 \r
166         private void processPathItem(Path csvFilePath, List<String> lines) {\r
167                 File csvFile = csvFilePath.toFile();\r
168 \r
169                 try {\r
170                         processMissingMaterialCSV(csvFilePath, lines);\r
171                         moveProcessedCSV(csvFilePath);\r
172                 } catch (Exception e) {\r
173                         logger.catching(e);\r
174                         logger.error(jobRuntime.getMarker(), "A {} MORPHEUS állomány mozgatásakor hiba történt. A rendszer hibaüzenete: {}.", csvFile.getName(),\r
175                                         e.getMessage());\r
176                 }\r
177         }\r
178 \r
179         private void processPathItems(List<Path> filePaths) throws IOException {\r
180                 overall = 0;\r
181                 Map<Path, List<String>> contents = new HashMap<>();\r
182                 for (Path filePath : filePaths) {\r
183                         File csvFile = filePath.toFile();\r
184                         if (csvFile.isDirectory() || !csvFile.getName().toLowerCase().endsWith(CSV_EXT.toLowerCase()))\r
185                                 continue;\r
186                         List<String> lines = Files.readAllLines(filePath, Charset.forName("UTF-8"));\r
187                         overall += lines.size();\r
188                         contents.put(filePath, lines);\r
189                 }\r
190 \r
191                 Set<Path> csvPaths = contents.keySet();\r
192                 for (Path csvPath : csvPaths) {\r
193                         processPathItem(csvPath, contents.get(csvPath));\r
194                 }\r
195         }\r
196 \r
197         private void setAndCheck(String sourcePath, String processedFolder, String targetPath, IJobEngine jobEngine) {\r
198                 if (jobEngine == null) {\r
199                         logger.error(jobRuntime.getMarker(), "Az folyamatkezelő réteg nem elérhető.");\r
200                         throw new NullPointerException("Internal error, missing JobEngine reference.");\r
201                 }\r
202 \r
203                 IItemManager manager = jobEngine.getItemManager();\r
204                 if (manager == null) {\r
205                         logger.error(jobRuntime.getMarker(), "Az adatbáziskezelő réteg nem elérhető.");\r
206                         throw new NullPointerException("Internal error, missing ItemManager reference.");\r
207                 }\r
208                 dao = (MediaFileDAO) manager.getBaseDAO(MediaFile.class);\r
209                 if (dao == null) {\r
210                         logger.error(jobRuntime.getMarker(), "Az adatbáziskezelő réteg MediaFile kezelöje nem elérhető.");\r
211                         throw new NullPointerException("Internal error, missing MediaFile DAO reference.");\r
212                 }\r
213                 if (sourcePath == null) {\r
214                         logger.error(jobRuntime.getMarker(), "A folyamat 'sourcePath' bemeneti paramétere üres.");\r
215                         throw new NullPointerException("System is not configured properly, 'sourcePath' input parameter missing.");\r
216                 }\r
217                 if (processedFolder == null) {\r
218                         logger.error(jobRuntime.getMarker(), "A folyamat 'processedFolder' bemeneti paramétere üres.");\r
219                         throw new NullPointerException("System is not configured properly, 'processedFolder' input parameter missing.");\r
220                 }\r
221                 this.processedFolder = processedFolder;\r
222 \r
223                 if (targetPath == null) {\r
224                         logger.error(jobRuntime.getMarker(), "A folyamat 'targetPath' bemeneti paramétere üres.");\r
225                         throw new NullPointerException("System is not configured properly, 'targetPath' input parameter missing.");\r
226                 }\r
227                 this.targetPath = targetPath;\r
228 \r
229                 db = NoSQLUtils.getNoSQLDB();\r
230                 if (db == null) {\r
231                         logger.error(jobRuntime.getMarker(), "Sikertelen kapcsolódás a NoSQL adatbázishoz.");\r
232                         throw new NullPointerException("Can not connect to NoSQL database.");\r
233                 }\r
234 \r
235         }\r
236 \r
237         private void storeMetadataPosition(Path csvFilePath, List<String> dataList, String name, Map<String, Integer> metadatas) throws Exception {\r
238                 int pos = dataList.indexOf(name);\r
239                 if (pos < 0)\r
240                         throw new Exception(String.format("A '%s' MORPHEUS állományban nem található a '%s' mező.", csvFilePath.getFileName(), name));\r
241                 metadatas.put(name, pos);\r
242         }\r
243 \r
244 }\r