Index: src/main/java/org/springframework/batch/execution/step/ItemOrientedStep.java
===================================================================
--- src/main/java/org/springframework/batch/execution/step/ItemOrientedStep.java (revision 10655)
+++ src/main/java/org/springframework/batch/execution/step/ItemOrientedStep.java (working copy)
@@ -20,7 +20,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.domain.BatchStatus;
-import org.springframework.batch.core.domain.ItemSkipPolicy;
import org.springframework.batch.core.domain.JobInstance;
import org.springframework.batch.core.domain.JobInterruptedException;
import org.springframework.batch.core.domain.StepContribution;
@@ -30,15 +29,12 @@
import org.springframework.batch.core.runtime.ExitStatusExceptionClassifier;
import org.springframework.batch.core.tasklet.Tasklet;
import org.springframework.batch.execution.listener.CompositeStepListener;
-import org.springframework.batch.execution.step.support.NeverSkipItemSkipPolicy;
import org.springframework.batch.execution.step.support.SimpleExitStatusExceptionClassifier;
import org.springframework.batch.execution.step.support.StepInterruptionPolicy;
import org.springframework.batch.execution.step.support.ThreadStepInterruptionPolicy;
-import org.springframework.batch.io.Skippable;
import org.springframework.batch.io.exception.InfrastructureException;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
-import org.springframework.batch.item.ItemRecoverer;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.exception.CommitFailedException;
@@ -48,10 +44,6 @@
import org.springframework.batch.repeat.RepeatContext;
import org.springframework.batch.repeat.RepeatOperations;
import org.springframework.batch.repeat.support.RepeatTemplate;
-import org.springframework.batch.retry.RetryOperations;
-import org.springframework.batch.retry.RetryPolicy;
-import org.springframework.batch.retry.callback.ItemReaderRetryCallback;
-import org.springframework.batch.retry.support.RetryTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
@@ -89,10 +81,6 @@
// default to checking current thread for interruption.
private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy();
- private RetryOperations retryOperations = new RetryTemplate();
-
- private ItemReaderRetryCallback retryCallback;
-
private CompositeItemStream stream = new CompositeItemStream();
private CompositeStepListener listener = new CompositeStepListener();
@@ -101,12 +89,8 @@
private PlatformTransactionManager transactionManager;
- private ItemReader itemReader;
+ private ItemProcessor itemProcessor;
- private ItemWriter itemWriter;
-
- private ItemSkipPolicy itemSkipPolicy = new NeverSkipItemSkipPolicy();
-
/**
* @param name
*/
@@ -131,29 +115,16 @@
public void setTransactionManager(PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
-
+
/**
- * @param itemReader the itemReader to set
+ * Public setter for the {@link ItemProcessor}.
+ * @param itemProcessor the {@link ItemProcessor} to set
*/
- public void setItemReader(ItemReader itemReader) {
- this.itemReader = itemReader;
+ public void setItemProcessor(ItemProcessor itemProcessor) {
+ this.itemProcessor = itemProcessor;
}
/**
- * @param itemWriter the itemWriter to set
- */
- public void setItemWriter(ItemWriter itemWriter) {
- this.itemWriter = itemWriter;
- }
-
- /**
- * @param itemSkipPolicy
- */
- public void setItemSkipPolicy(ItemSkipPolicy itemSkipPolicy) {
- this.itemSkipPolicy = itemSkipPolicy;
- }
-
- /**
* Register each of the streams for callbacks at the appropriate time in the
* step. The {@link ItemReader} and {@link ItemWriter} are automatically
* registered, but it doesn't hurt to also register them here. Injected
@@ -227,22 +198,6 @@
}
/**
- * Public setter for the {@link RetryOperations}.
- * @param retryOperations the {@link RetryOperations} to set
- */
- public void setRetryOperations(RetryOperations retryOperations) {
- this.retryOperations = retryOperations;
- }
-
- /**
- * Public setter for the ItemReaderRetryCallback. TODO: get rid of this.
- * @param retryCallback the retryCallback to set
- */
- public void setRetryCallback(ItemReaderRetryCallback retryCallback) {
- this.retryCallback = retryCallback;
- }
-
- /**
* Setter for the {@link StepInterruptionPolicy}. The policy is used to
* check whether an external request has been made to interrupt the job
* execution.
@@ -288,10 +243,6 @@
ExitStatus status = ExitStatus.FAILED;
final ExceptionHolder fatalException = new ExceptionHolder();
- // This could go in applyConfiguration(), but some unit tests do not
- // call that
- possiblyRegisterStreams();
-
try {
stepExecution.setStartTime(new Date(System.currentTimeMillis()));
@@ -330,7 +281,7 @@
try {
- itemReader.mark();
+ itemProcessor.mark();
result = processChunk(contribution);
contribution.incrementCommitCount();
@@ -363,8 +314,8 @@
}
try {
- itemReader.mark();
- itemWriter.flush();
+ itemProcessor.mark();
+ itemProcessor.flush();
transactionManager.commit(transaction);
}
catch (Exception e) {
@@ -390,8 +341,8 @@
}
try {
- itemReader.reset();
- itemWriter.clear();
+ itemProcessor.reset();
+ itemProcessor.clear();
transactionManager.rollback(transaction);
}
catch (Exception e) {
@@ -489,25 +440,6 @@
}
/**
- * Register the item reader and writer as listeners and streams. If they are
- * manually registered anyway, it shouldn't matter.
- */
- private void possiblyRegisterStreams() {
- if (itemReader instanceof ItemStream) {
- stream.register((ItemStream) itemReader);
- }
- if (itemReader instanceof StepListener) {
- listener.register((StepListener) itemReader);
- }
- if (itemWriter instanceof ItemStream) {
- stream.register((ItemStream) itemWriter);
- }
- if (itemWriter instanceof StepListener) {
- listener.register((StepListener) itemWriter);
- }
- }
-
- /**
* Execute a bunch of identical business logic operations all within a
* transaction. The transaction is programmatically started and stopped
* outside this method, so subclasses that override do not need to create a
@@ -517,7 +449,7 @@
* business logic.
* @return true if there is more data to process.
*/
- ExitStatus processChunk(final StepContribution contribution) {
+ protected ExitStatus processChunk(final StepContribution contribution) {
ExitStatus result = chunkOperations.iterate(new RepeatCallback() {
public ExitStatus doInIteration(final RepeatContext context) throws Exception {
if (contribution.isTerminateOnly()) {
@@ -525,7 +457,7 @@
}
// check for interruption before each item as well
interruptionPolicy.checkInterrupted(context);
- ExitStatus exitStatus = doProcessing(contribution);
+ ExitStatus exitStatus = itemProcessor.process(contribution);
contribution.incrementTaskCount();
// check for interruption after each item as well
interruptionPolicy.checkInterrupted(context);
@@ -536,92 +468,6 @@
}
/**
- * Execute the business logic, delegating to the given {@link Tasklet}.
- * Subclasses could extend the behaviour as long as they always return the
- * value of this method call in their superclass.
- *
- * If there is an exception and the {@link Tasklet} implements
- * {@link Skippable} then the skip method is called.
- *
- * @param tasklet the unit of business logic to execute
- * @param contribution the current step
- * @return boolean if there is more processing to do
- * @throws Exception if there is an error
- */
- private ExitStatus doProcessing(StepContribution contribution) throws Exception {
- ExitStatus exitStatus = ExitStatus.CONTINUABLE;
-
- try {
-
- exitStatus = execute();
-
- }
- catch (Exception e) {
- if (retryCallback == null && itemSkipPolicy.shouldSkip(e, contribution.getSkipCount())) {
- contribution.incrementSkipCount();
- skip();
- }
- else {
- // Rethrow so that outer transaction is rolled back properly
- throw e;
- }
- }
-
- return exitStatus;
- }
-
- /**
- * Read from the {@link ItemReader} and process (if not null) with the
- * {@link ItemWriter}. The call to {@link ItemWriter} is wrapped in a
- * stateful retry, if a {@link RetryPolicy} is provided. The
- * {@link ItemRecoverer} is used (if provided) in the case of an exception
- * to apply alternate processing to the item. If the stateful retry is in
- * place then the recovery will happen in the next transaction
- * automatically, otherwise it might be necessary for clients to make the
- * recover method transactional with appropriate propagation behaviour
- * (probably REQUIRES_NEW because the call will happen in the context of a
- * transaction that is about to rollback).
- *
- * @see org.springframework.batch.core.tasklet.Tasklet#execute()
- */
- private ExitStatus execute() throws Exception {
-
- if (retryCallback == null) {
- Object item = itemReader.read();
- if (item == null) {
- return ExitStatus.FINISHED;
- }
- itemWriter.write(item);
- return ExitStatus.CONTINUABLE;
- }
-
- return new ExitStatus(retryOperations.execute(retryCallback) != null);
-
- }
-
- /**
- * Mark the current item as skipped if possible. If there is a retry policy
- * in action there is no need to take any action now because it will be
- * covered by the retry in the next transaction. Otherwise if the reader and /
- * or writer are {@link Skippable} then delegate to them in that order.
- *
- * @see org.springframework.batch.io.Skippable#skip()
- */
- private void skip() {
- if (retryCallback != null) {
- // No need to skip because the recoverer will take any action
- // necessary.
- return;
- }
- if (this.itemReader instanceof Skippable) {
- ((Skippable) this.itemReader).skip();
- }
- if (this.itemWriter instanceof Skippable) {
- ((Skippable) this.itemWriter).skip();
- }
- }
-
- /**
* Convenience method to update the status in all relevant places.
*
* @param stepInstance the current step
Index: src/main/java/org/springframework/batch/execution/step/ItemProcessor.java
===================================================================
--- src/main/java/org/springframework/batch/execution/step/ItemProcessor.java (revision 0)
+++ src/main/java/org/springframework/batch/execution/step/ItemProcessor.java (revision 0)
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2006-2007 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.batch.execution.step;
+
+import org.springframework.batch.core.domain.StepContribution;
+import org.springframework.batch.item.ItemReader;
+import org.springframework.batch.item.ItemWriter;
+import org.springframework.batch.repeat.ExitStatus;
+
+/**
+ * Strategy for processing a single item in an item-oriented step. Extends
+ * {@link ItemReader} and {@link ItemWriter} because part of the contract of the
+ * processor is that it should delegate calls to those interfaces.
+ *
+ * @author Dave Syer
+ *
+ */
+public interface ItemProcessor extends ItemReader, ItemWriter {
+
+ /**
+ * Given the current context in the form of a step contribution, do whatever
+ * is necessary to process this unit inside a chunk. Implementations obtain
+ * the item and return {@link ExitStatus#FINISHED} if it is null. If it is
+ * not null process the item and return {@link ExitStatus#CONTINUABLE}. On
+ * failure throws an exception.
+ *
+ * @param contribution the current step context
+ * @return an {@link ExitStatus} indicating whether processing is
+ * continuable.
+ */
+ ExitStatus process(StepContribution contribution) throws Exception;
+
+}
Index: src/main/java/org/springframework/batch/execution/step/support/AbstractStepFactoryBean.java
===================================================================
--- src/main/java/org/springframework/batch/execution/step/support/AbstractStepFactoryBean.java (revision 10668)
+++ src/main/java/org/springframework/batch/execution/step/support/AbstractStepFactoryBean.java (working copy)
@@ -1,173 +1,172 @@
-/*
- * Copyright 2006-2007 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.springframework.batch.execution.step.support;
-
-import org.springframework.batch.core.domain.Step;
-import org.springframework.batch.core.repository.JobRepository;
-import org.springframework.batch.execution.step.ItemOrientedStep;
-import org.springframework.batch.item.ItemReader;
-import org.springframework.batch.item.ItemWriter;
-import org.springframework.beans.factory.BeanNameAware;
-import org.springframework.beans.factory.config.AbstractFactoryBean;
-import org.springframework.transaction.PlatformTransactionManager;
-import org.springframework.util.Assert;
-
-/**
- * Base class for factory beans for {@link ItemOrientedStep}. Ensures that all
- * the mandatory properties are set, and provides basic support for the
- * {@link Step} interface responsibilities like start limit.
- *
- * @author Dave Syer
- *
- */
-public abstract class AbstractStepFactoryBean extends AbstractFactoryBean implements BeanNameAware {
-
- private String name;
-
- private int startLimit = Integer.MAX_VALUE;
-
- private boolean allowStartIfComplete;
-
- private ItemReader itemReader;
-
- private ItemWriter itemWriter;
-
- private PlatformTransactionManager transactionManager;
-
- private JobRepository jobRepository;
-
- /**
- *
- */
- public AbstractStepFactoryBean() {
- super();
- }
-
- /**
- * Set the bean name property, which will become the name of the
- * {@link Step} when it is created.
- *
- * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String)
- */
- public void setBeanName(String name) {
- this.name = name;
- }
-
- /**
- * Public getter for the String.
- * @return the name
- */
- public String getName() {
- return name;
- }
-
- /**
- * Public setter for the startLimit.
- *
- * @param startLimit the startLimit to set
- */
- public void setStartLimit(int startLimit) {
- this.startLimit = startLimit;
- }
-
- /**
- * Public setter for the shouldAllowStartIfComplete.
- *
- * @param allowStartIfComplete the shouldAllowStartIfComplete to set
- */
- public void setAllowStartIfComplete(boolean allowStartIfComplete) {
- this.allowStartIfComplete = allowStartIfComplete;
- }
-
- /**
- * @param itemReader the itemReader to set
- */
- public void setItemReader(ItemReader itemReader) {
- this.itemReader = itemReader;
- }
-
- /**
- * @param itemWriter the itemWriter to set
- */
- public void setItemWriter(ItemWriter itemWriter) {
- this.itemWriter = itemWriter;
- }
-
- /**
- * Protected getter for the {@link ItemReader} for subclasses to use.
- * @return the itemReader
- */
- protected ItemReader getItemReader() {
- return itemReader;
- }
-
- /**
- * Protected getter for the {@link ItemWriter} for subclasses to use
- * @return the itemWriter
- */
- protected ItemWriter getItemWriter() {
- return itemWriter;
- }
-
- /**
- * Public setter for {@link JobRepository}.
- *
- * @param jobRepository is a mandatory dependence (no default).
- */
- public void setJobRepository(JobRepository jobRepository) {
- this.jobRepository = jobRepository;
- }
-
- /**
- * Public setter for the {@link PlatformTransactionManager}.
- *
- * @param transactionManager the transaction manager to set
- */
- public void setTransactionManager(PlatformTransactionManager transactionManager) {
- this.transactionManager = transactionManager;
- }
-
- protected Object createInstance() throws Exception {
- ItemOrientedStep step = new ItemOrientedStep(getName());
- applyConfiguration(step);
- return step;
- }
-
- /**
- * @param step
- *
- */
- protected void applyConfiguration(ItemOrientedStep step) {
-
- Assert.notNull(getItemReader(), "ItemReader must be provided");
- Assert.notNull(getItemWriter(), "ItemWriter must be provided");
- Assert.notNull(jobRepository, "JobRepository must be provided");
- Assert.notNull(transactionManager, "TransactionManager must be provided");
-
- step.setItemReader(getItemReader());
- step.setItemWriter(getItemWriter());
- step.setTransactionManager(transactionManager);
- step.setJobRepository(jobRepository);
- step.setStartLimit(startLimit);
- step.setAllowStartIfComplete(allowStartIfComplete);
-
- }
-
- public Class getObjectType() {
- return Step.class;
- }
-
+/*
+ * Copyright 2006-2007 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.batch.execution.step.support;
+
+import org.springframework.batch.core.domain.Step;
+import org.springframework.batch.core.repository.JobRepository;
+import org.springframework.batch.execution.step.ItemOrientedStep;
+import org.springframework.batch.item.ItemReader;
+import org.springframework.batch.item.ItemWriter;
+import org.springframework.beans.factory.BeanNameAware;
+import org.springframework.beans.factory.config.AbstractFactoryBean;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.util.Assert;
+
+/**
+ * Base class for factory beans for {@link ItemOrientedStep}. Ensures that all
+ * the mandatory properties are set, and provides basic support for the
+ * {@link Step} interface responsibilities like start limit.
+ *
+ * @author Dave Syer
+ *
+ */
+public abstract class AbstractStepFactoryBean extends AbstractFactoryBean implements BeanNameAware {
+
+ private String name;
+
+ private int startLimit = Integer.MAX_VALUE;
+
+ private boolean allowStartIfComplete;
+
+ private ItemReader itemReader;
+
+ private ItemWriter itemWriter;
+
+ private PlatformTransactionManager transactionManager;
+
+ private JobRepository jobRepository;
+
+ /**
+ *
+ */
+ public AbstractStepFactoryBean() {
+ super();
+ }
+
+ /**
+ * Set the bean name property, which will become the name of the
+ * {@link Step} when it is created.
+ *
+ * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String)
+ */
+ public void setBeanName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Public getter for the String.
+ * @return the name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Public setter for the startLimit.
+ *
+ * @param startLimit the startLimit to set
+ */
+ public void setStartLimit(int startLimit) {
+ this.startLimit = startLimit;
+ }
+
+ /**
+ * Public setter for the shouldAllowStartIfComplete.
+ *
+ * @param allowStartIfComplete the shouldAllowStartIfComplete to set
+ */
+ public void setAllowStartIfComplete(boolean allowStartIfComplete) {
+ this.allowStartIfComplete = allowStartIfComplete;
+ }
+
+ /**
+ * @param itemReader the itemReader to set
+ */
+ public void setItemReader(ItemReader itemReader) {
+ this.itemReader = itemReader;
+ }
+
+ /**
+ * @param itemWriter the itemWriter to set
+ */
+ public void setItemWriter(ItemWriter itemWriter) {
+ this.itemWriter = itemWriter;
+ }
+
+ /**
+ * Protected getter for the {@link ItemReader} for subclasses to use.
+ * @return the itemReader
+ */
+ protected ItemReader getItemReader() {
+ return itemReader;
+ }
+
+ /**
+ * Protected getter for the {@link ItemWriter} for subclasses to use
+ * @return the itemWriter
+ */
+ protected ItemWriter getItemWriter() {
+ return itemWriter;
+ }
+
+ /**
+ * Public setter for {@link JobRepository}.
+ *
+ * @param jobRepository is a mandatory dependence (no default).
+ */
+ public void setJobRepository(JobRepository jobRepository) {
+ this.jobRepository = jobRepository;
+ }
+
+ /**
+ * Public setter for the {@link PlatformTransactionManager}.
+ *
+ * @param transactionManager the transaction manager to set
+ */
+ public void setTransactionManager(PlatformTransactionManager transactionManager) {
+ this.transactionManager = transactionManager;
+ }
+
+ protected Object createInstance() throws Exception {
+ ItemOrientedStep step = new ItemOrientedStep(getName());
+ applyConfiguration(step);
+ return step;
+ }
+
+ /**
+ * @param step
+ *
+ */
+ protected void applyConfiguration(ItemOrientedStep step) {
+
+ Assert.notNull(getItemReader(), "ItemReader must be provided");
+ Assert.notNull(getItemWriter(), "ItemWriter must be provided");
+ Assert.notNull(jobRepository, "JobRepository must be provided");
+ Assert.notNull(transactionManager, "TransactionManager must be provided");
+
+ step.setItemProcessor(new SimpleItemProcessor(itemReader, itemWriter));
+ step.setTransactionManager(transactionManager);
+ step.setJobRepository(jobRepository);
+ step.setStartLimit(startLimit);
+ step.setAllowStartIfComplete(allowStartIfComplete);
+
+ }
+
+ public Class getObjectType() {
+ return Step.class;
+ }
+
}
\ No newline at end of file
Index: src/main/java/org/springframework/batch/execution/step/support/DefaultStepFactoryBean.java
===================================================================
--- src/main/java/org/springframework/batch/execution/step/support/DefaultStepFactoryBean.java (revision 10668)
+++ src/main/java/org/springframework/batch/execution/step/support/DefaultStepFactoryBean.java (working copy)
@@ -1,151 +1,156 @@
-/*
- * Copyright 2006-2007 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.springframework.batch.execution.step.support;
-
-import org.springframework.batch.core.domain.BatchListener;
-import org.springframework.batch.core.domain.Step;
-import org.springframework.batch.core.domain.StepListener;
-import org.springframework.batch.execution.step.ItemOrientedStep;
-import org.springframework.batch.item.ItemReader;
-import org.springframework.batch.item.ItemStream;
-import org.springframework.batch.item.ItemWriter;
-import org.springframework.batch.repeat.exception.handler.SimpleLimitExceptionHandler;
-import org.springframework.batch.repeat.support.RepeatTemplate;
-import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate;
-import org.springframework.core.task.TaskExecutor;
-
-/**
- * Adds listeners to {@link SimpleStepFactoryBean}.
- *
- * @author Dave Syer
- *
- */
-public class DefaultStepFactoryBean extends SimpleStepFactoryBean {
-
- private boolean alwaysSkip = false;
-
- private BatchListener[] listeners = new BatchListener[0];
-
- private ListenerMulticaster listener = new ListenerMulticaster();
-
- private TaskExecutor taskExecutor;
-
- /**
- * Public setter for a flag that determines skip policy. If this flag is
- * true then an exception in chunk processing will cause the item to be
- * skipped and no exceptions propagated. If it is false then all exceptions
- * will be propagated from the chunk and cause the step to abort.
- *
- * @param alwaysSkip the value to set. Default is false.
- */
- public void setAlwaysSkip(boolean alwaysSkip) {
- this.alwaysSkip = alwaysSkip;
- }
-
- /**
- * The listeners to inject into the {@link Step}. Any instance of
- * {@link BatchListener} can be used, and will then receive callbacks at the
- * appropriate stage in the step.
- *
- * @param listeners an array of listeners
- */
- public void setListeners(BatchListener[] listeners) {
- this.listeners = listeners;
- }
-
- /**
- * Public setter for the {@link TaskExecutor}. If this is set, then it will
- * be used to execute the chunk processing inside the {@link Step}.
- *
- * @param taskExecutor the taskExecutor to set
- */
- public void setTaskExecutor(TaskExecutor taskExecutor) {
- this.taskExecutor = taskExecutor;
- }
-
- /**
- * @param step
- *
- */
- protected void applyConfiguration(ItemOrientedStep step) {
-
- super.applyConfiguration(step);
- for (int i = 0; i < listeners.length; i++) {
- BatchListener listener = listeners[i];
- if (listener instanceof StepListener) {
- step.registerStepListener((StepListener) listener);
- }
- else {
- this.listener.register(listener);
- }
- }
-
- ItemReader itemReader = getItemReader();
- ItemWriter itemWriter = getItemWriter();
-
- // Since we are going to wrap these things with listener callbacks we
- // need to register them here because the step will not know we did
- // that.
- if (itemReader instanceof ItemStream) {
- step.registerStream((ItemStream) itemReader);
- }
- if (itemReader instanceof StepListener) {
- step.registerStepListener((StepListener) itemReader);
- }
- if (itemWriter instanceof ItemStream) {
- step.registerStream((ItemStream) itemWriter);
- }
- if (itemWriter instanceof StepListener) {
- step.registerStepListener((StepListener) itemWriter);
- }
-
- BatchListenerFactoryHelper helper = new BatchListenerFactoryHelper();
-
- StepListener[] stepListeners = helper.getStepListeners(listeners);
- itemReader = helper.getItemReader(itemReader, listeners);
- itemWriter = helper.getItemWriter(itemWriter, listeners);
- RepeatTemplate stepOperations = new RepeatTemplate();
- stepOperations = (RepeatTemplate) helper.getStepOperations(stepOperations, listeners);
-
- // In case they are used by subclasses:
- setItemReader(itemReader);
- setItemWriter(itemWriter);
-
- step.setStepListeners(stepListeners);
- step.setItemReader(itemReader);
- step.setItemWriter(itemWriter);
-
- if (taskExecutor != null) {
- TaskExecutorRepeatTemplate repeatTemplate = new TaskExecutorRepeatTemplate();
- repeatTemplate.setTaskExecutor(taskExecutor);
- stepOperations = repeatTemplate;
- }
-
- if (alwaysSkip) {
- // If we always skip (not the default) then we are prepared to
- // absorb all exceptions at the step level because the failed items
- // will never re-appear after a rollback.
- step.setItemSkipPolicy(new AlwaysSkipItemSkipPolicy());
- stepOperations.setExceptionHandler(new SimpleLimitExceptionHandler(Integer.MAX_VALUE));
- step.setStepOperations(stepOperations);
- }
- else {
- // This is the default in ItemOrientedStep anyway...
- step.setItemSkipPolicy(new NeverSkipItemSkipPolicy());
- }
-
- }
-}
+/*
+ * Copyright 2006-2007 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.batch.execution.step.support;
+
+import org.springframework.batch.core.domain.BatchListener;
+import org.springframework.batch.core.domain.Step;
+import org.springframework.batch.core.domain.StepListener;
+import org.springframework.batch.execution.step.ItemOrientedStep;
+import org.springframework.batch.execution.step.ItemProcessor;
+import org.springframework.batch.item.ItemReader;
+import org.springframework.batch.item.ItemWriter;
+import org.springframework.batch.repeat.exception.handler.SimpleLimitExceptionHandler;
+import org.springframework.batch.repeat.support.RepeatTemplate;
+import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate;
+import org.springframework.core.task.TaskExecutor;
+
+/**
+ * Adds listeners to {@link SimpleStepFactoryBean}.
+ *
+ * @author Dave Syer
+ *
+ */
+public class DefaultStepFactoryBean extends SimpleStepFactoryBean {
+
+ private boolean alwaysSkip = false;
+
+ private BatchListener[] listeners = new BatchListener[0];
+
+ private ListenerMulticaster listener = new ListenerMulticaster();
+
+ private TaskExecutor taskExecutor;
+
+ private ItemProcessor itemProcessor;
+
+ /**
+ * Protected getter for the {@link ItemProcessor}.
+ * @return the itemProcessor
+ */
+ protected ItemProcessor getItemProcessor() {
+ return itemProcessor;
+ }
+
+ /**
+ * Protected setter for the {@link ItemProcessor}.
+ * @param itemProcessor the {@link ItemProcessor} to set
+ */
+ protected void setItemProcessor(ItemProcessor itemProcessor) {
+ this.itemProcessor = itemProcessor;
+ }
+
+ /**
+ * Public setter for a flag that determines skip policy. If this flag is
+ * true then an exception in chunk processing will cause the item to be
+ * skipped and no exceptions propagated. If it is false then all exceptions
+ * will be propagated from the chunk and cause the step to abort.
+ *
+ * @param alwaysSkip the value to set. Default is false.
+ */
+ public void setAlwaysSkip(boolean alwaysSkip) {
+ this.alwaysSkip = alwaysSkip;
+ }
+
+ /**
+ * The listeners to inject into the {@link Step}. Any instance of
+ * {@link BatchListener} can be used, and will then receive callbacks at the
+ * appropriate stage in the step.
+ *
+ * @param listeners an array of listeners
+ */
+ public void setListeners(BatchListener[] listeners) {
+ this.listeners = listeners;
+ }
+
+ /**
+ * Public setter for the {@link TaskExecutor}. If this is set, then it will
+ * be used to execute the chunk processing inside the {@link Step}.
+ *
+ * @param taskExecutor the taskExecutor to set
+ */
+ public void setTaskExecutor(TaskExecutor taskExecutor) {
+ this.taskExecutor = taskExecutor;
+ }
+
+ /**
+ * @param step
+ *
+ */
+ protected void applyConfiguration(ItemOrientedStep step) {
+
+ super.applyConfiguration(step);
+ for (int i = 0; i < listeners.length; i++) {
+ BatchListener listener = listeners[i];
+ if (listener instanceof StepListener) {
+ step.registerStepListener((StepListener) listener);
+ }
+ else {
+ this.listener.register(listener);
+ }
+ }
+
+ ItemReader itemReader = getItemReader();
+ ItemWriter itemWriter = getItemWriter();
+
+ BatchListenerFactoryHelper helper = new BatchListenerFactoryHelper();
+
+ StepListener[] stepListeners = helper.getStepListeners(listeners);
+ itemReader = helper.getItemReader(itemReader, listeners);
+ itemWriter = helper.getItemWriter(itemWriter, listeners);
+ RepeatTemplate stepOperations = new RepeatTemplate();
+ stepOperations = (RepeatTemplate) helper.getStepOperations(stepOperations, listeners);
+
+ // In case they are used by subclasses:
+ setItemReader(itemReader);
+ setItemWriter(itemWriter);
+
+ step.setStepListeners(stepListeners);
+
+ if (taskExecutor != null) {
+ TaskExecutorRepeatTemplate repeatTemplate = new TaskExecutorRepeatTemplate();
+ repeatTemplate.setTaskExecutor(taskExecutor);
+ stepOperations = repeatTemplate;
+ }
+
+ KitchenSinkItemProcessor itemProcessor = new KitchenSinkItemProcessor(itemReader, itemWriter);
+
+ if (alwaysSkip) {
+ // If we always skip (not the default) then we are prepared to
+ // absorb all exceptions at the step level because the failed items
+ // will never re-appear after a rollback.
+ itemProcessor.setItemSkipPolicy(new AlwaysSkipItemSkipPolicy());
+ stepOperations.setExceptionHandler(new SimpleLimitExceptionHandler(Integer.MAX_VALUE));
+ step.setStepOperations(stepOperations);
+ }
+ else {
+ // This is the default in ItemOrientedStep anyway...
+ itemProcessor.setItemSkipPolicy(new NeverSkipItemSkipPolicy());
+ }
+
+ setItemProcessor(itemProcessor);
+ step.setItemProcessor(itemProcessor);
+
+ }
+}
Index: src/main/java/org/springframework/batch/execution/step/support/KitchenSinkItemProcessor.java
===================================================================
--- src/main/java/org/springframework/batch/execution/step/support/KitchenSinkItemProcessor.java (revision 0)
+++ src/main/java/org/springframework/batch/execution/step/support/KitchenSinkItemProcessor.java (revision 0)
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2006-2007 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.batch.execution.step.support;
+
+import org.springframework.batch.core.domain.ItemSkipPolicy;
+import org.springframework.batch.core.domain.StepContribution;
+import org.springframework.batch.io.Skippable;
+import org.springframework.batch.item.ItemReader;
+import org.springframework.batch.item.ItemRecoverer;
+import org.springframework.batch.item.ItemWriter;
+import org.springframework.batch.repeat.ExitStatus;
+import org.springframework.batch.retry.RetryCallback;
+import org.springframework.batch.retry.RetryOperations;
+import org.springframework.batch.retry.callback.ItemReaderRetryCallback;
+import org.springframework.batch.retry.support.RetryTemplate;
+
+/**
+ * @author Dave Syer
+ *
+ */
+public class KitchenSinkItemProcessor extends SimpleItemProcessor {
+
+ private RetryOperations retryOperations = new RetryTemplate();
+
+ private ItemReaderRetryCallback retryCallback;
+
+ private ItemSkipPolicy itemSkipPolicy = new NeverSkipItemSkipPolicy();
+
+ /**
+ * @param itemReader
+ * @param itemWriter
+ */
+ public KitchenSinkItemProcessor(ItemReader itemReader, ItemWriter itemWriter) {
+ super(itemReader, itemWriter);
+ }
+
+ /**
+ * @param itemSkipPolicy
+ */
+ public void setItemSkipPolicy(ItemSkipPolicy itemSkipPolicy) {
+ this.itemSkipPolicy = itemSkipPolicy;
+ }
+
+ /**
+ * Public setter for the {@link RetryOperations}.
+ * @param retryOperations the {@link RetryOperations} to set
+ */
+ public void setRetryOperations(RetryOperations retryOperations) {
+ this.retryOperations = retryOperations;
+ }
+
+ /**
+ * Public setter for the ItemReaderRetryCallback. TODO: get rid of this.
+ * @param retryCallback the retryCallback to set
+ */
+ public void setRetryCallback(ItemReaderRetryCallback retryCallback) {
+ this.retryCallback = retryCallback;
+ }
+
+ /**
+ * Execute the business logic, delegating to the reader and writer.
+ * Subclasses could extend the behaviour as long as they always return the
+ * value of this method call in their superclass.
+ *
+ * Read from the {@link ItemReader} and process (if not null) with the
+ * {@link ItemWriter}. If a {@link RetryCallback} is provided, then the
+ * call to {@link ItemWriter} is wrapped in a stateful retry. In that case
+ * the {@link ItemRecoverer} is used (if provided) in the case of an
+ * exception to apply alternate processing to the item. If the stateful
+ * retry is in place then the recovery will happen in the next transaction
+ * automatically, otherwise it might be necessary for clients to make the
+ * recover method transactional with appropriate propagation behaviour
+ * (probably REQUIRES_NEW because the call will happen in the context of a
+ * transaction that is about to rollback).
+ *
+ * If there is an exception and the reader or writer implements
+ * {@link Skippable} then the skip method is called.
+ *
+ * @param contribution the current step
+ * @return {@link ExitStatus#CONTINUABLE} if there is more processing to do
+ * @throws Exception if there is an error
+ */
+ public ExitStatus process(StepContribution contribution) throws Exception {
+ ExitStatus exitStatus = ExitStatus.CONTINUABLE;
+
+ if (retryCallback != null) {
+ return new ExitStatus(retryOperations.execute(retryCallback) != null);
+ }
+
+ try {
+
+ exitStatus = super.process(contribution);
+
+ }
+ catch (Exception e) {
+
+ if (retryCallback == null && itemSkipPolicy.shouldSkip(e, contribution.getSkipCount())) {
+ contribution.incrementSkipCount();
+ skip();
+ }
+ else {
+ // Rethrow so that outer transaction is rolled back properly
+ throw e;
+ }
+
+ }
+
+ return exitStatus;
+ }
+
+ /**
+ * Mark the current item as skipped if possible. If there is a retry policy
+ * in action there is no need to take any action now because it will be
+ * covered by the retry in the next transaction. Otherwise if the reader and /
+ * or writer are {@link Skippable} then delegate to them in that order.
+ *
+ * @see org.springframework.batch.io.Skippable#skip()
+ */
+ private void skip() {
+ if (retryCallback != null) {
+ // No need to skip because the recoverer will take any action
+ // necessary.
+ return;
+ }
+ if (getItemReader() instanceof Skippable) {
+ ((Skippable) getItemReader()).skip();
+ }
+ if (getItemWriter() instanceof Skippable) {
+ ((Skippable) getItemWriter()).skip();
+ }
+ }
+
+}
Index: src/main/java/org/springframework/batch/execution/step/support/RepeatOperationsStepFactoryBean.java
===================================================================
--- src/main/java/org/springframework/batch/execution/step/support/RepeatOperationsStepFactoryBean.java (revision 10668)
+++ src/main/java/org/springframework/batch/execution/step/support/RepeatOperationsStepFactoryBean.java (working copy)
@@ -130,8 +130,7 @@
setItemWriter(itemWriter);
step.setStepListeners(stepListeners);
- step.setItemReader(itemReader);
- step.setItemWriter(itemWriter);
+ step.setItemProcessor(new SimpleItemProcessor(itemReader, itemWriter));
step.setChunkOperations(chunkOperations);
step.setStepOperations(stepOperations);
Index: src/main/java/org/springframework/batch/execution/step/support/SimpleItemProcessor.java
===================================================================
--- src/main/java/org/springframework/batch/execution/step/support/SimpleItemProcessor.java (revision 0)
+++ src/main/java/org/springframework/batch/execution/step/support/SimpleItemProcessor.java (revision 0)
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2006-2007 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.batch.execution.step.support;
+
+import org.springframework.batch.core.domain.StepContribution;
+import org.springframework.batch.execution.step.ItemProcessor;
+import org.springframework.batch.item.ItemReader;
+import org.springframework.batch.item.ItemWriter;
+import org.springframework.batch.item.exception.ClearFailedException;
+import org.springframework.batch.item.exception.FlushFailedException;
+import org.springframework.batch.item.exception.MarkFailedException;
+import org.springframework.batch.item.exception.ResetFailedException;
+import org.springframework.batch.repeat.ExitStatus;
+
+/**
+ * Simplest possible implementation of {@link ItemProcessor} with no skipping or
+ * recovering. Just delegates all calls to the provided {@link ItemReader} and
+ * {@link ItemWriter}.
+ *
+ * @author Dave Syer
+ *
+ */
+public class SimpleItemProcessor implements ItemProcessor {
+
+ private ItemReader itemReader;
+
+ private ItemWriter itemWriter;
+
+ /**
+ * @param itemReader
+ * @param itemWriter
+ */
+ public SimpleItemProcessor(ItemReader itemReader, ItemWriter itemWriter) {
+ super();
+ this.itemReader = itemReader;
+ this.itemWriter = itemWriter;
+ }
+
+ /**
+ * Public getter for the ItemReader.
+ * @return the itemReader
+ */
+ public ItemReader getItemReader() {
+ return itemReader;
+ }
+
+ /**
+ * Public getter for the ItemWriter.
+ * @return the itemWriter
+ */
+ public ItemWriter getItemWriter() {
+ return itemWriter;
+ }
+
+ /**
+ * Read from the {@link ItemReader} and process (if not null) with the
+ * {@link ItemWriter}.
+ *
+ * @see org.springframework.batch.execution.step.ItemProcessor#process(org.springframework.batch.core.domain.StepContribution)
+ */
+ public ExitStatus process(StepContribution contribution) throws Exception {
+ Object item = read();
+ if (item == null) {
+ return ExitStatus.FINISHED;
+ }
+ write(item);
+ return ExitStatus.CONTINUABLE;
+ }
+
+ /**
+ * @throws MarkFailedException
+ * @see org.springframework.batch.item.ItemReader#mark()
+ */
+ public void mark() throws MarkFailedException {
+ itemReader.mark();
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ * @see org.springframework.batch.item.ItemReader#read()
+ */
+ public Object read() throws Exception {
+ return itemReader.read();
+ }
+
+ /**
+ * @throws ResetFailedException
+ * @see org.springframework.batch.item.ItemReader#reset()
+ */
+ public void reset() throws ResetFailedException {
+ itemReader.reset();
+ }
+
+ /**
+ * @throws ClearFailedException
+ * @see org.springframework.batch.item.ItemWriter#clear()
+ */
+ public void clear() throws ClearFailedException {
+ itemWriter.clear();
+ }
+
+ /**
+ * @throws FlushFailedException
+ * @see org.springframework.batch.item.ItemWriter#flush()
+ */
+ public void flush() throws FlushFailedException {
+ itemWriter.flush();
+ }
+
+ /**
+ * @param item
+ * @throws Exception
+ * @see org.springframework.batch.item.ItemWriter#write(java.lang.Object)
+ */
+ public void write(Object item) throws Exception {
+ itemWriter.write(item);
+ }
+
+}
Index: src/main/java/org/springframework/batch/execution/step/support/SimpleStepFactoryBean.java
===================================================================
--- src/main/java/org/springframework/batch/execution/step/support/SimpleStepFactoryBean.java (revision 10653)
+++ src/main/java/org/springframework/batch/execution/step/support/SimpleStepFactoryBean.java (working copy)
@@ -16,8 +16,11 @@
package org.springframework.batch.execution.step.support;
import org.springframework.batch.core.domain.Step;
+import org.springframework.batch.core.domain.StepListener;
import org.springframework.batch.execution.step.ItemOrientedStep;
+import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
+import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
import org.springframework.batch.repeat.support.RepeatTemplate;
@@ -59,6 +62,22 @@
super.applyConfiguration(step);
+ ItemReader itemReader = getItemReader();
+ ItemWriter itemWriter = getItemWriter();
+
+ if (itemReader instanceof ItemStream) {
+ step.registerStream((ItemStream) itemReader);
+ }
+ if (itemReader instanceof StepListener) {
+ step.registerStepListener((StepListener) itemReader);
+ }
+ if (itemWriter instanceof ItemStream) {
+ step.registerStream((ItemStream) itemWriter);
+ }
+ if (itemWriter instanceof StepListener) {
+ step.registerStepListener((StepListener) itemWriter);
+ }
+
step.setStreams(streams);
if (commitInterval > 0) {
Index: src/main/java/org/springframework/batch/execution/step/support/StatefulRetryStepFactoryBean.java
===================================================================
--- src/main/java/org/springframework/batch/execution/step/support/StatefulRetryStepFactoryBean.java (revision 10668)
+++ src/main/java/org/springframework/batch/execution/step/support/StatefulRetryStepFactoryBean.java (working copy)
@@ -85,10 +85,11 @@
ItemReaderRetryCallback retryCallback = new ItemReaderRetryCallback(getItemReader(), getKeyGenerator(),
getItemWriter());
ItemReaderRetryPolicy itemProviderRetryPolicy = new ItemReaderRetryPolicy(retryPolicy);
- RetryTemplate template = new RetryTemplate();
- template.setRetryPolicy(itemProviderRetryPolicy);
- step.setRetryOperations(template);
- step.setRetryCallback(retryCallback);
+ RetryTemplate retryTemplate = new RetryTemplate();
+ retryTemplate.setRetryPolicy(itemProviderRetryPolicy);
+ KitchenSinkItemProcessor itemProcessor = (KitchenSinkItemProcessor) getItemProcessor();
+ itemProcessor.setRetryOperations(retryTemplate);
+ itemProcessor.setRetryCallback(retryCallback);
}
}
Index: src/test/java/org/springframework/batch/execution/step/ItemOrientedStepTests.java
===================================================================
--- src/test/java/org/springframework/batch/execution/step/ItemOrientedStepTests.java (revision 10654)
+++ src/test/java/org/springframework/batch/execution/step/ItemOrientedStepTests.java (working copy)
@@ -37,6 +37,7 @@
import org.springframework.batch.execution.repository.dao.MapJobInstanceDao;
import org.springframework.batch.execution.repository.dao.MapStepExecutionDao;
import org.springframework.batch.execution.step.support.JobRepositorySupport;
+import org.springframework.batch.execution.step.support.SimpleItemProcessor;
import org.springframework.batch.execution.step.support.StepInterruptionPolicy;
import org.springframework.batch.io.exception.InfrastructureException;
import org.springframework.batch.item.ExecutionContext;
@@ -85,8 +86,7 @@
private AbstractStep getStep(String[] strings) throws Exception {
ItemOrientedStep step = new ItemOrientedStep("stepName");
- step.setItemWriter(processor);
- step.setItemReader(getReader(strings));
+ step.setItemProcessor(new SimpleItemProcessor(getReader(strings), processor));
step.setJobRepository(new JobRepositorySupport());
step.setTransactionManager(transactionManager);
return step;
@@ -168,7 +168,7 @@
};
- itemOrientedStep.setItemReader(itemReader);
+ itemOrientedStep.setItemProcessor(new SimpleItemProcessor(itemReader, processor));
JobExecution jobExecutionContext = new JobExecution(jobInstance);
StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecutionContext);
@@ -198,7 +198,7 @@
};
- itemOrientedStep.setItemReader(itemReader);
+ itemOrientedStep.setItemProcessor(new SimpleItemProcessor(itemReader, processor));
JobExecution jobExecutionContext = new JobExecution(jobInstance);
StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecutionContext);
@@ -217,7 +217,8 @@
*/
public void testNonRestartedJob() throws Exception {
MockRestartableItemReader tasklet = new MockRestartableItemReader();
- itemOrientedStep.setItemReader(tasklet);
+ itemOrientedStep.setItemProcessor(new SimpleItemProcessor(tasklet, processor));
+ itemOrientedStep.registerStream(tasklet);
JobExecution jobExecutionContext = new JobExecution(jobInstance);
StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecutionContext);
@@ -287,7 +288,7 @@
*/
public void testNoSaveExecutionAttributesRestartableJob() {
MockRestartableItemReader tasklet = new MockRestartableItemReader();
- itemOrientedStep.setItemReader(tasklet);
+ itemOrientedStep.setItemProcessor(new SimpleItemProcessor(tasklet, processor));
JobExecution jobExecutionContext = new JobExecution(jobInstance);
StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecutionContext);
@@ -307,11 +308,11 @@
* Restartable.
*/
public void testRestartJobOnNonRestartableTasklet() throws Exception {
- itemOrientedStep.setItemReader(new AbstractItemReader() {
+ itemOrientedStep.setItemProcessor(new SimpleItemProcessor(new AbstractItemReader() {
public Object read() throws Exception {
return "foo";
}
- });
+ }, processor));
JobExecution jobExecution = new JobExecution(jobInstance);
StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecution);
@@ -319,7 +320,7 @@
}
public void testStreamManager() throws Exception {
- itemOrientedStep.setItemReader(new MockRestartableItemReader() {
+ MockRestartableItemReader reader = new MockRestartableItemReader() {
public Object read() throws Exception {
return "foo";
}
@@ -327,7 +328,9 @@
// TODO Auto-generated method stub
executionContext.putString("foo", "bar");
}
- });
+ };
+ itemOrientedStep.setItemProcessor(new SimpleItemProcessor(reader, processor));
+ itemOrientedStep.registerStream(reader);
JobExecution jobExecution = new JobExecution(jobInstance);
StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecution);
@@ -395,11 +398,11 @@
return null;
}
});
- itemOrientedStep.setItemReader(new MockRestartableItemReader() {
+ itemOrientedStep.setItemProcessor(new SimpleItemProcessor(new MockRestartableItemReader() {
public Object read() throws Exception {
throw new RuntimeException("FOO");
}
- });
+ }, processor));
JobExecution jobExecution = new JobExecution(jobInstance);
StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecution);
try {
@@ -421,7 +424,7 @@
executionContext.putString("foo", "bar");
}
};
- itemOrientedStep.setItemReader(reader);
+ itemOrientedStep.setItemProcessor(new SimpleItemProcessor(reader, processor));
itemOrientedStep.setStreams(new ItemStream[] {reader});
JobExecution jobExecution = new JobExecution(jobInstance);
StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecution);
@@ -461,7 +464,7 @@
};
- itemOrientedStep.setItemReader(itemReader);
+ itemOrientedStep.setItemProcessor(new SimpleItemProcessor(itemReader, processor));
JobExecution jobExecutionContext = new JobExecution(jobInstance);
StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecutionContext);
@@ -489,7 +492,7 @@
throw new RuntimeException("Foo");
}
};
- itemOrientedStep.setItemReader(itemReader);
+ itemOrientedStep.setItemProcessor(new SimpleItemProcessor(itemReader, processor));
JobExecution jobExecutionContext = jobInstance.createJobExecution();
StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecutionContext);
@@ -516,7 +519,7 @@
throw new RuntimeException("Foo");
}
};
- itemOrientedStep.setItemReader(itemReader);
+ itemOrientedStep.setItemProcessor(new SimpleItemProcessor(itemReader, processor));
itemOrientedStep.setTransactionManager(new ResourcelessTransactionManager() {
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
// Simulate failure on rollback when stream resets
@@ -605,13 +608,15 @@
public void testStatusForCloseFailedException() throws Exception {
- itemOrientedStep.setItemReader(new MockRestartableItemReader() {
+ MockRestartableItemReader reader = new MockRestartableItemReader() {
public void close(ExecutionContext executionContext) throws StreamException {
super.close(executionContext);
// Simulate failure on rollback when stream resets
throw new RuntimeException("Bar");
}
- });
+ };
+ itemOrientedStep.setItemProcessor(new SimpleItemProcessor(reader, processor));
+ itemOrientedStep.registerStream(reader);
JobExecution jobExecutionContext = jobInstance.createJobExecution();
StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecutionContext);
Index: src/test/java/org/springframework/batch/execution/step/support/DefaultStepFactoryBeanTests.java
===================================================================
--- src/test/java/org/springframework/batch/execution/step/support/DefaultStepFactoryBeanTests.java (revision 10654)
+++ src/test/java/org/springframework/batch/execution/step/support/DefaultStepFactoryBeanTests.java (working copy)
@@ -177,13 +177,14 @@
// TODO: test recovery and stateful retry
public void testExceptionTerminates() throws Exception {
- ItemOrientedStep step = (ItemOrientedStep) getStep(new String[] { "foo", "bar", "spam" }).getObject();
- step.setName("exceptionStep");
- step.setItemWriter(new AbstractItemWriter() {
+ DefaultStepFactoryBean factory = getStep(new String[] { "foo", "bar", "spam" });
+ factory.setBeanName("exceptionStep");
+ factory.setItemWriter(new AbstractItemWriter() {
public void write(Object data) throws Exception {
throw new RuntimeException("Foo");
}
});
+ ItemOrientedStep step = (ItemOrientedStep) factory.getObject();
job.setSteps(Collections.singletonList(step));
JobExecution jobExecution = repository.createJobExecution(job, new JobParameters());
Index: src/test/java/org/springframework/batch/execution/step/support/StepExecutorInterruptionTests.java
===================================================================
--- src/test/java/org/springframework/batch/execution/step/support/StepExecutorInterruptionTests.java (revision 10649)
+++ src/test/java/org/springframework/batch/execution/step/support/StepExecutorInterruptionTests.java (working copy)
@@ -1,133 +1,136 @@
-/*
- * Copyright 2006-2007 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.batch.execution.step.support;
-
-import junit.framework.TestCase;
-
-import org.springframework.batch.core.domain.BatchStatus;
-import org.springframework.batch.core.domain.JobExecution;
-import org.springframework.batch.core.domain.JobInstance;
-import org.springframework.batch.core.domain.JobInterruptedException;
-import org.springframework.batch.core.domain.JobParameters;
-import org.springframework.batch.core.domain.StepExecution;
-import org.springframework.batch.core.repository.JobRepository;
-import org.springframework.batch.execution.job.JobSupport;
-import org.springframework.batch.execution.repository.SimpleJobRepository;
-import org.springframework.batch.execution.repository.dao.JobExecutionDao;
-import org.springframework.batch.execution.repository.dao.JobInstanceDao;
-import org.springframework.batch.execution.repository.dao.MapJobExecutionDao;
-import org.springframework.batch.execution.repository.dao.MapJobInstanceDao;
-import org.springframework.batch.execution.repository.dao.MapStepExecutionDao;
-import org.springframework.batch.execution.repository.dao.StepExecutionDao;
-import org.springframework.batch.execution.step.ItemOrientedStep;
-import org.springframework.batch.item.reader.AbstractItemReader;
-import org.springframework.batch.item.reader.ItemReaderAdapter;
-import org.springframework.batch.item.writer.AbstractItemWriter;
-import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
-import org.springframework.batch.repeat.support.RepeatTemplate;
-import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
-
-public class StepExecutorInterruptionTests extends TestCase {
-
- private JobRepository jobRepository;
-
- private JobInstanceDao jobInstanceDao = new MapJobInstanceDao();
-
- private JobExecutionDao jobExecutionDao = new MapJobExecutionDao();
-
- private StepExecutionDao stepExecutionDao = new MapStepExecutionDao();
-
- private ItemOrientedStep step;
-
- public void setUp() throws Exception {
- MapJobInstanceDao.clear();
- MapJobExecutionDao.clear();
- MapStepExecutionDao.clear();
-
- jobRepository = new SimpleJobRepository(jobInstanceDao, jobExecutionDao, stepExecutionDao);
-
- JobSupport jobConfiguration = new JobSupport();
- step = new ItemOrientedStep("interruptedStep");
- jobConfiguration.addStep(step);
- jobConfiguration.setBeanName("testJob");
- jobRepository.createJobExecution(jobConfiguration, new JobParameters());
- step.setJobRepository(jobRepository);
- step.setTransactionManager(new ResourcelessTransactionManager());
- step.setItemReader(new ItemReaderAdapter());
- step.setItemWriter(new AbstractItemWriter(){
- public void write(Object item) throws Exception {
- }});
- }
-
-
- public void testInterruptChunk() throws Exception {
-
- JobExecution jobExecutionContext = new JobExecution(new JobInstance(new Long(0L), new JobParameters(), new JobSupport("testJob")));
- final StepExecution stepExecution = new StepExecution(step, jobExecutionContext);
- step.setItemReader(new AbstractItemReader() {
- public Object read() throws Exception {
- // do something non-trivial (and not Thread.sleep())
- double foo = 1;
- for (int i = 2; i < 250; i++) {
- foo = foo * i;
- }
-
- if(foo != 1){
- return new Double(foo);
- }
- else{
- return null;
- }
- }
- });
-
- Thread processingThread = new Thread() {
- public void run() {
- try {
- step.execute(stepExecution);
- }
- catch (JobInterruptedException e) {
- // do nothing...
- }
- }
- };
-
- processingThread.start();
-
- Thread.sleep(100);
-
- processingThread.interrupt();
-
- int count = 0;
- while (processingThread.isAlive() && count < 1000) {
- Thread.sleep(20);
- count++;
- }
-
- assertFalse(processingThread.isAlive());
- assertEquals(BatchStatus.STOPPED, stepExecution.getStatus());
- }
-
- public void testInterruptStep() throws Exception {
- RepeatTemplate template = new RepeatTemplate();
- // N.B, If we don't set the completion policy it might run forever
- template.setCompletionPolicy(new SimpleCompletionPolicy(2));
- step.setChunkOperations(template);
- testInterruptChunk();
- }
-
-}
+/*
+ * Copyright 2006-2007 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.batch.execution.step.support;
+
+import junit.framework.TestCase;
+
+import org.springframework.batch.core.domain.BatchStatus;
+import org.springframework.batch.core.domain.JobExecution;
+import org.springframework.batch.core.domain.JobInstance;
+import org.springframework.batch.core.domain.JobInterruptedException;
+import org.springframework.batch.core.domain.JobParameters;
+import org.springframework.batch.core.domain.StepExecution;
+import org.springframework.batch.core.repository.JobRepository;
+import org.springframework.batch.execution.job.JobSupport;
+import org.springframework.batch.execution.repository.SimpleJobRepository;
+import org.springframework.batch.execution.repository.dao.JobExecutionDao;
+import org.springframework.batch.execution.repository.dao.JobInstanceDao;
+import org.springframework.batch.execution.repository.dao.MapJobExecutionDao;
+import org.springframework.batch.execution.repository.dao.MapJobInstanceDao;
+import org.springframework.batch.execution.repository.dao.MapStepExecutionDao;
+import org.springframework.batch.execution.repository.dao.StepExecutionDao;
+import org.springframework.batch.execution.step.ItemOrientedStep;
+import org.springframework.batch.item.ItemWriter;
+import org.springframework.batch.item.reader.AbstractItemReader;
+import org.springframework.batch.item.reader.ItemReaderAdapter;
+import org.springframework.batch.item.writer.AbstractItemWriter;
+import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
+import org.springframework.batch.repeat.support.RepeatTemplate;
+import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
+
+public class StepExecutorInterruptionTests extends TestCase {
+
+ private JobRepository jobRepository;
+
+ private JobInstanceDao jobInstanceDao = new MapJobInstanceDao();
+
+ private JobExecutionDao jobExecutionDao = new MapJobExecutionDao();
+
+ private StepExecutionDao stepExecutionDao = new MapStepExecutionDao();
+
+ private ItemOrientedStep step;
+
+ private ItemWriter writer;
+
+ public void setUp() throws Exception {
+ MapJobInstanceDao.clear();
+ MapJobExecutionDao.clear();
+ MapStepExecutionDao.clear();
+
+ jobRepository = new SimpleJobRepository(jobInstanceDao, jobExecutionDao, stepExecutionDao);
+
+ JobSupport jobConfiguration = new JobSupport();
+ step = new ItemOrientedStep("interruptedStep");
+ jobConfiguration.addStep(step);
+ jobConfiguration.setBeanName("testJob");
+ jobRepository.createJobExecution(jobConfiguration, new JobParameters());
+ step.setJobRepository(jobRepository);
+ step.setTransactionManager(new ResourcelessTransactionManager());
+ writer = new AbstractItemWriter(){
+ public void write(Object item) throws Exception {
+ }};
+ step.setItemProcessor(new SimpleItemProcessor(new ItemReaderAdapter(), writer));
+ }
+
+
+ public void testInterruptChunk() throws Exception {
+
+ JobExecution jobExecutionContext = new JobExecution(new JobInstance(new Long(0L), new JobParameters(), new JobSupport("testJob")));
+ final StepExecution stepExecution = new StepExecution(step, jobExecutionContext);
+ step.setItemProcessor(new SimpleItemProcessor(new AbstractItemReader() {
+ public Object read() throws Exception {
+ // do something non-trivial (and not Thread.sleep())
+ double foo = 1;
+ for (int i = 2; i < 250; i++) {
+ foo = foo * i;
+ }
+
+ if(foo != 1){
+ return new Double(foo);
+ }
+ else{
+ return null;
+ }
+ }
+ }, writer));
+
+ Thread processingThread = new Thread() {
+ public void run() {
+ try {
+ step.execute(stepExecution);
+ }
+ catch (JobInterruptedException e) {
+ // do nothing...
+ }
+ }
+ };
+
+ processingThread.start();
+
+ Thread.sleep(100);
+
+ processingThread.interrupt();
+
+ int count = 0;
+ while (processingThread.isAlive() && count < 1000) {
+ Thread.sleep(20);
+ count++;
+ }
+
+ assertFalse(processingThread.isAlive());
+ assertEquals(BatchStatus.STOPPED, stepExecution.getStatus());
+ }
+
+ public void testInterruptStep() throws Exception {
+ RepeatTemplate template = new RepeatTemplate();
+ // N.B, If we don't set the completion policy it might run forever
+ template.setCompletionPolicy(new SimpleCompletionPolicy(2));
+ step.setChunkOperations(template);
+ testInterruptChunk();
+ }
+
+}