|
I added the 'Dechunker' as the opposite of 'Chunker'. In it, I assumed that it shouldn't be managing transactions, and thus can only make basic decisions about whether or not an output error should be skipped leaving the 'higher power' to decide whether or not the chunk should be retried by handing the chunk back to be 'de-chunked'. A quick note on the naming, apparently there's more precedence for 'chunker' and 'dechunker' than I thought: http://onlinedictionary.datasegment.com/word/dechunker.
I lost a little bit of steam trying to create a step executor that could work with the chunker and dechunker, since it's a pretty large change. I'll hopefully be able to commit it tomorrow. I have added a new class: "ChunkedStep" that uses the chunker and dechunker. it's still a bit rough, but it's functioning, as can be seen by the unit tests. I'm also finding a couple of bugs with the SimpleStepExecutor I pulled from. (i.e.
Overall, I'm fairly happy with how things have worked out, it took a lot less effort than I thought to get it working for basic use cases, and as mentioned above, I had more issues with bugs that were in the existing code than I did with the 'chunk processing' portion. Even retry worked extremely well. If you look at one of the test cases, you can see that if the ItemWriter throws an exception, the chunk will be retried three times, since that's the default behavior of the RetryTemplate, before bombing out. The last big hurdle is how to apply the skip policies and exception handling, but that shouldn't take too long. I added quite a few more refinements to the chunking approach. Both the chunker and dechunker now return *Result objects that contain necessary information, including any exceptions encountered. This allows for the step to log out exceptions via an exception logger, which is purposely separate from the repeat template exception handler. The semantics of the two are very different. I also modified the chunker/dechunker interfaces so that a new instance doesn't have to be created for each chunk. It has made for some interesting issues when configuring the step. All of which should be alleviated by a namespace, since the ChunkedStep could then accept a chunker/dechunker without a need to worry about an ItemReader/ItemWriter.
I'm adding the chain of discussion that we've had about chunking recently in the order that that emails occurred.
======== gkick w/wxlund commentary ======= Very helpful recap of a long conversation with Spring Source yesterday. See below for my comments. I'm concerned that we be able to support a concurrent model as well so discussing what we have to do is helpful. I was talking with Lucas today about the processing model for spring-batch and we got around to chunk processing and transaction boundaries. I was hoping that I could get your feedback on some of it. Just to quickly make sure that we're all on the same page I'll recap my assumptions: If we were to consider a batch job to be reading and writing (assume that processing is a noop or rolled up into writing) then we really have two options for the transaction boundary. Either: read(); tx{write();} or tx{read(); write();} Let's also assume that since there is no requirement that allows a transaction to be suspended in one thread and resumed in another (some implementations can do it, some can't), any tx{} block is one thread. <Correct. The discussion centered around these two options for txn boundaries/> So, as usual, we want the transaction to wrap the smallest amount of code possible for our operation. Both for the typical "we want to have the transaction open for as short a time as possible" reason and the fact that any code that is outside the transaction can be executed in a separate thread. So the question Lucas and I were trying to sift through was whether or not it is necessary to have the read as part of the transaction. <Hadn't really thought about the separate thread aspect but so far we're on the same page with one caveat. Because batch is also about through put we actually compromise on the "short a time as possible" to support much larger amounts of data in a commit's working set. This improves throughput on almost any transactional resource.> As I see it, the two arguments for including the read() in the transaction would be to be able to (1) roll back the read when you error on write (i.e. sync reading and writing to be nice and consistent) and to be able to (2) persist statistics and restart data for your reader with integrity. <That's a good summary of where we were at. The other item was the processedFlag/> While (1) makes logical sense at first glance (when I can't process a record, start the whole process over for that record), I don't think that it holds up. The only way that rolling back to the read would be beneficial would be if a record were to be deleted or modified (otherwise you're just reading and converting the same objects multiple times). Otherwise, you'd just have the object(s) that you read on the first read in memory and wouldn't fetch them again. <ok. I'll buy that for now. Although I'm not sure how skip fits in yet but let's assume a skip never hits the write boundary at all./> So, let's say that the order of events is read1, fail, change, read2, write. If you were to roll back, the result of read1 would be different than the result of read2, but if the read were outside the tx, you would still have the result of read1. Is this a travesty? If the write fails over and over again, those items will either be skipped or the job will fail. Then the job starts over and picks up the new data anyway. It doesn't seem like this extra little bit of robustness is worth the cost (getting to the cost in a second...). <I did not parse your scenario well. I do not know what you mean that define read1, fail, change (what)?. Also is what a travesty? I can think of certain scenarios where read1 being different after a rollback would be alarming. Maybe not travesty -:) T1 [ read1, (fails validate), read2, (pass validate), write, read3, (pass validate), fails write, rollback] But in the end, if we agree that as long as we are 100% certain what know what was read, skipped, processed and written then I'm basically in agreement. > > Lucas also mentioned process indicators as a possible reason to wrap the read. I think that the same concept applies. When you try to write to the record, which is the same as the input record, you're essentially going to do an update where the fields of the record you're updating match the fields of the record you read. If there's no match, it fails. Fail the job or skip. Try again with the new data. <It's the same as the input record that you don't have transactional context for. What I mean is that if I were in the same txn there is no new database operation to find the record and then update the flag because its already in my txn context. However, if done on the write transaction there is an implicit read of at least the index page and then update the processFlag. My point is that it's not exactly the same semantics as when it was within the same txn.> As for writing the restart and statistics data for the reader, there doesn't seem to be any reason that that can't be passed from the reader through the transaction boundary. So, say I have a chunk. The chunk can know where it came from. "I am the chunk from line x to line y in z flat file" or "I am the chunk from key x to key y of my driving query" or whatever. Then, this just gets persisted when the chunk processes. When a chunk writes, the reader and writer data gets persisted. In the event of failure, the job didn't record that the chunk's records were written so it reads them again. That seems to be an easy solution. <I don't have a problem with this as long as the 100% rule above is observed/> Now for the cost. I can't go 30 seconds at Walgreens or on the message boards or wherever without somebody asking me about performance. If processing is confined to a single thread because the whole bulk of processing has to be in a single transaction, performance is severely limited. There's always partitioning, but that's obviously not a viable solution for all jobs so there needs to be something else. I'm here with an environment with machines with 16 processors per machine and there's no way that batch jobs running in a single thread is ideal. So, why not both? Mostly maintenance costs. I keep telling Lucas that if it were up to me, the difference between iterative processing and concurrent processing would be a thread pool of size 1. IMHO, not having to maintain an iterative solution and a concurrent solution would really help the long-term evolution. The codebase has already gotten pretty huge anyway and it's a lot for a handful of guys to support as it is. <after a long discussion again with Lucas and Scott we agreed that both makes sense. To continue. We'll let chunk move forward with different txnl semantics but I'm still unconvinced that it won't produce unpredicted errors in client code because the txnl semantics are different. But its certainly worth exploring/> While that turned out to be much longer than I intended it to be, hopefully it will provide the grounds for some decent conversation. I know my java pretty well, but I'm certainly no expert on batch processing scenarios. What have I missed? <We did have a good discussion over this email and it was very helpful. Like I've said, my response is more like a parental, "because" than solid evidence but I'm not comfortable in change txn semantics. I'm much more comfortable in leaving both options in and seeing if it isn't a viable alternative where people realize they didn't need there read chunks within the same transaction/> ===== lucas ===== My only 2 cents to add here is about 'not having a parallel option'. Technically, we always have. If you look at the parallelJob, you can see what I mean. We've had a client in Argentina use this approach with success. However, this is very dependent upon readers and writers that can handle the concurrency. Our other approach has been to 'write' the chunks to a queue, thus allowing us to grow by the amount of processors we could attach to it on the other end. This is still possible with an 'iterative' solution by aggregating items in the writer and then writing the chunk on flush() (another among many reasons to have a specific flush) Having said that, I do realize it's a bit hokie. I still believe in the chunked approach, and I think the semantics are much better. If we're going to write a chunk out to a queue, I'd rather actually have a full chunk style approach rather than a 'hack'. BUT, it still works either way. It does sound like we're all in agreement that the chunk processing should live to fight another day. I guess we can take the Torvald's approach and toss them both out there and see what people end up using. If some people are adamant about using read and write in one ===== gkick ====== Lucas & Wayne, I'm about to completely trash the "parallel job," but Lucas and I have already talked it through and we agree, so take no offense. :-) The concept behind the parallel job is that read and write have to be in the same thread, so you can just take that entire operation and put it into an executor service. While that's good in theory, as Lucas mentioned you need thread-safe readers and writers. This is very, very difficult. Not only do you have to synchronize on the actual reading, skipped items need to be read and updated concurrently and mark/reset needs to be handled per thread (thread local is probably the best option). None of the readers or writers in spring-batch handle this properly now and it is a pretty big burden to place on developers implementing their own. If parallel job ever had to rollback there would be a resounding 'kaboom' shortly thereafter. The easy solution is to thread-scope readers and writers. A single reader dispatches items to a group of processing threads and each of those threads either its own writer. This works well for databases because they are meant to be written to concurrently. In the case of a file, writing to separate files an additional step to aggregate solves the problem without having to create a thread-safe writer. As for the cost/benefit, I mocked up a fake little batch processor. (code attached) What it does is mock out contended resources (cpu, io, etc.) at a scale that obscures the impact of the actual machine. Basically, I just created a bunch of narcoleptic threads that sleep for much, much longer than java takes to process. So, a cpu operation takes 10ms and a network operation takes 1000ms. My fake resources can also be shared. So a single core cpu resource can be used by one thread while a quad core cpu can be used by 4. This is a gross approximation, but it more or less mocks out the scenario. For my first run, just mocked up the approximation of a data load of 10 items. Reading takes 100ms (one thread can read from the file), processing takes 10ms (dual core) and writing takes 1000ms (10 simultaneous db connections). With the iterative processing, everything happens serially. So, the results are about 10 items * (100ms + 10ms + 1000ms) = 11100ms. Iterative Starting... Finished in 11104ms Then i moved the transaction boundary so that read and write no longer needed to be in the same thread. Now I can submit the writing task to an executor service. Concurrent w/ 2 threads Starting... Finished in 5255ms Concurrent w/ 3 threads Starting... Finished in 4143ms Concurrent w/ 10 threads Starting... Finished in 2017ms I do want to mention that, as Wayne pointed out, I have entirely ignored multiple items in a single commit. The same concept should apply if you just pretend that you are dealing with 10 chunks instead of 10 items. Feel free to download it, play with the numbers and run it. Feel free to criticize the model as well. I'm sure that the code written at 7 am on the train isn't bulletproof... :-) In any event, despite my numbers having little to do with reality, and this being the equivalent to saying that you can build a house because you made a model out of legos, it seems far too appealing not to pursue. And actually, as Lucas and I have talked about this more and more, having the two options for transaction semantics seems like a good idea. I would even say that to a developer whether something is 'item-oriented' or 'chunk-oriented' is of little consequence. The differentiating factor is really the txn and naming it as such might be a good idea. ======= dkaminsky ==== Do you guys think there's any value in creating a Step implementation specifically for parallelizing chunk processing (with and/or without using a JMS queue)? ====== lucas ===== I've mentioned the concept to Dave before. Really, the whole 'task executor' approach to the RepeatTemplate is the only value to the whole thing. I'm highly tempted to copy the whole ItemOreintatedStep into something called 'ParrallelStep' and remove the repeat templates from the ItemOrientated one completely. ====== gkick ===== Just thinking that the chunked step strategy seems extra-useful for threaded processing, and it seems a counter-intuitive to not be able to control multithreading each of these chunks at the step level. ====== wxlund =========== Just ran your example and looked through the code. Good examples but not sure it changes the tone of our conversation. Also, your speeds are really slow as our database updates were taking .5 ms on aix -:) but I get your point and nobody's arguing your points. I'm also look at the StagingItem<Reader, Writer, Processor) that Dave provided. =========== gkick ======== No, it certainly doesn't change anything, just a nice, illustrative example of the impact that moving that transaction boundary can have. Plus, it was more fun to write than jira issues... :-) And yes, I would hope that the speeds are slow. (If your database operations take a second, you have bigger problems...) The numbers were purposefully inflated because realistic numbers would be obscured by the actual processing time of the fake resources. I agree that leaving the iterative solution in the codebase is probably best for developers. I entirely disagree with the concept of running a lot of these types of steps concurrently and sharing streams. [snipped some non-relevant discussion around StagingItem]. The iterative solution is fine, but not for concurrent processing. ========= wxlund ========== Dave, Ben, Last Thursday we made the decision to pull back from our determination to include chunking in the spring-batch-1.0. However, in our Accenture status meeting on Friday there was a long debate over the merits of at least leaving it in as another option. I'm going to include a fairly long email on the topic with a couple of snippets to shorten the thread. The point is that Scott and I, with Lucas and others, were feeling if the coding and testing are complete than we could see no strong reason not to include and see how it falls out in usage. The thread is largely Greg Kick but he wasn't the only one on an internal Accenture call to express dismay at taking chunking out of the picture. Summary of the thread: * People were questioning whether there was a valid argument for supplying an alternative transaction boundary. If it is a new batch job and the use case warrants it is there really a problem with the transaction boundary being only around the write 1/2 of the transaction? Quite frankly, I couldn't come up with a good argument. * There's a feeling that the chunking is an easier programming model when viewed through the eye of concurrency, (e.g. easier to submit processing to workers). * There was pushback on our original arguments of processFlag updates needing to be in the same transaction. * There was pushback on the statistics needing to be in the same transaction as the reads. * There are a few other minor internal politics about m5 being the "chunk" release that we can manage but we're not feeling with the discussions on Friday that we have as strong a reason for removing as we originally felt, which was solely based on caution. * Moving the transaction boundary does not affect restart/statistics, since they can be stored during the write transaction as well. With that in mind, barring that the chunking is already in place, do you see any strong reason for not including it in the release? I'm personally fine with it staying in the code base even though I was one of the most outspoken to remove it in our last call. I'm also happy to add this to a jira issue but was questioning whether this was internal laundry before publicly showing the discussion. Wayne =============== dsyer ======== Wayne, This was timely as I was in the middle of moving the chunks out to a branch. All of the points below are really good, and I don't disagree with any of the analysis. I just think that there are too many questions still open, and we will be diluting our efforts if we have to maintain a both approaches in 1.0. There are still quite a few things not working in the item-oriented (or iteration) approach, and if we have to fix them twice it will cost us dearly in our deadlines. The same goes, in spades, for bugs and maintenance of 1.0 after the release. In my opinion the chunking discussions should continue, but we should focus all our efforts on just getting a release out now. If no one disagrees, I think this issue should probably be moved to 2.0. There's no way to implement it without adding some new interfaces, and it was my understanding that we couldn't do that in 1.1?
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
I modified the StepExecution to include a skipCount and a retryCount, but they aren't being persisted yet. One could argue whether or not they should be on the execution or executionAttributes, but it's hard to argue that they shouldn't be persisted, as it's extremely useful metadata about the step. Adding these two pieces allowed me to modify the ReadFailurePolicy to simply use a StepExecution. This makes even more sense since a skipLimit isn't for reading or writing, but rather global for the step. It's fairly natural that the domain object of StepExecution would be holding this attribute. The one piece I'm missing is how ExceptionHandling should be applied. In the case of reading, you would likely want to log out the skipped item immediately, but with writing that doesn't quite make sense. The ChunkResult that's in the sandbox models this fairly nicely, and it made me wonder if a similar approach couldn't be used for reading. The problem though, is that writing out a chunk has a defined end, whereas reading doesn't. You can't aggregate the skipped items and then return, since you *could* be reading for a long time. This makes the ReadFailurePolicy still necessary. The skipped items could still be put into a list and returned, leaving one exception handler, in the Step, to log them out.
A note on naming: I know we've always discussed 'ChunkReader' or 'ChunkProvider', but I added the interface as 'Chunker'. We've used it before at Accenture, and I've found precedent in other places for similar types of usage: http://developers.sun.com/webtier/reference/techart/chunked_req.html In that document it's 'Chunked' and 'Unchunked'. The basic thinking is that a 'Chunker' is taking a stream of items and 'chunking' them, rather than 'reading chunks'. i.e. "The thing that chunks items is a chunker'. it' fairly easy to rename, so I thought I would throw that out there.