+org.slf4j.simpleLogger.defaultLogLevel=debug\r
jobengine.jobsteps.root=../user.jobengine.executors/bin\r
jobengine.jobsteps.config=../user.jobengine.executors/config/config-worker.xml\r
jobengine.jobtemplates.root=../user.jobengine.executors/jobtemplates\r
+jobengine.jobsteps.groovy.root=../user.jobengine.executors/src/user/jobengine/server/steps\r
\r
jetty.home=../-configuration/jetty\r
jetty.etc.config.urls=etc/user-jetty.xml,etc/user-jetty-ssl.xml,etc/user-jetty-ssl-context.xml,etc/user-jetty-http.xml,etc/user-jetty-https.xml\r
\r
-log4j.configurationFile=../-configuration/log4j2.xml\r
+log4j.configurationFile=../-configuration/log4j2-test.xml\r
\r
jobengine.db.url=jdbc:db2://10.228.198.1:50000/mediaarc:retrieveMessagesFromServerOnGetMessage=true;\r
jobengine.db.user=db2admin\r
jobengine.nosql.db.user=db2admin\r
jobengine.nosql.db.password=password\r
\r
-jobengine.master.server=http://localhost:8888\r
+#jobengine.master.server=http://localhost:8888\r
javax.ws.rs.ext.RuntimeDelegate=org.jboss.resteasy.spi.ResteasyProviderFactory
\ No newline at end of file
package hu.user.mediacube.executors.tests;\r
\r
+import java.text.SimpleDateFormat;\r
+import java.time.Duration;\r
+import java.time.Instant;\r
+import java.util.Iterator;\r
import java.util.List;\r
\r
import org.apache.commons.io.FilenameUtils;\r
\r
import user.commons.RemoteFile;\r
import user.commons.StoreUri;\r
+import user.commons.nexio.api.Clip;\r
+import user.commons.nexio.api.Controller;\r
+import user.commons.nexio.api.Mediabase;\r
import user.commons.remotestore.FtpDirectoryLister;\r
import user.commons.remotestore.IDirectoryLister;\r
import user.commons.remotestore.RemoteStoreProtocol;\r
public class MediaBaseTest {\r
\r
@Test\r
- public void listMediaBase() throws Exception {\r
+ public void listMediaBaseFTP() throws Exception {\r
+ Instant start = Instant.now();\r
StoreUri nexioUri = new StoreUri();\r
nexioUri.setProtocol(RemoteStoreProtocol.FTP);\r
nexioUri.setUri("10.10.1.55");\r
nexioUri.setPortNumber(2098);\r
nexioUri.setUserName("ftp");\r
nexioUri.setPassword("ftp");\r
+ int i = 0;\r
try {\r
FTPClient ftp = ((FtpDirectoryLister) nexioUri.getLister()).connect();\r
IDirectoryLister lister = nexioUri.getLister();\r
List<RemoteFile> list = lister.list();\r
for (RemoteFile rf : list) {\r
+ if (i > 9)\r
+ break;\r
if (rf.getIsFolder())\r
continue;\r
String baseName = FilenameUtils.getBaseName(rf.getName());\r
} catch (Exception ie) {\r
System.err.println(ie.getMessage());\r
}\r
+ i++;\r
+\r
}\r
\r
} catch (Exception e) {\r
} finally {\r
nexioUri.cleanUp();\r
}\r
+ Instant end = Instant.now();\r
+ System.out.println(Duration.between(start, end));\r
\r
}\r
\r
+ @Test\r
+ public void listMediaBaseNEXIO() throws Exception {\r
+ Instant start = Instant.now();\r
+\r
+ Controller controller = new Controller("10.10.1.55");\r
+\r
+ controller.connect();\r
+ Mediabase mediabase = controller.getMediabase();\r
+ int i = 100;\r
+ try {\r
+ SimpleDateFormat df = new SimpleDateFormat("yyy-MM-dd HH:mm:ss");\r
+ Iterator<Clip> clips = mediabase.getClips();\r
+ while (clips.hasNext() && i > 0) {\r
+ Clip clip = clips.next();\r
+ System.out.println(clip.getId() + " " + clip.getXid().get() + " " + df.format(clip.getModifiedTimestamp().getTime()));\r
+ i--;\r
+ }\r
+ } catch (Exception e) {\r
+ System.err.println(e.getMessage());\r
+ } finally {\r
+ controller.disconnect();\r
+ }\r
+ Instant end = Instant.now();\r
+ System.out.println(Duration.between(start, end));\r
+\r
+ }\r
}\r
<?xml version="1.0" encoding="UTF-8"?>\r
<executors>\r
<executor className="user.jobengine.server.steps.CancelableStep" maxConcurrent="1" isRemote="true" />\r
+ <executor className="FakeStep.java" maxConcurrent="1" />\r
</executors>
\ No newline at end of file
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>\r
+<jobtemplate multiInstance="true" name="Fake">\r
+<declarations>\r
+ <parameters>\r
+ <parameter name="itemID" type="java.lang.Long"/>\r
+ </parameters>\r
+ <parameters>\r
+ <parameter name="iter" type="java.lang.Iterable"/>\r
+ </parameters>\r
+ <variables>\r
+ <variable name="resultID" type="java.lang.Long"/>\r
+ <variable name="resultID1" type="java.lang.Long"/>\r
+ </variables>\r
+</declarations>\r
+<commands>\r
+ <calljobstep type="FakeStep.java" weight="5" forEach="iter">\r
+ <inputs>\r
+ <input>\r
+ <parameter name="itemID" />\r
+ </input>\r
+ <input>\r
+ <parameter name="iter" />\r
+ </input>\r
+ </inputs>\r
+ <outputs>\r
+ <output>\r
+ <variable name="resultID" />\r
+ </output>\r
+ <output>\r
+ <variable name="resultID1" />\r
+ </output>\r
+ </outputs>\r
+ </calljobstep>\r
+ <calljobstep type="MergeStep.java" weight="1">\r
+ <inputs>\r
+ <input>\r
+ <variable name="resultID" />\r
+ </input>\r
+ <input>\r
+ <variable name="resultID1" />\r
+ </input>\r
+ </inputs>\r
+ </calljobstep>\r
+</commands>\r
+</jobtemplate>
\ No newline at end of file
<parameters>\r
<parameter name="itemID" type="java.lang.Long"/>\r
</parameters>\r
- <parameters>\r
- <parameter name="iter" type="java.lang.Iterable"/>\r
- </parameters>\r
<variables>\r
<variable name="resultID" type="java.lang.Long"/>\r
<variable name="resultID1" type="java.lang.Long"/>\r
</variables>\r
</declarations>\r
<commands>\r
- <calljobstep type="FakeStep.java" weight="5" forEach="iter">\r
+ <calljobstep type="FakeStep.java" weight="1">\r
<inputs>\r
<input>\r
<parameter name="itemID" />\r
</input>\r
- <input>\r
- <parameter name="iter" />\r
- </input>\r
</inputs>\r
<outputs>\r
<output>\r
</output>\r
</outputs>\r
</calljobstep>\r
- <calljobstep type="MergeStep.java" weight="1">\r
- <inputs>\r
- <input>\r
- <variable name="resultID" />\r
- </input>\r
- <input>\r
- <variable name="resultID1" />\r
- </input>\r
- </inputs>\r
- </calljobstep>\r
</commands>\r
</jobtemplate>
\ No newline at end of file
public Object[] execute() throws Exception {\r
try {\r
for (int i = 0; i < count; i++) {\r
- if (getJobRuntime().isWaitingCancel() || getJobRuntime().isWaitingSuspend())\r
+ if (getJobRuntime().isWaitingCancel())\r
break;\r
Thread.sleep(1000);\r
setProgress((i + 1) * count);\r
\r
@Override\r
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {\r
+\r
+ //A .-al kezdodo mappakat kihagyjuk\r
+ if (dir.getFileName().toString().startsWith("."))\r
+ return FileVisitResult.SKIP_SUBTREE;\r
+\r
return FileVisitResult.CONTINUE;\r
}\r
\r
\r
setProgress(currentCount[0] * 100 / allCount[0]);\r
\r
- if (STATUSFOLDER.equals(filePath.getParent().getFileName()))\r
- logger.info("Skipping {}", filePath);\r
-\r
logger.info("Checking {}", filePath);\r
List<Path> killDateFiles = getKillDateFiles(filePath);\r
if (killDateFiles == null || killDateFiles.size() == 0) {\r
--- /dev/null
+package user.jobengine.server.steps;\r
+\r
+import java.util.Arrays;\r
+\r
+import org.apache.logging.log4j.LogManager;\r
+import org.apache.logging.log4j.Logger;\r
+\r
+public class FakeSpawnStep extends JobStep {\r
+ private static final Logger logger = LogManager.getLogger();\r
+ private int count = 10;\r
+\r
+ @StepEntry\r
+ public Object[] execute(long itemID, Iterable<?> iter) throws Exception {\r
+\r
+ if (getJobRuntime().getSpawnOrder() == 0) {\r
+ count = 5;\r
+ }\r
+ logger.info(getMarker(), "Starting params: {}, {}, spawnOrder {}", itemID, iter, getJobRuntime().getSpawnOrder());\r
+\r
+ try {\r
+ int step = 100 / count;\r
+ for (int i = 0; i < count; i++) {\r
+ if (!canContinue())\r
+ break;\r
+ setProgress((i + 1) * step);\r
+ for (int j = 0; j < 100; j++) {\r
+ Thread.sleep(10);\r
+ }\r
+ logger.info("Progress {}", getJobRuntime().getProgress());\r
+ }\r
+\r
+ } catch (Exception e) {\r
+ logger.error(getMarker(), e.getMessage());\r
+ throw e;\r
+ }\r
+ Object[] result = Arrays.asList(10, 20).toArray();\r
+ logger.info("Returning {}, {}", result[0], result[1]);\r
+ return result;\r
+ }\r
+\r
+}\r
private int count = 10;\r
\r
@StepEntry\r
- public Object[] execute(long itemID, Iterable<?> iter) throws Exception {\r
+ public Object[] execute(long itemID) throws Exception {\r
\r
- if (getJobRuntime().getSpawnOrder() == 0) {\r
- count = 5;\r
- }\r
- logger.info(getMarker(), "Starting params: {}, {}, spawnOrder {}", itemID, iter, getJobRuntime().getSpawnOrder());\r
+ logger.info(getMarker(), "Starting params: {}");\r
\r
try {\r
int step = 100 / count;\r
if (!canContinue())\r
break;\r
setProgress((i + 1) * step);\r
- for (int j = 0; j < 100; j++) {\r
- Thread.sleep(10);\r
+ for (int j = 0; j < 10; j++) {\r
+ Thread.sleep(1);\r
}\r
- logger.info("Progress {}", getJobRuntime().getProgress());\r
+ //logger.info("{} Progress {}, p{}", getJobRuntime().getId(), getJobRuntime().getProgress(), getJobRuntime().getPriority());\r
}\r
\r
} catch (Exception e) {\r
throw e;\r
}\r
Object[] result = Arrays.asList(10, 20).toArray();\r
- logger.info("Returning {}, {}", result[0], result[1]);\r
+ //logger.info("Returning {}, {}", result[0], result[1]);\r
return result;\r
}\r
\r
if (targetLength > sourceLength) {\r
throw new Exception("Hiba! A fájl túl nagy lett.");\r
}\r
+\r
+ if (getJobRuntime().isWaitingCancel()) {\r
+ break;\r
+ }\r
}\r
\r
targetLength = targetFile.length();\r
controller = new Controller(storeUri.getRootPath(), storeUri.getPortNumber());\r
controller.connect();\r
Mediabase mediabase = controller.getMediabase();\r
- //SimpleDateFormat df = new SimpleDateFormat("yyyy.MM.dd HH:mm:ss");\r
Iterator<Clip> clips = mediabase.getClips();\r
int count = limit;\r
while (clips.hasNext()) {\r
+\r
+ if (getJobRuntime().isWaitingCancel())\r
+ break;\r
+\r
Clip clip = clips.next();\r
- //String id = clip.getId().get();\r
String title = clip.getXid().get();\r
Timestamp modified = Timestamp.from(clip.getModifiedTimestamp().toInstant());\r
Timestamp created = Timestamp.from(clip.getRecordDateTimestamp().toInstant());\r
\r
String getTemplate();\r
\r
- void IncrementPriority();\r
+ void incrementPriority();\r
\r
void NotifyUpdate();\r
\r
* 1. szempont prioritas 2. szempont azonos prioritasnal a rogzites datuma\r
*/\r
@Override\r
- public int compareTo(IJob job0) {\r
- int ret = (this.priority - job0.getPriority());\r
- if ((ret == 0) && (this.submitted != null) && (job0.getSubmitted() != null)) {\r
- ret = (int) (job0.getSubmitted().getTime() - this.submitted.getTime());\r
- }\r
- return ret;\r
+ public int compareTo(IJob job) {\r
+ int res = 0;\r
+ if (getPriority() == job.getPriority())\r
+ res = getSubmitted().getTime() < job.getSubmitted().getTime() ? -1 : 1;\r
+ else\r
+ res = (getPriority() > job.getPriority() ? -1 : 1);\r
+ return res;\r
}\r
\r
@Override\r
}\r
\r
@Override\r
- public void IncrementPriority() {\r
+ public void incrementPriority() {\r
priority++;\r
}\r
\r
\r
@Override\r
public void setPriority(int priority) {\r
- this.priority = priority;\r
+ if (this.priority != priority) {\r
+ this.priority = priority;\r
+ }\r
}\r
\r
@Override\r
--- /dev/null
+package user.commons.cluster;\r
+\r
+import java.sql.Timestamp;\r
+\r
+import user.commons.JobStatus;\r
+\r
+public class ClusteredJob {\r
+\r
+ private long id;\r
+ private String name;\r
+ private String description;\r
+ private String template;\r
+ private Object[] inputs;\r
+ private Timestamp submitted;\r
+ private JobStatus status;\r
+ private int progress;\r
+\r
+ public String getDescription() {\r
+ return description;\r
+ }\r
+\r
+ public long getId() {\r
+ return id;\r
+ }\r
+\r
+ public Object[] getInputs() {\r
+ return inputs;\r
+ }\r
+\r
+ public String getName() {\r
+ return name;\r
+ }\r
+\r
+ public int getProgress() {\r
+ return progress;\r
+ }\r
+\r
+ public JobStatus getStatus() {\r
+ return status;\r
+ }\r
+\r
+ public Timestamp getSubmitted() {\r
+ return submitted;\r
+ }\r
+\r
+ public String getTemplate() {\r
+ return template;\r
+ }\r
+\r
+ public void setDescription(String description) {\r
+ this.description = description;\r
+ }\r
+\r
+ public void setId(long id) {\r
+ this.id = id;\r
+ }\r
+\r
+ public void setInputs(Object[] inputs) {\r
+ this.inputs = inputs;\r
+ }\r
+\r
+ public void setName(String name) {\r
+ this.name = name;\r
+ }\r
+\r
+ public void setProgress(int progress) {\r
+ this.progress = progress;\r
+ }\r
+\r
+ public void setStatus(JobStatus status) {\r
+ this.status = status;\r
+ }\r
+\r
+ public void setSubmitted(Timestamp submitted) {\r
+ this.submitted = submitted;\r
+ }\r
+\r
+ public void setTemplate(String template) {\r
+ this.template = template;\r
+ }\r
+\r
+}\r
\r
if (decoder != null && decoder.getCodecType() == MediaDescriptor.Type.MEDIA_VIDEO) {\r
videoStreamId = i;\r
- frames = stream.getDuration();\r
+ frames = stream.getNumFrames();\r
break;\r
}\r
}\r
}\r
\r
public enum SignalType {\r
- CREATE(0), UPDATE(1), DELETE(2);\r
+ CREATE(0), UPDATE(1), DELETE(2), EXECUTE(3);\r
\r
private final long value;\r
\r
<classpathentry exported="true" kind="lib" path="WEB-INF/lib/zul.jar"/>\r
<classpathentry exported="true" kind="lib" path="WEB-INF/lib/zuti.jar"/>\r
<classpathentry exported="true" kind="lib" path="WEB-INF/lib/zweb.jar"/>\r
+ <classpathentry kind="lib" path="/-dependencies/target/repository/plugins/org.apache.logging.log4j.core_2.8.2.jar"/>\r
<classpathentry kind="output" path="bin"/>\r
</classpath>\r
-version=2.5.2\r
+version=2.6.0\r
footer=2016-2020 © Copyright User Rendszerház Kft.\r
\r
login_info=Információ\r
--- /dev/null
+package user.jobengine.server;\r
+\r
+import user.commons.cluster.ClusteredJob;\r
+\r
+public class ClusteredJobRuntime extends JobRuntime {\r
+\r
+ public ClusteredJobRuntime(ClusteredJob job, IJobEngine jobEngine, IJobStatusChangedListener listener) {\r
+ super(job, jobEngine, listener);\r
+ }\r
+}\r
import java.util.Map;
import user.commons.Job;
+import user.commons.cluster.ClusteredJob;
import user.jobengine.db.IItemManager;
import user.jobengine.server.messagequeue.IUserMessageQueues;
import user.jobengine.server.messages.IJobMessage;
import user.jobengine.server.scheduler.SchedulerService;
public interface IJobEngine {
+ static final int QUEUE_POLL_INTERVAL_MS = 1;
void addJobChangedEventListener(IJobChangedListener listener);
void addToRunQueue(IJobRuntime jobRuntime);
+ void applyPriorityChange(IJobRuntime jobRuntime);
+
void bindItemManagerService(IItemManager service);
boolean deleteProgram(String fileName);
void executeSendMessageToUserInstruction(IJobRuntime jobRuntime);
+ void executeWaitForSpawnJobsInstruction(IJobRuntime jobRuntime);
+
void fireJobChangedEvent(JobChangedEvent event);
Map<String, IJobStepExecutor> getExecutors();
IJobRuntime getJobById(long jobId);
- IJobRuntime getJobForRemote(String className) throws Exception;
-
Map<Long, IJobRuntime> getJobs();
IProgram getProgram(String name);
Map<String, IProgram> getPrograms();
- IJobRuntime getRemoteJob(String className);
+ JobEngineRemote getRemoteEngine();
ScheduledJob getScheduledJob(String template);
void removeJobChangedEventListener(IJobChangedListener listener);
+ void removeSpanwChild(IJobRuntime jobRuntime);
+
void removeSuspended();
- void rePrioritization(IJobRuntime jobRuntime);
+ ClusteredJob requestJob(String className) throws Exception;
void sendMessage(IJobMessage jobMessage);
void startup();
+ void storeJob(IJobRuntime runtime);
+
IJobRuntime submit(IJobRuntime parent, IJobStatusChangedListener statusListener, String template, String name, Map<String, Object> parameters)
throws JobEngineException;
import user.commons.IEntityPersister;\r
import user.commons.IJob;\r
import user.commons.JobStatus;\r
+import user.jobengine.server.instructions.CallJobStepInstruction;\r
import user.jobengine.server.instructions.IInstruction;\r
\r
public interface IJobRuntime extends IJob {\r
+\r
void addChild(JobRuntime c);\r
\r
void addEventListener(IJobStatusChangedListener listener);\r
\r
+ void addSpawnChild(IJobRuntime runtime);\r
+\r
void addVariable(String name, Class<?> type);\r
\r
void arrangeStack();\r
\r
void cancelForkPrepare() throws InterruptedException;\r
\r
+ boolean canContinueExecution();\r
+\r
void checkStackParameter() throws RuntimeException, IllegalArgumentException;\r
\r
+ IJobRuntime createCopy();\r
+\r
void decrementInstructionPointer();\r
\r
public void done();\r
\r
void forkWaitComplete() throws InterruptedException;\r
\r
+ CallJobStepInstruction getCurrentCallJobStepInstruction();\r
+\r
IInstruction getCurrentInstruction();\r
\r
+ String getCurrentStep();\r
+\r
Marker getFinishMarker();\r
\r
int getIp();\r
\r
+ IJobEngine getJobEngine();\r
+\r
Marker getMarker();\r
\r
IInstruction getNextInstruction() throws NoSuchElementException;\r
\r
JobStatus getSavedStatus();\r
\r
+ int getSpawnOrder();\r
+\r
Stack<Object> getStack();\r
\r
Object getVariable(String name);\r
\r
Map<String, Object> getVariables();\r
\r
+ int getWeight();\r
+\r
boolean hasNextInstruction();\r
\r
void incrementProgress(int progress);\r
\r
boolean isService();\r
\r
+ boolean isWaitFinish();\r
+\r
boolean isWaitingCancel();\r
\r
boolean isWaitingExecutor();\r
\r
void removeEventListener(IJobStatusChangedListener listener);\r
\r
+ void removeSpanwChild(long id);\r
+\r
void reset();\r
\r
void restoreStack();\r
\r
void saveStatus();\r
\r
+ void setCurrentStep(String currentStep);\r
+\r
@Override\r
void setParameters(Map<String, Object> parameters);\r
\r
void setService(boolean isService);\r
\r
+ void setSpawnOrder(int spawnOrder);\r
+\r
void setVariable(String name, Object value);\r
\r
void swapStack();\r
import java.util.concurrent.PriorityBlockingQueue;
+import user.commons.cluster.ClusteredJob;
import user.jobengine.server.steps.IJobStep;
/**
- * Folyamat l�p�s v�grehajt� interface.
+ * Folyamat lepes vegrehajto interface.
*/
public interface IJobStepExecutor {
static final String PROCESSING_LOCALLY = "Processing locally";
PriorityBlockingQueue<IJobRuntime> getQueue();
/**
- * V�grehajt� l�p�s implement�ci�j�nak lek�rdez�se.
+ * Vegrehajte lepes implementaciojanak lekerdezese.
*
- * @return L�p�s implement�ci�.
+ * @return Lepes implementacio.
*/
Class<IJobStep> getStepClass();
+ String getStepUnitName();
+
boolean isRemoteEnabled();
void revoke(IJobRuntime jobRuntime);
*/
void startup(IJobEngine jobEngine) throws Exception;
- IJobRuntime steelJob() throws InterruptedException;
+ ClusteredJob steelJob() throws InterruptedException;
/**
* Folyamat elhelyez�se a v�grehajt� v�rakoz�si sor�ba.
* @param job
* Folyamat.
*/
- void submit(IJobRuntime job);
+ void submit(IJobRuntime... job);
void waitShutdown();
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import javax.ws.rs.core.Response;
-
+import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.jboss.resteasy.client.jaxrs.ResteasyClient;
-import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder;
-import org.jboss.resteasy.client.jaxrs.ResteasyWebTarget;
+import com.ibm.nosql.json.api.BasicDBList;
import com.ibm.nosql.json.api.BasicDBObject;
import user.commons.Job;
import user.commons.JobStatus;
import user.commons.RemoteFile;
import user.commons.StoreUri;
+import user.commons.cluster.ClusteredJob;
import user.commons.nosql.NoSQLUtils;
import user.commons.remotestore.DirectoryUtils;
import user.commons.remotestore.RemoteStoreProtocol;
import user.jobengine.server.ast.Encoder;
import user.jobengine.server.ast.JobTemplate;
import user.jobengine.server.ast.Parser;
+import user.jobengine.server.instructions.CallJobStepInstruction;
import user.jobengine.server.instructions.IInstruction;
import user.jobengine.server.messagequeue.IUserMessage;
import user.jobengine.server.messagequeue.IUserMessageQueues;
import user.jobengine.server.messages.UserReplyMessage;
import user.jobengine.server.scheduler.ScheduledJob;
import user.jobengine.server.scheduler.SchedulerService;
-import user.jobengine.server.steps.IJobStep;
import user.tsm.client.TSMClient;
import user.tsm.client.TSMException;
while (!shutdown) {
try {
- IJobMessage message = messageQueue.poll(50, TimeUnit.MILLISECONDS);
- if (message != null) {
+ IJobMessage message = messageQueue.poll(QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ if (message != null)
message.process(JobEngine.this);
- }
+
} catch (InterruptedException e) {
shutdown = true;
}
}
+ //a leallitas utan az osszes fuggo uzenet vegrehajtasa
while (!messageQueue.isEmpty()) {
try {
- IJobMessage message = messageQueue.poll(50, TimeUnit.MILLISECONDS);
- if (message != null) {
+ IJobMessage message = messageQueue.poll(QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ if (message != null)
message.process(JobEngine.this);
- }
} catch (InterruptedException e) {
shutdown = true;
}
while (!shutdown) {
try {
- IJobRuntime jobRuntime = runQueue.poll(10, TimeUnit.MILLISECONDS);
+ Thread.sleep(QUEUE_POLL_INTERVAL_MS);
+ //IJobRuntime jobRuntime = runQueue.poll(QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ IJobRuntime jobRuntime = runQueue.poll();
if (jobRuntime != null) {
- while (jobRuntime.hasNextInstruction() && jobRuntime.isRunable()) {
+ logger.debug("Processing {}", jobRuntime.getId());
+ //varakozo esetben vegrehajtjuk a kovetkezo utasitast
+ if (jobRuntime.hasNextInstruction() && jobRuntime.isWaitFinish()) {
ir = jobRuntime.getNextInstruction();
ir.execute(JobEngine.this, jobRuntime);
+ } else {
+ //normal esetben elfutunk a kovetkezo job step-ig, vagy vegig
+ while (jobRuntime.hasNextInstruction() && jobRuntime.isRunable()) {
+ ir = jobRuntime.getNextInstruction();
+ ir.execute(JobEngine.this, jobRuntime);
+ }
}
+
if (!jobRuntime.hasNextInstruction() && jobRuntime.isRunable())
jobCleanup(jobRuntime);
+
}
} catch (Exception e) {
logger.error("Critical VM error!", e);
private List<IJobChangedListener> jobChangedListenerList = new CopyOnWriteArrayList<>();
private Map<String, LocalDate> remoteWorkers;
private String masterServerAddress = System.getProperty("jobengine.master.server", "");
+ private final JobEngineRemote remoteEngine;
/**
* A MAM motor példányosítása. Ennek során létrejönnek a várakozási sorok, az ütemező szál, az üzenet kezelő szál.
remoteWorkers = new ConcurrentHashMap<>();
//logger.info("JobEngine created");
+ if (isWorker())
+ remoteEngine = createRemoteEngine();
+ else
+ remoteEngine = null;
}
public void activate() {
@Override
public void addStepExecutor(IJobStepExecutor executor) {
- Class<IJobStep> stepClass = executor.getStepClass();
- String canonicalName = stepClass.getCanonicalName();
- if (!executors.containsKey(canonicalName)) {
- executors.put(canonicalName, executor);
- logger.debug("Executor registered: " + stepClass);
+ //Class<IJobStep> stepClass = executor.getStepClass();
+ String unitName = executor.getStepUnitName();
+ if (!executors.containsKey(unitName)) {
+ logger.info("Executor registered {}", unitName);
+ executors.put(unitName, executor);
} else
- logger.debug("Executor already registered: " + stepClass);
+ logger.debug("Executor already registered {}", unitName);
}
if (typeName == null)
throw new Exception(jobRuntime.toString() + " illegal execution state detected: executor name is null.");
String executorName = String.valueOf(typeName);
- if (!executors.containsKey(executorName)) {
+ if (!executors.containsKey(executorName))
throw new Exception(jobRuntime.toString() + " executor is unavailable: " + executorName);
- }
- executors.get(executorName).submit(jobRuntime);
+
+ //a lista mindig letezik, de sima futtataskor csak 1 elemu, az aktualis van benne
+ //ha van parhuzamos futtatas, akkor letrejonnek benne a step szellemek is, es meghivodik a fork
+ List<IJobRuntime> jobs = spawnJobs(jobRuntime, executorName);
+ executors.get(executorName).submit(jobs.toArray(new IJobRuntime[] {}));
+ jobs.forEach(r -> fireJobChangedEvent(new JobChangedEvent(r, SignalType.EXECUTE)));
+
} catch (Exception e) {
logger.catching(e);
suspendWaitExecutorJob(e, jobRuntime);
}
}
+ /**
+ * Job prioritasa megvaltozott. Job main queue-ban es a JobStepExecutorok soraiban is ujra kell rendezni a jobokat. Az ujrarendezes a put-nal is megy
+ * automatikusan, mivel eltero prioritasu jobot dobhatnak be vagy default prioritasut a korabban atrendezett listaba.
+ *
+ * @param jobRuntime
+ */
+ @Override
+ public void applyPriorityChange(IJobRuntime jobRuntime) {
+ logger.info("rePrioritization start for {}", jobRuntime.getId());
+ // synchronized(this.runQueue){
+
+ //job main queue reorder
+
+ if (this.runQueue.contains(jobRuntime)) {
+ logger.info("runQueue");
+ this.runQueue.remove(jobRuntime);
+ try {
+ this.runQueue.put(jobRuntime);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ //JobStepExecutor reorder
+ if (this.executors != null) {
+ for (IJobStepExecutor exec : executors.values()) {
+ if (exec.containsRuntime(jobRuntime)) {
+ logger.info("executor");
+ exec.changePriority(jobRuntime);
+ }
+ }
+ }
+
+ // } logger.info("rePrioritization end");
+ }
+
@Override
public synchronized void bindItemManagerService(IItemManager service) {
setItemManager(service);
return new ConcurrentHashMap<Long, IJobRuntime>();
}
+ protected JobEngineRemote createRemoteEngine() {
+ return new JobEngineRemote(masterServerAddress);
+ }
+
protected IStatusMachine createStatusMachine() {
return new StatusMachine(this);
}
public void executeAssignVariableInstruction(IJobRuntime jobRuntime) {
Object value = jobRuntime.popFromStack();
String name = (String) jobRuntime.popFromStack();
+
+ //a gyerek job-ok visszateresi erteket a spawn sorszammal kiegeszitve mentjuk
+ //TODO el kell tudni erni a kovetkezo lepesekbol ezeket az ertekeket
+ if (jobRuntime.getParentJobId() > 0) {
+ IJobRuntime parentRuntime = getJobById(jobRuntime.getParentJobId());
+ parentRuntime.setVariable(name + jobRuntime.getSpawnOrder(), value);
+ }
+
jobRuntime.setVariable(name, value);
}
+ /***
+ * Fuggetlen (beagyazott) alfolyamat letrehozasa
+ */
@Override
public void executeCallConcurrentJobStepInstruction(IJobRuntime jobRuntime, IProgram subProgram) {
JobRuntime c = new JobRuntime(this, jobRuntime, subProgram);
getUserMessageQueues().addMessage(jobRuntime, catalogName, messageNumber, true, inputs);
}
+ @Override
+ public void executeWaitForSpawnJobsInstruction(IJobRuntime jobRuntime) {
+ //logger.info("Processing {} {}", jobRuntime.getId(), jobRuntime.canContinueExecution());
+
+ if (jobRuntime.canContinueExecution()) {
+ jobRuntime.setStatus(JobStatus.RUNABLE);
+ } else {
+ if (!JobStatus.WAIT_FINISH.equals(jobRuntime.getStatus())) {
+ jobRuntime.setStatus(JobStatus.WAIT_FINISH);
+ }
+ jobRuntime.decrementInstructionPointer();
+ }
+ addToRunQueue(jobRuntime);
+ }
+
@Override
public void fireJobChangedEvent(JobChangedEvent event) {
for (IJobChangedListener listener : jobChangedListenerList) {
return submittedJobs.get(jobId);
}
- @Override
- public IJobRuntime getJobForRemote(String className) throws Exception {
- if (!executors.containsKey(className))
- throw new Exception("Unregistered executor request: " + className);
-
- IJobStepExecutor executor = executors.get(className);
- if (!executor.isRemoteEnabled())
- throw new Exception("Job is not registered for remote workers: " + className);
- return executor.steelJob();
- }
-
@Override
public Map<Long, IJobRuntime> getJobs() {
return submittedJobs;
}
@Override
- public IJobRuntime getRemoteJob(String className) {
- IJobRuntime result = null;
- ResteasyClient client = new ResteasyClientBuilder().build();
- ResteasyWebTarget target = client.target(masterServerAddress).path("/services/rest/cluster/getjob").queryParam("className", className);
- Response response = null;
- try {
- response = target.request().get();
- if (response.getEntity() instanceof IJobRuntime)
- result = (IJobRuntime) response.getEntity();
- } catch (Exception e) {
- logger.error(e.getMessage());
- } finally {
- logger.debug("Response status: {}", response.getStatus());
- if (response != null)
- response.close();
- }
- return result;
+ public JobEngineRemote getRemoteEngine() {
+ return remoteEngine;
}
@Override
}
private void jobCleanup(IJobRuntime jobRuntime) {
+ logger.info("Cleanup {}", jobRuntime.getId());
statusMachine.processAction(JobAction.FINISH, jobRuntime);
}
+ @Deprecated
@Override
public void keepAliveWorker(String remoteAddr) {
remoteWorkers.put(remoteAddr, LocalDate.now());
throw new Exception("File not exists: " + filePath);
logger.info("Loading template: " + filePath);
stream = new FileInputStream(filePath);
+
Parser parser = new Parser(stream);
- Encoder encoder = new Encoder();
JobTemplate template = parser.parse();
template.validate();
template.setFileName(fileName);
+ Encoder encoder = new Encoder();
IProgram program = (IProgram) encoder.visitJobTemplate(template, null);
if (programs.containsKey(fileName))
try {
String filePath = templateRoot + name;
logger.info("Loading template: " + name);
+ // System.out.println(name);
stream = new FileInputStream(filePath);
Parser parser = new Parser(stream);
Encoder encoder = new Encoder();
public void processJobStepCompletedMessage(IJobMessage message) {
// TODO cancel nem megy, valszeg itt van gubasz
IJobRuntime jobRuntime = getJobById(message.getJobId());
+
+ if (jobRuntime.getParentJobId() > 0)
+ removeSpanwChild(jobRuntime);
+
+ JobStepCompletedMessage m = (JobStepCompletedMessage) message;
+ //kesz vagyunk, jelezni
+ if (isWorker()) {
+ statusMachine.processAction(JobAction.DONE, jobRuntime);
+ return;
+ }
+
+ //a cancel hamarabb megjott?
+ //ha remote akkot tuti
if (jobRuntime == null) {
- //a cancel hamarabb megjott?
+
}
- JobStepCompletedMessage m = (JobStepCompletedMessage) message;
- putOutputsToStack(jobRuntime, m.getOutputs());
+ Object[] outputs = m.getOutputs();
+ putOutputsToStack(jobRuntime, outputs);
+
statusMachine.processAction(JobAction.DONE, jobRuntime);
}
}
}
+ @Override
+ public void removeSpanwChild(IJobRuntime jobRuntime) {
+ IJobRuntime parent = getJobById(jobRuntime.getParentJobId());
+ if (parent == null)
+ return;
+
+ parent.removeSpanwChild(jobRuntime.getId());
+
+ }
+
@Override
public void removeSuspended() {
List<Long> removeId = new ArrayList<>();
submittedJobs.remove(id);
}
- /**
- * Job prioritasa megvaltozott. Job main queue-ban es a JobStepExecutorok soraiban is ujra kell rendezni a jobokat. Az ujrarendezes a put-nal is megy
- * automatikusan, mivel eltero prioritasu jobot dobhatnak be vagy default prioritasut a korabban atrendezett listaba.
- *
- * @param jobRuntime
- */
@Override
- public void rePrioritization(IJobRuntime jobRuntime) {
- // synchronized(this.runQueue){
-
- //1. JobStepExecutor reorder
- if (this.executors != null) {
- for (IJobStepExecutor exec : executors.values()) {
- if (exec.containsRuntime(jobRuntime)) {
- exec.changePriority(jobRuntime);
- }
- }
- }
+ public ClusteredJob requestJob(String className) throws Exception {
+ if (!executors.containsKey(className))
+ throw new Exception("Unregistered executor request: " + className);
- //2. job main queue reorder
- if (this.runQueue.contains(jobRuntime)) {
- this.runQueue.remove(jobRuntime);
- try {
- this.runQueue.put(jobRuntime);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
+ IJobStepExecutor executor = executors.get(className);
+ if (!executor.isRemoteEnabled())
+ throw new Exception("Job is not registered for remote workers: " + className);
- // }
+ ClusteredJob job = executor.steelJob();
+ return job;
}
@Override
if (executors == null)
return;
for (IJobStepExecutor executor : executors.values()) {
- logger.trace("Notify executor {}", executor.getStepClass());
+ logger.trace("Notify executor {}", executor.getStepUnitName());
executor.shutdown();
}
for (IJobStepExecutor executor : executors.values()) {
- logger.info("Stopping executor {}", executor.getStepClass());
+ logger.info("Stopping executor {}", executor.getStepUnitName());
executor.waitShutdown();
}
}
+ private List<IJobRuntime> spawnJobs(IJobRuntime jobRuntime, String executorName) throws InterruptedException {
+ List<IJobRuntime> result = new ArrayList<>();
+
+ CallJobStepInstruction currentInstruction = jobRuntime.getCurrentCallJobStepInstruction();
+ if (currentInstruction != null) {
+ String forEach = currentInstruction.getForEach();
+ if (StringUtils.isNotBlank(forEach)) {
+ Object parameter = jobRuntime.getParameter(forEach);
+ if (parameter == null)
+ parameter = jobRuntime.getVariable(forEach);
+
+ //a sima array helyett ezt jobb hasznalni
+ if (parameter != null && parameter instanceof BasicDBList) {
+
+ BasicDBList iter = (BasicDBList) parameter;
+ for (int i = 1; i < iter.size(); i++) {
+ IJobRuntime jobRuntimeCopy = new JobRuntime(jobRuntime);
+ jobRuntimeCopy.setSpawnOrder(i);
+ jobRuntimeCopy.add();
+
+ jobRuntime.addSpawnChild(jobRuntimeCopy);
+
+ storeJob(jobRuntimeCopy);
+ result.add(jobRuntimeCopy);
+ }
+ }
+ }
+
+ }
+
+ result.add(jobRuntime);
+ return result;
+ }
+
@Override
public void startup() {
try {
}
+ @Override
+ public void storeJob(IJobRuntime runtime) {
+ submittedJobs.put(runtime.getId(), runtime);
+ logger.debug("+++ {} stored in VM ", runtime);
+ }
+
private void submit(IJobRuntime runtime) {
runtime.setSubmitted(new Timestamp(System.currentTimeMillis()));
runtime.add();
--- /dev/null
+package user.jobengine.server;\r
+\r
+import javax.ws.rs.client.Client;\r
+import javax.ws.rs.client.Entity;\r
+import javax.ws.rs.client.WebTarget;\r
+import javax.ws.rs.core.MediaType;\r
+import javax.ws.rs.core.Response;\r
+import javax.ws.rs.core.Response.Status;\r
+\r
+import org.apache.logging.log4j.LogManager;\r
+import org.apache.logging.log4j.Logger;\r
+import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder;\r
+\r
+import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider;\r
+\r
+import user.commons.cluster.ClusteredJob;\r
+import user.commons.rest.ServiceObjectMapper;\r
+\r
+public class JobEngineRemote {\r
+ private static final Logger logger = LogManager.getLogger();\r
+ private final Client client;\r
+ private final WebTarget root;\r
+\r
+ public JobEngineRemote(String masterServerAddress) {\r
+ //config\r
+ //https://www.programcreek.com/java-api-examples/?class=org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder&method=register\r
+ //trace\r
+ //https://docs.jboss.org/resteasy/docs/4.0.0.Final/userguide/html/Tracing_Feature.html\r
+\r
+ JacksonJaxbJsonProvider jaxbProvider = new JacksonJaxbJsonProvider(ServiceObjectMapper.getMapper(), JacksonJaxbJsonProvider.DEFAULT_ANNOTATIONS);\r
+ ResteasyClientBuilder builder = new ResteasyClientBuilder();\r
+ builder.register(jaxbProvider);\r
+ client = builder.build();\r
+ root = client.target(masterServerAddress);\r
+ }\r
+\r
+ public ClusteredJob getRemoteJob(String className) {\r
+ ClusteredJob result = null;\r
+ WebTarget target = root.path("/services/rest/cluster/getjob").queryParam("className", className);\r
+ Response response = null;\r
+ try {\r
+ response = target.request().get();\r
+ if (Status.OK.getStatusCode() != response.getStatus() && response.getEntity() instanceof ClusteredJob)\r
+ result = (ClusteredJob) response.getEntity();\r
+ } catch (Exception e) {\r
+ logger.error(e.getMessage());\r
+ } finally {\r
+ logger.debug("Response status: {}", response.getStatus());\r
+ if (response != null)\r
+ response.close();\r
+ }\r
+ return result;\r
+ }\r
+\r
+ public void reportJobStatus(ClusteredJob job) {\r
+ WebTarget target = root.path("/services/rest/cluster/notifyjob");\r
+ Response response = null;\r
+ try {\r
+ //response = target.request().post(Entity.entity(mapper.writeValueAsString(job), MediaType.APPLICATION_JSON));\r
+ response = target.request().post(Entity.entity(job, MediaType.APPLICATION_JSON));\r
+ if (Status.OK.getStatusCode() != response.getStatus())\r
+ throw new Exception("Unexpected reponse occured");\r
+ } catch (Exception e) {\r
+ logger.error(e.getMessage());\r
+ } finally {\r
+ logger.debug("Response status: {}", response.getStatus());\r
+ if (response != null)\r
+ response.close();\r
+ }\r
+ }\r
+\r
+}\r
import user.commons.JobStatus;
import user.commons.MediaCubeFinishMarker;
import user.commons.MediaCubeMarker;
+import user.commons.cluster.ClusteredJob;
import user.jobengine.db.ItemManagerData.SignalType;
import user.jobengine.db.JobParameters;
import user.jobengine.server.instructions.CallJobStepInstruction;
private Stack<Object> stack = null;
private Stack<Object> savedStack = null;
private int ip;
- private final EventListenerList listeners;
+ private final EventListenerList listeners = new EventListenerList();
private final Map<String, Object> variables;
private Map<String, Object> parameters;
private JobStatus savedStatus;
private final IJobEngine jobEngine;
private double currentProgress;
- private int runtimeWeight;
- private List<JobRuntime> children;
+ private int weight;
private MediaCubeMarker sessionMarker;
private MediaCubeMarker finishMarker;
private boolean isService;
private Semaphore forkSempahore;
- private IJobChangedListener jobChangedListener;
+ private IJobChangedListener spawnJobListener;
+ private String currentStep;
+ //Fuggetlen (beagyazott) alfolyamatok
+ private List<JobRuntime> children;
+ //Parhuzamosan futtatot komplett job-ok, amiket bevarunk, ha a megfelelo step hivja + a forEach parhuzamositas
private List<Long> childrenIDs;
+ private int spawnOrder;
public JobRuntime() {
- this.listeners = new EventListenerList();
variables = null;
jobEngine = null;
program = null;
status = JobStatus.RUNABLE;
}
+ public JobRuntime(ClusteredJob job, IJobEngine jobEngine, IJobStatusChangedListener listener) {
+ this.jobEngine = jobEngine;
+ this.persister = jobEngine.getItemManager();
+ variables = null;
+ program = null;
+ name = job.getName();
+ description = job.getDescription();
+ submitted = job.getSubmitted();
+ template = job.getTemplate();
+ status = JobStatus.RUNABLE;
+ //CREATE notifikacio miatt
+ setId(job.getId());
+ addEventListener(listener);
+ }
+
public JobRuntime(IJob job) {
variables = null;
jobEngine = null;
program = null;
- listeners = null;
id = job.getId();
description = job.getDescription();
name = job.getName();
public JobRuntime(IJobEngine jobEngine, IJobRuntime runtime, IProgram program) {
this.program = program;
this.jobEngine = jobEngine;
- this.listeners = new EventListenerList();
this.submitted = new Timestamp(System.currentTimeMillis());
this.stack = new Stack<Object>();
this.status = JobStatus.RUNABLE;
if (program == null)
throw new NullPointerException("program");
this.jobEngine = jobEngine;
- this.listeners = new EventListenerList();
this.program = program;
this.stack = new Stack<Object>();
this.variables = new HashMap<String, Object>();
throw new NullPointerException("program");
this.jobEngine = jobEngine;
this.ip = 0;
- this.listeners = new EventListenerList();
this.program = program;
this.stack = new Stack<Object>();
this.variables = new HashMap<String, Object>();
fromJob(job);
}
+ /***
+ * Parhuzamosan es blokkoltan futtatando lepesek letrehozasa
+ *
+ * @param runtime
+ */
+ @SuppressWarnings("unchecked")
+ public JobRuntime(IJobRuntime runtime) {
+ this.program = new Program(runtime.getProgram());
+ this.jobEngine = runtime.getJobEngine();
+ this.submitted = runtime.getSubmitted();
+ this.stack = (Stack<Object>) runtime.getStack().clone();
+ this.status = runtime.getStatus();
+ this.ip = runtime.getIp();
+ this.variables = new HashMap<>(runtime.getVariables());
+ this.parameters = new HashMap<>(runtime.getParameters());
+ this.persister = runtime.getPersister();
+ this.template = runtime.getTemplate();
+ this.weight = runtime.getWeight();
+ this.name = runtime.getName();
+ this.owner = runtime.getOwner();
+ program.removeAfter(getIp());
+ //logger.info("Program {}", program);
+ }
+
@Override
public void add() {
IJob job = toJob();
listeners.add(IJobStatusChangedListener.class, listener);
}
+ @Override
+ public void addSpawnChild(IJobRuntime runtime) {
+ if (childrenIDs == null)
+ childrenIDs = Collections.synchronizedList(new ArrayList<>());
+ childrenIDs.add(runtime.getId());
+ runtime.setParentJobId(id);
+ }
+
@Override
public void addVariable(String name, Class<?> type) {
if (variables.containsKey(name))
forkSempahore.release();
}
+ @Override
+ public boolean canContinueExecution() {
+ return childrenIDs == null || childrenIDs.size() == 0;
+ }
+
@Override
public void checkStackParameter() throws RuntimeException, IllegalArgumentException {
Class<?> requiredType = (Class<?>) popFromStack();
}
+ @Override
+ public IJobRuntime createCopy() {
+ return null;
+ }
+
@Override
public void decrementInstructionPointer() {
if (this.ip == 0)
public boolean forkPrepare() throws InterruptedException {
boolean result = false;
forkSempahore = new Semaphore(1);
- if (jobChangedListener == null) {
- logger.info("Preparing fork");
+ if (spawnJobListener == null) {
+ logger.info("Preparing spawn");
childrenIDs = Collections.synchronizedList(new ArrayList<>());
- jobChangedListener = event -> {
+ spawnJobListener = event -> {
IJobRuntime child = event.getJob();
if (event.getSignalType().equals(SignalType.CREATE)) {
if (child.getParentJobId() == getId()) {
if (childrenIDs.size() == 0)
forkSempahore.release();
}
- if (!jobEngine.isRunning())
+ if (!getJobEngine().isRunning())
forkSempahore.release();
//A gyerek(ek) el sem indultak, pl. nem letezik a template
};
logger.info("Adding job changed listener");
- jobEngine.addJobChangedEventListener(jobChangedListener);
+ getJobEngine().addJobChangedEventListener(spawnJobListener);
result = true;
forkSempahore.acquire();
} else {
@Override
public void forkWaitComplete() throws InterruptedException {
- logger.info("Waiting for semaphore" + forkSempahore);
+ //atlagos mukodes
+ if (forkSempahore == null)
+ return;
+ logger.info("Waiting for semaphore {}", forkSempahore);
forkSempahore.acquire();
logger.info("Removing job changed listener");
- if (jobEngine.isRunning()) {
+ if (getJobEngine().isRunning()) {
logger.info("Removing job changed listener");
- jobEngine.removeJobChangedEventListener(jobChangedListener);
- jobChangedListener = null;
+ getJobEngine().removeJobChangedEventListener(spawnJobListener);
+ spawnJobListener = null;
childrenIDs = null;
} else {
logger.info("Instruction pointer repositioned");
parametersFromByteArray();
}
+ @Override
+ public CallJobStepInstruction getCurrentCallJobStepInstruction() {
+ CallJobStepInstruction result = null;
+
+ int i = getIp();
+ if (i < program.getInstructionsCount()) {
+ while (true) {
+ IInstruction instruction = program.get(i);
+ if (instruction instanceof CallJobStepInstruction) {
+ result = (CallJobStepInstruction) instruction;
+ break;
+ }
+
+ i--;
+ if (i < 0)
+ break;
+ }
+
+ }
+
+ return result;
+ }
+
@Override
public IInstruction getCurrentInstruction() {
- return program.get(getIp());
+ IInstruction result = null;
+ if (getIp() < program.getInstructionsCount())
+ result = program.get(getIp());
+ return result;
+ }
+
+ @Override
+ public String getCurrentStep() {
+ return currentStep;
}
@Override
return ip;
}
+ @Override
+ public IJobEngine getJobEngine() {
+ return jobEngine;
+ }
+
/***
* Log session marker. A teljes folyamat osszes naplobejegyzese osszegyujtheto a segitsegevel. MediaCubeMarker tipusu, folyamatonkent uj peldany jon letre.
*/
return savedStatus;
}
+ @Override
+ public int getSpawnOrder() {
+ return spawnOrder;
+ }
+
@Override
public Stack<Object> getStack() {
return stack;
return variables;
}
+ @Override
+ public int getWeight() {
+ return weight;
+ }
+
@Override
public boolean hasNextInstruction() {
boolean result = false;
- if (program.getInstructionsCount() > 0)
+ if (program != null && program.getInstructionsCount() > 0)
result = (this.ip == (program.getInstructionsCount())) ? false : true;
return result;
}
@Override
public void incrementProgress(int progress) {
+ //remote ghost
+ if (program == null) {
+ setProgress(progress);
+ NotifyUpdate();
+ return;
+ }
+
List<IInstruction> instructions = program.getInstructions();
IInstruction currentInstruction = program.get(ip - 1);
- if (runtimeWeight == 0) {
+ if (getWeight() == 0) {
for (IInstruction instruction : instructions)
if (instruction.getClass().equals(CallJobStepInstruction.class)) {
int weight = instruction.getWeight();
- runtimeWeight += weight;
+ this.weight = getWeight() + weight;
}
}
currentProgress = 0;
if (instruction.getClass().equals(CallJobStepInstruction.class)) {
if (instruction == currentInstruction)
break;
- currentProgress += (double) instruction.getWeight() * 100 / runtimeWeight;
+ currentProgress += (double) instruction.getWeight() * 100 / getWeight();
}
- double currentDelta = (double) currentWeight * progress / runtimeWeight;
+ double currentDelta = (double) currentWeight * progress / getWeight();
currentProgress = Math.ceil(currentProgress + currentDelta);
if (currentProgress - getProgress() > 4 || currentProgress == 100) {
@Override
public boolean isRunable() {
- return (status == JobStatus.RUNABLE) ? true : false;
+ return JobStatus.RUNABLE.equals(status);
}
private boolean isRuntimeAssignable(Class<?> fromType, Class<?> toType) {
return isService;
}
+ @Override
+ public boolean isWaitFinish() {
+ return JobStatus.WAIT_FINISH.equals(status);
+ }
+
@Override
public boolean isWaitingCancel() {
return getStatus() == JobStatus.WAIT_CANCEL;
listeners.remove(IJobStatusChangedListener.class, listener);
}
+ @Override
+ public void removeSpanwChild(long id) {
+ childrenIDs.remove(id);
+ }
+
@Override
public void reset() {
currentProgress = 0;
savedStatus = status;
}
+ @Override
+ public void setCurrentStep(String currentStep) {
+ this.currentStep = currentStep;
+ }
+
@Override
public void setDescription(String description) {
super.setDescription(description);
}
+ /*
+ private final EventListenerList listeners = new EventListenerList();
+ private List<JobRuntime> children;
+ private MediaCubeMarker sessionMarker;
+ private MediaCubeMarker finishMarker;
+ private boolean isService;
+ private Semaphore forkSempahore;
+ private IJobChangedListener jobChangedListener;
+ private List<Long> childrenIDs;
+
+ * */
+
@Override
public void setId(long id) {
super.setId(id);
this.isService = isService;
}
+ @Override
+ public void setSpawnOrder(int spawnOrder) {
+ this.spawnOrder = spawnOrder;
+ }
+
@Override
public void setStatus(JobStatus status) {
if (this.status != status) {
@Override
public void setVariable(String name, Object value) {
- /*
- //castnal elszall
- Class<?> type = null;
- try {
- type = (Class<?>) getVariable(name);
- } catch (ClassCastException e) {
- throw new IllegalStateException("multiple set");
- }
- if (value != null && !type.equals(value.getClass()))
- throw new IllegalArgumentException("name " + name + " value " + value);
- */
variables.put(name, value);
}
private void signal(SignalType signalType) {
- if (jobEngine != null)
- jobEngine.fireJobChangedEvent(new JobChangedEvent(this, signalType));
+ if (getJobEngine() != null)
+ getJobEngine().fireJobChangedEvent(new JobChangedEvent(this, signalType));
}
@Override
job.setParentJobId(getParentJobId());
return job;
}
+
}
package user.jobengine.server;
import java.net.URLClassLoader;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.FrameworkUtil;
-import org.osgi.framework.wiring.BundleWiring;
+import groovy.lang.GroovyClassLoader;
import user.commons.JobStatus;
+import user.commons.cluster.ClusteredJob;
import user.jobengine.server.messages.JobStepCompletedMessage;
import user.jobengine.server.messages.JobStepSkippedMessage;
import user.jobengine.server.steps.IJobStep;
public class JobStepExecutor implements IJobStepExecutor {
+
private class Worker extends Thread {
- private static final int WAIT_FOR_REMOTE = 30000;
+ private static final int WAIT_FOR_REMOTE = 3000;
private volatile boolean shutdown = false;
@Override
IJobStep step = null;
while (true) {
try {
- if (jobEngine.isWorker()) {
- //a worker is csak azokat akarja vegrehajtani
- if (isRemote) {
- Object o = jobEngine.getRemoteJob(className);
+ Thread.sleep(IJobEngine.QUEUE_POLL_INTERVAL_MS);
- if (o == null) {
- Thread.sleep(1000);
- continue;
- }
- jobRuntime = (IJobRuntime) o;
+ if (jobEngine.isWorker() && isRemote) {
+ //a worker is csak azokat akarja vegrehajtani
+ ClusteredJob job = jobEngine.getRemoteEngine().getRemoteJob(getStepUnitName());
- jobRuntime.setDescription(PROCESSING_REMOTLY);
- }
- } else {
- //SessionUtil.getMediaCubeConfig().getJobQueuePollInterval()
- jobRuntime = queue.poll(10, TimeUnit.MILLISECONDS);
- if (jobRuntime == null && shutdown) {
- logger.trace("Shutting down");
- break;
- }
- if (jobRuntime == null)
- continue;
+ //TODO remote-ba jelezni, hogy nem sikerult
if (shutdown) {
logger.trace("{} skipping by shutdown", jobRuntime);
jobEngine.sendMessage(new JobStepSkippedMessage(jobRuntime.getId()));
- continue;
+ break;
}
+ if (job != null) {
+ //TODO set job accepted = PROCESSING_REMOTLY + if error then feedback?
+ //jobRuntime.setDescription(PROCESSING_REMOTLY);
+ jobRuntime = new ClusteredJobRuntime(job, jobEngine, e -> {
+ IJobRuntime runtime = (IJobRuntime) e.getSource();
+ job.setStatus(runtime.getStatus());
+ job.setProgress(runtime.getProgress());
+ jobEngine.getRemoteEngine().reportJobStatus(job);
+ });
+ jobEngine.storeJob(jobRuntime);
+
+ Object[] inputs = job.getInputs();
+ runStepObject(jobRuntime, inputs);
+ }
+ }
+
+ jobRuntime = queue.poll();
+
+ if (shutdown) {
+ logger.trace("Shutting down");
+ break;
+ }
+
+ if (jobRuntime != null) {
long submitted = jobRuntime.getSubmitted().getTime();
long current = System.currentTimeMillis();
boolean timeout = current - submitted > WAIT_FOR_REMOTE;
+
+ //ha remote, de nem jelentkezik senki, akkor helyi vegrehajtas
if (isRemote) {
if (timeout) {
logger.info("Remote JobStep timed out, processing locally.");
} else {
- //logger.info("JobStep is remote, waiting for remote processor");
- if (!WAIT_REMOTE_PROCESSOR.equals(jobRuntime.getDescription()))
- jobRuntime.setDescription(WAIT_REMOTE_PROCESSOR);
+ // if (!WAIT_REMOTE_PROCESSOR.equals(jobRuntime.getDescription()))
+ // jobRuntime.setDescription(WAIT_REMOTE_PROCESSOR);
queue.put(jobRuntime);
- //skip local processor
continue;
}
}
- jobRuntime.setDescription(PROCESSING_LOCALLY);
+
+ logger.info("Executing locally {}", jobRuntime.getId());
+ //jobRuntime.setDescription(PROCESSING_LOCALLY);
+ Object[] inputs = jobEngine.getInputsFromStack(jobRuntime);
+ runStepObject(jobRuntime, inputs);
+ }
+
+ if (shutdown) {
+ logger.trace("Shutting down");
+ break;
}
- //processing locally
- Object[] inputs = jobEngine.getInputsFromStack(jobRuntime);
- Object[] outputs = null;
- jobRuntime.setStatus(JobStatus.EXECUTING);
- jobRuntime.NotifyUpdate();
- step = createStepObject();
- if (step == null)
- throw new Exception("Step object is null");
- logger.debug("{} executing", jobRuntime);
- jobRuntime.IncrementPriority();
- outputs = step.run(jobEngine, jobRuntime, inputs);
- jobEngine.sendMessage(new JobStepCompletedMessage(jobRuntime.getId(), outputs));
} catch (Throwable e) {
logger.error("Error in {}", jobRuntime);
Throwable t = e.getCause() == null ? e : e.getCause();
}
}
+ private void runStepObject(IJobRuntime jobRuntime, Object[] inputs) throws Throwable {
+ IJobStep step = createStepObject();
+ if (step == null)
+ throw new Exception("Step object is null");
+ jobRuntime.setStatus(JobStatus.EXECUTING);
+ jobRuntime.NotifyUpdate();
+ logger.debug("{} executing", jobRuntime);
+ jobRuntime.incrementPriority();
+ Object[] outputs = step.run(jobEngine, jobRuntime, inputs);
+
+ //TODO itt lekezelni a remote notification-t
+ jobEngine.sendMessage(new JobStepCompletedMessage(jobRuntime.getId(), outputs));
+ }
+
public void shutdown() {
this.shutdown = true;
}
private CountDownLatch barrier;
private Class<IJobStep> stepClass;
private int maxConcurrent;
- private String className;
+ private String stepUnitName;
private boolean isRemote;
public JobStepExecutor() {
@Override
public void changePriority(IJobRuntime runtime) {
- if (queue != null && (runtime != null)) {
- if (queue.remove(runtime)) {
+ if (queue != null && runtime != null) {
+ if (queue.remove(runtime))
queue.put(runtime);
- }
}
}
@Override
@SuppressWarnings("unchecked")
- public void create(String className, int maxConcurrent, boolean isRemote) throws JobEngineException {
- this.className = className;
+ public void create(String stepUnitName, int maxConcurrent, boolean isRemote) throws JobEngineException {
+ this.stepUnitName = stepUnitName;
this.isRemote = isRemote;
- logger = LogManager.getLogger(getClass().getSimpleName() + ":" + className);
- logger.debug("Creating executor {}, instances {}", className, maxConcurrent);
- if (StringUtils.isEmpty(className))
- throw new JobEngineException("Step class name can't be null.");
-
- try {
- URLClassLoader loader = URLClassLoader.newInstance(DynamicClassLocator.makeURLs(), getClass().getClassLoader());
- stepClass = (Class<IJobStep>) loader.loadClass(className);
- } catch (ClassNotFoundException e) {
- logger.catching(e);
- throw new JobEngineException("System can't load JobStep implementation: " + className);
+ logger = LogManager.getLogger(getClass().getSimpleName() + ":" + stepUnitName);
+ logger.debug("Creating executor {}, instances {}", stepUnitName, maxConcurrent);
+ if (StringUtils.isEmpty(stepUnitName))
+ throw new JobEngineException("Step unit name can't be null.");
+
+ if (!isGroovyStep()) {
+ //a groovy-nak nem kell
+ try {
+ URLClassLoader loader = URLClassLoader.newInstance(DynamicClassLocator.makeURLs(), getClass().getClassLoader());
+ stepClass = (Class<IJobStep>) loader.loadClass(stepUnitName);
+ } catch (ClassNotFoundException e1) {
+ logger.catching(e1);
+ throw new JobEngineException("System can't load JobStep implementation: " + stepUnitName);
+ }
}
queue = new PriorityBlockingQueue<IJobRuntime>();
}
}
- protected IJobStep createStepObject() throws InstantiationException, IllegalAccessException {
- return stepClass.newInstance();
+ protected IJobStep createStepObject() throws Exception {
+ IJobStep result = null;
+
+ if (isGroovyStep()) {
+ GroovyClassLoader gcl = new GroovyClassLoader();
+ Class myClass = gcl.parseClass(Paths.get(System.getProperty("jobengine.jobsteps.groovy.root", ""), stepUnitName).toFile());
+ result = (IJobStep) myClass.newInstance();
+ } else
+ result = stepClass.newInstance();
+ return result;
}
@Override
return maxConcurrent;
}
- private ClassLoader getParentClassLoader() {
- ClassLoader parentClassLoader = getClass().getClassLoader();
- Bundle bundle = FrameworkUtil.getBundle(getClass());
- if (bundle != null) {
- BundleContext bundleContext = bundle.getBundleContext();
- if (bundleContext != null) {
- BundleWiring bundleWiring = bundle.adapt(BundleWiring.class);
- parentClassLoader = bundleWiring.getClassLoader();
- }
- }
- return parentClassLoader;
- }
-
@Override
public PriorityBlockingQueue<IJobRuntime> getQueue() {
return this.queue;
}
- // @Override
- // public void synchronize() {
- // if (priorityQueue.size() > 0 && workers.size() > queue.size()) {
- // IJobRuntime jobRuntime = priorityQueue.poll();
- // try {
- // queue.put(jobRuntime);
- // } catch (InterruptedException e) {
- // e.printStackTrace();
- // }
- // }
- // }
-
- @SuppressWarnings("unchecked")
@Override
public Class<IJobStep> getStepClass() {
- //TODO miért hozunk létre mindíg újat
- // if (stepClass != null) {
- // DynamicClassLoader loader = new DynamicClassLoader(getClass().getClassLoader());
- // stepClass = (Class<IJobStep>) loader.loadClass(stepClass.getCanonicalName());
- // }
return stepClass;
}
+ @Override
+ public String getStepUnitName() {
+ return stepUnitName;
+ }
+
+ private boolean isGroovyStep() {
+ return stepUnitName.toLowerCase().endsWith(".java");
+ }
+
@Override
public boolean isRemoteEnabled() {
return isRemote;
@Override
public void revoke(IJobRuntime jobRuntime) {
- //synchronized(queue){
queue.remove(jobRuntime);
- //}
}
@Override
}
@Override
- public IJobRuntime steelJob() throws InterruptedException {
- return queue.poll(5, TimeUnit.MILLISECONDS);
+ public ClusteredJob steelJob() throws InterruptedException {
+ ClusteredJob result = null;
+ IJobRuntime jobRuntime = queue.poll(IJobEngine.QUEUE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
+
+ if (jobRuntime != null) {
+ result = new ClusteredJob();
+ result.setId(jobRuntime.getId());
+ result.setName(jobRuntime.getName());
+ result.setDescription(jobRuntime.getDescription());
+ result.setTemplate(jobRuntime.getTemplate());
+ Object[] inputs = jobEngine.getInputsFromStack(jobRuntime);
+ result.setInputs(inputs);
+ }
+
+ return result;
}
@Override
- public void submit(IJobRuntime jobRuntime) {
- queue.put(jobRuntime);
- jobRuntime.setDescription(stepClass.getSimpleName());
- //logger.info("Executor got ! {}", jobRuntime);
+ public void submit(IJobRuntime... jobRuntime) {
+ for (IJobRuntime r : jobRuntime) {
+ logger.info("Adding job {} to executor queue", r.getId());
+ r.setCurrentStep(stepUnitName);
+ queue.put(r);
+ }
}
@Override
for (Worker w : workers)
w.waitShutdown();
}
-
}
StatusMachineAction machineAction = new StatusMachineAction(jobAction, jobRuntime.getStatus());\r
if (actions.containsKey(machineAction)) {\r
IJobStatusAction action = actions.get(machineAction);\r
- logger.info("{} status change {} -> {}", jobRuntime, jobRuntime.getStatus(), jobAction);\r
+ logger.debug("{} changes {} -> {}", jobRuntime.getId(), jobRuntime.getStatus(), jobAction);\r
action.processAction(jobEngine, jobRuntime);\r
} else {\r
logger.warn("No status processor registered for {} -> {}", jobAction, jobRuntime.getStatus());\r
@Command\r
public void cancelJobs() {\r
if (this.jobList.getSelection() != null) {\r
- for (IJobRuntime job : this.jobList.getSelection()) {\r
- jobEngine.sendMessage(new CancelRequest(job.getId()));\r
+ for (IJobRuntime jobRuntime : this.jobList.getSelection()) {\r
+ jobEngine.sendMessage(new CancelRequest(jobRuntime.getId()));\r
}\r
}\r
}\r
@Command\r
public void changeJobsPriority() {\r
if (this.jobList.getSelection() != null) {\r
- for (IJobRuntime job : this.jobList.getSelection()) {\r
- job.setPriority(newPriority);\r
- jobEngine.rePrioritization(job);\r
+ for (IJobRuntime jobRuntime : this.jobList.getSelection()) {\r
+ jobRuntime.setPriority(newPriority);\r
+ jobEngine.applyPriorityChange(jobRuntime);\r
}\r
}\r
}\r
\r
IJobEngine jobEngine = JobEngine.getInstance();\r
ScheduledJob scheduledJob = jobEngine.getScheduledJob(JOBTEMPLATE);\r
+ if (scheduledJob == null)\r
+ throw new Exception("A sablon nem található: " + JOBTEMPLATE);\r
+\r
Map<String, Object> parameters = scheduledJob.getJobParameters();\r
parameters.put(HOUSEID, houseId);\r
parameters.put(RECIPIENT, email);\r
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
+import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.Stack;
import org.junit.BeforeClass;
import org.junit.Test;
import user.commons.JobStatus;
+import user.commons.cluster.ClusteredJob;
import user.jobengine.db.IItemManager;
import user.jobengine.db.ItemManager;
import user.jobengine.db.ItemManagerData.SignalType;
import user.jobengine.server.IJobStatusChangedListener;
import user.jobengine.server.JobEngine;
import user.jobengine.server.JobEngineException;
+import user.jobengine.server.JobEngineRemote;
import user.jobengine.server.JobStatusChangedEvent;
import user.jobengine.server.ThreadSynchronizer;
* @throws Exception
*/
@Test
- public void remote() throws Exception {
+ public void remote_worker() throws Exception {
+ //ez barmi lehet
+ System.setProperty("jobengine.master.server", "http://localhost:8888");
final ThreadSynchronizer sync = new ThreadSynchronizer();
- final IJobEngine jobEngine = new JobEngine();
- jobEngine.startup();
- jobEngine.bindItemManagerService(manager);
- /*
- Map<String, Object> parameters = new HashMap<>();
- parameters.put("itemID", 100);
- IJobRuntime runtime = jobEngine.submit(null, e -> {
- if (JobStatus.FINISHED.equals(e.getStatus()) || JobStatus.SUSPENDED.equals(e.getStatus()))
- sync.suspend();
- }, "fake.xml", "Fake", parameters);
- jobEngine.addJobChangedEventListener(e -> {
- if (e.getSignalType().equals(SignalType.UPDATE)) {
- System.out.println(e.getJob().getProgress());
+
+ ClusteredJob job = new ClusteredJob();
+ job.setId(1);
+ job.setName("Teszt");
+ job.setSubmitted(new Timestamp(System.currentTimeMillis()));
+ job.setTemplate("teszt.xml");
+ //itemId
+ job.setInputs(new Object[] { 100 });
+
+ final Stack<ClusteredJob> jobs = new Stack<>();
+ jobs.push(job);
+ final IJobEngine jobEngine = new JobEngine() {
+
+ @Override
+ protected JobEngineRemote createRemoteEngine() {
+ return new JobEngineRemote("") {
+ @Override
+ public ClusteredJob getRemoteJob(String className) {
+ ClusteredJob job = null;
+ if (!jobs.isEmpty())
+ job = jobs.pop();
+ return job;
+ }
+
+ @Override
+ public void reportJobStatus(ClusteredJob job) {
+ System.out.println("Report:" + job.getStatus());
}
- });
- */
+ };
+ }
+
+ };
+ jobEngine.bindItemManagerService(manager);
+ jobEngine.addJobChangedEventListener(e -> {
+ // if (e.getSignalType().equals(SignalType.UPDATE)) {
+ // System.out.println(e.getJob().getProgress());
+ // }
+
+ if (SignalType.CREATE.equals(e.getSignalType()))
+ System.out.println(e.getSignalType());
+ if (SignalType.UPDATE.equals(e.getSignalType())) {
+ JobStatus status = e.getJob().getStatus();
+ System.out.println(status);
+ if (JobStatus.FINISHED.equals(status) || JobStatus.SUSPENDED.equals(status))
+ sync.suspend();
+ }
+ });
+ jobEngine.startup();
sync.waitSuspend();
sync.resume();
jobEngine.shutdown();
//assertEquals(JobStatus.FINISHED, runtime.getStatus());
}
+ @Test
+ public void reportJobStatus() throws Exception {
+ JobEngineRemote sut = new JobEngineRemote("http://localhost:8888");
+ ClusteredJob job = new ClusteredJob();
+ job.setId(1);
+ sut.reportJobStatus(job);
+ }
+
/***
* NEXIO adatok szinkronizalo folyamat futtatasa
*
--- /dev/null
+package user.jobengine.server.IT;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.FileInputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import user.commons.JobStatus;
+import user.jobengine.db.IItemManager;
+import user.jobengine.db.ItemManager;
+import user.jobengine.db.ItemManagerData.SignalType;
+import user.jobengine.server.IJobEngine;
+import user.jobengine.server.IJobRuntime;
+import user.jobengine.server.JobEngine;
+
+public class ProrityChangeTests {
+ private static IItemManager manager;
+ private static IJobEngine jobEngine;
+ private static Map<String, Object> jobParams = new HashMap<>();
+ private static int JOB_COUNT = 10;
+
+ @BeforeClass
+ public static void initialize() throws Exception {
+ //Kornyezeti valtozok betoltese
+ Properties properties = new Properties();
+ URL srcLocation = ProrityChangeTests.class.getProtectionDomain().getCodeSource().getLocation();
+ URL location = new URL(srcLocation, "../../-configuration/mediacube-dev-user.properties");
+ properties.load(new FileInputStream(location.toURI().getPath().toString()));
+ System.getProperties().putAll(properties);
+
+ manager = new ItemManager();
+ manager.connect();
+
+ jobEngine = new JobEngine();
+ jobEngine.startup();
+ jobEngine.bindItemManagerService(manager);
+
+ jobParams.put("itemID", 100);
+ }
+
+ @AfterClass
+ public static void terminate() throws Exception {
+ jobEngine.shutdown();
+ manager.disconnect();
+ }
+
+ @Test
+ public void testAfterExecutorSubmitCompleted() throws Exception {
+ CountDownLatch startLatch = new CountDownLatch(JOB_COUNT);
+ CountDownLatch finishLatch = new CountDownLatch(JOB_COUNT);
+ List<IJobRuntime> runtimes = new ArrayList<>();
+ List<IJobRuntime> results = new ArrayList<>();
+
+ jobEngine.addJobChangedEventListener(e -> {
+ if (e.getSignalType().equals(SignalType.EXECUTE))
+ startLatch.countDown();
+ });
+
+ for (int i = 0; i < JOB_COUNT; i++) {
+ IJobRuntime jobRuntime = jobEngine.submit(null, e -> {
+ if (JobStatus.FINISHED.equals(e.getStatus()) || JobStatus.SUSPENDED.equals(e.getStatus())) {
+ results.add((IJobRuntime) e.getSource());
+ finishLatch.countDown();
+ }
+ }, "fake.xml", "Fake", jobParams);
+ runtimes.add(jobRuntime);
+ }
+
+ startLatch.await();
+
+ IJobRuntime highPriorityJob = runtimes.get(runtimes.size() - 1);
+ highPriorityJob.incrementPriority();
+ jobEngine.applyPriorityChange(highPriorityJob);
+
+ finishLatch.await();
+
+ for (int i = 0; i < JOB_COUNT; i++)
+ assertEquals(JobStatus.FINISHED, runtimes.get(i).getStatus());
+
+ assertFalse(runtimes.indexOf(highPriorityJob) == results.indexOf(highPriorityJob));
+
+ }
+
+ @Test
+ public void testAfterSubmitCompleted() throws Exception {
+ CountDownLatch startLatch = new CountDownLatch(JOB_COUNT);
+ CountDownLatch finishLatch = new CountDownLatch(JOB_COUNT);
+ List<IJobRuntime> runtimes = new ArrayList<>();
+ List<IJobRuntime> results = new ArrayList<>();
+
+ jobEngine.addJobChangedEventListener(e -> {
+ if (e.getSignalType().equals(SignalType.CREATE))
+ startLatch.countDown();
+ });
+
+ for (int i = 0; i < JOB_COUNT; i++) {
+ IJobRuntime jobRuntime = jobEngine.submit(null, e -> {
+ if (JobStatus.FINISHED.equals(e.getStatus()) || JobStatus.SUSPENDED.equals(e.getStatus())) {
+ results.add((IJobRuntime) e.getSource());
+ finishLatch.countDown();
+ }
+ }, "fake.xml", "Fake", jobParams);
+ runtimes.add(jobRuntime);
+ }
+
+ startLatch.await();
+
+ IJobRuntime highPriorityJob = runtimes.get(runtimes.size() - 1);
+ highPriorityJob.incrementPriority();
+ jobEngine.applyPriorityChange(highPriorityJob);
+
+ finishLatch.await();
+
+ for (int i = 0; i < JOB_COUNT; i++)
+ assertEquals(JobStatus.FINISHED, runtimes.get(i).getStatus());
+
+ assertFalse(runtimes.indexOf(highPriorityJob) == results.indexOf(highPriorityJob));
+
+ }
+
+ @Test
+ public void testUnderSubmit() throws Exception {
+ CountDownLatch finishLatch = new CountDownLatch(JOB_COUNT);
+ List<IJobRuntime> runtimes = new ArrayList<>();
+ List<IJobRuntime> results = new ArrayList<>();
+
+ for (int i = 0; i < JOB_COUNT; i++) {
+ IJobRuntime jobRuntime = jobEngine.submit(null, e -> {
+ if (JobStatus.FINISHED.equals(e.getStatus()) || JobStatus.SUSPENDED.equals(e.getStatus())) {
+ results.add((IJobRuntime) e.getSource());
+ finishLatch.countDown();
+ }
+ }, "fake.xml", "Fake", jobParams);
+ runtimes.add(jobRuntime);
+ }
+
+ IJobRuntime highPriorityJob = runtimes.get(runtimes.size() - 1);
+ highPriorityJob.incrementPriority();
+ jobEngine.applyPriorityChange(highPriorityJob);
+
+ finishLatch.await();
+
+ for (int i = 0; i < JOB_COUNT; i++)
+ assertEquals(JobStatus.FINISHED, runtimes.get(i).getStatus());
+
+ assertFalse(runtimes.indexOf(highPriorityJob) == results.indexOf(highPriorityJob));
+ }
+
+ @Test
+ public void testUnderSubmitEqualPriority() throws Exception {
+ CountDownLatch finishLatch = new CountDownLatch(JOB_COUNT);
+ List<IJobRuntime> runtimes = new ArrayList<>();
+ List<IJobRuntime> results = new ArrayList<>();
+
+ for (int i = 0; i < JOB_COUNT; i++) {
+ IJobRuntime jobRuntime = jobEngine.submit(null, e -> {
+ if (JobStatus.FINISHED.equals(e.getStatus()) || JobStatus.SUSPENDED.equals(e.getStatus())) {
+ results.add((IJobRuntime) e.getSource());
+ finishLatch.countDown();
+ }
+ }, "fake.xml", "Fake", jobParams);
+ runtimes.add(jobRuntime);
+ }
+
+ IJobRuntime highPriorityJob = runtimes.get(runtimes.size() - 1);
+
+ finishLatch.await();
+
+ for (int i = 0; i < JOB_COUNT; i++)
+ assertEquals(JobStatus.FINISHED, runtimes.get(i).getStatus());
+
+ assertTrue(runtimes.indexOf(highPriorityJob) == results.indexOf(highPriorityJob));
+ }
+
+}
import user.commons.IJob;
public class PriorityEntryTest {
- // private PriorityEntry<Long> firstEntry;
- // private PriorityEntry<Long> secondEntry;
- private JobRuntime firstEntry;
- private JobRuntime secondEntry;
- private JobRuntime thirdEntry;
- private BlockingQueue<IJobRuntime> queue;
-
private class JobComparator implements Comparator<IJob> {
@Override
}
+ // private PriorityEntry<Long> firstEntry;
+ // private PriorityEntry<Long> secondEntry;
+ private JobRuntime firstEntry;
+ private JobRuntime secondEntry;
+ private JobRuntime thirdEntry;
+
+ private BlockingQueue<IJobRuntime> queue;
+
@Before
public void setup() {
firstEntry = new JobRuntime();
}
@Test
- public void testPriorityEntry_IncrementPriority() throws Exception {
+ public void testPriorityEntry_incrementPriority() throws Exception {
+ // Fixture
+ queue.add(firstEntry);
+ queue.add(secondEntry);
+ queue.add(thirdEntry);
+
+ queue.remove(secondEntry);
+ secondEntry.incrementPriority();
+ queue.add(secondEntry);
+
+ // Exercise
+ IJobRuntime current = queue.poll();
+
+ // Verify
+ assertEquals(secondEntry, current);
+ }
+
+ @Test
+ public void testPriorityEntry_incrementPriority_async_poll() throws Exception {
// Fixture
queue.add(firstEntry);
queue.add(secondEntry);
queue.add(thirdEntry);
+ secondEntry.incrementPriority();
queue.remove(secondEntry);
- secondEntry.IncrementPriority();
queue.add(secondEntry);
// Exercise
- IJobRuntime current = queue.take();
+ IJobRuntime current = queue.poll();
// Verify
assertEquals(secondEntry, current);
\r
import javax.servlet.http.HttpServletRequest;\r
import javax.ws.rs.GET;\r
+import javax.ws.rs.POST;\r
import javax.ws.rs.Path;\r
import javax.ws.rs.Produces;\r
import javax.ws.rs.QueryParam;\r
import org.apache.logging.log4j.LogManager;\r
import org.apache.logging.log4j.Logger;\r
\r
+import user.commons.cluster.ClusteredJob;\r
import user.jobengine.osgi.rest.ComponentBinder;\r
import user.jobengine.server.IJobEngine;\r
\r
public Response getJob(@QueryParam("className") String className) {\r
Response result = null;\r
try {\r
- //IJobRuntime job = jobEngine.getJobForRemote(className);\r
- ClusteredJob j = new ClusteredJob();\r
- j.setId(100);\r
- j.setName("Jobname");\r
- result = Response.ok().entity(j).build();\r
+ ClusteredJob job = jobEngine.requestJob(className);\r
+ if (job == null)\r
+ result = Response.noContent().build();\r
+ else\r
+ result = Response.ok().entity(job).build();\r
} catch (Exception e) {\r
result = Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build();\r
}\r
}\r
return result;\r
}\r
+\r
+ @POST\r
+ @Path("/notifyjob")\r
+ @Produces({ MediaType.APPLICATION_JSON })\r
+ public Response notifyJob(ClusteredJob job) {\r
+ Response result = null;\r
+ try {\r
+ result = Response.ok().build();\r
+ } catch (Exception e) {\r
+ result = Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build();\r
+ }\r
+ return result;\r
+ }\r
}\r
+++ /dev/null
-package user.jobengine.osgi.mediacube;\r
-\r
-public class ClusteredJob {\r
- private long id;\r
- private String name;\r
-\r
- public long getId() {\r
- return id;\r
- }\r
-\r
- public String getName() {\r
- return name;\r
- }\r
-\r
- public void setId(long id) {\r
- this.id = id;\r
- }\r
-\r
- public void setName(String name) {\r
- this.name = name;\r
- }\r
-\r
-}\r