Issue Details (XML | Word | Printable)

Key: BATCH-220
Type: Task Task
Status: Resolved Resolved
Resolution: Fixed
Priority: Major Major
Assignee: Dave Syer
Reporter: Dave Syer
Votes: 0
Watchers: 3
Operations

If you were logged in you would be able to see more operations.
Spring Batch

Chunk-oriented approach to processing

Created: 29/Nov/07 07:28 AM   Updated: 29/Aug/08 10:24 AM
Component/s: Core
Affects Version/s: 1.0-m3
Fix Version/s: 2.0.0.M2
Security Level: Public (Public Issues)

Time Tracking:
Issue & Sub-Tasks
Issue Only
Original Estimate: 22d
Original Estimate - 22d
Remaining Estimate: 11.25d
Time Spent - 10.17d Remaining Estimate - 11.25d
Time Spent: 10.17d
Time Spent - 10.17d Remaining Estimate - 11.25d Time Not Required

File Attachments: 1. Zip Archive pretend-to-contend.zip (11 kB)


Sub-Tasks  All   Open   

 Description  « Hide
Mainly as an internal change inside (a new) StepExecutor. The concept of a Chunk needs to be elevated to first class status - a chunk is executed and takes care of itself (retry, skip, etc.) and returns a result message encapsulating the information the client needs to determine whether to abort. The client can easily handle multiple concurrently executing chunks as long as we are careful with restart data.

 All   Comments   Work Log   Change History   FishEye   Related Builds      Sort Order: Ascending order - Click to sort in descending order
Lucas Ward added a comment - 10/Feb/08 09:45 PM
I have added the reading portion of chunking with my last commit, however, I haven't modified the SimpleStepExecutor yet, since the 'Writing' portion would also need to be completed.

I modified the StepExecution to include a skipCount and a retryCount, but they aren't being persisted yet. One could argue whether or not they should be on the execution or executionAttributes, but it's hard to argue that they shouldn't be persisted, as it's extremely useful metadata about the step. Adding these two pieces allowed me to modify the ReadFailurePolicy to simply use a StepExecution. This makes even more sense since a skipLimit isn't for reading or writing, but rather global for the step. It's fairly natural that the domain object of StepExecution would be holding this attribute. The one piece I'm missing is how ExceptionHandling should be applied. In the case of reading, you would likely want to log out the skipped item immediately, but with writing that doesn't quite make sense. The ChunkResult that's in the sandbox models this fairly nicely, and it made me wonder if a similar approach couldn't be used for reading. The problem though, is that writing out a chunk has a defined end, whereas reading doesn't. You can't aggregate the skipped items and then return, since you *could* be reading for a long time. This makes the ReadFailurePolicy still necessary. The skipped items could still be put into a list and returned, leaving one exception handler, in the Step, to log them out.

A note on naming: I know we've always discussed 'ChunkReader' or 'ChunkProvider', but I added the interface as 'Chunker'. We've used it before at Accenture, and I've found precedent in other places for similar types of usage: http://developers.sun.com/webtier/reference/techart/chunked_req.html In that document it's 'Chunked' and 'Unchunked'. The basic thinking is that a 'Chunker' is taking a stream of items and 'chunking' them, rather than 'reading chunks'. i.e. "The thing that chunks items is a chunker'. it' fairly easy to rename, so I thought I would throw that out there.

Lucas Ward added a comment - 12/Feb/08 12:19 AM
I added the 'Dechunker' as the opposite of 'Chunker'. In it, I assumed that it shouldn't be managing transactions, and thus can only make basic decisions about whether or not an output error should be skipped leaving the 'higher power' to decide whether or not the chunk should be retried by handing the chunk back to be 'de-chunked'. A quick note on the naming, apparently there's more precedence for 'chunker' and 'dechunker' than I thought: http://onlinedictionary.datasegment.com/word/dechunker.

I lost a little bit of steam trying to create a step executor that could work with the chunker and dechunker, since it's a pretty large change. I'll hopefully be able to commit it tomorrow.

