4ca332cf6ddacd828fb9df3ee0a02e066d63fc77
[mediacube.git] /
1 package user.jobengine.server.steps;\r
2 \r
3 import java.io.File;\r
4 import java.io.IOException;\r
5 import java.nio.charset.Charset;\r
6 import java.nio.file.DirectoryStream;\r
7 import java.nio.file.Files;\r
8 import java.nio.file.Path;\r
9 import java.nio.file.Paths;\r
10 import java.text.SimpleDateFormat;\r
11 import java.util.ArrayList;\r
12 import java.util.Arrays;\r
13 import java.util.Date;\r
14 import java.util.HashMap;\r
15 import java.util.List;\r
16 import java.util.Locale;\r
17 import java.util.Map;\r
18 import java.util.Set;\r
19 \r
20 import org.apache.logging.log4j.LogManager;\r
21 import org.apache.logging.log4j.Logger;\r
22 \r
23 import com.ibm.nosql.json.api.BasicDBObject;\r
24 import com.ibm.nosql.json.api.DB;\r
25 import com.ibm.nosql.json.api.DBCollection;\r
26 import com.ibm.nosql.json.api.DBObject;\r
27 import com.ibm.nosql.json.api.QueryBuilder;\r
28 \r
29 import user.commons.IEntityBase;\r
30 import user.commons.morpheus.MorpheusStrings;\r
31 import user.commons.nosql.NoSQLUtils;\r
32 import user.jobengine.db.IItemManager;\r
33 import user.jobengine.db.MediaFile;\r
34 import user.jobengine.db.MediaFileDAO;\r
35 import user.jobengine.server.IJobEngine;\r
36 import user.jobengine.server.IJobRuntime;\r
37 \r
38 public class ImportMORPHEUSMissingMaterialsStep extends JobStep {\r
39         private static final Logger logger = LogManager.getLogger();\r
40 \r
41         private static final String CSV_EXT = ".csv";\r
42         private static final String MXF_EXT = ".MXF";\r
43         private MediaFileDAO dao;\r
44         private String processedFolder;\r
45         private DB db;\r
46         private String targetPath;\r
47         private IJobRuntime jobRuntime;\r
48         private int overall;\r
49         private int current;\r
50         private final SimpleDateFormat enDateFormat = new SimpleDateFormat(MorpheusStrings.FORMAT_TIME_TO_AIR, Locale.ENGLISH);\r
51 \r
52         private Map<String, Integer> buildMetadataMap(Path csvFilePath, String[] data) throws Exception {\r
53                 Map<String, Integer> result = new HashMap<>();\r
54                 List<String> dataList = Arrays.asList(data);\r
55                 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.MATERIAL_ID, result);\r
56                 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.CHANNEL, result);\r
57                 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.TIME_TO_AIR, result);\r
58                 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.DURATION, result);\r
59                 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.TITLE, result);\r
60                 //              storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.DEVICE_ID, result);\r
61                 //              storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.REASON, result);\r
62                 return result;\r
63         }\r
64 \r
65         @StepEntry\r
66         public Object[] execute(String sourceCSVPath, String processedFolder, String targetPath, IJobEngine jobEngine, IJobRuntime jobRuntime) throws Exception {\r
67                 this.jobRuntime = jobRuntime;\r
68                 setAndCheck(sourceCSVPath, processedFolder, targetPath, jobEngine);\r
69 \r
70                 List<Path> filePaths = new ArrayList<>();\r
71                 try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(Paths.get(sourceCSVPath))) {\r
72                         for (Path path : directoryStream) {\r
73                                 filePaths.add(path);\r
74                         }\r
75 \r
76                         processPathItems(filePaths);\r
77                 } catch (Exception e) {\r
78                         logger.catching(e);\r
79                         logger.error(jobRuntime.getMarker(), "Hiba a végrehajtás során. A rendszer üzenete: {}", e.getMessage());\r
80                 }\r
81                 return null;\r
82         }\r
83 \r
84         private void moveProcessedCSV(Path csvFilePath) throws IOException {\r
85                 EscortFiles.ensureUNCFolder(csvFilePath.getParent().toString(), processedFolder);\r
86                 String fileName = csvFilePath.getFileName() + "." + EscortFiles.composeKillDate(0);\r
87                 Path targetPath = Paths.get(csvFilePath.getParent().toString(), processedFolder, fileName);\r
88                 Files.move(csvFilePath, targetPath);\r
89         }\r
90 \r
91         private void processLine(String[] data, Map<String, Integer> metadatas) throws Exception {\r
92                 String channel = data[metadatas.get(MorpheusStrings.CHANNEL)];\r
93                 String timeToAir = data[metadatas.get(MorpheusStrings.TIME_TO_AIR)];\r
94                 String duration = data[metadatas.get(MorpheusStrings.DURATION)];\r
95                 String materialID = data[metadatas.get(MorpheusStrings.MATERIAL_ID)];\r
96                 String title = data[metadatas.get(MorpheusStrings.TITLE)];\r
97                 //              String deviceID = data[metadatas.get(MorpheusStrings.DEVICE_ID)];\r
98                 //              String reason = data[metadatas.get(MorpheusStrings.REASON)];\r
99 \r
100                 DBObject query = QueryBuilder.start()\r
101                                 .and(QueryBuilder.start(MorpheusStrings.MATERIALID).is(materialID).get(), QueryBuilder.start(MorpheusStrings.TIMETOAIR).is(timeToAir).get())\r
102                                 .get();\r
103                 DBCollection collection = db.getCollection(MorpheusStrings.COLLECTION_NAME);\r
104                 BasicDBObject existingObject = (BasicDBObject) collection.findOne(query);\r
105                 if (existingObject != null) {\r
106                         logger.warn(jobRuntime.getMarker(), "Az '{}' anyag már feldolgozásra került az {} időpontban.", materialID,\r
107                                         existingObject.getDate(MorpheusStrings.IMPORTED));\r
108                         return;\r
109                 }\r
110 \r
111                 BasicDBObject dbObject = new BasicDBObject(MorpheusStrings.IMPORTED, new Date());\r
112                 dbObject.put(MorpheusStrings.CHANNEL, channel);\r
113                 dbObject.put(MorpheusStrings.TIMETOAIR, enDateFormat.parse(timeToAir));\r
114                 dbObject.put(MorpheusStrings.DURATION, duration);\r
115                 dbObject.put(MorpheusStrings.MATERIALID, materialID);\r
116                 dbObject.put(MorpheusStrings.TITLE, title);\r
117                 //              dbObject.put(MorpheusStrings.DEVICEID, deviceID);\r
118                 //              dbObject.put(MorpheusStrings.REASON, reason);\r
119 \r
120                 String fileName = materialID + MXF_EXT;\r
121 \r
122                 Path targetFilePath = Paths.get(targetPath, fileName);\r
123                 boolean exists = Files.exists(targetFilePath);\r
124                 if (exists && targetFilePath.toFile().length() > 0) {\r
125                         logger.warn(jobRuntime.getMarker(), "Az '{}' anyag már be van töltve.", materialID);\r
126                         dbObject.put(MorpheusStrings.STATUS, MorpheusStrings.STATUS_SKIPPED);\r
127                 } else {\r
128                         List<IEntityBase> medias = dao.getByHouseId(fileName);\r
129                         if (medias == null || medias.size() == 0) {\r
130                                 logger.warn(jobRuntime.getMarker(), "Az '{}' anyag nem található az archívumban.", materialID);\r
131                                 dbObject.put(MorpheusStrings.STATUS, MorpheusStrings.STATUS_UNAVAILABLE);\r
132                         } else if (medias.size() > 1) {\r
133                                 logger.warn(jobRuntime.getMarker(), "Az '{}' anyagból egynél több található az archívumban.", materialID);\r
134                                 dbObject.put(MorpheusStrings.STATUS, MorpheusStrings.STATUS_MULTIPLE);\r
135                         } else {\r
136                                 logger.info(jobRuntime.getMarker(), "Az '{}' anyag megtalálható az archívumban.", materialID);\r
137                                 dbObject.put(MorpheusStrings.STATUS, MorpheusStrings.STATUS_RESTORABLE);\r
138 \r
139                                 MediaFile mf = (MediaFile) medias.get(0);\r
140                                 dbObject.put(MorpheusStrings.MEDIAID, mf.getMediaId());\r
141                         }\r
142 \r
143                 }\r
144 \r
145                 collection.insert(dbObject);\r
146         }\r
147 \r
148         //      Channel,Time to Air,Duration,Material ID,Title,Device ID,Reason,\r
149         //      TX02,10-JAN-2018 13:25:21:08,00:05:00:00,M009572A,Tiéd a pálya/26. - 1. seg - Eredeti ** mc ,ICELE-02,On Domain (ISILON, ICELE-01, ICELE-05) ,\r
150         private void processMissingMaterialCSV(Path csvFilePath, List<String> lines) throws Exception {\r
151                 if (lines == null | lines.size() == 0) {\r
152                         return;\r
153                 }\r
154 \r
155                 Map<String, Integer> metadatas = null;\r
156                 for (int i = 0; i < lines.size(); i++) {\r
157                         String line = lines.get(i);\r
158                         if (line == null)\r
159                                 continue;\r
160                         String[] data = line.split(",");\r
161                         if (i == 0)\r
162                                 metadatas = buildMetadataMap(csvFilePath, data);\r
163                         else\r
164                                 processLine(data, metadatas);\r
165 \r
166                         current++;\r
167                         setProgress(current * 100 / overall);\r
168                 }\r
169 \r
170         }\r
171 \r
172         private void processPathItem(Path csvFilePath, List<String> lines) {\r
173                 File csvFile = csvFilePath.toFile();\r
174 \r
175                 try {\r
176                         processMissingMaterialCSV(csvFilePath, lines);\r
177                         moveProcessedCSV(csvFilePath);\r
178                 } catch (Exception e) {\r
179                         logger.catching(e);\r
180                         logger.error(jobRuntime.getMarker(), "A {} MORPHEUS állomány mozgatásakor hiba történt. A rendszer hibaüzenete: {}.", csvFile.getName(),\r
181                                         e.getMessage());\r
182                 }\r
183         }\r
184 \r
185         private void processPathItems(List<Path> filePaths) throws IOException {\r
186                 overall = 0;\r
187                 Map<Path, List<String>> contents = new HashMap<>();\r
188                 for (Path filePath : filePaths) {\r
189                         File csvFile = filePath.toFile();\r
190                         if (csvFile.isDirectory() || !csvFile.getName().toLowerCase().endsWith(CSV_EXT.toLowerCase()))\r
191                                 continue;\r
192                         List<String> lines = Files.readAllLines(filePath, Charset.forName("UTF-8"));\r
193                         overall += lines.size();\r
194                         contents.put(filePath, lines);\r
195                 }\r
196 \r
197                 Set<Path> csvPaths = contents.keySet();\r
198                 for (Path csvPath : csvPaths) {\r
199                         processPathItem(csvPath, contents.get(csvPath));\r
200                 }\r
201         }\r
202 \r
203         private void setAndCheck(String sourcePath, String processedFolder, String targetPath, IJobEngine jobEngine) {\r
204                 if (jobEngine == null) {\r
205                         logger.error(jobRuntime.getMarker(), "Az folyamatkezelő réteg nem elérhető.");\r
206                         throw new NullPointerException("Internal error, missing JobEngine reference.");\r
207                 }\r
208 \r
209                 IItemManager manager = jobEngine.getItemManager();\r
210                 if (manager == null) {\r
211                         logger.error(jobRuntime.getMarker(), "Az adatbáziskezelő réteg nem elérhető.");\r
212                         throw new NullPointerException("Internal error, missing ItemManager reference.");\r
213                 }\r
214                 dao = (MediaFileDAO) manager.getBaseDAO(MediaFile.class);\r
215                 if (dao == null) {\r
216                         logger.error(jobRuntime.getMarker(), "Az adatbáziskezelő réteg MediaFile kezelöje nem elérhető.");\r
217                         throw new NullPointerException("Internal error, missing MediaFile DAO reference.");\r
218                 }\r
219                 if (sourcePath == null) {\r
220                         logger.error(jobRuntime.getMarker(), "A folyamat 'sourcePath' bemeneti paramétere üres.");\r
221                         throw new NullPointerException("System is not configured properly, 'sourcePath' input parameter missing.");\r
222                 }\r
223                 if (processedFolder == null) {\r
224                         logger.error(jobRuntime.getMarker(), "A folyamat 'processedFolder' bemeneti paramétere üres.");\r
225                         throw new NullPointerException("System is not configured properly, 'processedFolder' input parameter missing.");\r
226                 }\r
227                 this.processedFolder = processedFolder;\r
228 \r
229                 if (targetPath == null) {\r
230                         logger.error(jobRuntime.getMarker(), "A folyamat 'targetPath' bemeneti paramétere üres.");\r
231                         throw new NullPointerException("System is not configured properly, 'targetPath' input parameter missing.");\r
232                 }\r
233                 this.targetPath = targetPath;\r
234 \r
235                 db = NoSQLUtils.getNoSQLDB();\r
236                 if (db == null) {\r
237                         logger.error(jobRuntime.getMarker(), "Sikertelen kapcsolódás a NoSQL adatbázishoz.");\r
238                         throw new NullPointerException("Can not connect to NoSQL database.");\r
239                 }\r
240 \r
241         }\r
242 \r
243         private void storeMetadataPosition(Path csvFilePath, List<String> dataList, String name, Map<String, Integer> metadatas) throws Exception {\r
244                 int pos = dataList.indexOf(name);\r
245                 if (pos < 0)\r
246                         throw new Exception(String.format("A '%s' MORPHEUS állományban nem található a '%s' mező.", csvFilePath.getFileName(), name));\r
247                 metadatas.put(name, pos);\r
248         }\r
249 \r
250 }\r