f1df9e946a8ecb08a9c9e8c292ffbf21e30705e8
[mediacube.git] /
1 package user.jobengine.server.steps;\r
2 \r
3 import java.io.File;\r
4 import java.io.IOException;\r
5 import java.nio.charset.Charset;\r
6 import java.nio.file.DirectoryStream;\r
7 import java.nio.file.Files;\r
8 import java.nio.file.Path;\r
9 import java.nio.file.Paths;\r
10 import java.text.SimpleDateFormat;\r
11 import java.util.ArrayList;\r
12 import java.util.Arrays;\r
13 import java.util.Date;\r
14 import java.util.HashMap;\r
15 import java.util.List;\r
16 import java.util.Locale;\r
17 import java.util.Map;\r
18 import java.util.Set;\r
19 \r
20 import org.apache.logging.log4j.LogManager;\r
21 import org.apache.logging.log4j.Logger;\r
22 \r
23 import com.ibm.nosql.json.api.BasicDBObject;\r
24 import com.ibm.nosql.json.api.DB;\r
25 import com.ibm.nosql.json.api.DBCollection;\r
26 import com.ibm.nosql.json.api.DBObject;\r
27 import com.ibm.nosql.json.api.QueryBuilder;\r
28 \r
29 import user.commons.IEntityBase;\r
30 import user.commons.morpheus.MorpheusStrings;\r
31 import user.commons.nosql.NoSQLUtils;\r
32 import user.jobengine.db.IItemManager;\r
33 import user.jobengine.db.MediaFile;\r
34 import user.jobengine.db.MediaFileDAO;\r
35 import user.jobengine.server.IJobEngine;\r
36 import user.jobengine.server.IJobRuntime;\r
37 import user.jobengine.server.steps.shared.EscortFiles;\r
38 \r
39 public class ImportMORPHEUSMissingMaterialsStep extends JobStep {\r
40         private static final Logger logger = LogManager.getLogger();\r
41 \r
42         private static final String CSV_EXT = ".csv";\r
43         private static final String MXF_EXT = ".MXF";\r
44         private MediaFileDAO dao;\r
45         private String processedFolder;\r
46         private DB db;\r
47         private String targetPath;\r
48         private IJobRuntime jobRuntime;\r
49         private int overall;\r
50         private int current;\r
51         private final SimpleDateFormat enDateFormat = new SimpleDateFormat(MorpheusStrings.FORMAT_TIME_TO_AIR, Locale.ENGLISH);\r
52 \r
53         private Map<String, Integer> buildMetadataMap(Path csvFilePath, String[] data) throws Exception {\r
54                 Map<String, Integer> result = new HashMap<>();\r
55                 List<String> dataList = Arrays.asList(data);\r
56                 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.MATERIAL_ID, result);\r
57                 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.CHANNEL, result);\r
58                 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.TIME_TO_AIR, result);\r
59                 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.DURATION, result);\r
60                 storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.TITLE, result);\r
61                 //              storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.DEVICE_ID, result);\r
62                 //              storeMetadataPosition(csvFilePath, dataList, MorpheusStrings.REASON, result);\r
63                 return result;\r
64         }\r
65 \r
66         @StepEntry\r
67         public Object[] execute(String sourceCSVPath, String processedFolder, String targetPath, IJobEngine jobEngine, IJobRuntime jobRuntime) throws Exception {\r
68                 this.jobRuntime = jobRuntime;\r
69                 setAndCheck(sourceCSVPath, processedFolder, targetPath, jobEngine);\r
70 \r
71                 List<Path> filePaths = new ArrayList<>();\r
72                 try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(Paths.get(sourceCSVPath))) {\r
73                         for (Path path : directoryStream) {\r
74                                 filePaths.add(path);\r
75                         }\r
76 \r
77                         processPathItems(filePaths);\r
78                 } catch (Exception e) {\r
79                         logger.catching(e);\r
80                         logger.error(jobRuntime.getSessionMarker(), "Hiba a végrehajtás során. A rendszer üzenete: {}", e.getMessage());\r
81                 }\r
82                 return null;\r
83         }\r
84 \r
85         private void moveProcessedCSV(Path csvFilePath) throws IOException {\r
86                 EscortFiles.ensureUNCFolder(csvFilePath.getParent().toString(), processedFolder);\r
87                 String fileName = csvFilePath.getFileName() + "." + EscortFiles.composeKillDate(0);\r
88                 Path targetPath = Paths.get(csvFilePath.getParent().toString(), processedFolder, fileName);\r
89                 Files.move(csvFilePath, targetPath);\r
90         }\r
91 \r
92         private void processLine(String[] data, Map<String, Integer> metadatas) throws Exception {\r
93                 String channel = data[metadatas.get(MorpheusStrings.CHANNEL)];\r
94                 String timeToAir = data[metadatas.get(MorpheusStrings.TIME_TO_AIR)];\r
95                 String duration = data[metadatas.get(MorpheusStrings.DURATION)];\r
96                 String materialID = data[metadatas.get(MorpheusStrings.MATERIAL_ID)];\r
97                 String title = data[metadatas.get(MorpheusStrings.TITLE)];\r
98                 //              String deviceID = data[metadatas.get(MorpheusStrings.DEVICE_ID)];\r
99                 //              String reason = data[metadatas.get(MorpheusStrings.REASON)];\r
100 \r
101                 DBObject query = QueryBuilder.start()\r
102                                 .and(QueryBuilder.start(MorpheusStrings.MATERIALID).is(materialID).get(), QueryBuilder.start(MorpheusStrings.TIMETOAIR).is(timeToAir).get())\r
103                                 .get();\r
104                 DBCollection collection = db.getCollection(MorpheusStrings.COLLECTION_NAME);\r
105                 BasicDBObject existingObject = (BasicDBObject) collection.findOne(query);\r
106                 if (existingObject != null) {\r
107                         logger.warn(jobRuntime.getSessionMarker(), "Az '{}' anyag már feldolgozásra került az {} időpontban.", materialID,\r
108                                         existingObject.getDate(MorpheusStrings.IMPORTED));\r
109                         return;\r
110                 }\r
111 \r
112                 BasicDBObject dbObject = new BasicDBObject(MorpheusStrings.IMPORTED, new Date());\r
113                 dbObject.put(MorpheusStrings.CHANNEL, channel);\r
114                 dbObject.put(MorpheusStrings.TIMETOAIR, enDateFormat.parse(timeToAir));\r
115                 dbObject.put(MorpheusStrings.DURATION, duration);\r
116                 dbObject.put(MorpheusStrings.MATERIALID, materialID);\r
117                 dbObject.put(MorpheusStrings.TITLE, title);\r
118                 //              dbObject.put(MorpheusStrings.DEVICEID, deviceID);\r
119                 //              dbObject.put(MorpheusStrings.REASON, reason);\r
120 \r
121                 String fileName = materialID + MXF_EXT;\r
122 \r
123                 Path targetFilePath = Paths.get(targetPath, fileName);\r
124                 boolean exists = Files.exists(targetFilePath);\r
125                 if (exists && targetFilePath.toFile().length() > 0) {\r
126                         logger.warn(jobRuntime.getSessionMarker(), "Az '{}' anyag már be van töltve.", materialID);\r
127                         dbObject.put(MorpheusStrings.STATUS, MorpheusStrings.STATUS_SKIPPED);\r
128                 } else {\r
129                         List<IEntityBase> medias = dao.getByHouseId(fileName);\r
130                         if (medias == null || medias.size() == 0) {\r
131                                 logger.warn(jobRuntime.getSessionMarker(), "Az '{}' anyag nem található az archívumban.", materialID);\r
132                                 dbObject.put(MorpheusStrings.STATUS, MorpheusStrings.STATUS_UNAVAILABLE);\r
133                         } else if (medias.size() > 1) {\r
134                                 logger.warn(jobRuntime.getSessionMarker(), "Az '{}' anyagból egynél több található az archívumban.", materialID);\r
135                                 dbObject.put(MorpheusStrings.STATUS, MorpheusStrings.STATUS_MULTIPLE);\r
136                         } else {\r
137                                 logger.info(jobRuntime.getSessionMarker(), "Az '{}' anyag megtalálható az archívumban.", materialID);\r
138                                 dbObject.put(MorpheusStrings.STATUS, MorpheusStrings.STATUS_RESTORABLE);\r
139 \r
140                                 MediaFile mf = (MediaFile) medias.get(0);\r
141                                 dbObject.put(MorpheusStrings.MEDIAID, mf.getMediaId());\r
142                         }\r
143 \r
144                 }\r
145 \r
146                 collection.insert(dbObject);\r
147         }\r
148 \r
149         //      Channel,Time to Air,Duration,Material ID,Title,Device ID,Reason,\r
150         //      TX02,10-JAN-2018 13:25:21:08,00:05:00:00,M009572A,Tiéd a pálya/26. - 1. seg - Eredeti ** mc ,ICELE-02,On Domain (ISILON, ICELE-01, ICELE-05) ,\r
151         private void processMissingMaterialCSV(Path csvFilePath, List<String> lines) throws Exception {\r
152                 if (lines == null | lines.size() == 0) {\r
153                         return;\r
154                 }\r
155 \r
156                 Map<String, Integer> metadatas = null;\r
157                 for (int i = 0; i < lines.size(); i++) {\r
158                         String line = lines.get(i);\r
159                         if (line == null)\r
160                                 continue;\r
161                         String[] data = line.split(",");\r
162                         if (i == 0)\r
163                                 metadatas = buildMetadataMap(csvFilePath, data);\r
164                         else\r
165                                 processLine(data, metadatas);\r
166 \r
167                         current++;\r
168                         setProgress(current * 100 / overall);\r
169                 }\r
170 \r
171         }\r
172 \r
173         private void processPathItem(Path csvFilePath, List<String> lines) {\r
174                 File csvFile = csvFilePath.toFile();\r
175 \r
176                 try {\r
177                         processMissingMaterialCSV(csvFilePath, lines);\r
178                         moveProcessedCSV(csvFilePath);\r
179                 } catch (Exception e) {\r
180                         logger.catching(e);\r
181                         logger.error(jobRuntime.getSessionMarker(), "A {} MORPHEUS állomány mozgatásakor hiba történt. A rendszer hibaüzenete: {}.", csvFile.getName(),\r
182                                         e.getMessage());\r
183                 }\r
184         }\r
185 \r
186         private void processPathItems(List<Path> filePaths) throws IOException {\r
187                 overall = 0;\r
188                 Map<Path, List<String>> contents = new HashMap<>();\r
189                 for (Path filePath : filePaths) {\r
190                         File csvFile = filePath.toFile();\r
191                         if (csvFile.isDirectory() || !csvFile.getName().toLowerCase().endsWith(CSV_EXT.toLowerCase()))\r
192                                 continue;\r
193                         List<String> lines = Files.readAllLines(filePath, Charset.forName("UTF-8"));\r
194                         overall += lines.size();\r
195                         contents.put(filePath, lines);\r
196                 }\r
197 \r
198                 Set<Path> csvPaths = contents.keySet();\r
199                 for (Path csvPath : csvPaths) {\r
200                         processPathItem(csvPath, contents.get(csvPath));\r
201                 }\r
202         }\r
203 \r
204         private void setAndCheck(String sourcePath, String processedFolder, String targetPath, IJobEngine jobEngine) {\r
205                 if (jobEngine == null) {\r
206                         logger.error(jobRuntime.getSessionMarker(), "Az folyamatkezelő réteg nem elérhető.");\r
207                         throw new NullPointerException("Internal error, missing JobEngine reference.");\r
208                 }\r
209 \r
210                 IItemManager manager = jobEngine.getItemManager();\r
211                 if (manager == null) {\r
212                         logger.error(jobRuntime.getSessionMarker(), "Az adatbáziskezelő réteg nem elérhető.");\r
213                         throw new NullPointerException("Internal error, missing ItemManager reference.");\r
214                 }\r
215                 dao = (MediaFileDAO) manager.getBaseDAO(MediaFile.class);\r
216                 if (dao == null) {\r
217                         logger.error(jobRuntime.getSessionMarker(), "Az adatbáziskezelő réteg MediaFile kezelöje nem elérhető.");\r
218                         throw new NullPointerException("Internal error, missing MediaFile DAO reference.");\r
219                 }\r
220                 if (sourcePath == null) {\r
221                         logger.error(jobRuntime.getSessionMarker(), "A folyamat 'sourcePath' bemeneti paramétere üres.");\r
222                         throw new NullPointerException("System is not configured properly, 'sourcePath' input parameter missing.");\r
223                 }\r
224                 if (processedFolder == null) {\r
225                         logger.error(jobRuntime.getSessionMarker(), "A folyamat 'processedFolder' bemeneti paramétere üres.");\r
226                         throw new NullPointerException("System is not configured properly, 'processedFolder' input parameter missing.");\r
227                 }\r
228                 this.processedFolder = processedFolder;\r
229 \r
230                 if (targetPath == null) {\r
231                         logger.error(jobRuntime.getSessionMarker(), "A folyamat 'targetPath' bemeneti paramétere üres.");\r
232                         throw new NullPointerException("System is not configured properly, 'targetPath' input parameter missing.");\r
233                 }\r
234                 this.targetPath = targetPath;\r
235 \r
236                 db = NoSQLUtils.getNoSQLDB();\r
237                 if (db == null) {\r
238                         logger.error(jobRuntime.getSessionMarker(), "Sikertelen kapcsolódás a NoSQL adatbázishoz.");\r
239                         throw new NullPointerException("Can not connect to NoSQL database.");\r
240                 }\r
241 \r
242         }\r
243 \r
244         private void storeMetadataPosition(Path csvFilePath, List<String> dataList, String name, Map<String, Integer> metadatas) throws Exception {\r
245                 int pos = dataList.indexOf(name);\r
246                 if (pos < 0)\r
247                         throw new Exception(String.format("A '%s' MORPHEUS állományban nem található a '%s' mező.", csvFilePath.getFileName(), name));\r
248                 metadatas.put(name, pos);\r
249         }\r
250 \r
251 }\r