From 06fd0d71dfa26aacf111635c0c2cbf9aa6175aa1 Mon Sep 17 00:00:00 2001 From: elgekko Date: Mon, 23 May 2022 11:14:36 +0200 Subject: [PATCH] A javitasok elesitve --- .../jobs/executors.xml | 6 + .../jobs/executors.yaml | 5 - .../jobs/steps | 1 + .../jobs/templates/create-content-list.xml | 21 + .../settings/application.yaml | 4 +- .../tests/CreateContentListTest.java | 87 + .../executors/tests/GGGClassLoader.java | 7 - .../mediacube/executors/tests/SmallTests.java | 7 + .../user/mediacube/executors/tests/Test1.java | 8 - .../org.eclipse.core.resources.prefs | 1 + .../server/steps/ArchiveRecursive.java | 100 +- .../jobengine/server/steps/HLSProxyStep.java | 28 +- .../server/steps/MetadataTransformStep.java | 101 +- .../server/steps/shared/EscortFiles.java | 64 +- .../configuration/SystemConfiguration.java | 2 +- .../user/jobengine/db/EntityBaseDAO.java | 1643 ++++++------ .../src/user/jobengine/db/ItemManager.java | 39 +- .../jobengine/db/SqlIntegrityErrorParser.java | 44 + .../jobengine/server/DynamicStepsLoader.java | 27 +- .../src/user/jobengine/server/JobEngine.java | 2221 ++++++++--------- .../server/JobEngineConfiguration.java | 46 +- .../jobengine/zk/util/SessionListener.class | Bin 1794 -> 0 bytes 22 files changed, 2324 insertions(+), 2138 deletions(-) create mode 100644 server/hu.user.mediacube.executors.tests/jobs/executors.xml delete mode 100644 server/hu.user.mediacube.executors.tests/jobs/executors.yaml create mode 120000 server/hu.user.mediacube.executors.tests/jobs/steps create mode 100644 server/hu.user.mediacube.executors.tests/jobs/templates/create-content-list.xml create mode 100644 server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/CreateContentListTest.java delete mode 100644 server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/GGGClassLoader.java delete mode 100644 server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/Test1.java create mode 100644 server/user.jobengine.osgi.db/src/user/jobengine/db/SqlIntegrityErrorParser.java delete mode 100644 server/user.mediacube.gui/bin/user/jobengine/zk/util/SessionListener.class diff --git a/server/hu.user.mediacube.executors.tests/jobs/executors.xml b/server/hu.user.mediacube.executors.tests/jobs/executors.xml new file mode 100644 index 00000000..c48ea255 --- /dev/null +++ b/server/hu.user.mediacube.executors.tests/jobs/executors.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/server/hu.user.mediacube.executors.tests/jobs/executors.yaml b/server/hu.user.mediacube.executors.tests/jobs/executors.yaml deleted file mode 100644 index 8cba80bd..00000000 --- a/server/hu.user.mediacube.executors.tests/jobs/executors.yaml +++ /dev/null @@ -1,5 +0,0 @@ -executors: - - className: user.jobengine.server.steps.ArchiveListBuilderStep - maxConcurrent: 1 - - className: user.jobengine.server.steps.ArchiveMaterialSubmitStep - maxConcurrent: 2 diff --git a/server/hu.user.mediacube.executors.tests/jobs/steps b/server/hu.user.mediacube.executors.tests/jobs/steps new file mode 120000 index 00000000..486f1059 --- /dev/null +++ b/server/hu.user.mediacube.executors.tests/jobs/steps @@ -0,0 +1 @@ +c:/work/user/mediacube/server/user.jobengine.executors/src/user/jobengine/server/steps \ No newline at end of file diff --git a/server/hu.user.mediacube.executors.tests/jobs/templates/create-content-list.xml b/server/hu.user.mediacube.executors.tests/jobs/templates/create-content-list.xml new file mode 100644 index 00000000..ff82745b --- /dev/null +++ b/server/hu.user.mediacube.executors.tests/jobs/templates/create-content-list.xml @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/server/hu.user.mediacube.executors.tests/settings/application.yaml b/server/hu.user.mediacube.executors.tests/settings/application.yaml index 020389ea..5149acf7 100644 --- a/server/hu.user.mediacube.executors.tests/settings/application.yaml +++ b/server/hu.user.mediacube.executors.tests/settings/application.yaml @@ -1,6 +1,6 @@ datasource: mediacube: - url: jdbc:db2://192.168.224.128:50000/mc + url: jdbc:db2://localvm:50000/mc user: db2admin password: password external-indexer: false @@ -8,7 +8,7 @@ datasource: login-timeout: 3 pool-size: 10 mediacube-nosql: - url: jdbc:db2://192.168.224.128:50000/mc + url: jdbc:db2://localvm:50000/mc user: db2admin password: password schema: test diff --git a/server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/CreateContentListTest.java b/server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/CreateContentListTest.java new file mode 100644 index 00000000..79533ca6 --- /dev/null +++ b/server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/CreateContentListTest.java @@ -0,0 +1,87 @@ +package hu.user.mediacube.executors.tests; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Semaphore; + +import org.apache.commons.io.FileUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import user.commons.JobStatus; +import user.commons.ListUtils; +import user.commons.configuration.IConfiguration; +import user.commons.configuration.SystemConfiguration; +import user.jobengine.db.ItemManager; +import user.jobengine.server.IJobRuntime; +import user.jobengine.server.JobEngine; +import user.jobengine.server.JobEngineConfiguration; +import user.tsm.client.TSMException; + +public class CreateContentListTest { + + private static final Path targetFilePath = Paths.get("/work/test.txt"); + protected static ItemManager manager = null; + protected static JobEngine engine = null; + + @BeforeClass + static public void setUpConnection() throws TSMException { + + IConfiguration configuration = SystemConfiguration.getInstance(); + JobEngineConfiguration jobengineConfig = JobEngineConfiguration.getInstance(); + jobengineConfig.bindSystemConfiguration(configuration); + + manager = ItemManager.getInstance(); + manager.bindSystemConfiguration(jobengineConfig); + manager.connect(); + engine = JobEngine.getInstance(); + engine.setItemManager(manager); + engine.bindJobEngineConfiguration(jobengineConfig); + engine.startup(); + + } + + @AfterClass + static public void tearDownConnection() throws IOException { + engine.shutdown(); + manager.disconnect(); + Files.delete(targetFilePath); + } + + @Test + public void execute() throws Exception { + Semaphore lock = new Semaphore(1); + lock.acquire(); + Map params = ListUtils.asMap("sourcePath", "/data/video", "targetFile", targetFilePath.toString()); + IJobRuntime runtime = engine.submit("create-content-list.xml", "create-content-list.xml", params); + runtime.addEventListener(e -> { + if (JobStatus.FINISHED.equals(e.getStatus()) || JobStatus.SUSPENDED.equals(e.getStatus())) + lock.release(); + }); + lock.acquire(); + + printWhiteList(); + } + + void printWhiteList() throws IOException { + Set result = new LinkedHashSet<>(); + List lines = FileUtils.readLines(targetFilePath.toFile()); + + for (String line : lines) { + String[] tokens = line.trim().split("\t"); + if (tokens.length == 0) + continue; + result.add(tokens[0]); + } + + result.forEach(l -> System.out.println(l)); + } + +} diff --git a/server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/GGGClassLoader.java b/server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/GGGClassLoader.java deleted file mode 100644 index 1c9109fd..00000000 --- a/server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/GGGClassLoader.java +++ /dev/null @@ -1,7 +0,0 @@ -package hu.user.mediacube.executors.tests; - -import groovy.lang.GroovyClassLoader; - -public class GGGClassLoader extends GroovyClassLoader { - -} diff --git a/server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/SmallTests.java b/server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/SmallTests.java index 07d9501d..9e5bb23d 100644 --- a/server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/SmallTests.java +++ b/server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/SmallTests.java @@ -1573,4 +1573,11 @@ public class SmallTests { System.out.println(octopusAddress); } + @Test + public void test9999993() throws Exception { + Path sut = Paths.get("/opt/test/media.mxf"); + System.out.println(EscortFiles.constructMetadataPath(sut)); + + } + } diff --git a/server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/Test1.java b/server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/Test1.java deleted file mode 100644 index 15b8de2d..00000000 --- a/server/hu.user.mediacube.executors.tests/src/hu/user/mediacube/executors/tests/Test1.java +++ /dev/null @@ -1,8 +0,0 @@ -package hu.user.mediacube.executors.tests; - -public class Test1 { - - public void execute() { - System.out.println("Execute..."); - } -} diff --git a/server/user.jobengine.executors/.settings/org.eclipse.core.resources.prefs b/server/user.jobengine.executors/.settings/org.eclipse.core.resources.prefs index fd375d99..b446aa44 100644 --- a/server/user.jobengine.executors/.settings/org.eclipse.core.resources.prefs +++ b/server/user.jobengine.executors/.settings/org.eclipse.core.resources.prefs @@ -2,6 +2,7 @@ eclipse.preferences.version=1 encoding//src/user/jobengine/server/steps/ArchiveMaterialSubmitStep.java=UTF-8 encoding//src/user/jobengine/server/steps/ArchiveRecursive.java=UTF-8 encoding//src/user/jobengine/server/steps/CalculateMD5Step.java=UTF-8 +encoding//src/user/jobengine/server/steps/CreateContentListStep.java=UTF-8 encoding//src/user/jobengine/server/steps/DummyTestStep1.java=UTF-8 encoding//src/user/jobengine/server/steps/ForkDownloadStep.java=UTF-8 encoding/=UTF-8 diff --git a/server/user.jobengine.executors/src/user/jobengine/server/steps/ArchiveRecursive.java b/server/user.jobengine.executors/src/user/jobengine/server/steps/ArchiveRecursive.java index cb4db0c2..3b1bdd00 100644 --- a/server/user.jobengine.executors/src/user/jobengine/server/steps/ArchiveRecursive.java +++ b/server/user.jobengine.executors/src/user/jobengine/server/steps/ArchiveRecursive.java @@ -38,19 +38,11 @@ import user.mediacube.metadata.interfaces.PlanAirMetadataListOptions; public class ArchiveRecursive extends JobStep implements FileVisitor { private static final Logger logger = LogManager.getLogger(); private static final String JOBTEMPLATE = "archive-material.xml"; - private static final String ITEM_TITLE = "itemTitle"; - private static final String ITEM_HOUSEID = "itemHouseId"; - private static final String MEDIA_HOUSEID = "mediaHouseId"; - private static final String MEDIA_TITLE = "mediaTitle"; - private static final String MEDIA_DESCRIPTION = "mediaDescription"; - private static final String MEDIA_TYPE = "mediaType"; private static final String ARCHIVE = "Archiválás"; private static final String ARCHIVE_ITEM = "archiveItem"; private static final String KILL_DATE_DAYS = "killDateDays"; -// private SimpleDateFormat df = new SimpleDateFormat("yyyy.MM.dd"); + private List skipPathNames = Arrays.asList("!ARCHIVALAS_ALATT", "EBREDJ_VELUNK_ARCHIV", EscortFiles.STATUSFOLDER, EscortFiles.CONFLICTFOLDER); - private List skipPathNames = Arrays.asList("!ARCHIVALAS_ALATT", EscortFiles.STATUSFOLDER, - EscortFiles.CONFLICTFOLDER); private int limit; private int submitted; private int killDateDays; @@ -127,15 +119,14 @@ public class ArchiveRecursive extends JobStep implements FileVisitor { this.disableProxy = disableProxy; logger.info(getSessionMarker(), "Starting in {}", sourcePath); try { - String location = "/opt/test-mediacube/file_list_original.txt"; - includeList = loadIncludeList(location); + String location = "/opt/test-mediacube/archive-recursive-files.txt"; + includeList = loadWhiteList(location); if (getJobRuntime().forkPrepare()) { Files.walkFileTree(Paths.get(sourcePath), this); } } catch (Exception e) { - logger.error(getSessionMarker(), "A(z) '{}' mappa elérése sikertelen. A rendszer hibaüzenete: {}", - sourcePath, e.getMessage()); + logger.error(getSessionMarker(), "A(z) '{}' mappa elérése sikertelen. A rendszer hibaüzenete: {}", sourcePath, e.getMessage()); } finally { if (submitted > 0) getJobRuntime().forkWaitComplete(); @@ -192,8 +183,7 @@ public class ArchiveRecursive extends JobStep implements FileVisitor { if (metadata == null) logger.info(getSessionMarker(), "Metadata not available for {}", mediaHouseId); else { - logger.info(getSessionMarker(), "Metadata is available for {}: {}", mediaHouseId, - metadata.getMediaTitle()); + logger.info(getSessionMarker(), "Metadata is available for {}: {}", mediaHouseId, metadata.getMediaTitle()); if (mediaHouseId.equals(metadata.getForeignMediaId())) result = metadata; @@ -229,15 +219,13 @@ public class ArchiveRecursive extends JobStep implements FileVisitor { result = file.delete(); } } catch (Exception e) { - logger.error(getSessionMarker(), "A {} fájl nem törölhető. A rendszer hibaüzenete: {}", filePath, - e.getMessage()); + logger.error(getSessionMarker(), "A {} fájl nem törölhető. A rendszer hibaüzenete: {}", filePath, e.getMessage()); } return result; } @Override - public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes paramBasicFileAttributes) - throws IOException { + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes paramBasicFileAttributes) throws IOException { Path dirName = dir.getFileName(); if (skipPathNames.contains(dirName.toString())) { @@ -257,38 +245,35 @@ public class ArchiveRecursive extends JobStep implements FileVisitor { return false; } - if (limit != 0 && submitted == limit) { - logger.info(getSessionMarker(), "Limit reached {}, canceling", limit); - return false; - } - String fileName = mediaPath.getFileName().toString(); if (fileName.startsWith(".") || fileName.endsWith(".nomd")) return false; - logger.info(getSessionMarker(), "Processing {}", mediaPath); + if (limit != 0 && submitted == limit) { + logger.info(getSessionMarker(), "Limit reached {}, canceling", limit); + return false; + } File mediaFile = mediaPath.toFile(); if (mediaFile.isDirectory()) { return false; } - Path nomdFile = Paths.get(mediaPath.toString() + ".nomd"); - - if (nomdFile.toFile().exists()) { - // toroljuk es ujbol letrehozzuk ha kell - if (!removeFile(nomdFile)) { - logger.info(getSessionMarker(), "Can't delete nomd file for {}, skipping", nomdFile); - return false; - } - if (EscortFiles.isMediaCatched(mediaPath)) { - Path catchedFile = EscortFiles.createMediaCatchFilePath(mediaPath); - if (!removeFile(catchedFile)) { - logger.info(getSessionMarker(), "Can't delete catched file for {}, skipping", nomdFile); - return false; - } - } - } +// +// if (nomdFile.toFile().exists()) { +// // toroljuk es ujbol letrehozzuk ha kell +// if (!removeFile(nomdFile)) { +// logger.info(getSessionMarker(), "Can't delete nomd file for {}, skipping", nomdFile); +// return false; +// } +// if (EscortFiles.isMediaCatched(mediaPath)) { +// Path catchedFile = EscortFiles.createMediaCatchFilePath(mediaPath); +// if (!removeFile(catchedFile)) { +// logger.info(getSessionMarker(), "Can't delete catched file for {}, skipping", nomdFile); +// return false; +// } +// } +// } if (EscortFiles.isMediaCatched(mediaPath)) { // logger.info(getSessionMarker(), "Skipping already catched {}", mediaPath); @@ -311,13 +296,20 @@ public class ArchiveRecursive extends JobStep implements FileVisitor { // Message msg = new ParameterizedMessage("Nincs metaadat!"); // logger.info(new MediaCubeMarker("vasary@elgekko.net,muszak@mediavivantis.hu", // "Értesítés problémás " + mediaPath.getFileName().toString() + " archiválásról"), msg); + Path nomdFile = Paths.get(mediaPath.toString() + ".nomd"); Files.createFile(nomdFile); return false; } + logger.info(getSessionMarker(), "Archiving {}", mediaPath); + try { checkArchiveItem(archiveItem); + EscortFiles.createMediaCatch(mediaPath); + String metadata = archiveItemJSON(archiveItem); + EscortFiles.createMetadata(mediaPath.getParent().toString(), mediaPath.getFileName().toString(), metadata); + Map parameters = ListUtils.asMap(ARCHIVE_ITEM, archiveItem, KILL_DATE_DAYS, killDateDays); IJobRuntime runtime = getEngine().submit(getJobRuntime(), e -> { if (e.getStatus().equals(JobStatus.CANCELED) || e.getStatus().equals(JobStatus.SUSPENDED)) @@ -326,14 +318,11 @@ public class ArchiveRecursive extends JobStep implements FileVisitor { if (runtime == null) throw new Exception("Submit returned null runtime"); runtime.setRelated(mediaPath.toString()); - EscortFiles.createMediaCatch(mediaPath); - String metadata = archiveItemJSON(archiveItem); - EscortFiles.createMetadata(mediaPath.getParent().toString(), mediaPath.getFileName().toString(), metadata); + submitted++; } catch (Exception e) { - logger.error(getSessionMarker(), - "A(z) '{}' állomány archiválási kísérlete sikertelen. A rendszer üzenete: {}", mediaPath, - e.getMessage()); + logger.catching(e); + logger.error(getSessionMarker(), "A(z) '{}' állomány archiválási kísérlete sikertelen. A rendszer üzenete: {}", mediaPath, e.getMessage()); return false; } @@ -344,7 +333,8 @@ public class ArchiveRecursive extends JobStep implements FileVisitor { public FileVisitResult visitFile(Path filePath, BasicFileAttributes paramBasicFileAttributes) throws IOException { // logger.info(getSessionMarker(), "Will checked {}", filePath); try { - processPathItem(filePath); + if (!processPathItem(filePath)) + return FileVisitResult.TERMINATE; } catch (Exception e) { logger.catching(e); } @@ -399,4 +389,18 @@ public class ArchiveRecursive extends JobStep implements FileVisitor { return result; } + private Set loadWhiteList(String location) throws IOException { + Set result = new LinkedHashSet<>(); + Path path = Paths.get(location); + List lines = FileUtils.readLines(path.toFile()); + + for (String line : lines) { + String[] tokens = line.trim().split("\t"); + if (tokens.length == 0) + continue; + result.add(tokens[0]); + } + return result; + } + } diff --git a/server/user.jobengine.executors/src/user/jobengine/server/steps/HLSProxyStep.java b/server/user.jobengine.executors/src/user/jobengine/server/steps/HLSProxyStep.java index e54a5a86..7d2704c4 100644 --- a/server/user.jobengine.executors/src/user/jobengine/server/steps/HLSProxyStep.java +++ b/server/user.jobengine.executors/src/user/jobengine/server/steps/HLSProxyStep.java @@ -27,12 +27,12 @@ public class HLSProxyStep extends JobStep { Path sourceFilePath = Paths.get(archiveItem.getMediaFile()); try { String fileName = sourceFilePath.getFileName().toString(); - String proxyName = fileName.substring(0, fileName.lastIndexOf(".")) + "S01" - + fileName.substring(fileName.lastIndexOf(".")); - Path lowresSourcePath = Paths.get(sourceFilePath.getParent().toString(), EscortFiles.STATUSFOLDER, - proxyName); - if (!lowresSourcePath.toFile().exists()) - throw new Exception("File does not exist: " + lowresSourcePath); + String proxyName = fileName.substring(0, fileName.lastIndexOf(".")) + "S01" + fileName.substring(fileName.lastIndexOf(".")); + Path lowresSourcePath = Paths.get(sourceFilePath.getParent().toString(), EscortFiles.STATUSFOLDER, proxyName); + if (!lowresSourcePath.toFile().exists()) { + logger.info(marker, "HLS proxy not found for {}", archiveItem.getMediaFile()); + return null; + } Store lowresStore = getManager().getCurrentLowresStore(); StoreUri lowresStoreUri = lowresStore.getTargetStoreUri(RemoteStoreProtocol.LOCAL); @@ -40,8 +40,7 @@ public class HLSProxyStep extends JobStep { Path subdirPath = null; if (proxyName.indexOf(".") > 2) { - subdirPath = Paths.get(proxyName.substring(0, 1), proxyName.substring(1, 2), proxyName.substring(2, 3), - proxyName); + subdirPath = Paths.get(proxyName.substring(0, 1), proxyName.substring(1, 2), proxyName.substring(2, 3), proxyName); } else { subdirPath = Paths.get(proxyName); } @@ -60,20 +59,17 @@ public class HLSProxyStep extends JobStep { lowresTargetPath = Paths.get(webPath, subDir); // Files.move(lowresSourcePath, lowresTargetPath); - FFMpeg.hls_audio4ch(lowresSourcePath.toAbsolutePath().toString(), - lowresTargetPath.toAbsolutePath().toString(), p -> { - setProgress((int) p); - }); + FFMpeg.hls_audio4ch(lowresSourcePath.toAbsolutePath().toString(), lowresTargetPath.toAbsolutePath().toString(), p -> { + setProgress((int) p); + }); Path lowresHTTPPath = Paths.get(subDir, "index.m3u8"); - MediaFile mediaFile = getManager().createMediaFile(lowresHTTPPath.toString(), LOWRES_FILETYPE, - lowresStore.getName()); + MediaFile mediaFile = getManager().createMediaFile(lowresHTTPPath.toString(), LOWRES_FILETYPE, lowresStore.getName()); mediaFile.setMediaId(mediaCubeMedia.getId()); getManager().add(mediaFile); } catch (Exception e) { logger.catching(e); - logger.error(marker, "A HLS proxy létrehozása sikertelen a '{}' fájlból. A rendszer üzenete: {}", - sourceFilePath, e.getMessage()); + logger.error(marker, "A HLS proxy létrehozása sikertelen a '{}' fájlból. A rendszer üzenete: {}", sourceFilePath, e.getMessage()); } return null; } diff --git a/server/user.jobengine.executors/src/user/jobengine/server/steps/MetadataTransformStep.java b/server/user.jobengine.executors/src/user/jobengine/server/steps/MetadataTransformStep.java index d1938203..4bd54415 100644 --- a/server/user.jobengine.executors/src/user/jobengine/server/steps/MetadataTransformStep.java +++ b/server/user.jobengine.executors/src/user/jobengine/server/steps/MetadataTransformStep.java @@ -4,12 +4,9 @@ import java.io.File; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.nio.file.attribute.FileAttribute; -import java.nio.file.attribute.PosixFilePermission; -import java.nio.file.attribute.PosixFilePermissions; -import java.util.Date; -import java.util.Set; +import java.util.List; +import org.apache.commons.io.FilenameUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Marker; @@ -17,11 +14,9 @@ import org.apache.logging.log4j.Marker; import com.ibm.nosql.json.api.BasicDBList; import user.jobengine.db.Item; -import user.jobengine.db.ItemManager; import user.jobengine.db.ItemType; import user.jobengine.db.Media; -import user.jobengine.server.IJobEngine; -import user.jobengine.server.IJobRuntime; +import user.jobengine.server.steps.shared.EscortFiles; /** * Itemek es mediak krealasa a ArchiveItem objektum alapjan. @@ -31,10 +26,7 @@ import user.jobengine.server.IJobRuntime; public class MetadataTransformStep extends JobStep { private static final String CONFLICT = ".CONFLICT"; private static final Logger logger = LogManager.getLogger(); - private static final String ITEM_MANAGER_IS_NULL = "ItemManager is null"; public static final String DEFAULT_MEDIATYPE = "Generic"; - private ItemManager itemManager; - private Marker marker;; private void addTags(ArchiveItem archiveItem, Media mediaCubeMedia) { @@ -44,7 +36,7 @@ public class MetadataTransformStep extends JobStep { try { String tagText = String.valueOf(tag); - itemManager.addMediaTag(tagText, mediaCubeMedia.getId()); + getManager().addMediaTag(tagText, mediaCubeMedia.getId()); System.out.println(); } catch (Exception e) { @@ -54,48 +46,32 @@ public class MetadataTransformStep extends JobStep { } } - private void checkDuplicates(ArchiveItem archiveItem, String sourceFileName) throws Exception { - if (itemManager.isMediaFileExists(sourceFileName)) { - try { - Path sourcePath = Paths.get(archiveItem.getMediaFile()); - Path parent = sourcePath.getParent(); - Path conflictPath = Paths.get(parent.toString(), CONFLICT); - File folder = conflictPath.toFile(); - if (!folder.exists() || !folder.isDirectory()) { - Set perms = PosixFilePermissions.fromString("rwxrwxrwx"); - FileAttribute> attr = PosixFilePermissions.asFileAttribute(perms); - try { - Files.createDirectories(conflictPath, attr); - } catch (Exception e) { - try { - Files.createDirectory(conflictPath); - } catch (Exception e1) { - logger.catching(e); - throw e; - } - } - } - - Files.move(sourcePath, Paths.get(conflictPath.toString(), sourceFileName + (new Date()).getTime())); - } catch (Exception e1) { - logger.catching(e1); - logger.error(marker, "Hiba az '{}' állomány mappába másolásakor. A rendszer üzenete: {}", CONFLICT, e1.getMessage()); - } - throw new Exception("Az '" + sourceFileName + "' állomány már megtalálható az archívumban, archiválása nem lehetséges."); + private void moveToDuplicates(ArchiveItem archiveItem, String sourceFileName) { + try { + Path sourcePath = Paths.get(archiveItem.getMediaFile()); + Path conflictPath = Paths.get(sourcePath.getParent().toString(), CONFLICT); + EscortFiles.ensureUNCFolder(conflictPath); + Files.move(sourcePath, Paths.get(conflictPath.toString(), sourceFileName)); + Path metadataPath = EscortFiles.constructMetadataPath(sourcePath); + if (metadataPath.toFile().exists()) + Files.move(metadataPath, Paths.get(conflictPath.toString(), metadataPath.getFileName().toString())); + } catch (Exception e1) { + logger.catching(e1); + logger.error(marker, "Hiba az '{}' állomány mappába mozgatásakor. A rendszer üzenete: {}", CONFLICT, e1.getMessage()); } } private Item createItem(ArchiveItem archiveItem) { Item mediaCubeItem = getExistingItem(archiveItem.getItemHouseId(), archiveItem.getItemTitle()); if (mediaCubeItem == null) - mediaCubeItem = itemManager.createItem(DEFAULT_MEDIATYPE, archiveItem.getItemTitle(), archiveItem.getItemDescription(), + mediaCubeItem = getManager().createItem(DEFAULT_MEDIATYPE, archiveItem.getItemTitle(), archiveItem.getItemDescription(), archiveItem.getItemHouseId()); return mediaCubeItem; } private Media createMedia(ArchiveItem archiveItem, Item mediaCubeItem, String mediaType) { Media mediaCubeMedia; - mediaCubeMedia = itemManager.createMedia(mediaType, archiveItem.getMediaTitle(), archiveItem.getMediaDescription(), archiveItem.getMediaHouseId()); + mediaCubeMedia = getManager().createMedia(mediaType, archiveItem.getMediaTitle(), archiveItem.getMediaDescription(), archiveItem.getMediaHouseId()); mediaCubeMedia.setLength(archiveItem.getDuration()); mediaCubeItem.appendMedia(mediaCubeMedia); @@ -103,24 +79,30 @@ public class MetadataTransformStep extends JobStep { } @StepEntry - public Object[] execute(ArchiveItem archiveItem, IJobEngine jobEngine, IJobRuntime jobRuntime) throws Exception { - marker = jobRuntime.getSessionMarker(); + public Object[] execute(ArchiveItem archiveItem) throws Exception { + marker = getSessionMarker(); Media mediaCubeMedia = null; - itemManager = (ItemManager) jobEngine.getItemManager(); - if (itemManager == null) - throw new NullPointerException(ITEM_MANAGER_IS_NULL); try { File sourceMediaFile = new File(archiveItem.getMediaFile()); String sourceFileName = sourceMediaFile.getName(); - checkDuplicates(archiveItem, sourceFileName); + + getJobRuntime().setRelated(FilenameUtils.removeExtension(sourceFileName)); + + if (getManager().isMediaFileExists(sourceFileName)) { + cleanUpOnDuplicate(archiveItem, sourceFileName); + logger.warn(marker, "Az '" + sourceFileName + "' állomány már megtalálható az archívumban, archiválása nem lehetséges."); + cancel(); + return null; + } + Item mediaCubeItem = createItem(archiveItem); - jobRuntime.incrementProgress(50); + setProgress(50); String mediaType = getCreateType(archiveItem); mediaCubeMedia = createMedia(archiveItem, mediaCubeItem, mediaType); - //ha itemid 0 akkor merge, egyebkent media insert + // ha itemid 0 akkor merge, egyebkent media insert if (mediaCubeItem.getId() == 0) - itemManager.mergeItemStructure(mediaCubeItem); + getManager().mergeItemStructure(mediaCubeItem); else { mediaCubeMedia.setItemId(mediaCubeItem.getId()); mediaCubeMedia.add(); @@ -137,19 +119,26 @@ public class MetadataTransformStep extends JobStep { logger.error(marker, "Az '{}' állomány .catched jelző állománya nem törölhető.", fileName); throw e; } finally { - jobRuntime.incrementProgress(100); + setProgress(100); } return new Object[] { mediaCubeMedia }; } + private void cleanUpOnDuplicate(ArchiveItem archiveItem, String sourceFileName) { + moveToDuplicates(archiveItem, sourceFileName); + archiveItem.removeCatchedFile(); + List killDateFiles = EscortFiles.getKillDateFiles(Paths.get(archiveItem.getMediaFile())); + EscortFiles.remove(killDateFiles); + } + private String getCreateType(ArchiveItem archiveItem) { String mediaType = archiveItem.getMediaType(); if (mediaType == null || mediaType.length() == 0) mediaType = DEFAULT_MEDIATYPE; else { - ItemType mediaItemType = itemManager.getItemType(mediaType); + ItemType mediaItemType = getManager().getItemType(mediaType); if (mediaItemType == null) - itemManager.createItemType(mediaType, mediaType).add(); + getManager().createItemType(mediaType, mediaType).add(); } return mediaType; } @@ -157,9 +146,9 @@ public class MetadataTransformStep extends JobStep { private Item getExistingItem(String itemHouseId, String itemTitle) { Item[] result = new Item[] { null }; String sql = String.format("select id from item where houseid='%s' and title='%s'", itemHouseId, itemTitle); - itemManager.executeQuery(sql, rs -> { + getManager().executeQuery(sql, rs -> { long id = rs.getLong("id"); - result[0] = itemManager.getItem(id); + result[0] = getManager().getItem(id); return true; }, null); return result[0]; diff --git a/server/user.jobengine.executors/src/user/jobengine/server/steps/shared/EscortFiles.java b/server/user.jobengine.executors/src/user/jobengine/server/steps/shared/EscortFiles.java index 8e890031..673cc5e6 100644 --- a/server/user.jobengine.executors/src/user/jobengine/server/steps/shared/EscortFiles.java +++ b/server/user.jobengine.executors/src/user/jobengine/server/steps/shared/EscortFiles.java @@ -7,6 +7,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.StringWriter; import java.io.UnsupportedEncodingException; +import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -14,8 +15,11 @@ import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Calendar; +import java.util.Collections; import java.util.Date; +import java.util.List; import java.util.Set; import javax.xml.parsers.DocumentBuilder; @@ -60,6 +64,7 @@ public class EscortFiles { private static final String EXTENDEDID = "extendedId"; private static final String ID = "ID"; private static final String KILLDATE_FILENAME = "%s.%s.killdate"; + private static final String KILLDATE_EXT = ".killdate"; private static final String FORMAT_KILLDATENAME = "yyyyMMdd"; public static final String STATUSFOLDER = ".STATUS"; public static final String CONFLICTFOLDER = ".CONFLICT"; @@ -114,8 +119,12 @@ public class EscortFiles { Files.write(metadataPath, metadata.getBytes()); } - public static boolean createMetadataIfNotExists(String filePath, String fileName, String metadata) - throws IOException { + public static Path constructMetadataPath(Path filePath) throws IOException { + String metadataFileName = filePath.getFileName().toString() + DOT_JSON; + return Paths.get(filePath.getParent().toString(), STATUSFOLDER, metadataFileName); + } + + public static boolean createMetadataIfNotExists(String filePath, String fileName, String metadata) throws IOException { boolean result = false; if (!EscortFiles.isMetadataExists(filePath, fileName)) { EscortFiles.createMetadata(filePath, fileName, metadata); @@ -143,17 +152,14 @@ public class EscortFiles { // 07-13-2020 (19:36:52) // 05-18-2013 (18:52:24) SimpleDateFormat df = new SimpleDateFormat("MM-dd-yyyy (HH:mm:ss)"); - root.appendChild(xmlDocument.createElement(MODIFIEDTIMESTAMP)) - .appendChild(xmlDocument.createTextNode(df.format(modified))); - root.appendChild(xmlDocument.createElement(RECORDTIMESTAMP)) - .appendChild(xmlDocument.createTextNode(df.format(recorded))); + root.appendChild(xmlDocument.createElement(MODIFIEDTIMESTAMP)).appendChild(xmlDocument.createTextNode(df.format(modified))); + root.appendChild(xmlDocument.createElement(RECORDTIMESTAMP)).appendChild(xmlDocument.createTextNode(df.format(recorded))); xmlDocument.appendChild(root); return xmDocumentToString(xmlDocument); } - public static byte[] createNEXIOKillDateFile(String fileName, Date killDate, String description, String agency) - throws Exception { + public static byte[] createNEXIOKillDateFile(String fileName, Date killDate, String description, String agency) throws Exception { DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); DocumentBuilder db = dbf.newDocumentBuilder(); DOMImplementation impl = db.getDOMImplementation(); @@ -167,8 +173,7 @@ public class EscortFiles { } if (StringUtils.isNotBlank(description)) - root.appendChild(xmlDocument.createElement(EXTENDEDDESCRIPTION)) - .appendChild(xmlDocument.createTextNode(description)); + root.appendChild(xmlDocument.createElement(EXTENDEDDESCRIPTION)).appendChild(xmlDocument.createTextNode(description)); if (StringUtils.isNotBlank(agency)) root.appendChild(xmlDocument.createElement(EXTENDEDAGENCY)).appendChild(xmlDocument.createTextNode(agency)); xmlDocument.appendChild(root); @@ -288,10 +293,35 @@ public class EscortFiles { } } + public static void remove(List files) { + if (files != null) + files.forEach(f -> remove(f)); + } + public static void removeCatchedFile(Path escortFile) { remove(Paths.get(escortFile.toString() + DOT_CATCHED)); } + public static List getKillDateFiles(Path filePath) { + String killDateFilePattern = String.format("%s.*%s", filePath.getFileName().toString(), KILLDATE_EXT); + List result = new ArrayList<>(); + try { + Path statusPath = Paths.get(filePath.getParent().toString(), STATUSFOLDER); + File statusPathFile = statusPath.toFile(); + if (statusPathFile.exists() && statusPathFile.isDirectory()) { + try (DirectoryStream stream = Files.newDirectoryStream(statusPath, killDateFilePattern)) { + stream.forEach(p -> result.add(p)); + } catch (Exception e) { + logger.catching(e); + } + } + Collections.sort(result); + } catch (Exception e) { + logger.catching(e); + } + return result; + } + /*** * A media eleresi utjan alapjan a .STATUS almappabol torli a .catch fajlt. * @@ -303,8 +333,7 @@ public class EscortFiles { remove(catchedFile); } - public static void setNEXIOKillDate(int killDateDays, String targetFileName, String nexioAgency, StoreUri targetUri) - throws Exception { + public static void setNEXIOKillDate(int killDateDays, String targetFileName, String nexioAgency, StoreUri targetUri) throws Exception { OutputStream outStream = null; try { FTPClient targetFTP = ((FtpDirectoryLister) targetUri.getLister()).connect(); @@ -312,14 +341,12 @@ public class EscortFiles { killDate.add(Calendar.DAY_OF_YEAR, killDateDays); if (targetFileName.toLowerCase().contains(".mxf")) targetFileName = targetFileName.substring(0, targetFileName.lastIndexOf('.')); - byte[] killDateFile = EscortFiles.createNEXIOKillDateFile(targetFileName, killDate.getTime(), null, - nexioAgency); + byte[] killDateFile = EscortFiles.createNEXIOKillDateFile(targetFileName, killDate.getTime(), null, nexioAgency); String xml = targetFileName + ".xml"; outStream = targetFTP.storeFileStream(xml); if (outStream == null) { throw new NullPointerException( - "Can not open: " + targetFileName.substring(0, targetFileName.lastIndexOf('.')) + ".xml" - + " Reply:" + targetFTP.getReplyString()); + "Can not open: " + targetFileName.substring(0, targetFileName.lastIndexOf('.')) + ".xml" + " Reply:" + targetFTP.getReplyString()); } outStream.write(killDateFile); outStream.flush(); @@ -332,8 +359,8 @@ public class EscortFiles { } } - private static byte[] xmDocumentToString(Document xmlDocument) throws TransformerFactoryConfigurationError, - TransformerConfigurationException, TransformerException, IOException, UnsupportedEncodingException { + private static byte[] xmDocumentToString(Document xmlDocument) + throws TransformerFactoryConfigurationError, TransformerConfigurationException, TransformerException, IOException, UnsupportedEncodingException { DOMSource domSource = new DOMSource(xmlDocument); TransformerFactory tf = TransformerFactory.newInstance(); Transformer transformer = tf.newTransformer(); @@ -347,4 +374,5 @@ public class EscortFiles { sw.close(); return result.getBytes("UTF-16"); } + } diff --git a/server/user.jobengine.osgi.commons/src/user/commons/configuration/SystemConfiguration.java b/server/user.jobengine.osgi.commons/src/user/commons/configuration/SystemConfiguration.java index 10111159..100f2535 100644 --- a/server/user.jobengine.osgi.commons/src/user/commons/configuration/SystemConfiguration.java +++ b/server/user.jobengine.osgi.commons/src/user/commons/configuration/SystemConfiguration.java @@ -46,7 +46,7 @@ public class SystemConfiguration implements IConfiguration { @Override public String getConfig(String relativeConfigName) throws FileNotFoundException { Path result = Paths.get(System.getProperty("user.dir", ""), relativeConfigName); - if (Files.exists(result)) + if (Files.exists(result) || Files.isSymbolicLink(result)) return result.toString(); result = Paths.get(System.getProperty("system.config.root", ""), relativeConfigName); if (Files.exists(result)) diff --git a/server/user.jobengine.osgi.db/generated/user/jobengine/db/EntityBaseDAO.java b/server/user.jobengine.osgi.db/generated/user/jobengine/db/EntityBaseDAO.java index f5286275..1829f596 100644 --- a/server/user.jobengine.osgi.db/generated/user/jobengine/db/EntityBaseDAO.java +++ b/server/user.jobengine.osgi.db/generated/user/jobengine/db/EntityBaseDAO.java @@ -1,822 +1,821 @@ -/*@lineinfo:filename=EntityBaseDAO*//*@lineinfo:user-code*//*@lineinfo:1^1*/package user.jobengine.db; - -import user.commons.IEntityBase; -import user.commons.logging.LogUtils; -import java.sql.SQLException; -import java.sql.ResultSet; -import java.util.List; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import sqlj.runtime.ref.DefaultContext; -import sqlj.runtime.ref.ResultSetIterImpl; -/** - * Az entitasok alap DAO osztalya. - */ -@SuppressWarnings("unused") -public abstract class EntityBaseDAO implements IEntityBaseDAO { - protected ItemManager manager; - protected static final Logger logger = LogManager.getLogger(); - - public EntityBaseDAO(ItemManager manager) { - this.manager = manager; - } - - @SuppressWarnings("rawtypes") - protected void checkNull(Object obj, Class type) { - if (obj == null) - throw new IllegalArgumentException(type.getName() + " can not be null."); - } - - @Override - public void add(IEntityBase entity) { - DefaultContext context = manager.getDbContext(); - try { - add(context, entity); - } catch (Exception e) { - //TODO reszletezes - throw new ItemManagerException(e); - } finally { - manager.putDbContext(context); - } - } - - public void add(DefaultContext context, IEntityBase entity) { - manager.traceIn(); - try { - checkNull(entity, IEntityBase.class); - beforeAdd(context, entity); - long id = entity.getId(); - - insert(context, entity); - if (id == 0) { - /*@lineinfo:generated-code*//*@lineinfo:52^4*/ - -// ************************************************************ -// #sql [context] { SELECT IDENTITY_VAL_LOCAL() INTO :id, FROM SYSIBM.SYSDUMMY1 }; -// ************************************************************ - -{ - sqlj.runtime.profile.RTResultSet __sJT_rtRs; - sqlj.runtime.ConnectionContext __sJT_connCtx = context; - if (__sJT_connCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); - sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); - if (__sJT_execCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); - synchronized (__sJT_execCtx) { - sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 0); - try - { - __sJT_rtRs = __sJT_execCtx.executeQuery(); - } - finally - { - __sJT_execCtx.releaseStatement(); - } - } - try - { - sqlj.runtime.ref.ResultSetIterImpl.checkColumns(__sJT_rtRs, 1); - if (!__sJT_rtRs.next()) - { - sqlj.runtime.error.RuntimeRefErrors.raise_NO_ROW_SELECT_INTO(); - } - id = __sJT_rtRs.getLongNoNull(1); - if (__sJT_rtRs.next()) - { - sqlj.runtime.error.RuntimeRefErrors.raise_MULTI_ROW_SELECT_INTO(); - } - } - finally - { - __sJT_rtRs.close(); - } -} - - -// ************************************************************ - -/*@lineinfo:user-code*//*@lineinfo:52^80*/ - entity.setId(id); - } - afterAdd(context, entity); - /*@lineinfo:generated-code*//*@lineinfo:56^3*/ - -// ************************************************************ -// #sql [context] { COMMIT }; -// ************************************************************ - -{ - sqlj.runtime.ConnectionContext __sJT_connCtx = context; - if (__sJT_connCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); - sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); - if (__sJT_execCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); - synchronized (__sJT_execCtx) { - sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 1); - try - { - __sJT_execCtx.executeUpdate(); - } - finally - { - __sJT_execCtx.releaseStatement(); - } - } -} - - -// ************************************************************ - -/*@lineinfo:user-code*//*@lineinfo:56^27*/ - if (useMemoryCache()) - manager.storeCached(entity.getId(), entity); - - } catch (Exception e) { - try { - /*@lineinfo:generated-code*//*@lineinfo:62^4*/ - -// ************************************************************ -// #sql [context] { ROLLBACK }; -// ************************************************************ - -{ - sqlj.runtime.ConnectionContext __sJT_connCtx = context; - if (__sJT_connCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); - sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); - if (__sJT_execCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); - synchronized (__sJT_execCtx) { - sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 2); - try - { - __sJT_execCtx.executeUpdate(); - } - finally - { - __sJT_execCtx.releaseStatement(); - } - } -} - - -// ************************************************************ - -/*@lineinfo:user-code*//*@lineinfo:62^30*/ - } catch(Exception e1) { - } - manager.throwError(e); - } - manager.traceOut(); - } - - @Override - public IEntityBase get(long id) { - IEntityBase entity = null; - DefaultContext context = manager.getDbContext(); - try { - entity = get(context, id); - } catch (Exception e) { - throw new ItemManagerException(e); - } finally { - manager.putDbContext(context); - } - return entity; - } - - public IEntityBase get(DefaultContext context, long id) { - manager.traceIn(); - IEntityBase entity = null; - try { - if (id == 0) - throw new IllegalArgumentException("Cannot get EntityBase with 0 ID."); - if (useMemoryCache()) - entity = manager.retrieveCached(getCacheType(), id); - if (entity == null) { - ResultSetIterImpl iter = selectByKey(context, id); - checkNull(iter, ResultSetIterImpl.class); - entity = getEntity(context, iter); - if (useMemoryCache()) - manager.storeCached(id, entity); - } - afterGet(context, entity); - } catch (Exception e) { - throw new ItemManagerException(e); - } - manager.traceOut(); - return entity; - } - - @Override - public List getAll() { - List result = null; - DefaultContext context = manager.getDbContext(); - try { - result = getAll(context); - } catch (Exception e) { - throw new ItemManagerException(e); - } finally { - manager.putDbContext(context); - } - return result; - } - - @Override - public List getAll(IDAOIterProvider provider) { - List result = null; - DefaultContext context = manager.getDbContext(); - try { - result = getAll(context, provider); - } catch (Exception e) { - throw new ItemManagerException(e); - } finally { - manager.putDbContext(context); - } - return result; - } - - @Override - public IEntityBase get(IDAOIterProvider provider) { - IEntityBase entity = null; - DefaultContext context = manager.getDbContext(); - try { - entity = get(context, provider); - } catch (Exception e) { - throw new ItemManagerException(e); - } finally { - manager.putDbContext(context); - } - return entity; - } - - private IEntityBase get(DefaultContext context, IDAOIterProvider provider) { - manager.traceIn(); - IEntityBase entity = null; - try { - ResultSetIterImpl iter = provider.get(context, this); - checkNull(iter, ResultSetIterImpl.class); - entity = getEntity(context, iter); - afterGet(context, entity); - } catch (Exception e) { - throw new ItemManagerException(e); - } - manager.traceOut(); - return entity; - } - - public List getAll(DefaultContext context, IDAOIterProvider provider) { - manager.traceIn(); - List result = null; - try { - ResultSetIterImpl iter = provider.get(context, this); - checkNull(iter, ResultSetIterImpl.class); - result = getList(context, iter, false, true); - } catch (Exception e) { - throw new ItemManagerException(e); - } - manager.traceOut(); - return result; - } - - public List getAll(DefaultContext context) { - manager.traceIn(); - List result = null; - try { - ResultSetIterImpl iter = selectAll(context); - checkNull(iter, ResultSetIterImpl.class); - result = getList(context, iter, false, true); - } catch (Exception e) { - throw new ItemManagerException(e); - } - manager.traceOut(); - return result; - } - - @Override - public List getAll(long id) { - List result = null; - DefaultContext context = manager.getDbContext(); - try { - result = getAll(context, id); - } catch (Exception e) { - throw new ItemManagerException(e); - } finally { - manager.putDbContext(context); - } - return result; - } - - public List getAll(DefaultContext context, long id) { - manager.traceIn(); - List result = null; - try { - ResultSetIterImpl iter = selectByForeignKey(context, id); - checkNull(iter, ResultSetIterImpl.class); - result = getList(context, iter, false, true); - } catch (Exception e) { - throw new ItemManagerException(e); - } - manager.traceOut(); - return result; - } - - protected IEntityBase getEntity(DefaultContext context, ResultSetIterImpl iter) { - manager.traceIn(); - IEntityBase entity = null; - try { - List result = getList(context, iter, false, false); - if (result != null) { - if (result.size() != 1) - throw new IllegalStateException("Expected one, found " + result.size() + "."); - entity = result.get(0); - } - } catch (Exception e) { - throw new ItemManagerException(e); - } - manager.traceOut(); - return entity; - } - -// protected List getList(ResultSetIterImpl iter) { -// List result = null; -// DefaultContext context = manager.getDbContext(); -// try { -// result = getList(context, iter); -// } catch (Exception e) { -// throw new ItemManagerException(e); -// } finally { -// manager.putDbContext(context); -// } -// return result; -// } - - protected List getList(ResultSetIterImpl iter, boolean isChunked) { - List result = null; - DefaultContext context = manager.getDbContext(); - try { - result = getList(context, iter, isChunked, true); - } catch (Exception e) { - throw new ItemManagerException(e); - } finally { - manager.putDbContext(context); - } - return result; - } - - protected List getList(DefaultContext context, ResultSetIterImpl iter, boolean isChunked) { - return getList(context, iter, isChunked, true); - } - - protected List getList(DefaultContext context, ResultSetIterImpl iter, boolean isChunked, boolean enablecache) { - manager.traceIn(); - List result = null; - try { - if (useMemoryCache() && enablecache) - result = manager.getAllCached(getCacheType()); - - if (result == null) { - result = entities(context, iter, isChunked); - if (useMemoryCache() && enablecache) - manager.setAllCached(getCacheType(), result); - } - - /*@lineinfo:generated-code*//*@lineinfo:280^3*/ - -// ************************************************************ -// #sql [context] { COMMIT }; -// ************************************************************ - -{ - sqlj.runtime.ConnectionContext __sJT_connCtx = context; - if (__sJT_connCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); - sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); - if (__sJT_execCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); - synchronized (__sJT_execCtx) { - sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 3); - try - { - __sJT_execCtx.executeUpdate(); - } - finally - { - __sJT_execCtx.releaseStatement(); - } - } -} - - -// ************************************************************ - -/*@lineinfo:user-code*//*@lineinfo:280^27*/ - } catch (Exception e) { - try { - /*@lineinfo:generated-code*//*@lineinfo:283^4*/ - -// ************************************************************ -// #sql [context] { ROLLBACK }; -// ************************************************************ - -{ - sqlj.runtime.ConnectionContext __sJT_connCtx = context; - if (__sJT_connCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); - sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); - if (__sJT_execCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); - synchronized (__sJT_execCtx) { - sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 4); - try - { - __sJT_execCtx.executeUpdate(); - } - finally - { - __sJT_execCtx.releaseStatement(); - } - } -} - - -// ************************************************************ - -/*@lineinfo:user-code*//*@lineinfo:283^30*/ - } catch(Exception e1) { - } - manager.throwError(e); - } - manager.traceOut(); - return result; - } - - @Override - public void modify(IEntityBase entity) { - DefaultContext context = manager.getDbContext(); - try { - modify(context, entity); - } catch (Exception e) { - throw new ItemManagerException(e); - } finally { - manager.putDbContext(context); - } - } - - @Override - public void merge(IEntityBase entity) { - DefaultContext context = manager.getDbContext(); - try { - if (entity.getId() == 0) - add(entity); - else - modify(context, entity); - } catch (Exception e) { - throw new ItemManagerException(e); - } finally { - manager.putDbContext(context); - } - } - - public void modify(DefaultContext context, IEntityBase entity) { - manager.traceIn(); - try { - checkNull(entity, IEntityBase.class); - long id = entity.getId(); - entity.checkParameter("ID", id, false); - update(context, entity); - afterModify(context, entity); - /*@lineinfo:generated-code*//*@lineinfo:327^3*/ - -// ************************************************************ -// #sql [context] { COMMIT }; -// ************************************************************ - -{ - sqlj.runtime.ConnectionContext __sJT_connCtx = context; - if (__sJT_connCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); - sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); - if (__sJT_execCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); - synchronized (__sJT_execCtx) { - sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 5); - try - { - __sJT_execCtx.executeUpdate(); - } - finally - { - __sJT_execCtx.releaseStatement(); - } - } -} - - -// ************************************************************ - -/*@lineinfo:user-code*//*@lineinfo:327^27*/ - if (useMemoryCache()) - manager.storeCached(entity.getId(), entity); - } catch (Exception e) { - try { - /*@lineinfo:generated-code*//*@lineinfo:332^4*/ - -// ************************************************************ -// #sql [context] { ROLLBACK }; -// ************************************************************ - -{ - sqlj.runtime.ConnectionContext __sJT_connCtx = context; - if (__sJT_connCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); - sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); - if (__sJT_execCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); - synchronized (__sJT_execCtx) { - sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 6); - try - { - __sJT_execCtx.executeUpdate(); - } - finally - { - __sJT_execCtx.releaseStatement(); - } - } -} - - -// ************************************************************ - -/*@lineinfo:user-code*//*@lineinfo:332^30*/ - } catch(Exception e1) { - } - manager.throwError(e); - } - manager.traceOut(); - } - - @Override - public void remove(IEntityBase entity) { - DefaultContext context = manager.getDbContext(); - try { - remove(context, entity); - } catch (Exception e) { - throw new ItemManagerException(e); - } finally { - manager.putDbContext(context); - } - } - - public void remove(DefaultContext context, IEntityBase entity) { - manager.traceIn(); - try { - checkNull(entity, IEntityBase.class); - long id = entity.getId(); - entity.checkParameter("ID", id, false); - beforeDelete(context, entity); - delete(context, id); - afterDelete(context, entity); - /*@lineinfo:generated-code*//*@lineinfo:361^3*/ - -// ************************************************************ -// #sql [context] { COMMIT }; -// ************************************************************ - -{ - sqlj.runtime.ConnectionContext __sJT_connCtx = context; - if (__sJT_connCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); - sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); - if (__sJT_execCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); - synchronized (__sJT_execCtx) { - sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 7); - try - { - __sJT_execCtx.executeUpdate(); - } - finally - { - __sJT_execCtx.releaseStatement(); - } - } -} - - -// ************************************************************ - -/*@lineinfo:user-code*//*@lineinfo:361^27*/ - if (useMemoryCache()) - manager.removeCached(getCacheType(), entity.getId()); - } catch (Exception e) { - try { - /*@lineinfo:generated-code*//*@lineinfo:366^4*/ - -// ************************************************************ -// #sql [context] { ROLLBACK }; -// ************************************************************ - -{ - sqlj.runtime.ConnectionContext __sJT_connCtx = context; - if (__sJT_connCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); - sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); - if (__sJT_execCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); - synchronized (__sJT_execCtx) { - sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 8); - try - { - __sJT_execCtx.executeUpdate(); - } - finally - { - __sJT_execCtx.releaseStatement(); - } - } -} - - -// ************************************************************ - -/*@lineinfo:user-code*//*@lineinfo:366^30*/ - } catch(Exception e1) { - } - manager.throwError(e); - } - manager.traceOut(); - } - - public void remove(List ids) { - DefaultContext context = manager.getDbContext(); - try { - remove(context, ids); - } catch (Exception e) { - throw new ItemManagerException(e); - } finally { - manager.putDbContext(context); - } - } - - protected void remove(DefaultContext context, List ids) { - manager.traceIn(); - try { - for (long id : ids) - delete(context, id); - /*@lineinfo:generated-code*//*@lineinfo:390^3*/ - -// ************************************************************ -// #sql [context] { COMMIT }; -// ************************************************************ - -{ - sqlj.runtime.ConnectionContext __sJT_connCtx = context; - if (__sJT_connCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); - sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); - if (__sJT_execCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); - synchronized (__sJT_execCtx) { - sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 9); - try - { - __sJT_execCtx.executeUpdate(); - } - finally - { - __sJT_execCtx.releaseStatement(); - } - } -} - - -// ************************************************************ - -/*@lineinfo:user-code*//*@lineinfo:390^27*/ - if (useMemoryCache()) { - for (long id : ids) - manager.removeCached(getCacheType(), id); - } - } catch (Exception e) { - try { - /*@lineinfo:generated-code*//*@lineinfo:397^4*/ - -// ************************************************************ -// #sql [context] { ROLLBACK }; -// ************************************************************ - -{ - sqlj.runtime.ConnectionContext __sJT_connCtx = context; - if (__sJT_connCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); - sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); - if (__sJT_execCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); - synchronized (__sJT_execCtx) { - sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 10); - try - { - __sJT_execCtx.executeUpdate(); - } - finally - { - __sJT_execCtx.releaseStatement(); - } - } -} - - -// ************************************************************ - -/*@lineinfo:user-code*//*@lineinfo:397^30*/ - } catch(Exception e1) { - } - manager.throwError(e); - } - manager.traceOut(); - } - - @Override - public void truncate() { - manager.traceIn(); - DefaultContext context = manager.getDbContext(); - try { - truncateTable(context); - /*@lineinfo:generated-code*//*@lineinfo:411^3*/ - -// ************************************************************ -// #sql [context] { COMMIT }; -// ************************************************************ - -{ - sqlj.runtime.ConnectionContext __sJT_connCtx = context; - if (__sJT_connCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); - sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); - if (__sJT_execCtx == null) sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); - synchronized (__sJT_execCtx) { - sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 11); - try - { - __sJT_execCtx.executeUpdate(); - } - finally - { - __sJT_execCtx.releaseStatement(); - } - } -} - - -// ************************************************************ - -/*@lineinfo:user-code*//*@lineinfo:411^27*/ - } catch (Exception e) { - throw new ItemManagerException(e); - } finally { - manager.putDbContext(context); - } - manager.traceOut(); - } - - protected void afterGet(DefaultContext context, IEntityBase entity) throws SQLException { } - protected void beforeAdd(DefaultContext context, IEntityBase entity) throws SQLException { } - protected void afterAdd(DefaultContext context, IEntityBase entity) throws SQLException { } - protected void afterModify(DefaultContext context, IEntityBase entity) throws SQLException { } - protected void beforeDelete(DefaultContext context, IEntityBase entity) throws SQLException { } - protected void afterDelete(DefaultContext context, IEntityBase entity) throws SQLException { } - - protected List entities(DefaultContext context, ResultSetIterImpl iter, boolean isChunked) throws SQLException { - return null; - } - - public List entities(DefaultContext context, ResultSet rs) throws SQLException { - return null; - } - - protected ResultSetIterImpl selectByKey(DefaultContext context, long id) throws SQLException { - return null; - } - - protected ResultSetIterImpl selectByForeignKey(DefaultContext context, long id) throws SQLException { - return null; - } - - protected ResultSetIterImpl selectAll(DefaultContext context) throws SQLException { - return null; - } - - protected void delete(DefaultContext context, long id) throws SQLException { } - - protected void truncateTable(DefaultContext context) throws SQLException { } - - protected void insert(DefaultContext context, IEntityBase entity) throws SQLException { } - - protected void update(DefaultContext context, IEntityBase entity) throws SQLException { } - - protected boolean useMemoryCache() { return false; } - - protected Class getCacheType() { return null; } -}/*@lineinfo:generated-code*/class EntityBaseDAO_SJProfileKeys -{ - private java.lang.Object[] keys; - private final sqlj.runtime.profile.Loader loader = sqlj.runtime.RuntimeContext.getRuntime().getLoaderForClass(getClass()); - private static EntityBaseDAO_SJProfileKeys inst = null; - public static java.lang.Object getKey(int keyNum) - throws java.sql.SQLException - { - synchronized(user.jobengine.db.EntityBaseDAO_SJProfileKeys.class) { - if (inst == null) - { - inst = new EntityBaseDAO_SJProfileKeys(); - } - } - return inst.keys[keyNum]; - } - private EntityBaseDAO_SJProfileKeys() - throws java.sql.SQLException - { - keys = new java.lang.Object[1]; - keys[0] = DefaultContext.getProfileKey(loader, "user.jobengine.db.EntityBaseDAO_SJProfile0"); - } -} +/*@lineinfo:filename=EntityBaseDAO*//*@lineinfo:user-code*//*@lineinfo:1^1*/ +package user.jobengine.db; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import sqlj.runtime.ref.DefaultContext; +import sqlj.runtime.ref.ResultSetIterImpl; +import user.commons.IEntityBase; + +/** + * Az entitasok alap DAO osztalya. + */ +@SuppressWarnings("unused") +public abstract class EntityBaseDAO implements IEntityBaseDAO { + protected ItemManager manager; + protected static final Logger logger = LogManager.getLogger(); + + public EntityBaseDAO(ItemManager manager) { + this.manager = manager; + } + + @SuppressWarnings("rawtypes") + protected void checkNull(Object obj, Class type) { + if (obj == null) + throw new IllegalArgumentException(type.getName() + " can not be null."); + } + + @Override + public void add(IEntityBase entity) { + DefaultContext context = manager.getDbContext(); + try { + add(context, entity); + } catch (Exception e) { + + // TODO reszletezes + throw new ItemManagerException(e); + } finally { + manager.putDbContext(context); + } + } + + public void add(DefaultContext context, IEntityBase entity) { + manager.traceIn(); + try { + checkNull(entity, IEntityBase.class); + beforeAdd(context, entity); + long id = entity.getId(); + + insert(context, entity); + if (id == 0) { + /* @lineinfo:generated-code */ + /* @lineinfo:52^4 */ + +// ************************************************************ +// #sql [context] { SELECT IDENTITY_VAL_LOCAL() INTO :id, FROM SYSIBM.SYSDUMMY1 }; +// ************************************************************ + + { + sqlj.runtime.profile.RTResultSet __sJT_rtRs; + sqlj.runtime.ConnectionContext __sJT_connCtx = context; + if (__sJT_connCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); + sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); + if (__sJT_execCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); + synchronized (__sJT_execCtx) { + sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 0); + try { + __sJT_rtRs = __sJT_execCtx.executeQuery(); + } finally { + __sJT_execCtx.releaseStatement(); + } + } + try { + sqlj.runtime.ref.ResultSetIterImpl.checkColumns(__sJT_rtRs, 1); + if (!__sJT_rtRs.next()) { + sqlj.runtime.error.RuntimeRefErrors.raise_NO_ROW_SELECT_INTO(); + } + id = __sJT_rtRs.getLongNoNull(1); + if (__sJT_rtRs.next()) { + sqlj.runtime.error.RuntimeRefErrors.raise_MULTI_ROW_SELECT_INTO(); + } + } finally { + __sJT_rtRs.close(); + } + } + +// ************************************************************ + + /* @lineinfo:user-code *//* @lineinfo:52^80 */ + entity.setId(id); + } + afterAdd(context, entity); + /* @lineinfo:generated-code *//* @lineinfo:56^3 */ + +// ************************************************************ +// #sql [context] { COMMIT }; +// ************************************************************ + + { + sqlj.runtime.ConnectionContext __sJT_connCtx = context; + if (__sJT_connCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); + sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); + if (__sJT_execCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); + synchronized (__sJT_execCtx) { + sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 1); + try { + __sJT_execCtx.executeUpdate(); + } finally { + __sJT_execCtx.releaseStatement(); + } + } + } + +// ************************************************************ + + /* @lineinfo:user-code *//* @lineinfo:56^27 */ + if (useMemoryCache()) + manager.storeCached(entity.getId(), entity); + + } catch (Exception e) { + try { + /* @lineinfo:generated-code */ + /* @lineinfo:62^4 */ + +// ************************************************************ +// #sql [context] { ROLLBACK }; +// ************************************************************ + + { + sqlj.runtime.ConnectionContext __sJT_connCtx = context; + if (__sJT_connCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); + sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); + if (__sJT_execCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); + synchronized (__sJT_execCtx) { + sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 2); + try { + __sJT_execCtx.executeUpdate(); + } finally { + __sJT_execCtx.releaseStatement(); + } + } + } + +// ************************************************************ + + /* @lineinfo:user-code *//* @lineinfo:62^30 */ + } catch (Exception e1) { + } + manager.throwError(e); + } + manager.traceOut(); + } + + @Override + public IEntityBase get(long id) { + IEntityBase entity = null; + DefaultContext context = manager.getDbContext(); + try { + entity = get(context, id); + } catch (Exception e) { + throw new ItemManagerException(e); + } finally { + manager.putDbContext(context); + } + return entity; + } + + public IEntityBase get(DefaultContext context, long id) { + manager.traceIn(); + IEntityBase entity = null; + try { + if (id == 0) + throw new IllegalArgumentException("Cannot get EntityBase with 0 ID."); + if (useMemoryCache()) + entity = manager.retrieveCached(getCacheType(), id); + if (entity == null) { + ResultSetIterImpl iter = selectByKey(context, id); + checkNull(iter, ResultSetIterImpl.class); + entity = getEntity(context, iter); + if (useMemoryCache()) + manager.storeCached(id, entity); + } + afterGet(context, entity); + } catch (Exception e) { + throw new ItemManagerException(e); + } + manager.traceOut(); + return entity; + } + + @Override + public List getAll() { + List result = null; + DefaultContext context = manager.getDbContext(); + try { + result = getAll(context); + } catch (Exception e) { + throw new ItemManagerException(e); + } finally { + manager.putDbContext(context); + } + return result; + } + + @Override + public List getAll(IDAOIterProvider provider) { + List result = null; + DefaultContext context = manager.getDbContext(); + try { + result = getAll(context, provider); + } catch (Exception e) { + throw new ItemManagerException(e); + } finally { + manager.putDbContext(context); + } + return result; + } + + @Override + public IEntityBase get(IDAOIterProvider provider) { + IEntityBase entity = null; + DefaultContext context = manager.getDbContext(); + try { + entity = get(context, provider); + } catch (Exception e) { + throw new ItemManagerException(e); + } finally { + manager.putDbContext(context); + } + return entity; + } + + private IEntityBase get(DefaultContext context, IDAOIterProvider provider) { + manager.traceIn(); + IEntityBase entity = null; + try { + ResultSetIterImpl iter = provider.get(context, this); + checkNull(iter, ResultSetIterImpl.class); + entity = getEntity(context, iter); + afterGet(context, entity); + } catch (Exception e) { + throw new ItemManagerException(e); + } + manager.traceOut(); + return entity; + } + + public List getAll(DefaultContext context, IDAOIterProvider provider) { + manager.traceIn(); + List result = null; + try { + ResultSetIterImpl iter = provider.get(context, this); + checkNull(iter, ResultSetIterImpl.class); + result = getList(context, iter, false, true); + } catch (Exception e) { + throw new ItemManagerException(e); + } + manager.traceOut(); + return result; + } + + public List getAll(DefaultContext context) { + manager.traceIn(); + List result = null; + try { + ResultSetIterImpl iter = selectAll(context); + checkNull(iter, ResultSetIterImpl.class); + result = getList(context, iter, false, true); + } catch (Exception e) { + throw new ItemManagerException(e); + } + manager.traceOut(); + return result; + } + + @Override + public List getAll(long id) { + List result = null; + DefaultContext context = manager.getDbContext(); + try { + result = getAll(context, id); + } catch (Exception e) { + throw new ItemManagerException(e); + } finally { + manager.putDbContext(context); + } + return result; + } + + public List getAll(DefaultContext context, long id) { + manager.traceIn(); + List result = null; + try { + ResultSetIterImpl iter = selectByForeignKey(context, id); + checkNull(iter, ResultSetIterImpl.class); + result = getList(context, iter, false, true); + } catch (Exception e) { + throw new ItemManagerException(e); + } + manager.traceOut(); + return result; + } + + protected IEntityBase getEntity(DefaultContext context, ResultSetIterImpl iter) { + manager.traceIn(); + IEntityBase entity = null; + try { + List result = getList(context, iter, false, false); + if (result != null) { + if (result.size() != 1) + throw new IllegalStateException("Expected one, found " + result.size() + "."); + entity = result.get(0); + } + } catch (Exception e) { + throw new ItemManagerException(e); + } + manager.traceOut(); + return entity; + } + +// protected List getList(ResultSetIterImpl iter) { +// List result = null; +// DefaultContext context = manager.getDbContext(); +// try { +// result = getList(context, iter); +// } catch (Exception e) { +// throw new ItemManagerException(e); +// } finally { +// manager.putDbContext(context); +// } +// return result; +// } + + protected List getList(ResultSetIterImpl iter, boolean isChunked) { + List result = null; + DefaultContext context = manager.getDbContext(); + try { + result = getList(context, iter, isChunked, true); + } catch (Exception e) { + throw new ItemManagerException(e); + } finally { + manager.putDbContext(context); + } + return result; + } + + protected List getList(DefaultContext context, ResultSetIterImpl iter, boolean isChunked) { + return getList(context, iter, isChunked, true); + } + + protected List getList(DefaultContext context, ResultSetIterImpl iter, boolean isChunked, boolean enablecache) { + manager.traceIn(); + List result = null; + try { + if (useMemoryCache() && enablecache) + result = manager.getAllCached(getCacheType()); + + if (result == null) { + result = entities(context, iter, isChunked); + if (useMemoryCache() && enablecache) + manager.setAllCached(getCacheType(), result); + } + + /* @lineinfo:generated-code *//* @lineinfo:280^3 */ + +// ************************************************************ +// #sql [context] { COMMIT }; +// ************************************************************ + + { + sqlj.runtime.ConnectionContext __sJT_connCtx = context; + if (__sJT_connCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); + sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); + if (__sJT_execCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); + synchronized (__sJT_execCtx) { + sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 3); + try { + __sJT_execCtx.executeUpdate(); + } finally { + __sJT_execCtx.releaseStatement(); + } + } + } + +// ************************************************************ + + /* @lineinfo:user-code *//* @lineinfo:280^27 */ + } catch (Exception e) { + try { + /* @lineinfo:generated-code */ + /* @lineinfo:283^4 */ + +// ************************************************************ +// #sql [context] { ROLLBACK }; +// ************************************************************ + + { + sqlj.runtime.ConnectionContext __sJT_connCtx = context; + if (__sJT_connCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); + sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); + if (__sJT_execCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); + synchronized (__sJT_execCtx) { + sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 4); + try { + __sJT_execCtx.executeUpdate(); + } finally { + __sJT_execCtx.releaseStatement(); + } + } + } + +// ************************************************************ + + /* @lineinfo:user-code *//* @lineinfo:283^30 */ + } catch (Exception e1) { + } + manager.throwError(e); + } + manager.traceOut(); + return result; + } + + @Override + public void modify(IEntityBase entity) { + DefaultContext context = manager.getDbContext(); + try { + modify(context, entity); + } catch (Exception e) { + throw new ItemManagerException(e); + } finally { + manager.putDbContext(context); + } + } + + @Override + public void merge(IEntityBase entity) { + DefaultContext context = manager.getDbContext(); + try { + if (entity.getId() == 0) + add(entity); + else + modify(context, entity); + } catch (Exception e) { + throw new ItemManagerException(e); + } finally { + manager.putDbContext(context); + } + } + + public void modify(DefaultContext context, IEntityBase entity) { + manager.traceIn(); + try { + checkNull(entity, IEntityBase.class); + long id = entity.getId(); + entity.checkParameter("ID", id, false); + update(context, entity); + afterModify(context, entity); + /* @lineinfo:generated-code *//* @lineinfo:327^3 */ + +// ************************************************************ +// #sql [context] { COMMIT }; +// ************************************************************ + + { + sqlj.runtime.ConnectionContext __sJT_connCtx = context; + if (__sJT_connCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); + sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); + if (__sJT_execCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); + synchronized (__sJT_execCtx) { + sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 5); + try { + __sJT_execCtx.executeUpdate(); + } finally { + __sJT_execCtx.releaseStatement(); + } + } + } + +// ************************************************************ + + /* @lineinfo:user-code *//* @lineinfo:327^27 */ + if (useMemoryCache()) + manager.storeCached(entity.getId(), entity); + } catch (Exception e) { + try { + /* @lineinfo:generated-code */ + /* @lineinfo:332^4 */ + +// ************************************************************ +// #sql [context] { ROLLBACK }; +// ************************************************************ + + { + sqlj.runtime.ConnectionContext __sJT_connCtx = context; + if (__sJT_connCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); + sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); + if (__sJT_execCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); + synchronized (__sJT_execCtx) { + sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 6); + try { + __sJT_execCtx.executeUpdate(); + } finally { + __sJT_execCtx.releaseStatement(); + } + } + } + +// ************************************************************ + + /* @lineinfo:user-code *//* @lineinfo:332^30 */ + } catch (Exception e1) { + } + manager.throwError(e); + } + manager.traceOut(); + } + + @Override + public void remove(IEntityBase entity) { + DefaultContext context = manager.getDbContext(); + try { + remove(context, entity); + } catch (Exception e) { + throw new ItemManagerException(e); + } finally { + manager.putDbContext(context); + } + } + + public void remove(DefaultContext context, IEntityBase entity) { + manager.traceIn(); + try { + checkNull(entity, IEntityBase.class); + long id = entity.getId(); + entity.checkParameter("ID", id, false); + beforeDelete(context, entity); + delete(context, id); + afterDelete(context, entity); + /* @lineinfo:generated-code *//* @lineinfo:361^3 */ + +// ************************************************************ +// #sql [context] { COMMIT }; +// ************************************************************ + + { + sqlj.runtime.ConnectionContext __sJT_connCtx = context; + if (__sJT_connCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); + sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); + if (__sJT_execCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); + synchronized (__sJT_execCtx) { + sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 7); + try { + __sJT_execCtx.executeUpdate(); + } finally { + __sJT_execCtx.releaseStatement(); + } + } + } + +// ************************************************************ + + /* @lineinfo:user-code *//* @lineinfo:361^27 */ + if (useMemoryCache()) + manager.removeCached(getCacheType(), entity.getId()); + } catch (Exception e) { + try { + /* @lineinfo:generated-code */ + /* @lineinfo:366^4 */ + +// ************************************************************ +// #sql [context] { ROLLBACK }; +// ************************************************************ + + { + sqlj.runtime.ConnectionContext __sJT_connCtx = context; + if (__sJT_connCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); + sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); + if (__sJT_execCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); + synchronized (__sJT_execCtx) { + sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 8); + try { + __sJT_execCtx.executeUpdate(); + } finally { + __sJT_execCtx.releaseStatement(); + } + } + } + +// ************************************************************ + + /* @lineinfo:user-code *//* @lineinfo:366^30 */ + } catch (Exception e1) { + } + manager.throwError(e); + } + manager.traceOut(); + } + + @Override + public void remove(List ids) { + DefaultContext context = manager.getDbContext(); + try { + remove(context, ids); + } catch (Exception e) { + throw new ItemManagerException(e); + } finally { + manager.putDbContext(context); + } + } + + protected void remove(DefaultContext context, List ids) { + manager.traceIn(); + try { + for (long id : ids) + delete(context, id); + /* @lineinfo:generated-code *//* @lineinfo:390^3 */ + +// ************************************************************ +// #sql [context] { COMMIT }; +// ************************************************************ + + { + sqlj.runtime.ConnectionContext __sJT_connCtx = context; + if (__sJT_connCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); + sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); + if (__sJT_execCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); + synchronized (__sJT_execCtx) { + sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 9); + try { + __sJT_execCtx.executeUpdate(); + } finally { + __sJT_execCtx.releaseStatement(); + } + } + } + +// ************************************************************ + + /* @lineinfo:user-code *//* @lineinfo:390^27 */ + if (useMemoryCache()) { + for (long id : ids) + manager.removeCached(getCacheType(), id); + } + } catch (Exception e) { + try { + /* @lineinfo:generated-code */ + /* @lineinfo:397^4 */ + +// ************************************************************ +// #sql [context] { ROLLBACK }; +// ************************************************************ + + { + sqlj.runtime.ConnectionContext __sJT_connCtx = context; + if (__sJT_connCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); + sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); + if (__sJT_execCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); + synchronized (__sJT_execCtx) { + sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 10); + try { + __sJT_execCtx.executeUpdate(); + } finally { + __sJT_execCtx.releaseStatement(); + } + } + } + +// ************************************************************ + + /* @lineinfo:user-code *//* @lineinfo:397^30 */ + } catch (Exception e1) { + } + manager.throwError(e); + } + manager.traceOut(); + } + + @Override + public void truncate() { + manager.traceIn(); + DefaultContext context = manager.getDbContext(); + try { + truncateTable(context); + /* @lineinfo:generated-code *//* @lineinfo:411^3 */ + +// ************************************************************ +// #sql [context] { COMMIT }; +// ************************************************************ + + { + sqlj.runtime.ConnectionContext __sJT_connCtx = context; + if (__sJT_connCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_DEFAULT_CONN_CTX(); + sqlj.runtime.ExecutionContext __sJT_execCtx = __sJT_connCtx.getExecutionContext(); + if (__sJT_execCtx == null) + sqlj.runtime.error.RuntimeRefErrors.raise_NULL_EXEC_CTX(); + synchronized (__sJT_execCtx) { + sqlj.runtime.profile.RTStatement __sJT_stmt = __sJT_execCtx.registerStatement(__sJT_connCtx, EntityBaseDAO_SJProfileKeys.getKey(0), 11); + try { + __sJT_execCtx.executeUpdate(); + } finally { + __sJT_execCtx.releaseStatement(); + } + } + } + +// ************************************************************ + + /* @lineinfo:user-code *//* @lineinfo:411^27 */ + } catch (Exception e) { + throw new ItemManagerException(e); + } finally { + manager.putDbContext(context); + } + manager.traceOut(); + } + + protected void afterGet(DefaultContext context, IEntityBase entity) throws SQLException { + } + + protected void beforeAdd(DefaultContext context, IEntityBase entity) throws SQLException { + } + + protected void afterAdd(DefaultContext context, IEntityBase entity) throws SQLException { + } + + protected void afterModify(DefaultContext context, IEntityBase entity) throws SQLException { + } + + protected void beforeDelete(DefaultContext context, IEntityBase entity) throws SQLException { + } + + protected void afterDelete(DefaultContext context, IEntityBase entity) throws SQLException { + } + + protected List entities(DefaultContext context, ResultSetIterImpl iter, boolean isChunked) throws SQLException { + return null; + } + + public List entities(DefaultContext context, ResultSet rs) throws SQLException { + return null; + } + + protected ResultSetIterImpl selectByKey(DefaultContext context, long id) throws SQLException { + return null; + } + + protected ResultSetIterImpl selectByForeignKey(DefaultContext context, long id) throws SQLException { + return null; + } + + protected ResultSetIterImpl selectAll(DefaultContext context) throws SQLException { + return null; + } + + protected void delete(DefaultContext context, long id) throws SQLException { + } + + protected void truncateTable(DefaultContext context) throws SQLException { + } + + protected void insert(DefaultContext context, IEntityBase entity) throws SQLException { + } + + protected void update(DefaultContext context, IEntityBase entity) throws SQLException { + } + + protected boolean useMemoryCache() { + return false; + } + + protected Class getCacheType() { + return null; + } +} + +/* @lineinfo:generated-code */class EntityBaseDAO_SJProfileKeys { + private java.lang.Object[] keys; + private final sqlj.runtime.profile.Loader loader = sqlj.runtime.RuntimeContext.getRuntime().getLoaderForClass(getClass()); + private static EntityBaseDAO_SJProfileKeys inst = null; + + public static java.lang.Object getKey(int keyNum) throws java.sql.SQLException { + synchronized (user.jobengine.db.EntityBaseDAO_SJProfileKeys.class) { + if (inst == null) { + inst = new EntityBaseDAO_SJProfileKeys(); + } + } + return inst.keys[keyNum]; + } + + private EntityBaseDAO_SJProfileKeys() throws java.sql.SQLException { + keys = new java.lang.Object[1]; + keys[0] = DefaultContext.getProfileKey(loader, "user.jobengine.db.EntityBaseDAO_SJProfile0"); + } +} diff --git a/server/user.jobengine.osgi.db/src/user/jobengine/db/ItemManager.java b/server/user.jobengine.osgi.db/src/user/jobengine/db/ItemManager.java index b4c0b443..0641a583 100644 --- a/server/user.jobengine.osgi.db/src/user/jobengine/db/ItemManager.java +++ b/server/user.jobengine.osgi.db/src/user/jobengine/db/ItemManager.java @@ -60,10 +60,12 @@ public class ItemManager extends MemoryCache implements IItemManager { private static final Logger logger = LogManager.getLogger(); - static ItemManager currentInstance = null; + static ItemManager instance = null; static public ItemManager getInstance() { - return currentInstance; + if (instance == null) + instance = new ItemManager(); + return instance; } private IConfiguration systemConfig; @@ -78,6 +80,11 @@ public class ItemManager extends MemoryCache implements IItemManager { private Connection connection; public ItemManager() { + if (instance == null) + instance = this; + else + throw new RuntimeException("Multiple ItemManager instances are not supported!"); + initializeItemManager(signalServer); } @@ -1139,7 +1146,6 @@ public class ItemManager extends MemoryCache implements IItemManager { } void initialize() { - currentInstance = this; super.setItemManager(this); baseDAOs.put(MetadataType.class, new MetadataTypeDAO(this)); @@ -1379,16 +1385,39 @@ public class ItemManager extends MemoryCache implements IItemManager { public void throwError(Exception e) { if (e != null) { String message = e.getMessage(); + + if ("SqlIntegrityConstraintViolationException".equals(e.getClass().getSimpleName())) { + SqlIntegrityErrorParser parser = new SqlIntegrityErrorParser(message); + if (parser.isParsed()) { + String name = getDBColumnName(parser.getTbSpaceId(), parser.getTbId(), parser.getColNr()); + message += String.format("\r\nInvolved column name is %s.", name); + } + } + if (message == null || message.length() == 0) message = e.getClass().getName(); StackTraceElement element = Thread.currentThread().getStackTrace()[2]; message += String.format(", source: %1$s.%2$s", element.getClassName(), element.getMethodName()); - // if (logger != null) - // logger.error(message, e); + throw new ItemManagerException(message); } } + private String getDBColumnName(String tbSpaceId, String tbId, String colNr) { + String sql = "SELECT T.TABNAME || '.' || C.COLNAME FROM SYSCAT.TABLES AS T, SYSCAT.COLUMNS AS C " + "WHERE T.TBSPACEID = ? " + "AND T.TABLEID = ? " + + "AND C.COLNO = ? " + "AND C.TABSCHEMA = T.TABSCHEMA " + "AND C.TABNAME = T.TABNAME"; + String[] result = new String[] { null }; + executeQuery(sql, rs -> { + result[0] = rs.getString(1); + return false; + }, d -> { + d.setString(1, tbSpaceId); + d.setString(2, tbId); + d.setString(3, colNr); + }); + return result[0]; + } + @Override public void traceIn() { // StackTraceElement element = diff --git a/server/user.jobengine.osgi.db/src/user/jobengine/db/SqlIntegrityErrorParser.java b/server/user.jobengine.osgi.db/src/user/jobengine/db/SqlIntegrityErrorParser.java new file mode 100644 index 00000000..8ddaf0bf --- /dev/null +++ b/server/user.jobengine.osgi.db/src/user/jobengine/db/SqlIntegrityErrorParser.java @@ -0,0 +1,44 @@ +package user.jobengine.db; + +public class SqlIntegrityErrorParser { + private static final String COLNO = "COLNO="; + private static final String TABLEID = "TABLEID="; + private static final String TBSPACEID = "TBSPACEID="; + private String tbSpaceId; + private String tbId; + private String colNr; + private boolean parsed; + + public SqlIntegrityErrorParser(String m) { + try { + if (m.contains(TBSPACEID) && m.contains(TABLEID) && m.contains(COLNO)) { + tbSpaceId = getValue(TBSPACEID, m); + tbId = getValue(TABLEID, m); + colNr = getValue(COLNO, m); + parsed = true; + } + } catch (Exception e) { + } + } + + private String getValue(String col, String m) { + int i = m.indexOf(col); + return m.substring(i + col.length(), m.indexOf(" ", i)).replace(",", "").replace("\"", ""); + } + + public String getTbSpaceId() { + return tbSpaceId; + } + + public String getTbId() { + return tbId; + } + + public String getColNr() { + return colNr; + } + + public boolean isParsed() { + return parsed; + } +} diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/DynamicStepsLoader.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/DynamicStepsLoader.java index 5e8a3b40..87b53274 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/DynamicStepsLoader.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/DynamicStepsLoader.java @@ -29,23 +29,13 @@ public class DynamicStepsLoader { public Class loadClassFromSourceCode(GroovyClassLoader gcl, String fileName) throws IOException { Class result = null; - - Path path = Paths.get(stepsDir, fileName); - File classFile = null; - - // Windows alatt az eleresi utak irasmodja egyforma kell legyen. - // A GroovyClassLoader - // ld. createGroovyClassLoader a new File(path.toFile().getCanonicalPath())-al - // operal. - classFile = new File(path.toFile().getCanonicalPath()); - logger.info("Class path is normalized to {}", classFile.getCanonicalPath()); - - if (!classFile.exists()) - return result; - - result = gcl.parseClass(classFile); - if (result != null) - logger.info("Class for {} successfully loaded", fileName); + Path path = Paths.get(stepsDir, fileName).toFile().getCanonicalFile().toPath(); + logger.info("Class path is normalized to {}", path.toString()); + if (Files.exists(path)) { + result = gcl.parseClass(path.toFile()); + if (result != null) + logger.info("Class for {} successfully loaded", fileName); + } return result; } @@ -74,8 +64,7 @@ public class DynamicStepsLoader { private Set getJavaFileList(Path path, String excludeFileName) { Set result = new HashSet<>(); if (path.toFile().isDirectory()) { - Predicate filter = file -> !Files.isDirectory(file) - && file.getFileName().toString().endsWith(".java"); + Predicate filter = file -> !Files.isDirectory(file) && file.getFileName().toString().endsWith(".java"); try (Stream stream = Files.list(path)) { stream.filter(filter).forEach(p -> { diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java index 85ae209e..9d91156a 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngine.java @@ -1,1113 +1,1108 @@ -package user.jobengine.server; - -import java.sql.Timestamp; -import java.time.LocalDate; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.lang.StringUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import com.ibm.nosql.json.api.BasicDBList; -import com.ibm.nosql.json.api.BasicDBObject; - -import user.commons.Job; -import user.commons.JobStatus; -import user.commons.cluster.ClusteredJob; -import user.jobengine.db.IItemManager; -import user.jobengine.db.ItemManagerData.SignalType; -import user.jobengine.server.actions.IStatusMachine; -import user.jobengine.server.actions.JobAction; -import user.jobengine.server.actions.StatusMachine; -import user.jobengine.server.ast.JobTemplate; -import user.jobengine.server.instructions.CallJobStepInstruction; -import user.jobengine.server.instructions.IInstruction; -import user.jobengine.server.messagequeue.IUserMessage; -import user.jobengine.server.messagequeue.IUserMessageQueues; -import user.jobengine.server.messagequeue.UserMessageQueues; -import user.jobengine.server.messages.IJobMessage; -import user.jobengine.server.messages.JobStepCompletedMessage; -import user.jobengine.server.messages.UserReplyMessage; -import user.jobengine.server.scheduler.ScheduledJob; -import user.jobengine.server.scheduler.SchedulerService; -import user.tsm.client.TSMClient; -import user.tsm.client.TSMException; - -/** - * MAM motor implementáció. - */ -public class JobEngine implements IJobEngine { - - private class JobChangedListenerChecker extends Thread { - private volatile boolean shutdown = false; - - @Override - public void run() { - try { - startUpBarrier.await(); - } catch (Exception e) { - } - - while (!shutdown) { - try { - Thread.sleep(1000); - removeJobChangedListenerGarbage(); - } catch (InterruptedException e) { - shutdown = true; - } - } - } - - void shutDown() { - shutdown = true; - try { - join(); - } catch (InterruptedException e) { - } - } - } - - private class MessageDispatcher extends Thread { - private volatile boolean shutdown = false; - - @Override - public void run() { - try { - startUpBarrier.await(); - } catch (Exception e) { - } - - while (!shutdown) { - try { - IJobMessage message = messageQueue.poll(QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); - if (message != null) - message.process(JobEngine.this); - - } catch (InterruptedException e) { - shutdown = true; - } - } - // a leallitas utan az osszes fuggo uzenet vegrehajtasa - while (!messageQueue.isEmpty()) { - try { - IJobMessage message = messageQueue.poll(QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); - if (message != null) - message.process(JobEngine.this); - } catch (InterruptedException e) { - shutdown = true; - } - } - } - - void shutDown() { - shutdown = true; - try { - join(); - } catch (InterruptedException e) { - } - } - } - - private class VM extends Thread { - private volatile boolean shutdown = false; - private IInstruction ir = null; - - @Override - public void run() { - try { - startUpBarrier.await(); - } catch (Exception e) { - } - - while (!shutdown) { - try { - Thread.sleep(QUEUE_POLL_INTERVAL_MS); - // IJobRuntime jobRuntime = runQueue.poll(QUEUE_POLL_INTERVAL_MS, - // TimeUnit.MILLISECONDS); - IJobRuntime jobRuntime = runQueue.poll(); - if (jobRuntime != null) { - logger.debug("Processing {}", jobRuntime.getId()); - // varakozo esetben vegrehajtjuk a kovetkezo utasitast - if (jobRuntime.hasNextInstruction() && jobRuntime.isWaitFinish()) { - ir = jobRuntime.getNextInstruction(); - ir.execute(JobEngine.this, jobRuntime); - } else { - // normal esetben elfutunk a kovetkezo job step-ig, vagy vegig - while (jobRuntime.hasNextInstruction() && jobRuntime.isRunnable()) { - ir = jobRuntime.getNextInstruction(); - ir.execute(JobEngine.this, jobRuntime); - } - } - - if (!jobRuntime.hasNextInstruction() && jobRuntime.isRunnable()) - jobCleanup(jobRuntime); - - } - } catch (Exception e) { - logger.error("Critical VM error!", e); - // shutdown = true; - } - } - } - - void shutDown() { - shutdown = true; - try { - join(); - } catch (InterruptedException e) { - } - } - } - - private static final Logger logger = LogManager.getLogger(); - static private IItemManager itemManager; - private static IJobEngine instance; - - public static IJobEngine getInstance() { - return instance; - } - - static public IItemManager getStaticItemManager() { - return itemManager; - } - - private volatile boolean isRunning; - - private volatile boolean isAllExecutionDisabled; - private volatile boolean isScheduledExecutionDisabled; - private final BlockingQueue runQueue; - private final BlockingQueue messageQueue; - private final Map submittedJobs; - private VM vm; - private MessageDispatcher dispatcher; - private JobChangedListenerChecker jobChangedListenerChecker; - - private IUserMessageQueues userMessageQueues; - - private final CyclicBarrier startUpBarrier; - - private final IStatusMachine statusMachine; - - private AtomicLong nextJobId; - private SchedulerService schedulerService = null; - private Set jobChangedListenerList = ConcurrentHashMap.newKeySet(); - private Map remoteWorkers; - private String masterServerAddress = System.getProperty("jobengine.master.server", ""); - private final JobEngineRemote remoteEngine; - - private ConcurrentHashMap keepAliveJobChangedListeners = new ConcurrentHashMap<>(); - - private IJobEngineConfiguration jobEngineConfiguration; - - /** - * A MAM motor példányosítása. Ennek során létrejönnek a várakozási sorok, az - * ütemező szál, az üzenet kezelő szál. - */ - public JobEngine() { - - runQueue = new PriorityBlockingQueue(); - messageQueue = new LinkedBlockingQueue(); - submittedJobs = createJobs(); - startUpBarrier = new CyclicBarrier(4); - nextJobId = new AtomicLong(0); - statusMachine = createStatusMachine(); - if (instance != null) - throw new RuntimeException("Multiple JobEngine instances are not supported!"); - instance = this; - - remoteWorkers = new ConcurrentHashMap<>(); - // logger.info("JobEngine created"); - if (isWorker()) - remoteEngine = createRemoteEngine(); - else - remoteEngine = null; - } - - public void activate() { - } - - @Override - public boolean addJobChangedEventListener(IJobChangedListener listener) { - boolean result = !jobChangedListenerList.contains(listener); - if (listener != null && result) - jobChangedListenerList.add(listener); - - return result; - } - - @Override - public void addToExecutorQueue(IJobRuntime jobRuntime) { - try { - Object typeName = jobRuntime.popFromStack(); - if (typeName == null) - throw new Exception( - jobRuntime.toString() + " illegal execution state detected: executor name is null."); - String executorName = String.valueOf(typeName); - if (!jobEngineConfiguration.getExecutors().containsKey(executorName)) - throw new Exception(jobRuntime.toString() + " executor is unavailable: " + executorName); - - // a lista mindig letezik, de sima futtataskor csak 1 elemu, az aktualis van - // benne - // ha van parhuzamos futtatas, akkor letrejonnek benne a step szellemek is, es - // meghivodik a fork - List jobs = spawnJobs(jobRuntime, executorName); - jobEngineConfiguration.getExecutors().get(executorName).submit(jobs.toArray(new IJobRuntime[] {})); - jobs.forEach(r -> fireJobChangedEvent(new JobChangedEvent(r, SignalType.EXECUTE))); - - } catch (Exception e) { - logger.catching(e); - suspendWaitExecutorJob(e, jobRuntime); - } - } - - @Override - public void addToRunQueue(IJobRuntime jobRuntime) { - try { - logger.debug("{} adding to run queue", jobRuntime); - if (jobRuntime.getIp() == 0 && !jobRuntime.isService() && jobRuntime.getParentJobId() == 0) - logger.info(jobRuntime.getSessionMarker(), "A '{}' folyamat elindult.", jobRuntime.getName()); - runQueue.put(jobRuntime); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - } - - /** - * Job prioritasa megvaltozott. Job main queue-ban es a JobStepExecutorok - * soraiban is ujra kell rendezni a jobokat. Az ujrarendezes a put-nal is megy - * automatikusan, mivel eltero prioritasu jobot dobhatnak be vagy default - * prioritasut a korabban atrendezett listaba. - * - * @param jobRuntime - */ - @Override - public void applyPriorityChange(IJobRuntime jobRuntime) { - logger.info("rePrioritization start for {}", jobRuntime.getId()); - // synchronized(this.runQueue){ - - // job main queue reorder - - if (this.runQueue.contains(jobRuntime)) { - logger.info("runQueue"); - this.runQueue.remove(jobRuntime); - try { - this.runQueue.put(jobRuntime); - } catch (InterruptedException e) { - } - } - - // JobStepExecutor reorder - if (jobEngineConfiguration.getExecutors() != null) { - for (IJobStepExecutor exec : jobEngineConfiguration.getExecutors().values()) { - if (exec.containsRuntime(jobRuntime)) { - logger.info("executor"); - exec.changePriority(jobRuntime); - } - } - } - - // } logger.info("rePrioritization end"); - } - - @Override - public synchronized void bindItemManagerService(IItemManager service) { - setItemManager(service); - } - - public synchronized void bindJobEngineConfiguration(Object service) { - if (service instanceof JobEngineConfiguration) { - jobEngineConfiguration = (IJobEngineConfiguration) service; - isScheduledExecutionDisabled = jobEngineConfiguration.isScheduledExecutionDisabled(); - logger.info("IJobEngineConfiguration service binded"); - } - } - - private void bootstrap() throws JobEngineException { - // submit("fake-noparams.xml", "Bootstrap", null); - } - - private void closeSessionLog(IJobRuntime jobRuntime) { - if (!jobRuntime.isService() && jobRuntime.getParentJobId() == 0) { - if (JobStatus.FINISHED.equals(jobRuntime.getStatus())) - logger.info(jobRuntime.getFinishMarker(), "A(z) '{}' folyamat futása sikeresen véget ért.", - jobRuntime.getName()); - else - logger.error(jobRuntime.getFinishMarker(), "A(z) '{}' folyamat futása megszakadt.", - jobRuntime.getName()); - } - } - - protected Map createJobs() { - return new ConcurrentHashMap(); - } - - protected JobEngineRemote createRemoteEngine() { - return new JobEngineRemote(masterServerAddress); - } - - protected IStatusMachine createStatusMachine() { - return new StatusMachine(this); - } - - public void deactivate() { - } - - @Override - public void executeAssignVariableInstruction(IJobRuntime jobRuntime) { - Object value = jobRuntime.popFromStack(); - String name = (String) jobRuntime.popFromStack(); - - // a gyerek job-ok visszateresi erteket a spawn sorszammal kiegeszitve mentjuk - // TODO el kell tudni erni a kovetkezo lepesekbol ezeket az ertekeket - if (jobRuntime.getParentJobId() > 0) { - IJobRuntime parentRuntime = getJobById(jobRuntime.getParentJobId()); - parentRuntime.setVariable(name + jobRuntime.getSpawnOrder(), value); - } - - jobRuntime.setVariable(name, value); - } - - /*** - * Fuggetlen (beagyazott) alfolyamat letrehozasa - */ - @Override - public void executeCallConcurrentJobStepInstruction(IJobRuntime jobRuntime, IProgram subProgram) { - JobRuntime c = new JobRuntime(this, jobRuntime, subProgram); - jobRuntime.addChild(c); - submit(c); - } - - @Override - public void executeCallJobStepInstruction(IJobRuntime jobRuntime) { - statusMachine.processAction(JobAction.EXECUTE, jobRuntime); - } - - @Override - public void executeCheckParameterInstruction(IJobRuntime jobRuntime) { - try { - jobRuntime.checkStackParameter(); - } catch (Exception e) { - logger.error(e.getMessage()); - } - } - - @Override - public void executeDeclareVariableInstruction(IJobRuntime jobRuntime) { - Class type = (Class) jobRuntime.popFromStack(); - String name = (String) jobRuntime.popFromStack(); - jobRuntime.addVariable(name, type); - } - - @Override - public void executeGetParameterInstruction(IJobRuntime jobRuntime) { - try { - String name = (String) jobRuntime.popFromStack(); - Object value = jobRuntime.getParameter(name); - jobRuntime.pushToStack(value); - } catch (Exception e) { - logger.error(e.getMessage()); - jobRuntime.setDescription(e.getMessage()); - suspendWaitExecutorJob(e, jobRuntime); - } - } - - @Override - public void executeGetVariableInstruction(IJobRuntime jobRuntime) { - try { - String name = (String) jobRuntime.popFromStack(); - Object value = jobRuntime.getVariable(name); - jobRuntime.pushToStack(value); - } catch (Exception e) { - logger.error(e.getMessage()); - jobRuntime.setDescription(e.getMessage()); - suspendWaitExecutorJob(e, jobRuntime); - } - } - - @Override - public void executePushToStackInstruction(IJobRuntime jobRuntime, Object item) { - jobRuntime.pushToStack(item); - } - - @Override - public void executeSendMessageToUserInstruction(IJobRuntime jobRuntime) { - statusMachine.processAction(JobAction.MESSAGE, jobRuntime); - - int messageNumber = (Integer) jobRuntime.popFromStack(); - String catalogName = (String) jobRuntime.popFromStack(); - Object[] inputs = getInputsFromStack(jobRuntime); - getUserMessageQueues().addMessage(jobRuntime, catalogName, messageNumber, true, inputs); - } - - @Override - public void executeWaitForSpawnJobsInstruction(IJobRuntime jobRuntime) { - // logger.info("Processing {} {}", jobRuntime.getId(), - // jobRuntime.canContinueExecution()); - - if (jobRuntime.canContinueExecution()) { - jobRuntime.setStatus(JobStatus.RUNNABLE); - } else { - if (!JobStatus.WAIT_FINISH.equals(jobRuntime.getStatus())) { - jobRuntime.setStatus(JobStatus.WAIT_FINISH); - } - jobRuntime.decrementInstructionPointer(); - } - addToRunQueue(jobRuntime); - } - - @Override - public void fireJobChangedEvent(JobChangedEvent event) { - for (IJobChangedListener listener : jobChangedListenerList) { - listener.jobChanged(event); - } - } - - @Override - public Object[] getInputsFromStack(IJobRuntime jobRuntime) { - jobRuntime.swapStack(); - Integer parameterCount = (Integer) jobRuntime.popFromStack(); - Object[] result = new Object[parameterCount]; - for (int index = 0; index < parameterCount; index++) { - result[index] = jobRuntime.popFromStack(); - } - return result; - } - - @Override - public IItemManager getItemManager() { - return itemManager; - } - - @Override - public Job getJob(long id) { - Job result = null; - if (submittedJobs.containsKey(id)) - result = (Job) submittedJobs.get(id); - return result; - } - - @Override - public IJobRuntime getJobById(long jobId) { - if (!submittedJobs.containsKey(jobId)) { - logger.warn("Job does not exist anymore {}. Possibly canceled already.", jobId); - } - return submittedJobs.get(jobId); - } - - @Override - public IJobEngineConfiguration getJobEngineConfiguration() { - return jobEngineConfiguration; - } - - @Override - public Map getJobs() { - return submittedJobs; - } - - protected long getNextJobId() { - return nextJobId.incrementAndGet(); - } - - @Override - public IProgram getProgram(String name) { - return jobEngineConfiguration.getPrograms().get(name); - } - - @Override - public JobEngineRemote getRemoteEngine() { - return remoteEngine; - } - - @Override - public ScheduledJob getScheduledJob(String template) throws Exception { - ArrayList> schedules = jobEngineConfiguration.getSchedules(); - BasicDBObject schedule = null; - boolean templateFirstOccurence = false; - - for (int index = 0; index < schedules.size(); index++) { - if (schedules.get(index).getKey().equals(template)) { - if (!templateFirstOccurence) { - templateFirstOccurence = true; - schedule = schedules.get(index).getValue(); - } else { - throw new Exception("Key " + template + " is already contained in scheduled jobs!"); - } - } - } - - if (schedule == null) - return null; - return jobEngineConfiguration.createScheduledJob(schedule, this); - } - - @Override - public SchedulerService getScheduler() { - return schedulerService; - } - - @Override - public IUserMessageQueues getUserMessageQueues() { - return userMessageQueues; - } - - @Override - public boolean isAllExecutionDisabled() { - return isAllExecutionDisabled; - } - - private void isRunnable(IProgram program) throws JobEngineException { - JobTemplate template = program.getTemplate(); - // A JOB xml-ben beállítható, hogy futhatnak-e párhuzamosan. - if (template.isMultiInstance()) - return; - - // Collection values = getJobs().values(); - - // vegigmenni a submittedJobs-on! - - Set ids = submittedJobs.keySet(); - for (long id : ids) { - if (!submittedJobs.containsKey(id)) - continue; - - IJobRuntime runtime = submittedJobs.get(id); - if (runtime == null) - continue; - - if (!template.getFileName().equals(runtime.getTemplate())) - continue; - - // 'EXECUTING', 'WAIT_EXECUTOR', 'WAIT_SUSPEND', 'RUNNABLE' - if (runtime.isExecuting() || runtime.isRunnable() || runtime.isWaitingSuspend() - || runtime.isWaitingExecutor()) - throw new JobEngineException(String.format("Can not submit job. Job with %s.%s already running", - template.getFileName(), template.getName())); - } - /* - * List runningJobs = itemManager.getRunningJobs(template.getFileName()); - * if (runningJobs != null) { for (Job job : runningJobs) { Job runningJob = - * getJob(job.getId()); - * - * if (runningJob != null && runningJob.getStatus() != JobStatus.SUSPENDED) - * throw new JobEngineException(String. - * format("Can not submit job. Job with %s.%s already running", - * template.getFileName(), template.getName())); } } - */ - } - - @Override - public boolean isRunning() { - return isRunning; - } - - @Override - public boolean isScheduledExecutionDisabled() { - return isScheduledExecutionDisabled; - } - - @Override - public boolean isWorker() { - return masterServerAddress.length() > 0; - } - - @Override - public void jobCleanup(IJobRuntime jobRuntime) { - logger.info("Cleanup {}", jobRuntime.getId()); - statusMachine.processAction(JobAction.FINISH, jobRuntime); - } - - @Override - public boolean keepAliveJobChangedListener(IJobChangedListener listener) { - boolean result = false; - if (keepAliveJobChangedListeners != null) { - long now = System.currentTimeMillis(); - // ha mar hozza van adva, nem adja hozza - result = addJobChangedEventListener(listener); - keepAliveJobChangedListeners.put(listener, now); - logger.debug("Refreshing listener {}, now {} ({})", listener, now, keepAliveJobChangedListeners.size()); - } - - return result; - } - - @Deprecated - @Override - public void keepAliveWorker(String remoteAddr) { - remoteWorkers.put(remoteAddr, LocalDate.now()); - } - - @Override - public void processCancelRequest(IJobMessage message) { - IJobRuntime jobRuntime = getJobById(message.getJobId()); - - if (jobRuntime == null) { - logger.warn("Job with id {} does not exist!", message.getJobId()); - return; - } - - // a gyerekek miatt nem az! - if (!jobRuntime.isCancelable()) - return; - - // Ha a jobs tartalmazta ezt a job-ot, akkor kikerül belőle. - // if (jobs.containsKey(message.getJobId())) - // jobs.remove(message.getJobId()); - statusMachine.processAction(JobAction.CANCEL, jobRuntime); - } - - @Override - public void processJobStepCompletedMessage(IJobMessage message) { - IJobRuntime jobRuntime = getJobById(message.getJobId()); - - if (jobRuntime == null) { - logger.warn("Job with id {} does not exist!", message.getJobId()); - return; - } - - // TODO ez hibat okoz az archivalasnal, mert hamarabb eltavolitja a childUd-ket - - // if (jobRuntime.getParentJobId() > 0) - // removeSpanwChild(jobRuntime); - - JobStepCompletedMessage m = (JobStepCompletedMessage) message; - // kesz vagyunk, jelezni - if (isWorker()) { - statusMachine.processAction(JobAction.DONE, jobRuntime); - return; - } - - // a cancel hamarabb megjott? - // ha remote akkor tuti - if (jobRuntime == null) { - - } - Object[] outputs = m.getOutputs(); - putOutputsToStack(jobRuntime, outputs); - - statusMachine.processAction(JobAction.DONE, jobRuntime); - } - - @Override - public void processJobStepSkippedMessage(IJobMessage message) { - IJobRuntime jobRuntime = getJobById(message.getJobId()); - statusMachine.processAction(JobAction.SKIP, jobRuntime); - } - - @Override - public void processReplyMessage(IJobMessage message) { - IJobRuntime jobRuntime = getJobById(message.getJobId()); - UserReplyMessage m = (UserReplyMessage) message; - IUserMessage userMessage = getUserMessageQueues().getUserMessages().get(m.getMessageId()); - userMessage.setReply((Integer) m.getReply()); - userMessage.setMustReply(false); - jobRuntime.pushToStack(m.getReply()); - statusMachine.processAction(JobAction.REPLY, jobRuntime); - } - - @Override - public void processRestartRequest(IJobMessage message) { - Job job = (Job) itemManager.get(Job.class, message.getJobId()); - if (job != null) { - IProgram program = getProgram(job.getTemplate()); - if (program != null) { - IJobRuntime result = new JobRuntime(this, program, job, itemManager); - result.setStatus(JobStatus.CANCELED); - statusMachine.processAction(JobAction.RESTART, result); - } - } else { - logger.info("JobEngine processRestartReqest: Job is null"); - } - } - - @Override - public void processResumeRequest(IJobMessage message) { - IJobRuntime jobRuntime = getJobById(message.getJobId()); - statusMachine.processAction(JobAction.RESUME, jobRuntime); - } - - @Override - public void processSuspendRequest(IJobMessage message) { - IJobRuntime jobRuntime = getJobById(message.getJobId()); - statusMachine.processAction(JobAction.SUSPEND, jobRuntime); - } - - @Override - public void putOutputsToStack(IJobRuntime jobRuntime, Object[] outputs) { - if (outputs != null) { - for (Object object : outputs) { - jobRuntime.pushToStack(object); - } - jobRuntime.arrangeStack(); - } - } - - @Override - public void reloadGracefully() throws Exception { - if (schedulerService != null) - schedulerService.shutdown(); - - jobEngineConfiguration.resetStepClassLoader(); - jobEngineConfiguration.load(this, true); - - schedulerService = new SchedulerService(this); - schedulerService.startup(); - logger.info("JobEngine gracefully reloaded"); - isRunning = true; - } - - @Override - public void removeFromExecutorQueue(IJobRuntime jobRuntime) { - for (IJobStepExecutor executor : jobEngineConfiguration.getExecutors().values()) - executor.revoke(jobRuntime); - } - - @Override - public void removeFromRunQueue(IJobRuntime jobRuntime) { - runQueue.remove(jobRuntime); - } - - @Override - public void removeGarbage() { - List removeId = new ArrayList<>(); - for (Long id : submittedJobs.keySet()) { - IJobRuntime runtime = submittedJobs.get(id); - if (runtime != null && (JobStatus.SUSPENDED.equals(runtime.getStatus()) - || JobStatus.CANCELED.equals(runtime.getStatus()))) - removeId.add(id); - } - for (Long id : removeId) - submittedJobs.remove(id); - } - - @Override - public void removeJob(long id) { - if (submittedJobs.containsKey(id)) { - IJobRuntime jobRuntime = submittedJobs.remove(id); - fireJobChangedEvent(new JobChangedEvent(jobRuntime, SignalType.DELETE)); - logger.debug("--- {} removed from VM", jobRuntime); - closeSessionLog(jobRuntime); - } - - } - - @Override - public void removeJobChangedEventListener(IJobChangedListener listener) { - if (listener != null) { - jobChangedListenerList.remove(listener); - } - } - - synchronized protected void removeJobChangedListenerGarbage() { - List toBeRemoved = new ArrayList<>(); - for (IJobChangedListener listener : keepAliveJobChangedListeners.keySet()) { - long lastMod = keepAliveJobChangedListeners.get(listener); - long now = System.currentTimeMillis(); - if ((now - lastMod) > 5 * 1000) { - - toBeRemoved.add(listener); - logger.debug("{} refreshed {}, now {}", listener, lastMod, now); - } - } - toBeRemoved.forEach(r -> { - logger.debug("Removing listener {}", r); - removeJobChangedEventListener(r); - keepAliveJobChangedListeners.remove(r); - }); - } - - @Override - public void removeSpanwChild(IJobRuntime jobRuntime) { - IJobRuntime parent = getJobById(jobRuntime.getParentJobId()); - if (parent == null) - return; - - parent.removeSpanwChild(jobRuntime.getId()); - - } - - @Override - public ClusteredJob requestJob(String className) throws Exception { - IJobStepExecutor executor = jobEngineConfiguration.getExecutors().get(className); - if (executor == null) - throw new Exception("Unregistered executor request: " + className); - if (!executor.isRemoteEnabled()) - throw new Exception("Job is not registered for remote workers: " + className); - - ClusteredJob job = executor.steelJob(); - return job; - } - - @Override - public void restartGracefully() throws Exception { - isRunning = false; - if (schedulerService != null) - schedulerService.shutdown(); - - vm.shutDown(); - shutdownExecutors(); - - startUpBarrier.reset(); - logger.info("JobEngine gracefully stopped"); - - jobEngineConfiguration.resetStepClassLoader(); - jobEngineConfiguration.load(this, false); - - vm.start(); - - startupExecutors(); - startUpBarrier.await(); - - schedulerService = new SchedulerService(this); - schedulerService.startup(); - logger.info("JobEngine gracefully started"); - isRunning = true; - } - - @Override - public void sendMessage(IJobMessage jobMessage) { - messageQueue.add(jobMessage); - } - - @Override - public void setAllExecutionDisabled(boolean isDisabled) { - this.isAllExecutionDisabled = isDisabled; - } - - @Override - public void setItemManager(IItemManager _itemManager) { - itemManager = _itemManager; - } - - @Override - public void setScheduledExecutionDisabled(boolean isDisabled) { - this.isScheduledExecutionDisabled = isDisabled; - } - - @Override - public void shutdown() { - isRunning = false; - if (schedulerService != null) - schedulerService.shutdown(); - - vm.shutDown(); - shutdownExecutors(); - dispatcher.shutDown(); - jobChangedListenerChecker.shutDown(); - - try { - TSMClient.CleanUpMultithread(); - } catch (TSMException e) { - logger.catching(e); - } - logger.info("JobEngine stopped"); - } - - private void shutdownExecutors() { - Collection executors = jobEngineConfiguration.getExecutors().values(); - for (IJobStepExecutor executor : executors) { - logger.trace("Notify executor {}", executor.getStepUnitName()); - executor.shutdown(); - } - for (IJobStepExecutor executor : executors) { - logger.info("Stopping executor {}", executor.getStepUnitName()); - executor.waitShutdown(); - } - - } - - private List spawnJobs(IJobRuntime jobRuntime, String executorName) throws InterruptedException { - List result = new ArrayList<>(); - - CallJobStepInstruction currentInstruction = jobRuntime.getCurrentCallJobStepInstruction(); - if (currentInstruction != null) { - String forEach = currentInstruction.getForEach(); - if (StringUtils.isNotBlank(forEach)) { - Object parameter = jobRuntime.getParameter(forEach); - if (parameter == null) - parameter = jobRuntime.getVariable(forEach); - - // a sima array helyett ezt jobb hasznalni - if (parameter != null && parameter instanceof BasicDBList) { - - BasicDBList iter = (BasicDBList) parameter; - for (int i = 1; i < iter.size(); i++) { - IJobRuntime jobRuntimeCopy = new JobRuntime(jobRuntime); - jobRuntimeCopy.setSpawnOrder(i); - jobRuntimeCopy.add(); - - jobRuntime.addSpawnChild(jobRuntimeCopy); - - storeJob(jobRuntimeCopy); - result.add(jobRuntimeCopy); - } - } - } - - } - - result.add(jobRuntime); - return result; - } - - @Override - public void startup() { - try { - removeGarbage(); - - jobEngineConfiguration.load(this, false); - - vm = new VM(); - dispatcher = new MessageDispatcher(); - userMessageQueues = new UserMessageQueues(); - jobChangedListenerChecker = new JobChangedListenerChecker(); - - vm.start(); - dispatcher.start(); - jobChangedListenerChecker.start(); - - startupExecutors(); - startUpBarrier.await(); - - schedulerService = new SchedulerService(this); - schedulerService.startup(); - bootstrap(); - isRunning = true; - TSMClient.SetUpMultithread(); - } catch (Exception e) { - logger.error(e.getMessage()); - logger.error(e); - } - logger.info("JobEngine started"); - - } - - private void startupExecutors() { - for (IJobStepExecutor executor : jobEngineConfiguration.getExecutors().values()) - executor.startup(); - } - - @Override - public void storeJob(IJobRuntime runtime) { - submittedJobs.put(runtime.getId(), runtime); - logger.debug("+++ {} stored in VM ", runtime); - } - - private void submit(IJobRuntime runtime) { - runtime.setSubmitted(new Timestamp(System.currentTimeMillis())); - runtime.add(); - addToRunQueue(runtime); - submittedJobs.put(runtime.getId(), runtime); - logger.debug("+++ {} added to VM ", runtime); - } - - @Override - public IJobRuntime submit(IJobRuntime parent, IJobStatusChangedListener statusListener, String template, - String name, int priority, String owner, Map parameters) throws JobEngineException { - IJobRuntime result = null; - IProgram program = getProgram(template); - if (program != null) { - isRunnable(program); - result = new JobRuntime(this, program); - result.setPersister(itemManager); - result.setTemplate(template); - result.setName(name); - result.setParameters(parameters); - result.setPriority(priority); - result.setService(program.getTemplate().isService()); - if (parent != null) - result.setParentJobId(parent.getId()); - if (statusListener != null) - result.addEventListener(statusListener); - submit(result); - } else { - logger.error("Unknown template {}", template); - throw new JobEngineException("Unknown template " + template); - } - return result; - } - - @Override - public IJobRuntime submit(IJobRuntime parent, IJobStatusChangedListener statusListener, String template, - String name, Map parameters) throws JobEngineException { - IJobRuntime result = null; - result = submit(parent, statusListener, template, name, 0, DEFAULT_OWNER, parameters); - return result; - } - - @Override - public IJobRuntime submit(IJobRuntime parent, String template, String name, int priority, - Map parameters) throws JobEngineException { - IJobRuntime result = null; - result = submit(parent, null, template, name, 0, DEFAULT_OWNER, parameters); - return result; - } - - IJobRuntime submit(IProgram program, IJobStatusChangedListener listener) { - IJobRuntime jobRuntime = new JobRuntime(this, program); - jobRuntime.addEventListener(listener); - addToRunQueue(jobRuntime); - return jobRuntime; - } - - @Override - public IJobRuntime submit(String template, String name, Map parameters) throws JobEngineException { - // Az ütemezett task-okat configból a Quartz futtatja - if (isAllExecutionDisabled) { - logger.info("JobEngine is disabled, can not submit job '{}'", name); - return null; - } - IJobRuntime result = null; - IProgram program = getProgram(template); - result = submit(null, null, template, name == null ? program.getTemplate().getName() : name, 0, DEFAULT_OWNER, - parameters); - return result; - } - - @Override - public IJobRuntime submit(String template, String name, Map parameters, String owner) - throws JobEngineException { - IJobRuntime result = null; - result = submit(null, null, template, name, 0, owner, parameters); - return result; - } - - @Override - public void suspendExecutingJob(Throwable t, IJobRuntime jobRuntime) { - String description = t.getClass().getSimpleName() + " : " + t.getMessage(); - jobRuntime.setDescription(description); - logger.error(description); - // TODO itt miert FINISH a kovetkezo allapot, miert nem SUSPEND - statusMachine.processAction(JobAction.FINISH, jobRuntime); - closeSessionLog(jobRuntime); - } - - @Override - public void suspendWaitExecutorJob(Throwable t, IJobRuntime jobRuntime) { - String description = t.getMessage(); - jobRuntime.setDescription(description); - logger.error(description); - statusMachine.processAction(JobAction.SUSPEND, jobRuntime); - closeSessionLog(jobRuntime); - } - - public synchronized void unbindItemManagerService(IItemManager service) { - try { - shutdown(); - } catch (Exception e) { - logger.error("Couldn't shutdown jobEngine", e); - } - } - -} +package user.jobengine.server; + +import java.sql.Timestamp; +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.lang.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.ibm.nosql.json.api.BasicDBList; +import com.ibm.nosql.json.api.BasicDBObject; + +import user.commons.Job; +import user.commons.JobStatus; +import user.commons.cluster.ClusteredJob; +import user.jobengine.db.IItemManager; +import user.jobengine.db.ItemManagerData.SignalType; +import user.jobengine.server.actions.IStatusMachine; +import user.jobengine.server.actions.JobAction; +import user.jobengine.server.actions.StatusMachine; +import user.jobengine.server.ast.JobTemplate; +import user.jobengine.server.instructions.CallJobStepInstruction; +import user.jobengine.server.instructions.IInstruction; +import user.jobengine.server.messagequeue.IUserMessage; +import user.jobengine.server.messagequeue.IUserMessageQueues; +import user.jobengine.server.messagequeue.UserMessageQueues; +import user.jobengine.server.messages.IJobMessage; +import user.jobengine.server.messages.JobStepCompletedMessage; +import user.jobengine.server.messages.UserReplyMessage; +import user.jobengine.server.scheduler.ScheduledJob; +import user.jobengine.server.scheduler.SchedulerService; +import user.tsm.client.TSMClient; +import user.tsm.client.TSMException; + +/** + * MAM motor implementáció. + */ +public class JobEngine implements IJobEngine { + + private class JobChangedListenerChecker extends Thread { + private volatile boolean shutdown = false; + + @Override + public void run() { + try { + startUpBarrier.await(); + } catch (Exception e) { + } + + while (!shutdown) { + try { + Thread.sleep(1000); + removeJobChangedListenerGarbage(); + } catch (InterruptedException e) { + shutdown = true; + } + } + } + + void shutDown() { + shutdown = true; + try { + join(); + } catch (InterruptedException e) { + } + } + } + + private class MessageDispatcher extends Thread { + private volatile boolean shutdown = false; + + @Override + public void run() { + try { + startUpBarrier.await(); + } catch (Exception e) { + } + + while (!shutdown) { + try { + IJobMessage message = messageQueue.poll(QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); + if (message != null) + message.process(JobEngine.this); + + } catch (InterruptedException e) { + shutdown = true; + } + } + // a leallitas utan az osszes fuggo uzenet vegrehajtasa + while (!messageQueue.isEmpty()) { + try { + IJobMessage message = messageQueue.poll(QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); + if (message != null) + message.process(JobEngine.this); + } catch (InterruptedException e) { + shutdown = true; + } + } + } + + void shutDown() { + shutdown = true; + try { + join(); + } catch (InterruptedException e) { + } + } + } + + private class VM extends Thread { + private volatile boolean shutdown = false; + private IInstruction ir = null; + + @Override + public void run() { + try { + startUpBarrier.await(); + } catch (Exception e) { + } + + while (!shutdown) { + try { + Thread.sleep(QUEUE_POLL_INTERVAL_MS); + // IJobRuntime jobRuntime = runQueue.poll(QUEUE_POLL_INTERVAL_MS, + // TimeUnit.MILLISECONDS); + IJobRuntime jobRuntime = runQueue.poll(); + if (jobRuntime != null) { + logger.debug("Processing {}", jobRuntime.getId()); + // varakozo esetben vegrehajtjuk a kovetkezo utasitast + if (jobRuntime.hasNextInstruction() && jobRuntime.isWaitFinish()) { + ir = jobRuntime.getNextInstruction(); + ir.execute(JobEngine.this, jobRuntime); + } else { + // normal esetben elfutunk a kovetkezo job step-ig, vagy vegig + while (jobRuntime.hasNextInstruction() && jobRuntime.isRunnable()) { + ir = jobRuntime.getNextInstruction(); + ir.execute(JobEngine.this, jobRuntime); + } + } + + if (!jobRuntime.hasNextInstruction() && jobRuntime.isRunnable()) + jobCleanup(jobRuntime); + + } + } catch (Exception e) { + logger.error("Critical VM error!", e); + // shutdown = true; + } + } + } + + void shutDown() { + shutdown = true; + try { + join(); + } catch (InterruptedException e) { + } + } + } + + private static final Logger logger = LogManager.getLogger(); + static private IItemManager itemManager; + private static JobEngine instance; + + public static JobEngine getInstance() { + if (instance == null) + instance = new JobEngine(); + return instance; + } + + static public IItemManager getStaticItemManager() { + return itemManager; + } + + private volatile boolean isRunning; + + private volatile boolean isAllExecutionDisabled; + private volatile boolean isScheduledExecutionDisabled; + private final BlockingQueue runQueue; + private final BlockingQueue messageQueue; + private final Map submittedJobs; + private VM vm; + private MessageDispatcher dispatcher; + private JobChangedListenerChecker jobChangedListenerChecker; + + private IUserMessageQueues userMessageQueues; + + private final CyclicBarrier startUpBarrier; + + private final IStatusMachine statusMachine; + + private AtomicLong nextJobId; + private SchedulerService schedulerService = null; + private Set jobChangedListenerList = ConcurrentHashMap.newKeySet(); + private Map remoteWorkers; + private String masterServerAddress = System.getProperty("jobengine.master.server", ""); + private final JobEngineRemote remoteEngine; + + private ConcurrentHashMap keepAliveJobChangedListeners = new ConcurrentHashMap<>(); + + private IJobEngineConfiguration jobEngineConfiguration; + + /** + * A MAM motor példányosítása. Ennek során létrejönnek a várakozási sorok, az + * ütemező szál, az üzenet kezelő szál. + */ + public JobEngine() { + + runQueue = new PriorityBlockingQueue(); + messageQueue = new LinkedBlockingQueue(); + submittedJobs = createJobs(); + startUpBarrier = new CyclicBarrier(4); + nextJobId = new AtomicLong(0); + statusMachine = createStatusMachine(); + + if (instance == null) + instance = this; + else + throw new RuntimeException("Multiple JobEngine instances are not supported!"); + + remoteWorkers = new ConcurrentHashMap<>(); + // logger.info("JobEngine created"); + if (isWorker()) + remoteEngine = createRemoteEngine(); + else + remoteEngine = null; + } + + public void activate() { + } + + @Override + public boolean addJobChangedEventListener(IJobChangedListener listener) { + boolean result = !jobChangedListenerList.contains(listener); + if (listener != null && result) + jobChangedListenerList.add(listener); + + return result; + } + + @Override + public void addToExecutorQueue(IJobRuntime jobRuntime) { + try { + Object typeName = jobRuntime.popFromStack(); + if (typeName == null) + throw new Exception(jobRuntime.toString() + " illegal execution state detected: executor name is null."); + String executorName = String.valueOf(typeName); + if (!jobEngineConfiguration.getExecutors().containsKey(executorName)) + throw new Exception(jobRuntime.toString() + " executor is unavailable: " + executorName); + + // a lista mindig letezik, de sima futtataskor csak 1 elemu, az aktualis van + // benne + // ha van parhuzamos futtatas, akkor letrejonnek benne a step szellemek is, es + // meghivodik a fork + List jobs = spawnJobs(jobRuntime, executorName); + jobEngineConfiguration.getExecutors().get(executorName).submit(jobs.toArray(new IJobRuntime[] {})); + jobs.forEach(r -> fireJobChangedEvent(new JobChangedEvent(r, SignalType.EXECUTE))); + + } catch (Exception e) { + logger.catching(e); + suspendWaitExecutorJob(e, jobRuntime); + } + } + + @Override + public void addToRunQueue(IJobRuntime jobRuntime) { + try { + logger.debug("{} adding to run queue", jobRuntime); + if (jobRuntime.getIp() == 0 && !jobRuntime.isService() && jobRuntime.getParentJobId() == 0) + logger.info(jobRuntime.getSessionMarker(), "A '{}' folyamat elindult.", jobRuntime.getName()); + runQueue.put(jobRuntime); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + + /** + * Job prioritasa megvaltozott. Job main queue-ban es a JobStepExecutorok + * soraiban is ujra kell rendezni a jobokat. Az ujrarendezes a put-nal is megy + * automatikusan, mivel eltero prioritasu jobot dobhatnak be vagy default + * prioritasut a korabban atrendezett listaba. + * + * @param jobRuntime + */ + @Override + public void applyPriorityChange(IJobRuntime jobRuntime) { + logger.info("rePrioritization start for {}", jobRuntime.getId()); + // synchronized(this.runQueue){ + + // job main queue reorder + + if (this.runQueue.contains(jobRuntime)) { + logger.info("runQueue"); + this.runQueue.remove(jobRuntime); + try { + this.runQueue.put(jobRuntime); + } catch (InterruptedException e) { + } + } + + // JobStepExecutor reorder + if (jobEngineConfiguration.getExecutors() != null) { + for (IJobStepExecutor exec : jobEngineConfiguration.getExecutors().values()) { + if (exec.containsRuntime(jobRuntime)) { + logger.info("executor"); + exec.changePriority(jobRuntime); + } + } + } + + // } logger.info("rePrioritization end"); + } + + @Override + public synchronized void bindItemManagerService(IItemManager service) { + setItemManager(service); + } + + public synchronized void bindJobEngineConfiguration(Object service) { + if (service instanceof JobEngineConfiguration) { + jobEngineConfiguration = (IJobEngineConfiguration) service; + isScheduledExecutionDisabled = jobEngineConfiguration.isScheduledExecutionDisabled(); + logger.info("IJobEngineConfiguration service binded"); + } + } + + private void bootstrap() throws JobEngineException { + // submit("fake-noparams.xml", "Bootstrap", null); + } + + private void closeSessionLog(IJobRuntime jobRuntime) { + if (!jobRuntime.isService() && jobRuntime.getParentJobId() == 0) { + if (JobStatus.FINISHED.equals(jobRuntime.getStatus())) + logger.info(jobRuntime.getFinishMarker(), "A(z) '{}' folyamat futása sikeresen véget ért.", jobRuntime.getName()); + else + logger.error(jobRuntime.getFinishMarker(), "A(z) '{}' folyamat futása megszakadt.", jobRuntime.getName()); + } + } + + protected Map createJobs() { + return new ConcurrentHashMap(); + } + + protected JobEngineRemote createRemoteEngine() { + return new JobEngineRemote(masterServerAddress); + } + + protected IStatusMachine createStatusMachine() { + return new StatusMachine(this); + } + + public void deactivate() { + } + + @Override + public void executeAssignVariableInstruction(IJobRuntime jobRuntime) { + Object value = jobRuntime.popFromStack(); + String name = (String) jobRuntime.popFromStack(); + + // a gyerek job-ok visszateresi erteket a spawn sorszammal kiegeszitve mentjuk + // TODO el kell tudni erni a kovetkezo lepesekbol ezeket az ertekeket + if (jobRuntime.getParentJobId() > 0) { + IJobRuntime parentRuntime = getJobById(jobRuntime.getParentJobId()); + parentRuntime.setVariable(name + jobRuntime.getSpawnOrder(), value); + } + + jobRuntime.setVariable(name, value); + } + + /*** + * Fuggetlen (beagyazott) alfolyamat letrehozasa + */ + @Override + public void executeCallConcurrentJobStepInstruction(IJobRuntime jobRuntime, IProgram subProgram) { + JobRuntime c = new JobRuntime(this, jobRuntime, subProgram); + jobRuntime.addChild(c); + submit(c); + } + + @Override + public void executeCallJobStepInstruction(IJobRuntime jobRuntime) { + statusMachine.processAction(JobAction.EXECUTE, jobRuntime); + } + + @Override + public void executeCheckParameterInstruction(IJobRuntime jobRuntime) { + try { + jobRuntime.checkStackParameter(); + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + + @Override + public void executeDeclareVariableInstruction(IJobRuntime jobRuntime) { + Class type = (Class) jobRuntime.popFromStack(); + String name = (String) jobRuntime.popFromStack(); + jobRuntime.addVariable(name, type); + } + + @Override + public void executeGetParameterInstruction(IJobRuntime jobRuntime) { + try { + String name = (String) jobRuntime.popFromStack(); + Object value = jobRuntime.getParameter(name); + jobRuntime.pushToStack(value); + } catch (Exception e) { + logger.error(e.getMessage()); + jobRuntime.setDescription(e.getMessage()); + suspendWaitExecutorJob(e, jobRuntime); + } + } + + @Override + public void executeGetVariableInstruction(IJobRuntime jobRuntime) { + try { + String name = (String) jobRuntime.popFromStack(); + Object value = jobRuntime.getVariable(name); + jobRuntime.pushToStack(value); + } catch (Exception e) { + logger.error(e.getMessage()); + jobRuntime.setDescription(e.getMessage()); + suspendWaitExecutorJob(e, jobRuntime); + } + } + + @Override + public void executePushToStackInstruction(IJobRuntime jobRuntime, Object item) { + jobRuntime.pushToStack(item); + } + + @Override + public void executeSendMessageToUserInstruction(IJobRuntime jobRuntime) { + statusMachine.processAction(JobAction.MESSAGE, jobRuntime); + + int messageNumber = (Integer) jobRuntime.popFromStack(); + String catalogName = (String) jobRuntime.popFromStack(); + Object[] inputs = getInputsFromStack(jobRuntime); + getUserMessageQueues().addMessage(jobRuntime, catalogName, messageNumber, true, inputs); + } + + @Override + public void executeWaitForSpawnJobsInstruction(IJobRuntime jobRuntime) { + // logger.info("Processing {} {}", jobRuntime.getId(), + // jobRuntime.canContinueExecution()); + + if (jobRuntime.canContinueExecution()) { + jobRuntime.setStatus(JobStatus.RUNNABLE); + } else { + if (!JobStatus.WAIT_FINISH.equals(jobRuntime.getStatus())) { + jobRuntime.setStatus(JobStatus.WAIT_FINISH); + } + jobRuntime.decrementInstructionPointer(); + } + addToRunQueue(jobRuntime); + } + + @Override + public void fireJobChangedEvent(JobChangedEvent event) { + for (IJobChangedListener listener : jobChangedListenerList) { + listener.jobChanged(event); + } + } + + @Override + public Object[] getInputsFromStack(IJobRuntime jobRuntime) { + jobRuntime.swapStack(); + Integer parameterCount = (Integer) jobRuntime.popFromStack(); + Object[] result = new Object[parameterCount]; + for (int index = 0; index < parameterCount; index++) { + result[index] = jobRuntime.popFromStack(); + } + return result; + } + + @Override + public IItemManager getItemManager() { + return itemManager; + } + + @Override + public Job getJob(long id) { + Job result = null; + if (submittedJobs.containsKey(id)) + result = (Job) submittedJobs.get(id); + return result; + } + + @Override + public IJobRuntime getJobById(long jobId) { + if (!submittedJobs.containsKey(jobId)) { + logger.warn("Job does not exist anymore {}. Possibly canceled already.", jobId); + } + return submittedJobs.get(jobId); + } + + @Override + public IJobEngineConfiguration getJobEngineConfiguration() { + return jobEngineConfiguration; + } + + @Override + public Map getJobs() { + return submittedJobs; + } + + protected long getNextJobId() { + return nextJobId.incrementAndGet(); + } + + @Override + public IProgram getProgram(String name) { + return jobEngineConfiguration.getPrograms().get(name); + } + + @Override + public JobEngineRemote getRemoteEngine() { + return remoteEngine; + } + + @Override + public ScheduledJob getScheduledJob(String template) throws Exception { + ArrayList> schedules = jobEngineConfiguration.getSchedules(); + BasicDBObject schedule = null; + boolean templateFirstOccurence = false; + + for (int index = 0; index < schedules.size(); index++) { + if (schedules.get(index).getKey().equals(template)) { + if (!templateFirstOccurence) { + templateFirstOccurence = true; + schedule = schedules.get(index).getValue(); + } else { + throw new Exception("Key " + template + " is already contained in scheduled jobs!"); + } + } + } + + if (schedule == null) + return null; + return jobEngineConfiguration.createScheduledJob(schedule, this); + } + + @Override + public SchedulerService getScheduler() { + return schedulerService; + } + + @Override + public IUserMessageQueues getUserMessageQueues() { + return userMessageQueues; + } + + @Override + public boolean isAllExecutionDisabled() { + return isAllExecutionDisabled; + } + + private void isRunnable(IProgram program) throws JobEngineException { + JobTemplate template = program.getTemplate(); + // A JOB xml-ben beállítható, hogy futhatnak-e párhuzamosan. + if (template.isMultiInstance()) + return; + + // Collection values = getJobs().values(); + + // vegigmenni a submittedJobs-on! + + Set ids = submittedJobs.keySet(); + for (long id : ids) { + if (!submittedJobs.containsKey(id)) + continue; + + IJobRuntime runtime = submittedJobs.get(id); + if (runtime == null) + continue; + + if (!template.getFileName().equals(runtime.getTemplate())) + continue; + + // 'EXECUTING', 'WAIT_EXECUTOR', 'WAIT_SUSPEND', 'RUNNABLE' + if (runtime.isExecuting() || runtime.isRunnable() || runtime.isWaitingSuspend() || runtime.isWaitingExecutor()) + throw new JobEngineException(String.format("Can not submit job. Job with %s.%s already running", template.getFileName(), template.getName())); + } + /* + * List runningJobs = itemManager.getRunningJobs(template.getFileName()); + * if (runningJobs != null) { for (Job job : runningJobs) { Job runningJob = + * getJob(job.getId()); + * + * if (runningJob != null && runningJob.getStatus() != JobStatus.SUSPENDED) + * throw new JobEngineException(String. + * format("Can not submit job. Job with %s.%s already running", + * template.getFileName(), template.getName())); } } + */ + } + + @Override + public boolean isRunning() { + return isRunning; + } + + @Override + public boolean isScheduledExecutionDisabled() { + return isScheduledExecutionDisabled; + } + + @Override + public boolean isWorker() { + return masterServerAddress.length() > 0; + } + + @Override + public void jobCleanup(IJobRuntime jobRuntime) { + logger.info("Cleanup {}", jobRuntime.getId()); + statusMachine.processAction(JobAction.FINISH, jobRuntime); + } + + @Override + public boolean keepAliveJobChangedListener(IJobChangedListener listener) { + boolean result = false; + if (keepAliveJobChangedListeners != null) { + long now = System.currentTimeMillis(); + // ha mar hozza van adva, nem adja hozza + result = addJobChangedEventListener(listener); + keepAliveJobChangedListeners.put(listener, now); + logger.debug("Refreshing listener {}, now {} ({})", listener, now, keepAliveJobChangedListeners.size()); + } + + return result; + } + + @Deprecated + @Override + public void keepAliveWorker(String remoteAddr) { + remoteWorkers.put(remoteAddr, LocalDate.now()); + } + + @Override + public void processCancelRequest(IJobMessage message) { + IJobRuntime jobRuntime = getJobById(message.getJobId()); + + if (jobRuntime == null) { + logger.warn("Job with id {} does not exist!", message.getJobId()); + return; + } + + // a gyerekek miatt nem az! + if (!jobRuntime.isCancelable()) + return; + + // Ha a jobs tartalmazta ezt a job-ot, akkor kikerül belőle. + // if (jobs.containsKey(message.getJobId())) + // jobs.remove(message.getJobId()); + statusMachine.processAction(JobAction.CANCEL, jobRuntime); + } + + @Override + public void processJobStepCompletedMessage(IJobMessage message) { + IJobRuntime jobRuntime = getJobById(message.getJobId()); + + if (jobRuntime == null) { + logger.warn("Job with id {} does not exist!", message.getJobId()); + return; + } + + // TODO ez hibat okoz az archivalasnal, mert hamarabb eltavolitja a childUd-ket + + // if (jobRuntime.getParentJobId() > 0) + // removeSpanwChild(jobRuntime); + + JobStepCompletedMessage m = (JobStepCompletedMessage) message; + // kesz vagyunk, jelezni + if (isWorker()) { + statusMachine.processAction(JobAction.DONE, jobRuntime); + return; + } + + // a cancel hamarabb megjott? + // ha remote akkor tuti + if (jobRuntime == null) { + + } + Object[] outputs = m.getOutputs(); + putOutputsToStack(jobRuntime, outputs); + + statusMachine.processAction(JobAction.DONE, jobRuntime); + } + + @Override + public void processJobStepSkippedMessage(IJobMessage message) { + IJobRuntime jobRuntime = getJobById(message.getJobId()); + statusMachine.processAction(JobAction.SKIP, jobRuntime); + } + + @Override + public void processReplyMessage(IJobMessage message) { + IJobRuntime jobRuntime = getJobById(message.getJobId()); + UserReplyMessage m = (UserReplyMessage) message; + IUserMessage userMessage = getUserMessageQueues().getUserMessages().get(m.getMessageId()); + userMessage.setReply((Integer) m.getReply()); + userMessage.setMustReply(false); + jobRuntime.pushToStack(m.getReply()); + statusMachine.processAction(JobAction.REPLY, jobRuntime); + } + + @Override + public void processRestartRequest(IJobMessage message) { + Job job = (Job) itemManager.get(Job.class, message.getJobId()); + if (job != null) { + IProgram program = getProgram(job.getTemplate()); + if (program != null) { + IJobRuntime result = new JobRuntime(this, program, job, itemManager); + result.setStatus(JobStatus.CANCELED); + statusMachine.processAction(JobAction.RESTART, result); + } + } else { + logger.info("JobEngine processRestartReqest: Job is null"); + } + } + + @Override + public void processResumeRequest(IJobMessage message) { + IJobRuntime jobRuntime = getJobById(message.getJobId()); + statusMachine.processAction(JobAction.RESUME, jobRuntime); + } + + @Override + public void processSuspendRequest(IJobMessage message) { + IJobRuntime jobRuntime = getJobById(message.getJobId()); + statusMachine.processAction(JobAction.SUSPEND, jobRuntime); + } + + @Override + public void putOutputsToStack(IJobRuntime jobRuntime, Object[] outputs) { + if (outputs != null) { + for (Object object : outputs) { + jobRuntime.pushToStack(object); + } + jobRuntime.arrangeStack(); + } + } + + @Override + public void reloadGracefully() throws Exception { + if (schedulerService != null) + schedulerService.shutdown(); + + jobEngineConfiguration.resetStepClassLoader(); + jobEngineConfiguration.load(this, true); + + schedulerService = new SchedulerService(this); + schedulerService.startup(); + logger.info("JobEngine gracefully reloaded"); + isRunning = true; + } + + @Override + public void removeFromExecutorQueue(IJobRuntime jobRuntime) { + for (IJobStepExecutor executor : jobEngineConfiguration.getExecutors().values()) + executor.revoke(jobRuntime); + } + + @Override + public void removeFromRunQueue(IJobRuntime jobRuntime) { + runQueue.remove(jobRuntime); + } + + @Override + public void removeGarbage() { + List removeId = new ArrayList<>(); + for (Long id : submittedJobs.keySet()) { + IJobRuntime runtime = submittedJobs.get(id); + if (runtime != null && (JobStatus.SUSPENDED.equals(runtime.getStatus()) || JobStatus.CANCELED.equals(runtime.getStatus()))) + removeId.add(id); + } + for (Long id : removeId) + submittedJobs.remove(id); + } + + @Override + public void removeJob(long id) { + if (submittedJobs.containsKey(id)) { + IJobRuntime jobRuntime = submittedJobs.remove(id); + fireJobChangedEvent(new JobChangedEvent(jobRuntime, SignalType.DELETE)); + logger.debug("--- {} removed from VM", jobRuntime); + closeSessionLog(jobRuntime); + } + + } + + @Override + public void removeJobChangedEventListener(IJobChangedListener listener) { + if (listener != null) { + jobChangedListenerList.remove(listener); + } + } + + synchronized protected void removeJobChangedListenerGarbage() { + List toBeRemoved = new ArrayList<>(); + for (IJobChangedListener listener : keepAliveJobChangedListeners.keySet()) { + long lastMod = keepAliveJobChangedListeners.get(listener); + long now = System.currentTimeMillis(); + if ((now - lastMod) > 5 * 1000) { + + toBeRemoved.add(listener); + logger.debug("{} refreshed {}, now {}", listener, lastMod, now); + } + } + toBeRemoved.forEach(r -> { + logger.debug("Removing listener {}", r); + removeJobChangedEventListener(r); + keepAliveJobChangedListeners.remove(r); + }); + } + + @Override + public void removeSpanwChild(IJobRuntime jobRuntime) { + IJobRuntime parent = getJobById(jobRuntime.getParentJobId()); + if (parent == null) + return; + + parent.removeSpanwChild(jobRuntime.getId()); + + } + + @Override + public ClusteredJob requestJob(String className) throws Exception { + IJobStepExecutor executor = jobEngineConfiguration.getExecutors().get(className); + if (executor == null) + throw new Exception("Unregistered executor request: " + className); + if (!executor.isRemoteEnabled()) + throw new Exception("Job is not registered for remote workers: " + className); + + ClusteredJob job = executor.steelJob(); + return job; + } + + @Override + public void restartGracefully() throws Exception { + isRunning = false; + if (schedulerService != null) + schedulerService.shutdown(); + + vm.shutDown(); + shutdownExecutors(); + + startUpBarrier.reset(); + logger.info("JobEngine gracefully stopped"); + + jobEngineConfiguration.resetStepClassLoader(); + jobEngineConfiguration.load(this, false); + + vm.start(); + + startupExecutors(); + startUpBarrier.await(); + + schedulerService = new SchedulerService(this); + schedulerService.startup(); + logger.info("JobEngine gracefully started"); + isRunning = true; + } + + @Override + public void sendMessage(IJobMessage jobMessage) { + messageQueue.add(jobMessage); + } + + @Override + public void setAllExecutionDisabled(boolean isDisabled) { + this.isAllExecutionDisabled = isDisabled; + } + + @Override + public void setItemManager(IItemManager _itemManager) { + itemManager = _itemManager; + } + + @Override + public void setScheduledExecutionDisabled(boolean isDisabled) { + this.isScheduledExecutionDisabled = isDisabled; + } + + @Override + public void shutdown() { + isRunning = false; + if (schedulerService != null) + schedulerService.shutdown(); + + vm.shutDown(); + shutdownExecutors(); + dispatcher.shutDown(); + jobChangedListenerChecker.shutDown(); + + try { + TSMClient.CleanUpMultithread(); + } catch (TSMException e) { + logger.catching(e); + } + logger.info("JobEngine stopped"); + } + + private void shutdownExecutors() { + Collection executors = jobEngineConfiguration.getExecutors().values(); + for (IJobStepExecutor executor : executors) { + logger.trace("Notify executor {}", executor.getStepUnitName()); + executor.shutdown(); + } + for (IJobStepExecutor executor : executors) { + logger.info("Stopping executor {}", executor.getStepUnitName()); + executor.waitShutdown(); + } + + } + + private List spawnJobs(IJobRuntime jobRuntime, String executorName) throws InterruptedException { + List result = new ArrayList<>(); + + CallJobStepInstruction currentInstruction = jobRuntime.getCurrentCallJobStepInstruction(); + if (currentInstruction != null) { + String forEach = currentInstruction.getForEach(); + if (StringUtils.isNotBlank(forEach)) { + Object parameter = jobRuntime.getParameter(forEach); + if (parameter == null) + parameter = jobRuntime.getVariable(forEach); + + // a sima array helyett ezt jobb hasznalni + if (parameter != null && parameter instanceof BasicDBList) { + + BasicDBList iter = (BasicDBList) parameter; + for (int i = 1; i < iter.size(); i++) { + IJobRuntime jobRuntimeCopy = new JobRuntime(jobRuntime); + jobRuntimeCopy.setSpawnOrder(i); + jobRuntimeCopy.add(); + + jobRuntime.addSpawnChild(jobRuntimeCopy); + + storeJob(jobRuntimeCopy); + result.add(jobRuntimeCopy); + } + } + } + + } + + result.add(jobRuntime); + return result; + } + + @Override + public void startup() { + try { + removeGarbage(); + + jobEngineConfiguration.load(this, false); + + vm = new VM(); + dispatcher = new MessageDispatcher(); + userMessageQueues = new UserMessageQueues(); + jobChangedListenerChecker = new JobChangedListenerChecker(); + + vm.start(); + dispatcher.start(); + jobChangedListenerChecker.start(); + + startupExecutors(); + startUpBarrier.await(); + + schedulerService = new SchedulerService(this); + schedulerService.startup(); + bootstrap(); + isRunning = true; + TSMClient.SetUpMultithread(); + } catch (Exception e) { + logger.error(e.getMessage()); + logger.error(e); + } + logger.info("JobEngine started"); + + } + + private void startupExecutors() { + for (IJobStepExecutor executor : jobEngineConfiguration.getExecutors().values()) + executor.startup(); + } + + @Override + public void storeJob(IJobRuntime runtime) { + submittedJobs.put(runtime.getId(), runtime); + logger.debug("+++ {} stored in VM ", runtime); + } + + private void submit(IJobRuntime runtime) { + runtime.setSubmitted(new Timestamp(System.currentTimeMillis())); + runtime.add(); + addToRunQueue(runtime); + submittedJobs.put(runtime.getId(), runtime); + logger.debug("+++ {} added to VM ", runtime); + } + + @Override + public IJobRuntime submit(IJobRuntime parent, IJobStatusChangedListener statusListener, String template, String name, int priority, String owner, + Map parameters) throws JobEngineException { + IJobRuntime result = null; + IProgram program = getProgram(template); + if (program != null) { + isRunnable(program); + result = new JobRuntime(this, program); + result.setPersister(itemManager); + result.setTemplate(template); + result.setName(name); + result.setParameters(parameters); + result.setPriority(priority); + result.setService(program.getTemplate().isService()); + if (parent != null) + result.setParentJobId(parent.getId()); + if (statusListener != null) + result.addEventListener(statusListener); + submit(result); + } else { + logger.error("Unknown template {}", template); + throw new JobEngineException("Unknown template " + template); + } + return result; + } + + @Override + public IJobRuntime submit(IJobRuntime parent, IJobStatusChangedListener statusListener, String template, String name, Map parameters) + throws JobEngineException { + IJobRuntime result = null; + result = submit(parent, statusListener, template, name, 0, DEFAULT_OWNER, parameters); + return result; + } + + @Override + public IJobRuntime submit(IJobRuntime parent, String template, String name, int priority, Map parameters) throws JobEngineException { + IJobRuntime result = null; + result = submit(parent, null, template, name, 0, DEFAULT_OWNER, parameters); + return result; + } + + IJobRuntime submit(IProgram program, IJobStatusChangedListener listener) { + IJobRuntime jobRuntime = new JobRuntime(this, program); + jobRuntime.addEventListener(listener); + addToRunQueue(jobRuntime); + return jobRuntime; + } + + @Override + public IJobRuntime submit(String template, String name, Map parameters) throws JobEngineException { + // Az ütemezett task-okat configból a Quartz futtatja + if (isAllExecutionDisabled) { + logger.info("JobEngine is disabled, can not submit job '{}'", name); + return null; + } + IJobRuntime result = null; + IProgram program = getProgram(template); + result = submit(null, null, template, name == null ? program.getTemplate().getName() : name, 0, DEFAULT_OWNER, parameters); + return result; + } + + @Override + public IJobRuntime submit(String template, String name, Map parameters, String owner) throws JobEngineException { + IJobRuntime result = null; + result = submit(null, null, template, name, 0, owner, parameters); + return result; + } + + @Override + public void suspendExecutingJob(Throwable t, IJobRuntime jobRuntime) { + String description = t.getClass().getSimpleName() + " : " + t.getMessage(); + jobRuntime.setDescription(description); + logger.error(description); + // TODO itt miert FINISH a kovetkezo allapot, miert nem SUSPEND + statusMachine.processAction(JobAction.FINISH, jobRuntime); + closeSessionLog(jobRuntime); + } + + @Override + public void suspendWaitExecutorJob(Throwable t, IJobRuntime jobRuntime) { + String description = t.getMessage(); + jobRuntime.setDescription(description); + logger.error(description); + statusMachine.processAction(JobAction.SUSPEND, jobRuntime); + closeSessionLog(jobRuntime); + } + + public synchronized void unbindItemManagerService(IItemManager service) { + try { + shutdown(); + } catch (Exception e) { + logger.error("Couldn't shutdown jobEngine", e); + } + } + +} diff --git a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngineConfiguration.java b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngineConfiguration.java index ccae13d4..7ff3e5d1 100644 --- a/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngineConfiguration.java +++ b/server/user.jobengine.osgi.server/src/user/jobengine/server/JobEngineConfiguration.java @@ -73,6 +73,14 @@ public class JobEngineConfiguration implements IJobEngineConfiguration { private GroovyClassLoader groovyClassLoader; private DynamicStepsLoader dynamicStepsLoader; + static JobEngineConfiguration currentInstance = null; + + static public JobEngineConfiguration getInstance() { + if (currentInstance == null) + currentInstance = new JobEngineConfiguration(); + return currentInstance; + } + public synchronized void bindSystemConfiguration(Object service) { if (service instanceof IConfiguration) { systemConfig = (IConfiguration) service; @@ -91,8 +99,7 @@ public class JobEngineConfiguration implements IJobEngineConfiguration { public IJobStep createJobStep(String stepUnitName) throws Exception { IJobStep result = null; - boolean isGroovyClass = stepUnitName.toLowerCase().endsWith(".java") - || stepUnitName.toLowerCase().endsWith(".groovy"); + boolean isGroovyClass = stepUnitName.toLowerCase().endsWith(".java") || stepUnitName.toLowerCase().endsWith(".groovy"); logger.info("Looking for {} step ClassLoader requirement", stepUnitName); if (resetStepClassLoader) { @@ -125,8 +132,7 @@ public class JobEngineConfiguration implements IJobEngineConfiguration { if (isGroovyClass) { if (dynamicStepsLoader != null) - stepClass = (Class) dynamicStepsLoader.loadClassFromSourceCode(groovyClassLoader, - stepUnitName); + stepClass = (Class) dynamicStepsLoader.loadClassFromSourceCode(groovyClassLoader, stepUnitName); } else stepClass = (Class) stepsClassLoader.loadClass(stepUnitName); @@ -278,8 +284,7 @@ public class JobEngineConfiguration implements IJobEngineConfiguration { int newMaxConcurrent = executor.getMaxConcurrent(); if (currentMaxConcurrent != newMaxConcurrent) { stepExecutor.setMaxConcurrent(newMaxConcurrent); - logger.info("Executor maxConcurrent changed from {} to {}", currentMaxConcurrent, - newMaxConcurrent); + logger.info("Executor maxConcurrent changed from {} to {}", currentMaxConcurrent, newMaxConcurrent); } } @@ -301,8 +306,16 @@ public class JobEngineConfiguration implements IJobEngineConfiguration { // a kulcs nem lehet a template, mert kulonbozo parameterekkel is benne lehet! schedules.clear(); - String configFilePath = systemConfig.getConfig(JobEngineConfiguration.CONF_SCHEDULES); - logger.info("Loading scheduler configuration file {}", configFilePath); + String configFilePath = null; + + try { + configFilePath = systemConfig.getConfig(JobEngineConfiguration.CONF_SCHEDULES); + logger.info("Loading scheduler configuration file {}", configFilePath); + } catch (Exception e) { + logger.warn(e.getMessage()); + return; + } + String jsonConfig = new String(Files.readAllBytes(Paths.get(configFilePath))); BasicDBObject dbo = (BasicDBObject) JSONUtil.jsonToDbObject(jsonConfig); BasicDBList scheduledJobs = NoSQLUtils.asDBList(dbo, "joblist"); @@ -413,16 +426,16 @@ public class JobEngineConfiguration implements IJobEngineConfiguration { BasicDBObject temp = schedules.get(i).getValue(); temp.remove("xml"); temp.remove("nextTime"); - if(temp.get("active") == null) { + if (temp.get("active") == null) { temp.remove("active"); } - if(temp.get("executeimmediate") == null) { + if (temp.get("executeimmediate") == null) { temp.remove("executeimmediate"); } jobList.add(temp); } dbo.put("joblist", jobList); - + return dbo; } @@ -457,14 +470,12 @@ public class JobEngineConfiguration implements IJobEngineConfiguration { if (!Files.isDirectory(templatesPath)) throw new FileNotFoundException(templatesPath + " is not a directory!"); if (templateToSave.getFileName() != null) { - filePath = templatesPath.toString() + FileSystems.getDefault().getSeparator() - + templateToSave.getFileName(); + filePath = templatesPath.toString() + FileSystems.getDefault().getSeparator() + templateToSave.getFileName(); if (Files.isWritable(Paths.get(filePath))) { logger.info("{} is writable", filePath); } else { logger.info("{} is not writable", filePath); - DosFileAttributeView dos = Files.getFileAttributeView(Paths.get(filePath), - DosFileAttributeView.class); + DosFileAttributeView dos = Files.getFileAttributeView(Paths.get(filePath), DosFileAttributeView.class); if (dos != null) { try { dos.setReadOnly(false); @@ -520,8 +531,7 @@ public class JobEngineConfiguration implements IJobEngineConfiguration { duplicateFileName = FilenameUtils.getName(duplicateFileName).replace(".xml", "").concat("-copy.xml"); filePath = templatesPath.toString() + FileSystems.getDefault().getSeparator() + duplicateFileName; } - InputStream is = new FileInputStream( - templatesPath.toString() + FileSystems.getDefault().getSeparator() + selectedJob.getString("template")); + InputStream is = new FileInputStream(templatesPath.toString() + FileSystems.getDefault().getSeparator() + selectedJob.getString("template")); Files.copy(is, Paths.get(filePath)); } catch (FileNotFoundException e) { logger.error("File not found: {}", filePath); @@ -529,7 +539,7 @@ public class JobEngineConfiguration implements IJobEngineConfiguration { logger.error("IOException: {}", e.getCause()); } return duplicateFileName; - + } @Override diff --git a/server/user.mediacube.gui/bin/user/jobengine/zk/util/SessionListener.class b/server/user.mediacube.gui/bin/user/jobengine/zk/util/SessionListener.class deleted file mode 100644 index bd5a88812f4443765dc89c151c03ef7ae00c6745..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1794 zcmb7^%~Bgj6otPljTj-x81olMYz)|zB?Jr!Bykvw4VYgGydcFQS&c-O7%b0FqlSV! zN?yQgZ=8@t$}2DO8hMY9n$ai_g#u?aHMeh{bMEQW{l{Ow{SM5sZITf3M-3HN2Yy9) zHP=(tPp_>b?bfXw)o8fBS9Tj(c`7h5gzf|9*sJK4pdc}bO`yddSW$Ha9mf~ z+SmHfTG#rp4MFIbcRg1x2+2ZmS1^|SJ(VFzHit=CxLtNVwSDxaqJr<8N?i#(Wxwjw zcb&iu|Kp;e_uYn2D1VxATj**uhglAkqt%{JD3q@ky>hHPZ5MYl^wMwACp`Fs4LJt* zOvu!fj)xb<3dI|^Y?C3O_rlwild3w@uI~wnh7um#0MHaG+}hDj_4Sr>7#r0ckNrY5 zbl{(=y)^fQ?15jY$ETWQlm{kb!tnLu<#@;=Ay-p+QR~319BC!wn>IERv7-amtJ%$I zFNef7dEA;Wiq>KfIHzH-GUO@d7^ftpU9AE~`++bJTh{E}i0gKiNi35op(pYx!fLJN znC45t@YIPG`dSE=vTB>m2*tM0gsD}5wXx|}miHa6ruJHi&+s+hntUVNzk1K6Us;Ln zOpYgfCuD0%Z+H#ucvU4lEEL-VdWoyK(avJ?)MQ>5xtb%?)goEonQ$*qZ~S9*AvZew zmS$ntVKiMcVo4}3Le{L1W*amHMNu=-s1`TWldl}oHJh!LsGM}|Bm zMS)I=^fS&7C8n6*2_=@8W|f&p{cL3A+CS>oV)ZGW@`4o-^fSpSYb2Q>!#W!nY{pJD n*Its0rY&KdGe(DrnDm;An&dH=g#L4KB%1YbF><6F9n#j{UdGHI -- 2.54.0