Enterprise Integration Pattern With Spring

Enterprise Integration Pattern with Spring

Recently in one of my project I got a requirement to poll a directory and it's sub directories on a constant rate and process the files residing in it to drive some business information out of it. To implement the same we used enterprise integration pattern implementation of spring because of two reasons, firstly – we are already using spring as our backend framework and secondly – it enforce separation of concerns between business logic and integration logic in an intuitive way with well-defined boundaries to promote reusability and portability.

What is Spring Integration?

Spring Integration is an enterprise integration pattern implementation of spring which supports integration with external systems via declarative adapters and these adapters provide a higher-level of abstraction over Spring's support for remoting, messaging, and scheduling. It does not need a container or separate process space and can be invoked in existing program as it is just a JAR which can be dropped with WAR or standalone systems.

As I mentioned it works using adapters, we created InboundChannelAdapter as a spring bean which starts at time of application boot up and constantly polls a directory specified noticing the Scanner and Filter specified as follows:

@Bean
    @InboundChannelAdapter(value = "fileIn", autoStartup = "true", poller = @Poller(fixedDelay = "500"))
    public MessageSource<File> fileMessageSource() throws Exception {
        FileReadingMessageSource fileReadingMessageSource = new FileReadingMessageSource();
        fileReadingMessageSource.setScanner(dirScanner());
        fileReadingMessageSource.setDirectory(new File(pollingDir));
        return fileReadingMessageSource;
    }

Scanner specified above is the strategy for scanning directories and we used WatchServiceDirectoryScanner implementation along with the composite filter as our requirement is to scan the directory and it's sub directories as well for a file ending with predefined pattern.

    @Bean
    public DirectoryScanner dirScanner() throws Exception {
        WatchServiceDirectoryScanner watchServiceDirectoryScanner = new WatchServiceDirectoryScanner(
                    "/Users/ArpitAggarwal/directory/");
        watchServiceDirectoryScanner.setFilter(compositeFilter());
        watchServiceDirectoryScanner.setAutoStartup(true);
        return watchServiceDirectoryScanner;
    }

In this post we will be polling for a file pattern ending with .csv in a /Users/ArpitAggarwal/directory/ directory, so we implemented SimplePatternFileListFilter provided by framework as below:

@Bean
    public CompositeFileListFilter<File> compositeFilter() throws Exception {
        return new CompositeFileListFilter<>(getFileListFilterList("*.csv"));
    }
    private List<FileListFilter<File>> getFileListFilterList(
            final String pattern) {
        List<FileListFilter<File>> fileListFilterList = new ArrayList<>();
        fileListFilterList.add(new SimplePatternFileListFilter(pattern));
        return fileListFilterList;
    }

By default framework keep in-memory track of files read from directory which doesn't suffice our need as we want to process the file only once even on server restart, which make us use FileSystemPersistentAcceptOnceFileListFilter implementation of framework which requires directory location to be specified to save information of files already processed on disk in form of properties file named metadata-store.properties as follows:

@Bean
    public FileSystemPersistentAcceptOnceFileListFilter persistentFilter() throws Exception {
        FileSystemPersistentAcceptOnceFileListFilter fileSystemPersistentAcceptOnceFileListFilter = new FileSystemPersistentAcceptOnceFileListFilter(
                metadataStore(), "");
        fileSystemPersistentAcceptOnceFileListFilter.setFlushOnUpdate(true);
        return fileSystemPersistentAcceptOnceFileListFilter;
    }

    public PropertiesPersistingMetadataStore metadataStore() throws Exception{
        PropertiesPersistingMetadataStore propertiesPersistingMetadataStore = new PropertiesPersistingMetadataStore();
        propertiesPersistingMetadataStore.setBaseDirectory("/Users/ArpitAggarwal/directory/");
        propertiesPersistingMetadataStore.afterPropertiesSet();
        return propertiesPersistingMetadataStore;
    }

Integrating FileSystemPersistentAcceptOnceFileListFilter as part of composite filter results in changing the FileListFilterList bean definition as follows:

private List<FileListFilter<File>> getFileListFilterList(
            final String pattern) {
        List<FileListFilter<File>> fileListFilterList = new ArrayList<>();
        fileListFilterList.add(new SimplePatternFileListFilter(pattern));
        fileListFilterList.add(persistentFilter());
        return fileListFilterList;
    }

That's all about basic configuration to look up a directory and it's sub directories for files with ending with .csv.

Next we need the action to be taken once file is read and to that framework provides @ServiceActivator to be specified with inputChannel over a method definition with file as an argument, as follows:

@Service
public class FileInServiceActivator {

    @ServiceActivator(inputChannel = "fileIn")
    public void run(File file) {
        String fileName = file.getAbsolutePath();
        System.out.println("File to be processed " + fileName);
    }
}

The complete source code is hosted on github.

Leave a Reply

Your email address will not be published. Required fields are marked *