Lucas Ward added a comment - 12/Feb/08 11:28 PM
I have added a new class: "ChunkedStep" that uses the chunker and dechunker. it's still a bit rough, but it's functioning, as can be seen by the unit tests. I'm also finding a couple of bugs with the SimpleStepExecutor I pulled from. (i.e. BATCH-357) The whole 'execution attributes' change still seems like it needs work.

Overall, I'm fairly happy with how things have worked out, it took a lot less effort than I thought to get it working for basic use cases, and as mentioned above, I had more issues with bugs that were in the existing code than I did with the 'chunk processing' portion. Even retry worked extremely well. If you look at one of the test cases, you can see that if the ItemWriter throws an exception, the chunk will be retried three times, since that's the default behavior of the RetryTemplate, before bombing out.

The last big hurdle is how to apply the skip policies and exception handling, but that shouldn't take too long.

Lucas Ward added a comment - 13/Feb/08 11:53 PM
I added quite a few more refinements to the chunking approach. Both the chunker and dechunker now return *Result objects that contain necessary information, including any exceptions encountered. This allows for the step to log out exceptions via an exception logger, which is purposely separate from the repeat template exception handler. The semantics of the two are very different. I also modified the chunker/dechunker interfaces so that a new instance doesn't have to be created for each chunk. It has made for some interesting issues when configuring the step. All of which should be alleviated by a namespace, since the ChunkedStep could then accept a chunker/dechunker without a need to worry about an ItemReader/ItemWriter.

Wayne Lund added a comment - 25/Feb/08 06:48 PM
I'm adding the chain of discussion that we've had about chunking recently in the order that that emails occurred.

======== gkick w/wxlund commentary =======

Very helpful recap of a long conversation with Spring Source yesterday.
See below for my comments. I'm concerned that we be able to support a
concurrent model as well so discussing what we have to do is helpful.

I was talking with Lucas today about the processing model for
spring-batch and we got around to chunk processing and transaction
boundaries. I was hoping that I could get your feedback on some of it.

Just to quickly make sure that we're all on the same page I'll recap my
assumptions:

If we were to consider a batch job to be reading and writing (assume
that processing is a noop or rolled up into writing) then we really have
two options for the transaction boundary. Either:
read(); tx{write();}
or
tx{read(); write();}
Let's also assume that since there is no requirement that allows a
transaction to be suspended in one thread and resumed in another (some
implementations can do it, some can't), any tx{} block is one thread.

<Correct. The discussion centered around these two options for txn
boundaries/>

So, as usual, we want the transaction to wrap the smallest amount of
code possible for our operation. Both for the typical "we want to have
the transaction open for as short a time as possible" reason and the
fact that any code that is outside the transaction can be executed in a
separate thread. So the question Lucas and I were trying to sift
through was whether or not it is necessary to have the read as part of
the transaction.

<Hadn't really thought about the separate thread aspect but so far we're
on the same page with one caveat. Because batch is also about through
put we actually compromise on the "short a time as possible" to support
much larger amounts of data in a commit's working set. This improves
throughput on almost any transactional resource.>


As I see it, the two arguments for including the read() in the
transaction would be to be able to (1) roll back the read when you error
on write (i.e. sync reading and writing to be nice and consistent) and
to be able to (2) persist statistics and restart data for your reader
with integrity.

<That's a good summary of where we were at. The other item was the
processedFlag/>

While (1) makes logical sense at first glance (when I can't process a
record, start the whole process over for that record), I don't think
that it holds up. The only way that rolling back to the read would be
beneficial would be if a record were to be deleted or modified
(otherwise you're just reading and converting the same objects multiple
times). Otherwise, you'd just have the object(s) that you read on the
first read in memory and wouldn't fetch them again.

<ok. I'll buy that for now. Although I'm not sure how skip fits in yet
but let's assume a skip never hits the write boundary at all./>

So, let's say that the order of events is read1, fail, change, read2,
write. If you were to roll back, the result of read1 would be different
than the result of read2, but if the read were outside the tx, you would
still have the result of read1. Is this a travesty? If the write fails
over and over again, those items will either be skipped or the job will
fail. Then the job starts over and picks up the new data anyway. It
doesn't seem like this extra little bit of robustness is worth the cost
(getting to the cost in a second...).

<I did not parse your scenario well. I do not know what you mean that
define read1, fail, change (what)?. Also is what a travesty? I can
think of certain scenarios where read1 being different after a rollback
would be alarming. Maybe not travesty -:)

