Index: src/main/java/org/springframework/batch/execution/step/ItemOrientedStep.java =================================================================== --- src/main/java/org/springframework/batch/execution/step/ItemOrientedStep.java (revision 10655) +++ src/main/java/org/springframework/batch/execution/step/ItemOrientedStep.java (working copy) @@ -20,7 +20,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.batch.core.domain.BatchStatus; -import org.springframework.batch.core.domain.ItemSkipPolicy; import org.springframework.batch.core.domain.JobInstance; import org.springframework.batch.core.domain.JobInterruptedException; import org.springframework.batch.core.domain.StepContribution; @@ -30,15 +29,12 @@ import org.springframework.batch.core.runtime.ExitStatusExceptionClassifier; import org.springframework.batch.core.tasklet.Tasklet; import org.springframework.batch.execution.listener.CompositeStepListener; -import org.springframework.batch.execution.step.support.NeverSkipItemSkipPolicy; import org.springframework.batch.execution.step.support.SimpleExitStatusExceptionClassifier; import org.springframework.batch.execution.step.support.StepInterruptionPolicy; import org.springframework.batch.execution.step.support.ThreadStepInterruptionPolicy; -import org.springframework.batch.io.Skippable; import org.springframework.batch.io.exception.InfrastructureException; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemReader; -import org.springframework.batch.item.ItemRecoverer; import org.springframework.batch.item.ItemStream; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.exception.CommitFailedException; @@ -48,10 +44,6 @@ import org.springframework.batch.repeat.RepeatContext; import org.springframework.batch.repeat.RepeatOperations; import org.springframework.batch.repeat.support.RepeatTemplate; -import org.springframework.batch.retry.RetryOperations; -import org.springframework.batch.retry.RetryPolicy; -import org.springframework.batch.retry.callback.ItemReaderRetryCallback; -import org.springframework.batch.retry.support.RetryTemplate; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.DefaultTransactionDefinition; @@ -89,10 +81,6 @@ // default to checking current thread for interruption. private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy(); - private RetryOperations retryOperations = new RetryTemplate(); - - private ItemReaderRetryCallback retryCallback; - private CompositeItemStream stream = new CompositeItemStream(); private CompositeStepListener listener = new CompositeStepListener(); @@ -101,12 +89,8 @@ private PlatformTransactionManager transactionManager; - private ItemReader itemReader; + private ItemProcessor itemProcessor; - private ItemWriter itemWriter; - - private ItemSkipPolicy itemSkipPolicy = new NeverSkipItemSkipPolicy(); - /** * @param name */ @@ -131,29 +115,16 @@ public void setTransactionManager(PlatformTransactionManager transactionManager) { this.transactionManager = transactionManager; } - + /** - * @param itemReader the itemReader to set + * Public setter for the {@link ItemProcessor}. + * @param itemProcessor the {@link ItemProcessor} to set */ - public void setItemReader(ItemReader itemReader) { - this.itemReader = itemReader; + public void setItemProcessor(ItemProcessor itemProcessor) { + this.itemProcessor = itemProcessor; } /** - * @param itemWriter the itemWriter to set - */ - public void setItemWriter(ItemWriter itemWriter) { - this.itemWriter = itemWriter; - } - - /** - * @param itemSkipPolicy - */ - public void setItemSkipPolicy(ItemSkipPolicy itemSkipPolicy) { - this.itemSkipPolicy = itemSkipPolicy; - } - - /** * Register each of the streams for callbacks at the appropriate time in the * step. The {@link ItemReader} and {@link ItemWriter} are automatically * registered, but it doesn't hurt to also register them here. Injected @@ -227,22 +198,6 @@ } /** - * Public setter for the {@link RetryOperations}. - * @param retryOperations the {@link RetryOperations} to set - */ - public void setRetryOperations(RetryOperations retryOperations) { - this.retryOperations = retryOperations; - } - - /** - * Public setter for the ItemReaderRetryCallback. TODO: get rid of this. - * @param retryCallback the retryCallback to set - */ - public void setRetryCallback(ItemReaderRetryCallback retryCallback) { - this.retryCallback = retryCallback; - } - - /** * Setter for the {@link StepInterruptionPolicy}. The policy is used to * check whether an external request has been made to interrupt the job * execution. @@ -288,10 +243,6 @@ ExitStatus status = ExitStatus.FAILED; final ExceptionHolder fatalException = new ExceptionHolder(); - // This could go in applyConfiguration(), but some unit tests do not - // call that - possiblyRegisterStreams(); - try { stepExecution.setStartTime(new Date(System.currentTimeMillis())); @@ -330,7 +281,7 @@ try { - itemReader.mark(); + itemProcessor.mark(); result = processChunk(contribution); contribution.incrementCommitCount(); @@ -363,8 +314,8 @@ } try { - itemReader.mark(); - itemWriter.flush(); + itemProcessor.mark(); + itemProcessor.flush(); transactionManager.commit(transaction); } catch (Exception e) { @@ -390,8 +341,8 @@ } try { - itemReader.reset(); - itemWriter.clear(); + itemProcessor.reset(); + itemProcessor.clear(); transactionManager.rollback(transaction); } catch (Exception e) { @@ -489,25 +440,6 @@ } /** - * Register the item reader and writer as listeners and streams. If they are - * manually registered anyway, it shouldn't matter. - */ - private void possiblyRegisterStreams() { - if (itemReader instanceof ItemStream) { - stream.register((ItemStream) itemReader); - } - if (itemReader instanceof StepListener) { - listener.register((StepListener) itemReader); - } - if (itemWriter instanceof ItemStream) { - stream.register((ItemStream) itemWriter); - } - if (itemWriter instanceof StepListener) { - listener.register((StepListener) itemWriter); - } - } - - /** * Execute a bunch of identical business logic operations all within a * transaction. The transaction is programmatically started and stopped * outside this method, so subclasses that override do not need to create a @@ -517,7 +449,7 @@ * business logic. * @return true if there is more data to process. */ - ExitStatus processChunk(final StepContribution contribution) { + protected ExitStatus processChunk(final StepContribution contribution) { ExitStatus result = chunkOperations.iterate(new RepeatCallback() { public ExitStatus doInIteration(final RepeatContext context) throws Exception { if (contribution.isTerminateOnly()) { @@ -525,7 +457,7 @@ } // check for interruption before each item as well interruptionPolicy.checkInterrupted(context); - ExitStatus exitStatus = doProcessing(contribution); + ExitStatus exitStatus = itemProcessor.process(contribution); contribution.incrementTaskCount(); // check for interruption after each item as well interruptionPolicy.checkInterrupted(context); @@ -536,92 +468,6 @@ } /** - * Execute the business logic, delegating to the given {@link Tasklet}. - * Subclasses could extend the behaviour as long as they always return the - * value of this method call in their superclass.
- * - * If there is an exception and the {@link Tasklet} implements - * {@link Skippable} then the skip method is called. - * - * @param tasklet the unit of business logic to execute - * @param contribution the current step - * @return boolean if there is more processing to do - * @throws Exception if there is an error - */ - private ExitStatus doProcessing(StepContribution contribution) throws Exception { - ExitStatus exitStatus = ExitStatus.CONTINUABLE; - - try { - - exitStatus = execute(); - - } - catch (Exception e) { - if (retryCallback == null && itemSkipPolicy.shouldSkip(e, contribution.getSkipCount())) { - contribution.incrementSkipCount(); - skip(); - } - else { - // Rethrow so that outer transaction is rolled back properly - throw e; - } - } - - return exitStatus; - } - - /** - * Read from the {@link ItemReader} and process (if not null) with the - * {@link ItemWriter}. The call to {@link ItemWriter} is wrapped in a - * stateful retry, if a {@link RetryPolicy} is provided. The - * {@link ItemRecoverer} is used (if provided) in the case of an exception - * to apply alternate processing to the item. If the stateful retry is in - * place then the recovery will happen in the next transaction - * automatically, otherwise it might be necessary for clients to make the - * recover method transactional with appropriate propagation behaviour - * (probably REQUIRES_NEW because the call will happen in the context of a - * transaction that is about to rollback). - * - * @see org.springframework.batch.core.tasklet.Tasklet#execute() - */ - private ExitStatus execute() throws Exception { - - if (retryCallback == null) { - Object item = itemReader.read(); - if (item == null) { - return ExitStatus.FINISHED; - } - itemWriter.write(item); - return ExitStatus.CONTINUABLE; - } - - return new ExitStatus(retryOperations.execute(retryCallback) != null); - - } - - /** - * Mark the current item as skipped if possible. If there is a retry policy - * in action there is no need to take any action now because it will be - * covered by the retry in the next transaction. Otherwise if the reader and / - * or writer are {@link Skippable} then delegate to them in that order. - * - * @see org.springframework.batch.io.Skippable#skip() - */ - private void skip() { - if (retryCallback != null) { - // No need to skip because the recoverer will take any action - // necessary. - return; - } - if (this.itemReader instanceof Skippable) { - ((Skippable) this.itemReader).skip(); - } - if (this.itemWriter instanceof Skippable) { - ((Skippable) this.itemWriter).skip(); - } - } - - /** * Convenience method to update the status in all relevant places. * * @param stepInstance the current step Index: src/main/java/org/springframework/batch/execution/step/ItemProcessor.java =================================================================== --- src/main/java/org/springframework/batch/execution/step/ItemProcessor.java (revision 0) +++ src/main/java/org/springframework/batch/execution/step/ItemProcessor.java (revision 0) @@ -0,0 +1,46 @@ +/* + * Copyright 2006-2007 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.execution.step; + +import org.springframework.batch.core.domain.StepContribution; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.ExitStatus; + +/** + * Strategy for processing a single item in an item-oriented step. Extends + * {@link ItemReader} and {@link ItemWriter} because part of the contract of the + * processor is that it should delegate calls to those interfaces. + * + * @author Dave Syer + * + */ +public interface ItemProcessor extends ItemReader, ItemWriter { + + /** + * Given the current context in the form of a step contribution, do whatever + * is necessary to process this unit inside a chunk. Implementations obtain + * the item and return {@link ExitStatus#FINISHED} if it is null. If it is + * not null process the item and return {@link ExitStatus#CONTINUABLE}. On + * failure throws an exception. + * + * @param contribution the current step context + * @return an {@link ExitStatus} indicating whether processing is + * continuable. + */ + ExitStatus process(StepContribution contribution) throws Exception; + +} Index: src/main/java/org/springframework/batch/execution/step/support/AbstractStepFactoryBean.java =================================================================== --- src/main/java/org/springframework/batch/execution/step/support/AbstractStepFactoryBean.java (revision 10668) +++ src/main/java/org/springframework/batch/execution/step/support/AbstractStepFactoryBean.java (working copy) @@ -1,173 +1,172 @@ -/* - * Copyright 2006-2007 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.execution.step.support; - -import org.springframework.batch.core.domain.Step; -import org.springframework.batch.core.repository.JobRepository; -import org.springframework.batch.execution.step.ItemOrientedStep; -import org.springframework.batch.item.ItemReader; -import org.springframework.batch.item.ItemWriter; -import org.springframework.beans.factory.BeanNameAware; -import org.springframework.beans.factory.config.AbstractFactoryBean; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.util.Assert; - -/** - * Base class for factory beans for {@link ItemOrientedStep}. Ensures that all - * the mandatory properties are set, and provides basic support for the - * {@link Step} interface responsibilities like start limit. - * - * @author Dave Syer - * - */ -public abstract class AbstractStepFactoryBean extends AbstractFactoryBean implements BeanNameAware { - - private String name; - - private int startLimit = Integer.MAX_VALUE; - - private boolean allowStartIfComplete; - - private ItemReader itemReader; - - private ItemWriter itemWriter; - - private PlatformTransactionManager transactionManager; - - private JobRepository jobRepository; - - /** - * - */ - public AbstractStepFactoryBean() { - super(); - } - - /** - * Set the bean name property, which will become the name of the - * {@link Step} when it is created. - * - * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String) - */ - public void setBeanName(String name) { - this.name = name; - } - - /** - * Public getter for the String. - * @return the name - */ - public String getName() { - return name; - } - - /** - * Public setter for the startLimit. - * - * @param startLimit the startLimit to set - */ - public void setStartLimit(int startLimit) { - this.startLimit = startLimit; - } - - /** - * Public setter for the shouldAllowStartIfComplete. - * - * @param allowStartIfComplete the shouldAllowStartIfComplete to set - */ - public void setAllowStartIfComplete(boolean allowStartIfComplete) { - this.allowStartIfComplete = allowStartIfComplete; - } - - /** - * @param itemReader the itemReader to set - */ - public void setItemReader(ItemReader itemReader) { - this.itemReader = itemReader; - } - - /** - * @param itemWriter the itemWriter to set - */ - public void setItemWriter(ItemWriter itemWriter) { - this.itemWriter = itemWriter; - } - - /** - * Protected getter for the {@link ItemReader} for subclasses to use. - * @return the itemReader - */ - protected ItemReader getItemReader() { - return itemReader; - } - - /** - * Protected getter for the {@link ItemWriter} for subclasses to use - * @return the itemWriter - */ - protected ItemWriter getItemWriter() { - return itemWriter; - } - - /** - * Public setter for {@link JobRepository}. - * - * @param jobRepository is a mandatory dependence (no default). - */ - public void setJobRepository(JobRepository jobRepository) { - this.jobRepository = jobRepository; - } - - /** - * Public setter for the {@link PlatformTransactionManager}. - * - * @param transactionManager the transaction manager to set - */ - public void setTransactionManager(PlatformTransactionManager transactionManager) { - this.transactionManager = transactionManager; - } - - protected Object createInstance() throws Exception { - ItemOrientedStep step = new ItemOrientedStep(getName()); - applyConfiguration(step); - return step; - } - - /** - * @param step - * - */ - protected void applyConfiguration(ItemOrientedStep step) { - - Assert.notNull(getItemReader(), "ItemReader must be provided"); - Assert.notNull(getItemWriter(), "ItemWriter must be provided"); - Assert.notNull(jobRepository, "JobRepository must be provided"); - Assert.notNull(transactionManager, "TransactionManager must be provided"); - - step.setItemReader(getItemReader()); - step.setItemWriter(getItemWriter()); - step.setTransactionManager(transactionManager); - step.setJobRepository(jobRepository); - step.setStartLimit(startLimit); - step.setAllowStartIfComplete(allowStartIfComplete); - - } - - public Class getObjectType() { - return Step.class; - } - +/* + * Copyright 2006-2007 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.execution.step.support; + +import org.springframework.batch.core.domain.Step; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.execution.step.ItemOrientedStep; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.BeanNameAware; +import org.springframework.beans.factory.config.AbstractFactoryBean; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.util.Assert; + +/** + * Base class for factory beans for {@link ItemOrientedStep}. Ensures that all + * the mandatory properties are set, and provides basic support for the + * {@link Step} interface responsibilities like start limit. + * + * @author Dave Syer + * + */ +public abstract class AbstractStepFactoryBean extends AbstractFactoryBean implements BeanNameAware { + + private String name; + + private int startLimit = Integer.MAX_VALUE; + + private boolean allowStartIfComplete; + + private ItemReader itemReader; + + private ItemWriter itemWriter; + + private PlatformTransactionManager transactionManager; + + private JobRepository jobRepository; + + /** + * + */ + public AbstractStepFactoryBean() { + super(); + } + + /** + * Set the bean name property, which will become the name of the + * {@link Step} when it is created. + * + * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String) + */ + public void setBeanName(String name) { + this.name = name; + } + + /** + * Public getter for the String. + * @return the name + */ + public String getName() { + return name; + } + + /** + * Public setter for the startLimit. + * + * @param startLimit the startLimit to set + */ + public void setStartLimit(int startLimit) { + this.startLimit = startLimit; + } + + /** + * Public setter for the shouldAllowStartIfComplete. + * + * @param allowStartIfComplete the shouldAllowStartIfComplete to set + */ + public void setAllowStartIfComplete(boolean allowStartIfComplete) { + this.allowStartIfComplete = allowStartIfComplete; + } + + /** + * @param itemReader the itemReader to set + */ + public void setItemReader(ItemReader itemReader) { + this.itemReader = itemReader; + } + + /** + * @param itemWriter the itemWriter to set + */ + public void setItemWriter(ItemWriter itemWriter) { + this.itemWriter = itemWriter; + } + + /** + * Protected getter for the {@link ItemReader} for subclasses to use. + * @return the itemReader + */ + protected ItemReader getItemReader() { + return itemReader; + } + + /** + * Protected getter for the {@link ItemWriter} for subclasses to use + * @return the itemWriter + */ + protected ItemWriter getItemWriter() { + return itemWriter; + } + + /** + * Public setter for {@link JobRepository}. + * + * @param jobRepository is a mandatory dependence (no default). + */ + public void setJobRepository(JobRepository jobRepository) { + this.jobRepository = jobRepository; + } + + /** + * Public setter for the {@link PlatformTransactionManager}. + * + * @param transactionManager the transaction manager to set + */ + public void setTransactionManager(PlatformTransactionManager transactionManager) { + this.transactionManager = transactionManager; + } + + protected Object createInstance() throws Exception { + ItemOrientedStep step = new ItemOrientedStep(getName()); + applyConfiguration(step); + return step; + } + + /** + * @param step + * + */ + protected void applyConfiguration(ItemOrientedStep step) { + + Assert.notNull(getItemReader(), "ItemReader must be provided"); + Assert.notNull(getItemWriter(), "ItemWriter must be provided"); + Assert.notNull(jobRepository, "JobRepository must be provided"); + Assert.notNull(transactionManager, "TransactionManager must be provided"); + + step.setItemProcessor(new SimpleItemProcessor(itemReader, itemWriter)); + step.setTransactionManager(transactionManager); + step.setJobRepository(jobRepository); + step.setStartLimit(startLimit); + step.setAllowStartIfComplete(allowStartIfComplete); + + } + + public Class getObjectType() { + return Step.class; + } + } \ No newline at end of file Index: src/main/java/org/springframework/batch/execution/step/support/DefaultStepFactoryBean.java =================================================================== --- src/main/java/org/springframework/batch/execution/step/support/DefaultStepFactoryBean.java (revision 10668) +++ src/main/java/org/springframework/batch/execution/step/support/DefaultStepFactoryBean.java (working copy) @@ -1,151 +1,156 @@ -/* - * Copyright 2006-2007 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.batch.execution.step.support; - -import org.springframework.batch.core.domain.BatchListener; -import org.springframework.batch.core.domain.Step; -import org.springframework.batch.core.domain.StepListener; -import org.springframework.batch.execution.step.ItemOrientedStep; -import org.springframework.batch.item.ItemReader; -import org.springframework.batch.item.ItemStream; -import org.springframework.batch.item.ItemWriter; -import org.springframework.batch.repeat.exception.handler.SimpleLimitExceptionHandler; -import org.springframework.batch.repeat.support.RepeatTemplate; -import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate; -import org.springframework.core.task.TaskExecutor; - -/** - * Adds listeners to {@link SimpleStepFactoryBean}. - * - * @author Dave Syer - * - */ -public class DefaultStepFactoryBean extends SimpleStepFactoryBean { - - private boolean alwaysSkip = false; - - private BatchListener[] listeners = new BatchListener[0]; - - private ListenerMulticaster listener = new ListenerMulticaster(); - - private TaskExecutor taskExecutor; - - /** - * Public setter for a flag that determines skip policy. If this flag is - * true then an exception in chunk processing will cause the item to be - * skipped and no exceptions propagated. If it is false then all exceptions - * will be propagated from the chunk and cause the step to abort. - * - * @param alwaysSkip the value to set. Default is false. - */ - public void setAlwaysSkip(boolean alwaysSkip) { - this.alwaysSkip = alwaysSkip; - } - - /** - * The listeners to inject into the {@link Step}. Any instance of - * {@link BatchListener} can be used, and will then receive callbacks at the - * appropriate stage in the step. - * - * @param listeners an array of listeners - */ - public void setListeners(BatchListener[] listeners) { - this.listeners = listeners; - } - - /** - * Public setter for the {@link TaskExecutor}. If this is set, then it will - * be used to execute the chunk processing inside the {@link Step}. - * - * @param taskExecutor the taskExecutor to set - */ - public void setTaskExecutor(TaskExecutor taskExecutor) { - this.taskExecutor = taskExecutor; - } - - /** - * @param step - * - */ - protected void applyConfiguration(ItemOrientedStep step) { - - super.applyConfiguration(step); - for (int i = 0; i < listeners.length; i++) { - BatchListener listener = listeners[i]; - if (listener instanceof StepListener) { - step.registerStepListener((StepListener) listener); - } - else { - this.listener.register(listener); - } - } - - ItemReader itemReader = getItemReader(); - ItemWriter itemWriter = getItemWriter(); - - // Since we are going to wrap these things with listener callbacks we - // need to register them here because the step will not know we did - // that. - if (itemReader instanceof ItemStream) { - step.registerStream((ItemStream) itemReader); - } - if (itemReader instanceof StepListener) { - step.registerStepListener((StepListener) itemReader); - } - if (itemWriter instanceof ItemStream) { - step.registerStream((ItemStream) itemWriter); - } - if (itemWriter instanceof StepListener) { - step.registerStepListener((StepListener) itemWriter); - } - - BatchListenerFactoryHelper helper = new BatchListenerFactoryHelper(); - - StepListener[] stepListeners = helper.getStepListeners(listeners); - itemReader = helper.getItemReader(itemReader, listeners); - itemWriter = helper.getItemWriter(itemWriter, listeners); - RepeatTemplate stepOperations = new RepeatTemplate(); - stepOperations = (RepeatTemplate) helper.getStepOperations(stepOperations, listeners); - - // In case they are used by subclasses: - setItemReader(itemReader); - setItemWriter(itemWriter); - - step.setStepListeners(stepListeners); - step.setItemReader(itemReader); - step.setItemWriter(itemWriter); - - if (taskExecutor != null) { - TaskExecutorRepeatTemplate repeatTemplate = new TaskExecutorRepeatTemplate(); - repeatTemplate.setTaskExecutor(taskExecutor); - stepOperations = repeatTemplate; - } - - if (alwaysSkip) { - // If we always skip (not the default) then we are prepared to - // absorb all exceptions at the step level because the failed items - // will never re-appear after a rollback. - step.setItemSkipPolicy(new AlwaysSkipItemSkipPolicy()); - stepOperations.setExceptionHandler(new SimpleLimitExceptionHandler(Integer.MAX_VALUE)); - step.setStepOperations(stepOperations); - } - else { - // This is the default in ItemOrientedStep anyway... - step.setItemSkipPolicy(new NeverSkipItemSkipPolicy()); - } - - } -} +/* + * Copyright 2006-2007 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.execution.step.support; + +import org.springframework.batch.core.domain.BatchListener; +import org.springframework.batch.core.domain.Step; +import org.springframework.batch.core.domain.StepListener; +import org.springframework.batch.execution.step.ItemOrientedStep; +import org.springframework.batch.execution.step.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.exception.handler.SimpleLimitExceptionHandler; +import org.springframework.batch.repeat.support.RepeatTemplate; +import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate; +import org.springframework.core.task.TaskExecutor; + +/** + * Adds listeners to {@link SimpleStepFactoryBean}. + * + * @author Dave Syer + * + */ +public class DefaultStepFactoryBean extends SimpleStepFactoryBean { + + private boolean alwaysSkip = false; + + private BatchListener[] listeners = new BatchListener[0]; + + private ListenerMulticaster listener = new ListenerMulticaster(); + + private TaskExecutor taskExecutor; + + private ItemProcessor itemProcessor; + + /** + * Protected getter for the {@link ItemProcessor}. + * @return the itemProcessor + */ + protected ItemProcessor getItemProcessor() { + return itemProcessor; + } + + /** + * Protected setter for the {@link ItemProcessor}. + * @param itemProcessor the {@link ItemProcessor} to set + */ + protected void setItemProcessor(ItemProcessor itemProcessor) { + this.itemProcessor = itemProcessor; + } + + /** + * Public setter for a flag that determines skip policy. If this flag is + * true then an exception in chunk processing will cause the item to be + * skipped and no exceptions propagated. If it is false then all exceptions + * will be propagated from the chunk and cause the step to abort. + * + * @param alwaysSkip the value to set. Default is false. + */ + public void setAlwaysSkip(boolean alwaysSkip) { + this.alwaysSkip = alwaysSkip; + } + + /** + * The listeners to inject into the {@link Step}. Any instance of + * {@link BatchListener} can be used, and will then receive callbacks at the + * appropriate stage in the step. + * + * @param listeners an array of listeners + */ + public void setListeners(BatchListener[] listeners) { + this.listeners = listeners; + } + + /** + * Public setter for the {@link TaskExecutor}. If this is set, then it will + * be used to execute the chunk processing inside the {@link Step}. + * + * @param taskExecutor the taskExecutor to set + */ + public void setTaskExecutor(TaskExecutor taskExecutor) { + this.taskExecutor = taskExecutor; + } + + /** + * @param step + * + */ + protected void applyConfiguration(ItemOrientedStep step) { + + super.applyConfiguration(step); + for (int i = 0; i < listeners.length; i++) { + BatchListener listener = listeners[i]; + if (listener instanceof StepListener) { + step.registerStepListener((StepListener) listener); + } + else { + this.listener.register(listener); + } + } + + ItemReader itemReader = getItemReader(); + ItemWriter itemWriter = getItemWriter(); + + BatchListenerFactoryHelper helper = new BatchListenerFactoryHelper(); + + StepListener[] stepListeners = helper.getStepListeners(listeners); + itemReader = helper.getItemReader(itemReader, listeners); + itemWriter = helper.getItemWriter(itemWriter, listeners); + RepeatTemplate stepOperations = new RepeatTemplate(); + stepOperations = (RepeatTemplate) helper.getStepOperations(stepOperations, listeners); + + // In case they are used by subclasses: + setItemReader(itemReader); + setItemWriter(itemWriter); + + step.setStepListeners(stepListeners); + + if (taskExecutor != null) { + TaskExecutorRepeatTemplate repeatTemplate = new TaskExecutorRepeatTemplate(); + repeatTemplate.setTaskExecutor(taskExecutor); + stepOperations = repeatTemplate; + } + + KitchenSinkItemProcessor itemProcessor = new KitchenSinkItemProcessor(itemReader, itemWriter); + + if (alwaysSkip) { + // If we always skip (not the default) then we are prepared to + // absorb all exceptions at the step level because the failed items + // will never re-appear after a rollback. + itemProcessor.setItemSkipPolicy(new AlwaysSkipItemSkipPolicy()); + stepOperations.setExceptionHandler(new SimpleLimitExceptionHandler(Integer.MAX_VALUE)); + step.setStepOperations(stepOperations); + } + else { + // This is the default in ItemOrientedStep anyway... + itemProcessor.setItemSkipPolicy(new NeverSkipItemSkipPolicy()); + } + + setItemProcessor(itemProcessor); + step.setItemProcessor(itemProcessor); + + } +} Index: src/main/java/org/springframework/batch/execution/step/support/KitchenSinkItemProcessor.java =================================================================== --- src/main/java/org/springframework/batch/execution/step/support/KitchenSinkItemProcessor.java (revision 0) +++ src/main/java/org/springframework/batch/execution/step/support/KitchenSinkItemProcessor.java (revision 0) @@ -0,0 +1,146 @@ +/* + * Copyright 2006-2007 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.execution.step.support; + +import org.springframework.batch.core.domain.ItemSkipPolicy; +import org.springframework.batch.core.domain.StepContribution; +import org.springframework.batch.io.Skippable; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemRecoverer; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.ExitStatus; +import org.springframework.batch.retry.RetryCallback; +import org.springframework.batch.retry.RetryOperations; +import org.springframework.batch.retry.callback.ItemReaderRetryCallback; +import org.springframework.batch.retry.support.RetryTemplate; + +/** + * @author Dave Syer + * + */ +public class KitchenSinkItemProcessor extends SimpleItemProcessor { + + private RetryOperations retryOperations = new RetryTemplate(); + + private ItemReaderRetryCallback retryCallback; + + private ItemSkipPolicy itemSkipPolicy = new NeverSkipItemSkipPolicy(); + + /** + * @param itemReader + * @param itemWriter + */ + public KitchenSinkItemProcessor(ItemReader itemReader, ItemWriter itemWriter) { + super(itemReader, itemWriter); + } + + /** + * @param itemSkipPolicy + */ + public void setItemSkipPolicy(ItemSkipPolicy itemSkipPolicy) { + this.itemSkipPolicy = itemSkipPolicy; + } + + /** + * Public setter for the {@link RetryOperations}. + * @param retryOperations the {@link RetryOperations} to set + */ + public void setRetryOperations(RetryOperations retryOperations) { + this.retryOperations = retryOperations; + } + + /** + * Public setter for the ItemReaderRetryCallback. TODO: get rid of this. + * @param retryCallback the retryCallback to set + */ + public void setRetryCallback(ItemReaderRetryCallback retryCallback) { + this.retryCallback = retryCallback; + } + + /** + * Execute the business logic, delegating to the reader and writer. + * Subclasses could extend the behaviour as long as they always return the + * value of this method call in their superclass.
+ * + * Read from the {@link ItemReader} and process (if not null) with the + * {@link ItemWriter}. If a {@link RetryCallback} is provided, then the + * call to {@link ItemWriter} is wrapped in a stateful retry. In that case + * the {@link ItemRecoverer} is used (if provided) in the case of an + * exception to apply alternate processing to the item. If the stateful + * retry is in place then the recovery will happen in the next transaction + * automatically, otherwise it might be necessary for clients to make the + * recover method transactional with appropriate propagation behaviour + * (probably REQUIRES_NEW because the call will happen in the context of a + * transaction that is about to rollback).
+ * + * If there is an exception and the reader or writer implements + * {@link Skippable} then the skip method is called. + * + * @param contribution the current step + * @return {@link ExitStatus#CONTINUABLE} if there is more processing to do + * @throws Exception if there is an error + */ + public ExitStatus process(StepContribution contribution) throws Exception { + ExitStatus exitStatus = ExitStatus.CONTINUABLE; + + if (retryCallback != null) { + return new ExitStatus(retryOperations.execute(retryCallback) != null); + } + + try { + + exitStatus = super.process(contribution); + + } + catch (Exception e) { + + if (retryCallback == null && itemSkipPolicy.shouldSkip(e, contribution.getSkipCount())) { + contribution.incrementSkipCount(); + skip(); + } + else { + // Rethrow so that outer transaction is rolled back properly + throw e; + } + + } + + return exitStatus; + } + + /** + * Mark the current item as skipped if possible. If there is a retry policy + * in action there is no need to take any action now because it will be + * covered by the retry in the next transaction. Otherwise if the reader and / + * or writer are {@link Skippable} then delegate to them in that order. + * + * @see org.springframework.batch.io.Skippable#skip() + */ + private void skip() { + if (retryCallback != null) { + // No need to skip because the recoverer will take any action + // necessary. + return; + } + if (getItemReader() instanceof Skippable) { + ((Skippable) getItemReader()).skip(); + } + if (getItemWriter() instanceof Skippable) { + ((Skippable) getItemWriter()).skip(); + } + } + +} Index: src/main/java/org/springframework/batch/execution/step/support/RepeatOperationsStepFactoryBean.java =================================================================== --- src/main/java/org/springframework/batch/execution/step/support/RepeatOperationsStepFactoryBean.java (revision 10668) +++ src/main/java/org/springframework/batch/execution/step/support/RepeatOperationsStepFactoryBean.java (working copy) @@ -130,8 +130,7 @@ setItemWriter(itemWriter); step.setStepListeners(stepListeners); - step.setItemReader(itemReader); - step.setItemWriter(itemWriter); + step.setItemProcessor(new SimpleItemProcessor(itemReader, itemWriter)); step.setChunkOperations(chunkOperations); step.setStepOperations(stepOperations); Index: src/main/java/org/springframework/batch/execution/step/support/SimpleItemProcessor.java =================================================================== --- src/main/java/org/springframework/batch/execution/step/support/SimpleItemProcessor.java (revision 0) +++ src/main/java/org/springframework/batch/execution/step/support/SimpleItemProcessor.java (revision 0) @@ -0,0 +1,133 @@ +/* + * Copyright 2006-2007 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.execution.step.support; + +import org.springframework.batch.core.domain.StepContribution; +import org.springframework.batch.execution.step.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.exception.ClearFailedException; +import org.springframework.batch.item.exception.FlushFailedException; +import org.springframework.batch.item.exception.MarkFailedException; +import org.springframework.batch.item.exception.ResetFailedException; +import org.springframework.batch.repeat.ExitStatus; + +/** + * Simplest possible implementation of {@link ItemProcessor} with no skipping or + * recovering. Just delegates all calls to the provided {@link ItemReader} and + * {@link ItemWriter}. + * + * @author Dave Syer + * + */ +public class SimpleItemProcessor implements ItemProcessor { + + private ItemReader itemReader; + + private ItemWriter itemWriter; + + /** + * @param itemReader + * @param itemWriter + */ + public SimpleItemProcessor(ItemReader itemReader, ItemWriter itemWriter) { + super(); + this.itemReader = itemReader; + this.itemWriter = itemWriter; + } + + /** + * Public getter for the ItemReader. + * @return the itemReader + */ + public ItemReader getItemReader() { + return itemReader; + } + + /** + * Public getter for the ItemWriter. + * @return the itemWriter + */ + public ItemWriter getItemWriter() { + return itemWriter; + } + + /** + * Read from the {@link ItemReader} and process (if not null) with the + * {@link ItemWriter}. + * + * @see org.springframework.batch.execution.step.ItemProcessor#process(org.springframework.batch.core.domain.StepContribution) + */ + public ExitStatus process(StepContribution contribution) throws Exception { + Object item = read(); + if (item == null) { + return ExitStatus.FINISHED; + } + write(item); + return ExitStatus.CONTINUABLE; + } + + /** + * @throws MarkFailedException + * @see org.springframework.batch.item.ItemReader#mark() + */ + public void mark() throws MarkFailedException { + itemReader.mark(); + } + + /** + * @return + * @throws Exception + * @see org.springframework.batch.item.ItemReader#read() + */ + public Object read() throws Exception { + return itemReader.read(); + } + + /** + * @throws ResetFailedException + * @see org.springframework.batch.item.ItemReader#reset() + */ + public void reset() throws ResetFailedException { + itemReader.reset(); + } + + /** + * @throws ClearFailedException + * @see org.springframework.batch.item.ItemWriter#clear() + */ + public void clear() throws ClearFailedException { + itemWriter.clear(); + } + + /** + * @throws FlushFailedException + * @see org.springframework.batch.item.ItemWriter#flush() + */ + public void flush() throws FlushFailedException { + itemWriter.flush(); + } + + /** + * @param item + * @throws Exception + * @see org.springframework.batch.item.ItemWriter#write(java.lang.Object) + */ + public void write(Object item) throws Exception { + itemWriter.write(item); + } + +} Index: src/main/java/org/springframework/batch/execution/step/support/SimpleStepFactoryBean.java =================================================================== --- src/main/java/org/springframework/batch/execution/step/support/SimpleStepFactoryBean.java (revision 10653) +++ src/main/java/org/springframework/batch/execution/step/support/SimpleStepFactoryBean.java (working copy) @@ -16,8 +16,11 @@ package org.springframework.batch.execution.step.support; import org.springframework.batch.core.domain.Step; +import org.springframework.batch.core.domain.StepListener; import org.springframework.batch.execution.step.ItemOrientedStep; +import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemStream; +import org.springframework.batch.item.ItemWriter; import org.springframework.batch.repeat.policy.SimpleCompletionPolicy; import org.springframework.batch.repeat.support.RepeatTemplate; @@ -59,6 +62,22 @@ super.applyConfiguration(step); + ItemReader itemReader = getItemReader(); + ItemWriter itemWriter = getItemWriter(); + + if (itemReader instanceof ItemStream) { + step.registerStream((ItemStream) itemReader); + } + if (itemReader instanceof StepListener) { + step.registerStepListener((StepListener) itemReader); + } + if (itemWriter instanceof ItemStream) { + step.registerStream((ItemStream) itemWriter); + } + if (itemWriter instanceof StepListener) { + step.registerStepListener((StepListener) itemWriter); + } + step.setStreams(streams); if (commitInterval > 0) { Index: src/main/java/org/springframework/batch/execution/step/support/StatefulRetryStepFactoryBean.java =================================================================== --- src/main/java/org/springframework/batch/execution/step/support/StatefulRetryStepFactoryBean.java (revision 10668) +++ src/main/java/org/springframework/batch/execution/step/support/StatefulRetryStepFactoryBean.java (working copy) @@ -85,10 +85,11 @@ ItemReaderRetryCallback retryCallback = new ItemReaderRetryCallback(getItemReader(), getKeyGenerator(), getItemWriter()); ItemReaderRetryPolicy itemProviderRetryPolicy = new ItemReaderRetryPolicy(retryPolicy); - RetryTemplate template = new RetryTemplate(); - template.setRetryPolicy(itemProviderRetryPolicy); - step.setRetryOperations(template); - step.setRetryCallback(retryCallback); + RetryTemplate retryTemplate = new RetryTemplate(); + retryTemplate.setRetryPolicy(itemProviderRetryPolicy); + KitchenSinkItemProcessor itemProcessor = (KitchenSinkItemProcessor) getItemProcessor(); + itemProcessor.setRetryOperations(retryTemplate); + itemProcessor.setRetryCallback(retryCallback); } } Index: src/test/java/org/springframework/batch/execution/step/ItemOrientedStepTests.java =================================================================== --- src/test/java/org/springframework/batch/execution/step/ItemOrientedStepTests.java (revision 10654) +++ src/test/java/org/springframework/batch/execution/step/ItemOrientedStepTests.java (working copy) @@ -37,6 +37,7 @@ import org.springframework.batch.execution.repository.dao.MapJobInstanceDao; import org.springframework.batch.execution.repository.dao.MapStepExecutionDao; import org.springframework.batch.execution.step.support.JobRepositorySupport; +import org.springframework.batch.execution.step.support.SimpleItemProcessor; import org.springframework.batch.execution.step.support.StepInterruptionPolicy; import org.springframework.batch.io.exception.InfrastructureException; import org.springframework.batch.item.ExecutionContext; @@ -85,8 +86,7 @@ private AbstractStep getStep(String[] strings) throws Exception { ItemOrientedStep step = new ItemOrientedStep("stepName"); - step.setItemWriter(processor); - step.setItemReader(getReader(strings)); + step.setItemProcessor(new SimpleItemProcessor(getReader(strings), processor)); step.setJobRepository(new JobRepositorySupport()); step.setTransactionManager(transactionManager); return step; @@ -168,7 +168,7 @@ }; - itemOrientedStep.setItemReader(itemReader); + itemOrientedStep.setItemProcessor(new SimpleItemProcessor(itemReader, processor)); JobExecution jobExecutionContext = new JobExecution(jobInstance); StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecutionContext); @@ -198,7 +198,7 @@ }; - itemOrientedStep.setItemReader(itemReader); + itemOrientedStep.setItemProcessor(new SimpleItemProcessor(itemReader, processor)); JobExecution jobExecutionContext = new JobExecution(jobInstance); StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecutionContext); @@ -217,7 +217,8 @@ */ public void testNonRestartedJob() throws Exception { MockRestartableItemReader tasklet = new MockRestartableItemReader(); - itemOrientedStep.setItemReader(tasklet); + itemOrientedStep.setItemProcessor(new SimpleItemProcessor(tasklet, processor)); + itemOrientedStep.registerStream(tasklet); JobExecution jobExecutionContext = new JobExecution(jobInstance); StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecutionContext); @@ -287,7 +288,7 @@ */ public void testNoSaveExecutionAttributesRestartableJob() { MockRestartableItemReader tasklet = new MockRestartableItemReader(); - itemOrientedStep.setItemReader(tasklet); + itemOrientedStep.setItemProcessor(new SimpleItemProcessor(tasklet, processor)); JobExecution jobExecutionContext = new JobExecution(jobInstance); StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecutionContext); @@ -307,11 +308,11 @@ * Restartable. */ public void testRestartJobOnNonRestartableTasklet() throws Exception { - itemOrientedStep.setItemReader(new AbstractItemReader() { + itemOrientedStep.setItemProcessor(new SimpleItemProcessor(new AbstractItemReader() { public Object read() throws Exception { return "foo"; } - }); + }, processor)); JobExecution jobExecution = new JobExecution(jobInstance); StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecution); @@ -319,7 +320,7 @@ } public void testStreamManager() throws Exception { - itemOrientedStep.setItemReader(new MockRestartableItemReader() { + MockRestartableItemReader reader = new MockRestartableItemReader() { public Object read() throws Exception { return "foo"; } @@ -327,7 +328,9 @@ // TODO Auto-generated method stub executionContext.putString("foo", "bar"); } - }); + }; + itemOrientedStep.setItemProcessor(new SimpleItemProcessor(reader, processor)); + itemOrientedStep.registerStream(reader); JobExecution jobExecution = new JobExecution(jobInstance); StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecution); @@ -395,11 +398,11 @@ return null; } }); - itemOrientedStep.setItemReader(new MockRestartableItemReader() { + itemOrientedStep.setItemProcessor(new SimpleItemProcessor(new MockRestartableItemReader() { public Object read() throws Exception { throw new RuntimeException("FOO"); } - }); + }, processor)); JobExecution jobExecution = new JobExecution(jobInstance); StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecution); try { @@ -421,7 +424,7 @@ executionContext.putString("foo", "bar"); } }; - itemOrientedStep.setItemReader(reader); + itemOrientedStep.setItemProcessor(new SimpleItemProcessor(reader, processor)); itemOrientedStep.setStreams(new ItemStream[] {reader}); JobExecution jobExecution = new JobExecution(jobInstance); StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecution); @@ -461,7 +464,7 @@ }; - itemOrientedStep.setItemReader(itemReader); + itemOrientedStep.setItemProcessor(new SimpleItemProcessor(itemReader, processor)); JobExecution jobExecutionContext = new JobExecution(jobInstance); StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecutionContext); @@ -489,7 +492,7 @@ throw new RuntimeException("Foo"); } }; - itemOrientedStep.setItemReader(itemReader); + itemOrientedStep.setItemProcessor(new SimpleItemProcessor(itemReader, processor)); JobExecution jobExecutionContext = jobInstance.createJobExecution(); StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecutionContext); @@ -516,7 +519,7 @@ throw new RuntimeException("Foo"); } }; - itemOrientedStep.setItemReader(itemReader); + itemOrientedStep.setItemProcessor(new SimpleItemProcessor(itemReader, processor)); itemOrientedStep.setTransactionManager(new ResourcelessTransactionManager() { protected void doRollback(DefaultTransactionStatus status) throws TransactionException { // Simulate failure on rollback when stream resets @@ -605,13 +608,15 @@ public void testStatusForCloseFailedException() throws Exception { - itemOrientedStep.setItemReader(new MockRestartableItemReader() { + MockRestartableItemReader reader = new MockRestartableItemReader() { public void close(ExecutionContext executionContext) throws StreamException { super.close(executionContext); // Simulate failure on rollback when stream resets throw new RuntimeException("Bar"); } - }); + }; + itemOrientedStep.setItemProcessor(new SimpleItemProcessor(reader, processor)); + itemOrientedStep.registerStream(reader); JobExecution jobExecutionContext = jobInstance.createJobExecution(); StepExecution stepExecution = new StepExecution(itemOrientedStep, jobExecutionContext); Index: src/test/java/org/springframework/batch/execution/step/support/DefaultStepFactoryBeanTests.java =================================================================== --- src/test/java/org/springframework/batch/execution/step/support/DefaultStepFactoryBeanTests.java (revision 10654) +++ src/test/java/org/springframework/batch/execution/step/support/DefaultStepFactoryBeanTests.java (working copy) @@ -177,13 +177,14 @@ // TODO: test recovery and stateful retry public void testExceptionTerminates() throws Exception { - ItemOrientedStep step = (ItemOrientedStep) getStep(new String[] { "foo", "bar", "spam" }).getObject(); - step.setName("exceptionStep"); - step.setItemWriter(new AbstractItemWriter() { + DefaultStepFactoryBean factory = getStep(new String[] { "foo", "bar", "spam" }); + factory.setBeanName("exceptionStep"); + factory.setItemWriter(new AbstractItemWriter() { public void write(Object data) throws Exception { throw new RuntimeException("Foo"); } }); + ItemOrientedStep step = (ItemOrientedStep) factory.getObject(); job.setSteps(Collections.singletonList(step)); JobExecution jobExecution = repository.createJobExecution(job, new JobParameters()); Index: src/test/java/org/springframework/batch/execution/step/support/StepExecutorInterruptionTests.java =================================================================== --- src/test/java/org/springframework/batch/execution/step/support/StepExecutorInterruptionTests.java (revision 10649) +++ src/test/java/org/springframework/batch/execution/step/support/StepExecutorInterruptionTests.java (working copy) @@ -1,133 +1,136 @@ -/* - * Copyright 2006-2007 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.batch.execution.step.support; - -import junit.framework.TestCase; - -import org.springframework.batch.core.domain.BatchStatus; -import org.springframework.batch.core.domain.JobExecution; -import org.springframework.batch.core.domain.JobInstance; -import org.springframework.batch.core.domain.JobInterruptedException; -import org.springframework.batch.core.domain.JobParameters; -import org.springframework.batch.core.domain.StepExecution; -import org.springframework.batch.core.repository.JobRepository; -import org.springframework.batch.execution.job.JobSupport; -import org.springframework.batch.execution.repository.SimpleJobRepository; -import org.springframework.batch.execution.repository.dao.JobExecutionDao; -import org.springframework.batch.execution.repository.dao.JobInstanceDao; -import org.springframework.batch.execution.repository.dao.MapJobExecutionDao; -import org.springframework.batch.execution.repository.dao.MapJobInstanceDao; -import org.springframework.batch.execution.repository.dao.MapStepExecutionDao; -import org.springframework.batch.execution.repository.dao.StepExecutionDao; -import org.springframework.batch.execution.step.ItemOrientedStep; -import org.springframework.batch.item.reader.AbstractItemReader; -import org.springframework.batch.item.reader.ItemReaderAdapter; -import org.springframework.batch.item.writer.AbstractItemWriter; -import org.springframework.batch.repeat.policy.SimpleCompletionPolicy; -import org.springframework.batch.repeat.support.RepeatTemplate; -import org.springframework.batch.support.transaction.ResourcelessTransactionManager; - -public class StepExecutorInterruptionTests extends TestCase { - - private JobRepository jobRepository; - - private JobInstanceDao jobInstanceDao = new MapJobInstanceDao(); - - private JobExecutionDao jobExecutionDao = new MapJobExecutionDao(); - - private StepExecutionDao stepExecutionDao = new MapStepExecutionDao(); - - private ItemOrientedStep step; - - public void setUp() throws Exception { - MapJobInstanceDao.clear(); - MapJobExecutionDao.clear(); - MapStepExecutionDao.clear(); - - jobRepository = new SimpleJobRepository(jobInstanceDao, jobExecutionDao, stepExecutionDao); - - JobSupport jobConfiguration = new JobSupport(); - step = new ItemOrientedStep("interruptedStep"); - jobConfiguration.addStep(step); - jobConfiguration.setBeanName("testJob"); - jobRepository.createJobExecution(jobConfiguration, new JobParameters()); - step.setJobRepository(jobRepository); - step.setTransactionManager(new ResourcelessTransactionManager()); - step.setItemReader(new ItemReaderAdapter()); - step.setItemWriter(new AbstractItemWriter(){ - public void write(Object item) throws Exception { - }}); - } - - - public void testInterruptChunk() throws Exception { - - JobExecution jobExecutionContext = new JobExecution(new JobInstance(new Long(0L), new JobParameters(), new JobSupport("testJob"))); - final StepExecution stepExecution = new StepExecution(step, jobExecutionContext); - step.setItemReader(new AbstractItemReader() { - public Object read() throws Exception { - // do something non-trivial (and not Thread.sleep()) - double foo = 1; - for (int i = 2; i < 250; i++) { - foo = foo * i; - } - - if(foo != 1){ - return new Double(foo); - } - else{ - return null; - } - } - }); - - Thread processingThread = new Thread() { - public void run() { - try { - step.execute(stepExecution); - } - catch (JobInterruptedException e) { - // do nothing... - } - } - }; - - processingThread.start(); - - Thread.sleep(100); - - processingThread.interrupt(); - - int count = 0; - while (processingThread.isAlive() && count < 1000) { - Thread.sleep(20); - count++; - } - - assertFalse(processingThread.isAlive()); - assertEquals(BatchStatus.STOPPED, stepExecution.getStatus()); - } - - public void testInterruptStep() throws Exception { - RepeatTemplate template = new RepeatTemplate(); - // N.B, If we don't set the completion policy it might run forever - template.setCompletionPolicy(new SimpleCompletionPolicy(2)); - step.setChunkOperations(template); - testInterruptChunk(); - } - -} +/* + * Copyright 2006-2007 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.execution.step.support; + +import junit.framework.TestCase; + +import org.springframework.batch.core.domain.BatchStatus; +import org.springframework.batch.core.domain.JobExecution; +import org.springframework.batch.core.domain.JobInstance; +import org.springframework.batch.core.domain.JobInterruptedException; +import org.springframework.batch.core.domain.JobParameters; +import org.springframework.batch.core.domain.StepExecution; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.execution.job.JobSupport; +import org.springframework.batch.execution.repository.SimpleJobRepository; +import org.springframework.batch.execution.repository.dao.JobExecutionDao; +import org.springframework.batch.execution.repository.dao.JobInstanceDao; +import org.springframework.batch.execution.repository.dao.MapJobExecutionDao; +import org.springframework.batch.execution.repository.dao.MapJobInstanceDao; +import org.springframework.batch.execution.repository.dao.MapStepExecutionDao; +import org.springframework.batch.execution.repository.dao.StepExecutionDao; +import org.springframework.batch.execution.step.ItemOrientedStep; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.reader.AbstractItemReader; +import org.springframework.batch.item.reader.ItemReaderAdapter; +import org.springframework.batch.item.writer.AbstractItemWriter; +import org.springframework.batch.repeat.policy.SimpleCompletionPolicy; +import org.springframework.batch.repeat.support.RepeatTemplate; +import org.springframework.batch.support.transaction.ResourcelessTransactionManager; + +public class StepExecutorInterruptionTests extends TestCase { + + private JobRepository jobRepository; + + private JobInstanceDao jobInstanceDao = new MapJobInstanceDao(); + + private JobExecutionDao jobExecutionDao = new MapJobExecutionDao(); + + private StepExecutionDao stepExecutionDao = new MapStepExecutionDao(); + + private ItemOrientedStep step; + + private ItemWriter writer; + + public void setUp() throws Exception { + MapJobInstanceDao.clear(); + MapJobExecutionDao.clear(); + MapStepExecutionDao.clear(); + + jobRepository = new SimpleJobRepository(jobInstanceDao, jobExecutionDao, stepExecutionDao); + + JobSupport jobConfiguration = new JobSupport(); + step = new ItemOrientedStep("interruptedStep"); + jobConfiguration.addStep(step); + jobConfiguration.setBeanName("testJob"); + jobRepository.createJobExecution(jobConfiguration, new JobParameters()); + step.setJobRepository(jobRepository); + step.setTransactionManager(new ResourcelessTransactionManager()); + writer = new AbstractItemWriter(){ + public void write(Object item) throws Exception { + }}; + step.setItemProcessor(new SimpleItemProcessor(new ItemReaderAdapter(), writer)); + } + + + public void testInterruptChunk() throws Exception { + + JobExecution jobExecutionContext = new JobExecution(new JobInstance(new Long(0L), new JobParameters(), new JobSupport("testJob"))); + final StepExecution stepExecution = new StepExecution(step, jobExecutionContext); + step.setItemProcessor(new SimpleItemProcessor(new AbstractItemReader() { + public Object read() throws Exception { + // do something non-trivial (and not Thread.sleep()) + double foo = 1; + for (int i = 2; i < 250; i++) { + foo = foo * i; + } + + if(foo != 1){ + return new Double(foo); + } + else{ + return null; + } + } + }, writer)); + + Thread processingThread = new Thread() { + public void run() { + try { + step.execute(stepExecution); + } + catch (JobInterruptedException e) { + // do nothing... + } + } + }; + + processingThread.start(); + + Thread.sleep(100); + + processingThread.interrupt(); + + int count = 0; + while (processingThread.isAlive() && count < 1000) { + Thread.sleep(20); + count++; + } + + assertFalse(processingThread.isAlive()); + assertEquals(BatchStatus.STOPPED, stepExecution.getStatus()); + } + + public void testInterruptStep() throws Exception { + RepeatTemplate template = new RepeatTemplate(); + // N.B, If we don't set the completion policy it might run forever + template.setCompletionPolicy(new SimpleCompletionPolicy(2)); + step.setChunkOperations(template); + testInterruptChunk(); + } + +}