Batch JMS processing

Run
How to run the sample
The source code for this sample can be found in the javaee7-samples GitHub repository. The first thing we need to do is to get the source by downloading the repository and then go into the samples folder:
git clone git://github.com/javaee-samples/javaee7-samples.git
cd javaee7-samples/jms/jms-batch/
Now we are ready to start testing. You can run all the tests in this sample by executing:
mvn test
Or you can run individual tests by executing one of the following:
mvn test -Dtest=JmsItemReaderTest

ItemReader reading from durable subscription

JmsItemReaderTest

This test demonstrates programmatical creation of durable consumer, and reading its subscribed messages in a batch job in form of an ItemReader.

@Named()
public class JmsItemReader extends AbstractItemReader {

    public JmsItemReader();
    @Resource(lookup = Resources.CONNECTION_FACTORY)
    ConnectionFactory factory;
    private JMSContext jms;
    @Resource(lookup = Resources.TOPIC)
    Topic topic;
    private JMSConsumer subscription;

    @Override()
    public void open(Serializable checkpoint) throws Exception;

    @Override()
    public Object readItem() throws Exception;

    @Override()
    public void close() throws Exception;
}

The items are then fed into the writer, that performs the aggregation and stores the result into a @Singleton EJB.

@Named()
public class SummingItemWriter extends AbstractItemWriter {

    public SummingItemWriter();
    @Inject()
    ResultCollector collector;
    private int numItems;
    private int sum;

    @Override()
    public void open(Serializable checkpoint) throws Exception;

    @Override()
    public void writeItems(List<Object> objects) throws Exception;

    @Override()
    public void close() throws Exception;

    private int computeSum(List<Object> objects);
}

Upon deployment a topic and connection factory for durable subscription are created:

@JMSDestinationDefinition(name = Resources.TOPIC, resourceAdapter = "jmsra", interfaceName = "javax.jms.Topic", destinationName = "batch.topic", description = "Batch processing topic")
@JMSConnectionFactoryDefinition(name = Resources.CONNECTION_FACTORY, resourceAdapter = "jmsra", clientId = "batchJob", description = "Connection factory with clientId of the durable subscription")
public class Resources {

    public Resources();
    public static final String SUBSCRIPTION = "BatchJob";
    public static final String TOPIC = "java:app/batch/topic";
    public static final String CONNECTION_FACTORY = "java:app/batch/factory";
}

Then the subscription itself is created by means of @Singleton @Startup EJB SubscriptionCreator.

@PostConstruct
void createSubscription() {
    try (JMSContext jms = factory.createContext()) { (1)
        JMSConsumer consumer = jms.createDurableConsumer(topic, Resources.SUBSCRIPTION); (2)
        consumer.close();
    }
}
  1. This is factory with clientId specified

  2. creates durable subscription on the topic

The job itself computes sum and count of random numbers that are send on the topic. Note that at time of sending there is no active consumer listening on the topic.

@Deployment
public static WebArchive deployment() {
    return ShrinkWrap.create(WebArchive.class)
            .addAsWebInfResource(EmptyAsset.INSTANCE, ArchivePaths.create("beans.xml"))
            .addClass(BatchTestHelper.class)
            .addPackage(JmsItemReader.class.getPackage())
            .addAsResource("META-INF/batch-jobs/jms-job.xml");
}

In this test case we verify that the subscription is really created upon deployment and thus messages are waiting for the job even before the first run of it.

The subscription is not deleted even after the application is undeployed, because the physical topic and its subscription in the message broker still exist, even after the application scoped managed objects are deleted.

Following method is used to generate the payload:

private int sendMessages(int count) {
    int sum = 0;
    Random r = new Random();
    try (JMSContext jms = factory.createContext(Session.AUTO_ACKNOWLEDGE)) {
        JMSProducer producer = jms.createProducer();
        for (int i=0; i< count; i++) {
            int payload = r.nextInt();
            producer.send(topic, payload);
            sum += payload;
        }
    }
    return sum;
}

So we send 10 random numbers, and verify that summing integers works exactly the same way on both ends. Or that the job really picked up all the numbers submitted for the computation.

@InSequence(1)
@Test
public void worksAfterDeployment() throws InterruptedException {
    int sum = sendMessages(10);
    runJob();
    assertEquals(10, collector.getLastItemCount());
    assertEquals(sum, collector.getLastSum());
    assertEquals(1, collector.getNumberOfJobs());
}

To verify that the durable subscription really collects messages we do few more runs.

@InSequence(2)
@Test
public void worksInMultipleRuns() throws InterruptedException {
    int sum = sendMessages(14);
    runJob();
    assertEquals(14, collector.getLastItemCount());
    assertEquals(sum, collector.getLastSum());
    assertEquals(2, collector.getNumberOfJobs());
    sum = sendMessages(8); (1)
    sum += sendMessages(4);
    runJob();
    assertEquals(12, collector.getLastItemCount());
    assertEquals(sum, collector.getLastSum());
    assertEquals(3, collector.getNumberOfJobs());
}
  1. Sending messages from separate connections makes no difference

Share the Knowledge

Find this sample useful? Share on

There's a lot more about JavaEE to cover. If you're ready to learn more, check out the other available samples.

Help Improve

Find a bug in the sample? Something missing? You can fix it by editing the source, making the correction and sending a pull request. Or report the problem to the issue tracker

Recent Changelog

  • Oct 05, 2014: #252 fixed arquillian configuration for the jobs by John D. Ament
  • Jul 15, 2014: Removed header license. the licensing is now referenced in the license file in the root of the project by Roberto Cortez
  • Mar 20, 2014: Update arquillian.xml cr1 references to final by Aslak Knutsen
  • Feb 02, 2014: Fix include expression in test case by Aslak Knutsen
  • Jan 17, 2014: Jms batch sample by pdudits
How to help improve this sample
The source code for this sample can be found in the javaee7-samples GitHub repository. The first thing you need to do is to get the source by downloading the repository and then go into the samples folder:
git clone git://github.com/javaee-samples/javaee7-samples.git
cd javaee7-samples/jms/jms-batch/

Do the changes as you see fit and send a pull request!

Good Luck!