T1 [ read1, (fails validate), read2, (pass validate), write, read3,
(pass validate), fails write, rollback]

But in the end, if we agree that as long as we are 100% certain what
know what was read, skipped, processed and written then I'm basically in
agreement.

>
>

Lucas also mentioned process indicators as a possible reason to wrap the
read. I think that the same concept applies. When you try to write to
the record, which is the same as the input record, you're essentially
going to do an update where the fields of the record you're updating
match the fields of the record you read. If there's no match, it fails.
Fail the job or skip. Try again with the new data.

<It's the same as the input record that you don't have transactional
context for. What I mean is that if I were in the same txn there is no
new database operation to find the record and then update the flag
because its already in my txn context. However, if done on the write
transaction there is an implicit read of at least the index page and
then update the processFlag. My point is that it's not exactly the same
semantics as when it was within the same txn.>


As for writing the restart and statistics data for the reader, there
doesn't seem to be any reason that that can't be passed from the reader
through the transaction boundary. So, say I have a chunk. The chunk
can know where it came from. "I am the chunk from line x to line y in z
flat file" or "I am the chunk from key x to key y of my driving query"
or whatever. Then, this just gets persisted when the chunk processes.
When a chunk writes, the reader and writer data gets persisted. In the
event of failure, the job didn't record that the chunk's records were
written so it reads them again. That seems to be an easy solution.

<I don't have a problem with this as long as the 100% rule above is
observed/>

Now for the cost. I can't go 30 seconds at Walgreens or on the message
boards or wherever without somebody asking me about performance. If
processing is confined to a single thread because the whole bulk of
processing has to be in a single transaction, performance is severely
limited. There's always partitioning, but that's obviously not a viable
solution for all jobs so there needs to be something else. I'm here
with an environment with machines with 16 processors per machine and
there's no way that batch jobs running in a single thread is ideal.

So, why not both? Mostly maintenance costs. I keep telling Lucas that
if it were up to me, the difference between iterative processing and
concurrent processing would be a thread pool of size 1. IMHO, not
having to maintain an iterative solution and a concurrent solution would
really help the long-term evolution. The codebase has already gotten
pretty huge anyway and it's a lot for a handful of guys to support as it
is.

<after a long discussion again with Lucas and Scott we agreed that both
makes sense. To continue. We'll let chunk move forward with different
txnl semantics but I'm still unconvinced that it won't produce
unpredicted errors in client code because the txnl semantics are
different. But its certainly worth exploring/>

While that turned out to be much longer than I intended it to be,
hopefully it will provide the grounds for some decent conversation. I
know my java pretty well, but I'm certainly no expert on batch
processing scenarios. What have I missed?

<We did have a good discussion over this email and it was very helpful.
Like I've said, my response is more like a parental, "because" than
solid evidence but I'm not comfortable in change txn semantics. I'm
much more comfortable in leaving both options in and seeing if it isn't
a viable alternative where people realize they didn't need there read
chunks within the same transaction/>

===== lucas =====
My only 2 cents to add here is about 'not having a parallel option'.
Technically, we always have. If you look at the parallelJob, you can
see what I mean. We've had a client in Argentina use this approach with
success. However, this is very dependent upon readers and writers that
can handle the concurrency. Our other approach has been to 'write' the
chunks to a queue, thus allowing us to grow by the amount of processors
we could attach to it on the other end. This is still possible with an
'iterative' solution by aggregating items in the writer and then writing
the chunk on flush() (another among many reasons to have a specific
flush)

