Bundle-Name: Server
Bundle-SymbolicName: user.jobengine.osgi.server;singleton:=true
Bundle-Version: 1.0.0
-Service-Component: OSGI-INF/component.xml, OSGI-INF/componentBinder.xml
+Service-Component: OSGI-INF/jobEngine.xml, OSGI-INF/jobEngineConfig.xml, OSGI-INF/componentBinder.xml
Import-Package: com.fasterxml.jackson.databind;version="2.4.5",
com.fasterxml.jackson.jaxrs.cfg;version="2.4.5",
com.fasterxml.jackson.jaxrs.json;version="2.4.5",
<scr:component xmlns:scr="http://www.osgi.org/xmlns/scr/v1.1.0" name="user.jobengine.server.ComponentBinder">\r
<implementation class="user.jobengine.osgi.server.ComponentBinder"/>\r
<reference cardinality="1..1" interface="user.commons.nexio.INexioAPI" name="INexioAPI" policy="static" bind="bindNexioService" unbind="unbindNexioService"/>\r
- <reference bind="bindSystemConfiguration" cardinality="1..1" interface="user.commons.configuration.IConfiguration" policy="static"/>\r
</scr:component>\r
<?xml version="1.0" encoding="UTF-8"?>\r
<scr:component xmlns:scr="http://www.osgi.org/xmlns/scr/v1.1.0" immediate="true" name="user.jobengine.server.JobEngine" activate="startup" deactivate="shutdown">\r
<implementation class="user.jobengine.server.JobEngine"/>\r
- <!-- \r
- <reference bind="bindService" cardinality="1..1" interface="user.jobengine.server.IJobEngine" name="IJobEngine" policy="static" unbind="unbindService"/>\r
- <reference bind="bindService" cardinality="1..1" interface="user.jobengine.db.IItemManager" name="IItemManager" policy="static" unbind="unbindService"/>\r
- --> \r
<reference bind="bindItemManagerService" cardinality="1..1" interface="user.jobengine.db.IItemManager" name="IItemManager" policy="static" unbind="unbindItemManagerService"/>\r
+ <reference bind="bindJobEngineConfiguration" cardinality="1..1" interface="user.commons.configuration.IJobEngineConfiguration" policy="static"/>\r
<service>\r
<provide interface="user.jobengine.server.IJobEngine"/>\r
</service>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>\r
+<scr:component xmlns:scr="http://www.osgi.org/xmlns/scr/v1.1.0" immediate="true" name="user.jobengine.server.JobEngineConfiguration">\r
+ <implementation class="user.jobengine.server.JobEngineConfiguration"/>\r
+ <reference bind="bindSystemConfiguration" cardinality="1..1" interface="user.commons.configuration.IConfiguration" policy="static"/>\r
+ <service>\r
+ <provide interface="user.jobengine.server.IJobEngineConfiguration"/>\r
+ </service>\r
+</scr:component>\r
import org.apache.logging.log4j.LogManager;\r
import org.apache.logging.log4j.Logger;\r
\r
-import user.commons.configuration.IConfiguration;\r
import user.commons.nexio.INexioAPI;\r
import user.mediacube.metadata.interfaces.IMetadataProviderFactory;\r
\r
private static INexioAPI nexioAPI;\r
private static IMetadataProviderFactory metadataProviderFactory;\r
\r
- private static IConfiguration systemConfig;\r
-\r
public static IMetadataProviderFactory getMetadataProviderFactory() {\r
return metadataProviderFactory;\r
}\r
return nexioAPI;\r
}\r
\r
- public static IConfiguration getSystemConfig() {\r
- return systemConfig;\r
- }\r
-\r
public synchronized void bindMetadataProviderFactory(Object service) {\r
if (service instanceof IMetadataProviderFactory) {\r
metadataProviderFactory = (IMetadataProviderFactory) service;\r
}\r
}\r
\r
- public synchronized void bindSystemConfiguration(Object service) {\r
- if (service instanceof IConfiguration) {\r
- systemConfig = (IConfiguration) service;\r
- logger.info("IConfiguration service binded");\r
- }\r
- }\r
-\r
public synchronized void unbindMetadataProviderFactory(Object service) {\r
metadataProviderFactory = null;\r
logger.info("IMetadataProviderFactory service unbinded");\r
IJobRuntime getJobById(long jobId);
- JobEngineConfiguration getJobEngineConfiguration();
+ IJobEngineConfiguration getJobEngineConfiguration();
Map<Long, IJobRuntime> getJobs();
--- /dev/null
+package user.jobengine.server;\r
+\r
+import java.util.List;\r
+import java.util.Map;\r
+\r
+import user.jobengine.server.ast.JobTemplate;\r
+import user.jobengine.server.scheduler.ScheduledJob;\r
+import user.jobengine.server.steps.IJobStep;\r
+\r
+public interface IJobEngineConfiguration {\r
+\r
+ IJobStep createJobStep(String stepUnitName) throws Exception;\r
+\r
+ Map<String, IJobStepExecutor> getExecutors();\r
+\r
+ Map<String, IProgram> getPrograms();\r
+\r
+ Map<String, ScheduledJob> getSchedules();\r
+\r
+ List<JobTemplate> getTemplates();\r
+\r
+ void load(IJobEngine jobEngine) throws Exception;\r
+\r
+ void loadExecutors(IJobEngine jobEngine);\r
+\r
+ void loadSchedules() throws Exception;\r
+\r
+}\r
import org.apache.logging.log4j.Logger;
import com.ibm.nosql.json.api.BasicDBList;
-import com.ibm.nosql.json.api.BasicDBObject;
import user.commons.Job;
import user.commons.JobStatus;
import user.commons.cluster.ClusteredJob;
-import user.commons.nosql.NoSQLUtils;
import user.jobengine.db.IItemManager;
import user.jobengine.db.ItemManagerData.SignalType;
-import user.jobengine.osgi.server.ComponentBinder;
import user.jobengine.server.actions.IStatusMachine;
import user.jobengine.server.actions.JobAction;
import user.jobengine.server.actions.StatusMachine;
}
private volatile boolean isRunning;
+
private volatile boolean isAllExecutionDisabled;
private volatile boolean isScheduledExecutionDisabled;
-
private final BlockingQueue<IJobRuntime> runQueue;
private final BlockingQueue<IJobMessage> messageQueue;
private final Map<Long, IJobRuntime> submittedJobs;
private VM vm;
private MessageDispatcher dispatcher;
private JobChangedListenerChecker jobChangedListenerChecker;
+
private IUserMessageQueues userMessageQueues;
+
private final CyclicBarrier startUpBarrier;
private final IStatusMachine statusMachine;
private AtomicLong nextJobId;
-
private SchedulerService schedulerService = null;
-
private List<IJobChangedListener> jobChangedListenerList = new CopyOnWriteArrayList<>();
private Map<String, LocalDate> remoteWorkers;
private String masterServerAddress = System.getProperty("jobengine.master.server", "");
private final JobEngineRemote remoteEngine;
+
private ConcurrentHashMap<IJobChangedListener, Long> keepAliveJobChangedListeners = new ConcurrentHashMap<>();
- private JobEngineConfiguration jobEngineConfiguration = new JobEngineConfiguration();
+
+ private IJobEngineConfiguration jobEngineConfiguration;
/**
* A MAM motor példányosítása. Ennek során létrejönnek a várakozási sorok, az ütemező szál, az üzenet kezelő szál.
*/
public JobEngine() {
- isScheduledExecutionDisabled = ComponentBinder.getSystemConfig().value("jobs.scheduled-execution-disabled", false);
+
+ // isScheduledExecutionDisabled = ComponentBinder.getSystemConfig().value("jobs.scheduled-execution-disabled", false);
+ isScheduledExecutionDisabled = true;
runQueue = new PriorityBlockingQueue<IJobRuntime>();
messageQueue = new LinkedBlockingQueue<IJobMessage>();
setItemManager(service);
}
+ public synchronized void bindJobEngineConfiguration(Object service) {
+ if (service instanceof JobEngineConfiguration) {
+ jobEngineConfiguration = (IJobEngineConfiguration) service;
+ logger.info("IJobEngineConfiguration service binded");
+ }
+ }
+
private void bootstrap() throws JobEngineException {
//submit("fake-noparams.xml", "Bootstrap", null);
}
}
@Override
- public JobEngineConfiguration getJobEngineConfiguration() {
+ public IJobEngineConfiguration getJobEngineConfiguration() {
return jobEngineConfiguration;
}
@Override
public ScheduledJob getScheduledJob(String template) {
- List<BasicDBObject> jobs = NoSQLUtils.asList(schedulerService.getScheduleJobs());
- BasicDBObject importJob = null;
- for (BasicDBObject job : jobs) {
- if (template.equals(job.getString("template"))) {
- importJob = job;
- break;
- }
- }
- if (importJob == null)
+ Map<String, ScheduledJob> schedules = jobEngineConfiguration.getSchedules();
+ ScheduledJob scheduledJob = schedules.get(template);
+ if (scheduledJob == null)
throw new NullPointerException("Missing template: " + template);
- return schedulerService.createScheduledJob(importJob);
+ return scheduledJob;
}
@Override
if (schedulerService != null)
schedulerService.shutdown();
- jobEngineConfiguration.loadTemplates();
- jobEngineConfiguration.loadExecutors(this);
+ jobEngineConfiguration.load(this);
schedulerService = new SchedulerService(this);
schedulerService.startup();
startUpBarrier.reset();
logger.info("JobEngine gracefully stopped");
- jobEngineConfiguration.loadTemplates();
- jobEngineConfiguration.loadExecutors(this);
+ jobEngineConfiguration.load(this);
vm.start();
try {
removeGarbage();
- jobEngineConfiguration.loadTemplates();
- jobEngineConfiguration.loadExecutors(this);
+ jobEngineConfiguration.load(this);
vm = new VM();
dispatcher = new MessageDispatcher();
logger.error("Couldn't shutdown jobEngine", e);
}
}
+
}
import java.util.function.Predicate;\r
import java.util.stream.Stream;\r
\r
+import org.apache.commons.lang.StringUtils;\r
import org.apache.logging.log4j.LogManager;\r
import org.apache.logging.log4j.Logger;\r
\r
+import com.ibm.nosql.json.JSONUtil;\r
+import com.ibm.nosql.json.api.BasicDBList;\r
+import com.ibm.nosql.json.api.BasicDBObject;\r
+\r
import groovy.lang.GroovyClassLoader;\r
import user.commons.configuration.IConfiguration;\r
-import user.jobengine.osgi.server.ComponentBinder;\r
+import user.commons.nosql.NoSQLUtils;\r
import user.jobengine.server.ast.Encoder;\r
import user.jobengine.server.ast.JobTemplate;\r
import user.jobengine.server.ast.Parser;\r
+import user.jobengine.server.scheduler.ScheduledJob;\r
import user.jobengine.server.steps.IJobStep;\r
\r
-public class JobEngineConfiguration {\r
+public class JobEngineConfiguration implements IJobEngineConfiguration {\r
private static final Logger logger = LogManager.getLogger();\r
public static final String CONF_MEDIACUBE = "configuration/mediacube.json";\r
public static final String CONF_MAESTRO = "configuration/maestro.json";\r
public static final String DIR_CLASSES = "jobs/classes";\r
public static final String DIR_TEMPLATES = "jobs/templates";\r
\r
+ private static IConfiguration systemConfig;\r
private final Map<String, IJobStepExecutor> executors = new LinkedHashMap<String, IJobStepExecutor>();\r
private final Map<String, IProgram> programs = new LinkedHashMap<String, IProgram>();\r
+ private final Map<String, ScheduledJob> schedules = new LinkedHashMap<String, ScheduledJob>();\r
private final List<JobTemplate> templates = Collections.synchronizedList(new ArrayList<JobTemplate>());\r
+\r
private URLClassLoader stepsClassLoader = null;\r
\r
+ public synchronized void bindSystemConfiguration(Object service) {\r
+ if (service instanceof IConfiguration) {\r
+ systemConfig = (IConfiguration) service;\r
+ logger.info("IConfiguration service binded");\r
+ }\r
+ }\r
+\r
@SuppressWarnings("unchecked")\r
+ @Override\r
public IJobStep createJobStep(String stepUnitName) throws Exception {\r
IJobStep result = null;\r
\r
boolean isGroovyClass = stepUnitName.toLowerCase().endsWith(".java") || stepUnitName.toLowerCase().endsWith(".groovy");\r
\r
if (stepsClassLoader == null) {\r
- String stepClassesDir = ComponentBinder.getSystemConfig().getConfig(DIR_CLASSES);\r
+ String stepClassesDir = systemConfig.getConfig(DIR_CLASSES);\r
Path path = Paths.get(stepClassesDir);\r
URL[] urls = { path.toUri().toURL() };\r
stepsClassLoader = URLClassLoader.newInstance(urls, getClass().getClassLoader());\r
\r
if (isGroovyClass) {\r
try (GroovyClassLoader gcl = new GroovyClassLoader(stepsClassLoader)) {\r
- String stepsDir = ComponentBinder.getSystemConfig().getConfig(DIR_STEPS);\r
+ String stepsDir = systemConfig.getConfig(DIR_STEPS);\r
stepClass = (Class<IJobStep>) parseClassHierarchy(gcl, stepsDir, stepUnitName);\r
} catch (Exception e) {\r
throw e;\r
return result;\r
}\r
\r
+ private ScheduledJob createScheduledJob(BasicDBObject jobJSON) {\r
+ ScheduledJob sj = new ScheduledJob();\r
+ String name = jobJSON.getString("name");\r
+ String template = jobJSON.getString("template");\r
+ sj.setJobId(String.format("%s.%s", template, name));\r
+ if (StringUtils.isEmpty(name))\r
+ sj.setJobName(template);\r
+ else\r
+ sj.setJobName(name);\r
+\r
+ sj.setTemplateName(template);\r
+ sj.setCronExpressions(NoSQLUtils.asString(jobJSON, "cronexpression"));\r
+ sj.setExecuteOnStartup(NoSQLUtils.asBool(jobJSON, "executeimmediate"));\r
+ sj.setActive(NoSQLUtils.asBool(jobJSON, "active"));\r
+\r
+ BasicDBList jobParams = (BasicDBList) jobJSON.get("parameters");\r
+ if (jobParams != null) {\r
+ for (int p = 0; p < jobParams.size(); p++) {\r
+ BasicDBObject jsParam = (BasicDBObject) jobParams.get(p);\r
+ String n = jsParam.getString("name");\r
+ Object v = jsParam.get("value");\r
+ if (v instanceof BasicDBList) {\r
+ ArrayList<?> list = (BasicDBList) v;\r
+ sj.setJobParameter(n, list);\r
+ } else {\r
+ if (v == null)\r
+ continue;\r
+\r
+ String sv = String.valueOf(v);\r
+\r
+ if ("null".equals(v))\r
+ continue;\r
+\r
+ Class<?> clazz = null;\r
+ try {\r
+ clazz = Class.forName(jsParam.getString("type"));\r
+ Object value = clazz.getConstructor(new Class[] { String.class }).newInstance(sv);\r
+ sj.setJobParameter(n, value);\r
+ } catch (Exception e) {\r
+ logger.error(e);\r
+ }\r
+ }\r
+\r
+ }\r
+ }\r
+ return sj;\r
+ }\r
+\r
+ @Override\r
public Map<String, IJobStepExecutor> getExecutors() {\r
return executors;\r
}\r
\r
+ @Override\r
public Map<String, IProgram> getPrograms() {\r
return programs;\r
}\r
\r
+ @Override\r
+ public Map<String, ScheduledJob> getSchedules() {\r
+ return schedules;\r
+ }\r
+\r
+ @Override\r
public List<JobTemplate> getTemplates() {\r
return templates;\r
}\r
\r
+ @Override\r
+ public void load(IJobEngine jobEngine) throws Exception {\r
+ loadTemplates();\r
+ loadExecutors(jobEngine);\r
+ loadSchedules();\r
+ }\r
+\r
+ @Override\r
public void loadExecutors(IJobEngine jobEngine) {\r
InputStream stream = null;\r
try {\r
- IConfiguration systemConfig = ComponentBinder.getSystemConfig();\r
String configFilePath = systemConfig.getConfig(CONF_EXECUTORS);\r
logger.info("Loading executors configuration {}", configFilePath);\r
stream = Files.newInputStream(Paths.get(configFilePath));\r
}\r
}\r
\r
+ @Override\r
+ public void loadSchedules() throws Exception {\r
+ getSchedules().clear();\r
+ String configFilePath = systemConfig.getConfig(JobEngineConfiguration.CONF_SCHEDULES);\r
+ logger.info("Loading scheduler configuration file {}", configFilePath);\r
+ String jsonConfig = new String(Files.readAllBytes(Paths.get(configFilePath)));\r
+ BasicDBObject dbo = (BasicDBObject) JSONUtil.jsonToDbObject(jsonConfig);\r
+ BasicDBList scheduleJobs = (BasicDBList) dbo.get("joblist");\r
+ for (int c = 0; c < scheduleJobs.size(); c++) {\r
+ BasicDBObject sjob = (BasicDBObject) scheduleJobs.get(c);\r
+ ScheduledJob sj = null;\r
+ try {\r
+ sj = createScheduledJob(sjob);\r
+ } catch (Exception e) {\r
+ logger.error("Error creating job: {}", sjob);\r
+ }\r
+ //skip inactive jobs\r
+ if (sj.isActive())\r
+ getSchedules().put(sj.getTemplateName(), sj);\r
+\r
+ }\r
+ }\r
+\r
private void loadTemplate(Path config) {\r
InputStream stream = null;\r
try {\r
public void loadTemplates() {\r
programs.clear();\r
try {\r
- IConfiguration systemConfig = ComponentBinder.getSystemConfig();\r
Path templatesPath = Paths.get(systemConfig.getConfig(DIR_TEMPLATES));\r
if (!templatesPath.toFile().isDirectory())\r
throw new FileNotFoundException(templatesPath + " is not directory!");\r
result = cl.parseClass(path.toFile());\r
return result;\r
}\r
-\r
}\r
package user.jobengine.server.scheduler;\r
\r
-import java.nio.file.Files;\r
-import java.nio.file.Paths;\r
-import java.util.ArrayList;\r
import java.util.Arrays;\r
import java.util.List;\r
+import java.util.Map;\r
import java.util.UUID;\r
\r
import org.apache.commons.lang.StringUtils;\r
import org.quartz.impl.StdSchedulerFactory;\r
import org.quartz.impl.matchers.KeyMatcher;\r
\r
-import com.ibm.nosql.json.JSONUtil;\r
import com.ibm.nosql.json.api.BasicDBList;\r
-import com.ibm.nosql.json.api.BasicDBObject;\r
\r
-import user.commons.configuration.IConfiguration;\r
-import user.commons.nosql.NoSQLUtils;\r
-import user.jobengine.osgi.server.ComponentBinder;\r
import user.jobengine.server.IJobEngine;\r
-import user.jobengine.server.JobEngineConfiguration;\r
+import user.jobengine.server.IJobEngineConfiguration;\r
\r
/**\r
* Jobok schedulalt inditasa.\r
private static final Logger logger = LogManager.getLogger();\r
private static final String CRON_EXPRESSIONS_DELIMITER = ";";\r
\r
- public static <T> T parseObjectFromString(String s, Class<T> clazz) throws Exception {\r
- return clazz.getConstructor(new Class[] { String.class }).newInstance(s);\r
- }\r
-\r
private Scheduler scheduler = null;\r
private IJobEngine jobEngine = null;\r
private BasicDBList scheduleJobs;\r
this.jobEngine = jobEngine;\r
}\r
\r
- public ScheduledJob createScheduledJob(BasicDBObject jobJSON) {\r
- ScheduledJob sj = new ScheduledJob();\r
- // sj.setJobId(String.valueOf(++schedulerJobId));\r
- String name = jobJSON.getString("name");\r
- String template = jobJSON.getString("template");\r
- sj.setJobId(String.format("%s.%s", template, name));\r
- if (StringUtils.isEmpty(name))\r
- sj.setJobName(template);\r
- else\r
- sj.setJobName(name);\r
-\r
- sj.setTemplateName(template);\r
- sj.setCronExpressions(NoSQLUtils.asString(jobJSON, "cronexpression"));\r
- sj.setExecuteOnStartup(NoSQLUtils.asBool(jobJSON, "executeimmediate"));\r
- sj.setActive(NoSQLUtils.asBool(jobJSON, "active"));\r
-\r
- BasicDBList jobParams = (BasicDBList) jobJSON.get("parameters");\r
- if (jobParams != null) {\r
- for (int p = 0; p < jobParams.size(); p++) {\r
- BasicDBObject jsParam = (BasicDBObject) jobParams.get(p);\r
- String n = jsParam.getString("name");\r
- Object v = jsParam.get("value");\r
- if (v instanceof BasicDBList) {\r
- ArrayList<?> list = (BasicDBList) v;\r
- sj.setJobParameter(n, list);\r
- } else {\r
- if (v == null)\r
- continue;\r
-\r
- String sv = String.valueOf(v);\r
-\r
- if ("null".equals(v))\r
- continue;\r
-\r
- Class<?> clazz = null;\r
- try {\r
- clazz = Class.forName(jsParam.getString("type"));\r
- Object value = parseObjectFromString(sv, clazz);\r
- sj.setJobParameter(n, value);\r
- } catch (Exception e) {\r
- logger.error(e);\r
- }\r
- }\r
-\r
- }\r
- }\r
-\r
- return sj;\r
- }\r
-\r
/**\r
* Regisztralt jobok azonnali futtatasa.\r
*\r
}\r
}\r
\r
- /**\r
- * Elore konfiguralt, idozitett (serice) jobok inditasa\r
- *\r
- * @throws Exception\r
- */\r
- private void loadStartupJobsFromConfig() throws Exception {\r
- IConfiguration systemConfig = ComponentBinder.getSystemConfig();\r
- String configFilePath = systemConfig.getConfig(JobEngineConfiguration.CONF_SCHEDULES);\r
- logger.info("Loading scheduler configuration file {}", configFilePath);\r
- String jsonConfig = new String(Files.readAllBytes(Paths.get(configFilePath)));\r
- BasicDBObject dbo = (BasicDBObject) JSONUtil.jsonToDbObject(jsonConfig);\r
- scheduleJobs = (BasicDBList) dbo.get("joblist");\r
- for (int c = 0; c < scheduleJobs.size(); c++) {\r
- BasicDBObject sjob = (BasicDBObject) scheduleJobs.get(c);\r
- ScheduledJob sj = null;\r
- try {\r
- sj = createScheduledJob(sjob);\r
- } catch (Exception e) {\r
- logger.error("Error creating job: {}", sjob);\r
- }\r
- //skip inactive jobs\r
- if (!sj.isActive())\r
- continue;\r
- schedule(sj);\r
- }\r
- }\r
-\r
public void register(ScheduledJob job) throws SchedulerException {\r
String jobID = job.getJobId();\r
JobDataMap jobDataMap = new JobDataMap();\r
public void startup() throws Exception {\r
scheduler = StdSchedulerFactory.getDefaultScheduler();\r
scheduler.start();\r
- loadStartupJobsFromConfig();\r
+ IJobEngineConfiguration jobEngineConfiguration = jobEngine.getJobEngineConfiguration();\r
+ Map<String, ScheduledJob> schedules = jobEngineConfiguration.getSchedules();\r
+ for (ScheduledJob job : schedules.values())\r
+ schedule(job);\r
}\r
\r
public void unregisterJob(ScheduledJob job) throws SchedulerException {\r
public void execute() {\r
if (selectedJob == null)\r
return;\r
- SchedulerService scheduler = jobEngine.getScheduler();\r
- ScheduledJob scheduledJob = scheduler.createScheduledJob(selectedJob);\r
+\r
+ ScheduledJob scheduledJob = jobEngine.getScheduledJob("");\r
scheduledJob.setJobEngine(jobEngine);\r
try {\r
scheduledJob.doManualJob();\r