Batch Chunk Partition Mapper

Run
How to run the sample
The source code for this sample can be found in the javaee7-samples GitHub repository. The first thing we need to do is to get the source by downloading the repository and then go into the samples folder:
git clone git://github.com/javaee-samples/javaee7-samples.git
cd javaee7-samples/batch/chunk-mapper/
Now we are ready to start testing. You can run all the tests in this sample by executing:
mvn test
Or you can run individual tests by executing one of the following:
mvn test -Dtest=BatchChunkMapperTest

Chunk Processing - Read, Process, Write in multiple Threads

BatchChunkMapperTest

The Batch specification provides a Chunk Oriented processing style. This style is defined by enclosing into a transaction a set of reads, process and write operations via ItemReader, ItemProcessor and ItemWriter. Items are read one at a time, processed and aggregated. The transaction is then committed when the defined checkpoint-policy is triggered.

Many batch processing problems can be solved with single threaded, single process jobs, but the Batch specification allows for steps to be executed as a partitioned step, meaning that the step can be parallelized across multiple threads. This is useful if you have some kind of bottleneck or if you can considerable boost your batch processing performance by splitting the work to be done.

You can define the number of partitions and the number of threads using a custom mapper. The custom mapper needs to implement PartitionMapper and create a new PartitionPlan to define the partitions behaviour. Each partition is required to receive a set of unique parameters that instruct it into which data it should operate.

Since each thread runs a separate copy of the step, chunking and checkpointing occur independently on each thread for chunk type steps.

<?xml version="1.0" encoding="UTF-8"?>
<job id="myJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <step id="myStep" >
        <chunk item-count="3">
            <reader ref="myItemReader">
                <properties>
                    <property name="start" value="#{partitionPlan['start']}"  />
                    <property name="end" value="#{partitionPlan['end']}"  />
                </properties>
            </reader>
            <processor ref="myItemProcessor"/>
            <writer ref="myItemWriter"/>
        </chunk>
        <partition>
            <mapper ref="myMapper"/>
            <reducer ref="myReducer"/>

        </partition>
    </step>
</job>

A job is defined in the myJob.xml file. Just a single step with a reader, a processor and a writer. This step also defines that the step should be executed into a partition with a custom mapper:

@Named()
public class MyMapper implements PartitionMapper {

    public MyMapper();

    @Override()
    public PartitionPlan mapPartitions() throws Exception;
}

The mapper defines 2 partitions and 2 threads. Properties for each partition define the data that is going to be read. For the first partition we start on 1 and end on 10. For the second partition we start on 11 and end on 20. The MyItemReader will generate the data based on these properties.

@Named()
public class MyItemReader extends AbstractItemReader {

    public MyItemReader();
    public static int totalReaders;
    private int readerId;
    private StringTokenizer tokens;
    @Inject()
    @BatchProperty(name = "start")
    private String startProp;
    @Inject()
    @BatchProperty(name = "end")
    private String endProp;
    @Inject()
    private JobContext context;

    @Override()
    public void open(Serializable e);

    @Override()
    public MyInputRecord readItem();
}

We’re just going to deploy the application as a web archive. Note the inclusion of the following files:

/META-INF/batch-jobs/myJob.xml

The myJob.xml file is needed for running the batch definition.

@Deployment
public static WebArchive createDeployment() {
    WebArchive war = ShrinkWrap.create(WebArchive.class)
            .addClass(BatchTestHelper.class)
            .addPackage("org.javaee7.batch.sample.chunk.mapper")
            .addAsWebInfResource(EmptyAsset.INSTANCE, ArchivePaths.create("beans.xml"))
            .addAsResource("META-INF/batch-jobs/myJob.xml");
    System.out.println(war.toString(true));
    return war;
}

In the test, we’re just going to invoke the batch execution and wait for completion. To validate the test expected behaviour we need to query the Metric object available in the step execution.

The batch process itself will read and process 20 elements from numbers 1 to 20, but only write the odd elements. Elements from 1 to 10 will be processed in one partition and elements from 11 to 20 in another partition. Commits are executed after 3 elements are read by partition.

@Test
public void testBatchChunkMapper() throws Exception {
    JobOperator jobOperator = BatchRuntime.getJobOperator();
    Long executionId = jobOperator.start("myJob", new Properties());
    JobExecution jobExecution = jobOperator.getJobExecution(executionId);

    jobExecution = BatchTestHelper.keepTestAlive(jobExecution);

    List<StepExecution> stepExecutions = jobOperator.getStepExecutions(executionId);
    for (StepExecution stepExecution : stepExecutions) {
        if (stepExecution.getStepName().equals("myStep")) {
            Map<Metric.MetricType, Long> metricsMap = BatchTestHelper.getMetricsMap(stepExecution.getMetrics());

            (1)
            assertEquals(20L, metricsMap.get(Metric.MetricType.READ_COUNT).longValue());
            (2)
            assertEquals(10L, metricsMap.get(Metric.MetricType.WRITE_COUNT).longValue());
            // Number of elements by the item count value on myJob.xml, plus an additional transaction for the
            // remaining elements by each partition.
            long commitCount = (10L / 3 + (10 % 3 > 0 ? 1 : 0)) * 2;
            (3)
            assertEquals(commitCount, metricsMap.get(Metric.MetricType.COMMIT_COUNT).longValue());
        }
    }

    (4)
    assertEquals(2L, MyItemReader.totalReaders);
    (5)
    assertEquals(BatchStatus.COMPLETED, jobExecution.getBatchStatus());
}
  1. The read count should be 20 elements. Check MyItemReader.

  2. The write count should be 10. Only half of the elements read are processed to be written.

  3. The commit count should be 8. Checkpoint is on every 3rd read, 4 commits for read elements and 2 partitions.

  4. Make sure that all the partitions were created.

  5. Job should be completed.

Share the Knowledge

Find this sample useful? Share on

There's a lot more about JavaEE to cover. If you're ready to learn more, check out the other available samples.

Help Improve

Find a bug in the sample? Something missing? You can fix it by editing the source, making the correction and sending a pull request. Or report the problem to the issue tracker

Recent Changelog

  • Dec 14, 2014: Switch from polling on jobexecution (for job completion) to polling with joboperator and executionid by Scott Kurz
  • Jul 05, 2014: Removed header license for batch xml files by Roberto Cortez
  • Jun 22, 2014: Removed header license. the licensing is now referenced in the license file in the root of the project by Roberto Cortez
  • Jun 20, 2014: Added fqn to java ee api references to generate direct links to javadocs by radcortez
  • Jun 19, 2014: Documentation clarifications and typos by radcortez
  • Jun 18, 2014: Added documentation to chunk-mapper project by radcortez
  • Dec 31, 2013: Code style issues by Roberto Cortez
  • Dec 31, 2013: Removed servlets and jsp's by Roberto Cortez
  • Dec 28, 2013: Added test for chunk-mapper project by Roberto Cortez
  • Oct 18, 2013: Returning correct records, fixing https://github.com/arun-gupta/javaee7-samples/issues/47 by Arun Gupta
How to help improve this sample
The source code for this sample can be found in the javaee7-samples GitHub repository. The first thing you need to do is to get the source by downloading the repository and then go into the samples folder:
git clone git://github.com/javaee-samples/javaee7-samples.git
cd javaee7-samples/batch/chunk-mapper/

Do the changes as you see fit and send a pull request!

Good Luck!