Having said that, I do realize it's a bit hokie. I still believe in the
chunked approach, and I think the semantics are much better. If we're
going to write a chunk out to a queue, I'd rather actually have a full
chunk style approach rather than a 'hack'. BUT, it still works either
way.

It does sound like we're all in agreement that the chunk processing
should live to fight another day. I guess we can take the Torvald's
approach and toss them both out there and see what people end up using.
If some people are adamant about using read and write in one

===== gkick ======

Lucas & Wayne,

I'm about to completely trash the "parallel job," but Lucas and I have
already talked it through and we agree, so take no offense. :-) The
concept behind the parallel job is that read and write have to be in the
same thread, so you can just take that entire operation and put it into
an executor service. While that's good in theory, as Lucas mentioned
you need thread-safe readers and writers. This is very, very difficult.
Not only do you have to synchronize on the actual reading, skipped items
need to be read and updated concurrently and mark/reset needs to be
handled per thread (thread local is probably the best option). None of
the readers or writers in spring-batch handle this properly now and it
is a pretty big burden to place on developers implementing their own.
If parallel job ever had to rollback there would be a resounding
'kaboom' shortly thereafter.

The easy solution is to thread-scope readers and writers. A single
reader dispatches items to a group of processing threads and each of
those threads either its own writer. This works well for databases
because they are meant to be written to concurrently. In the case of a
file, writing to separate files an additional step to aggregate solves
the problem without having to create a thread-safe writer.

As for the cost/benefit, I mocked up a fake little batch processor.
(code attached) What it does is mock out contended resources (cpu, io,
etc.) at a scale that obscures the impact of the actual machine.
Basically, I just created a bunch of narcoleptic threads that sleep for
much, much longer than java takes to process. So, a cpu operation takes
10ms and a network operation takes 1000ms. My fake resources can also
be shared. So a single core cpu resource can be used by one thread
while a quad core cpu can be used by 4. This is a gross approximation,
but it more or less mocks out the scenario.

For my first run, just mocked up the approximation of a data load of 10
items. Reading takes 100ms (one thread can read from the file),
processing takes 10ms (dual core) and writing takes 1000ms (10
simultaneous db connections). With the iterative processing, everything
happens serially. So, the results are about 10 items * (100ms + 10ms +
1000ms) = 11100ms.
Iterative
Starting...
Finished in 11104ms

Then i moved the transaction boundary so that read and write no longer
needed to be in the same thread. Now I can submit the writing task to
an executor service.
Concurrent w/ 2 threads
Starting...
Finished in 5255ms
Concurrent w/ 3 threads
Starting...
Finished in 4143ms
Concurrent w/ 10 threads
Starting...
Finished in 2017ms

I do want to mention that, as Wayne pointed out, I have entirely ignored
multiple items in a single commit. The same concept should apply if you
just pretend that you are dealing with 10 chunks instead of 10 items.

Feel free to download it, play with the numbers and run it. Feel free
to criticize the model as well. I'm sure that the code written at 7 am
on the train isn't bulletproof... :-)

In any event, despite my numbers having little to do with reality, and
this being the equivalent to saying that you can build a house because
you made a model out of legos, it seems far too appealing not to pursue.
And actually, as Lucas and I have talked about this more and more,
having the two options for transaction semantics seems like a good idea.
I would even say that to a developer whether something is
'item-oriented' or 'chunk-oriented' is of little consequence. The
differentiating factor is really the txn and naming it as such might be
a good idea.


======= dkaminsky ====

Do you guys think there's any value in creating a Step implementation
specifically for parallelizing chunk processing (with and/or without
using a JMS queue)?


====== lucas =====

I've mentioned the concept to Dave before.

Really, the whole 'task executor' approach to the RepeatTemplate is the
only value to the whole thing. I'm highly tempted to copy the whole
ItemOreintatedStep into something called 'ParrallelStep' and remove the
repeat templates from the ItemOrientated one completely.


====== gkick =====
Just thinking that the chunked step strategy seems extra-useful for
threaded processing, and it seems a counter-intuitive to not be able to
control multithreading each of these chunks at the step level.


====== wxlund ===========
Just ran your example and looked through the code. Good examples but
not sure it changes the tone of our conversation. Also, your speeds are
really slow as our database updates were taking .5 ms on aix -:) but I
get your point and nobody's arguing your points. I'm also look at the
StagingItem<Reader, Writer, Processor) that Dave provided.

=========== gkick ========
No, it certainly doesn't change anything, just a nice, illustrative
example of the impact that moving that transaction boundary can have.
Plus, it was more fun to write than jira issues... :-) And yes, I
would hope that the speeds are slow. (If your database operations take
a second, you have bigger problems...) The numbers were purposefully
inflated because realistic numbers would be obscured by the actual
processing time of the fake resources.

I agree that leaving the iterative solution in the codebase is probably
best for developers. I entirely disagree with the concept of running a
lot of these types of steps concurrently and sharing streams. [snipped
some non-relevant discussion around StagingItem]. The iterative
solution is fine, but not for concurrent processing.



========= wxlund ==========
Dave, Ben,

Last Thursday we made the decision to pull back from our determination
to include chunking in the spring-batch-1.0. However, in our Accenture
status meeting on Friday there was a long debate over the merits of at
least leaving it in as another option. I'm going to include a fairly
long email on the topic with a couple of snippets to shorten the thread.
The point is that Scott and I, with Lucas and others, were feeling if
the coding and testing are complete than we could see no strong reason
not to include and see how it falls out in usage.

The thread is largely Greg Kick but he wasn't the only one on an
internal Accenture call to express dismay at taking chunking out of the
picture.

Summary of the thread:

* People were questioning whether there was a valid argument for
supplying an alternative transaction boundary. If it is a new batch job
and the use case warrants it is there really a problem with the
transaction boundary being only around the write 1/2 of the transaction?
Quite frankly, I couldn't come up with a good argument.

* There's a feeling that the chunking is an easier programming model
when viewed through the eye of concurrency, (e.g. easier to submit
processing to workers).

* There was pushback on our original arguments of processFlag updates
needing to be in the same transaction.

* There was pushback on the statistics needing to be in the same
transaction as the reads.

* There are a few other minor internal politics about m5 being the
"chunk" release that we can manage but we're not feeling with the
discussions on Friday that we have as strong a reason for removing as we
originally felt, which was solely based on caution.

* Moving the transaction boundary does not affect restart/statistics,
since they can be stored during the write transaction as well.

With that in mind, barring that the chunking is already in place, do you
see any strong reason for not including it in the release? I'm
personally fine with it staying in the code base even though I was one
of the most outspoken to remove it in our last call.

I'm also happy to add this to a jira issue but was questioning whether
this was internal laundry before publicly showing the discussion.

Wayne




=============== dsyer ========

Wayne,

This was timely as I was in the middle of moving the chunks out to a
branch. All of the points below are really good, and I don't disagree
with any of the analysis. I just think that there are too many
questions still open, and we will be diluting our efforts if we have to
maintain a both approaches in 1.0. There are still quite a few things
not working in the item-oriented (or iteration) approach, and if we have
to fix them twice it will cost us dearly in our deadlines. The same
goes, in spades, for bugs and maintenance of 1.0 after the release. In
my opinion the chunking discussions should continue, but we should focus
all our efforts on just getting a release out now.



Lucas Ward added a comment - 12/May/08 10:25 AM
If no one disagrees, I think this issue should probably be moved to 2.0. There's no way to implement it without adding some new interfaces, and it was my understanding that we couldn't do that in 1.1?

Dave Syer added a comment - 13/May/08 07:53 PM
There are definitely ways to do it without new interfaces on top of the item-oriented step. I have been working on some code that uses Spring Integration to process chunks, and I think it should go out as a sample in 1.1.