Spring-Batch + Spring-Integration

Toutes les applications un tant soit peu conséquentes nécessitent la mise en œuvre de traitements batchs. Ces traitements peuvent même dans certains contextes revêtir un aspect très critique.

Toute mise en œuvre de traitements batchs amène des problématiques d’exploitation classiques : planification des traitements, suivi des exécutions pour vérifier que les traitements ne tombent pas en erreur ou durent trop longtemps, gestion des incidents (relance des traitements en échec …).

Les grandes sociétés sont généralement assez largement outillées en ordonnanceurs sophistiqués (type Dollar-U ou Control-M), en outils de supervision divers et variés, et de plateformes techniques procurant différentes formes de haute disponibilité.

Maintenant imaginons que vous n’ayez pas la chance d’être dans une société pourvue d’un portefeuille épais, et que vous deviez ordonnancer un ensemble de batchs avec de la répartition de charge et de la reprise sur erreur : la solution est loin d’être évidente tant les problématiques sont variées et potentiellement source de forte complexité.

Cet article vous propose une solution basée sur des briques open source : Spring-Batch et Spring-Intégration sont deux projets leadés par SpringSource l’éditeur bien connu qui se cache derrière le non moins fameux framework Spring.  De notre point de vue, ce sont deux outils à considérer absolument pour résoudre ce type de problématique.

Ils apportent des solutions clés en mains, désignées avec les meilleurs patterns, pour toute une série de problématiques récurrentes dans leur scope respectif, à savoir les traitements batchs d’une part et l’intégration d’applications au sein d’un SI d’autre part. Bien entendu, ils s’appuient sur le cœur de Spring (IOC, AOP).

Dans notre exemple, nous visons à répondre aux problématiques de scalabilité, load-balancing et fail-over avec reprise sur erreur. Nous espérons ainsi, outre apporter une solution clé en main, susciter votre intérêt pour ces produits qui offrent encore bien plus de perspectives et qui évoluent régulièrement.

Vision générale de la solution

Le cahier des charges est le suivant :

  1. Chaque partenaire dépose ses fichiers dans un répertoire distinct.
  2. Pour un partenaire donné, les fichiers doivent êtres traités de façon séquentielle dans l’ordre de leur arrivée.
  3. Vous avez deux serveurs sur lesquels vous devez répartir la charge des traitements batch.
  4. Il vous faut limiter le nombre de batchs tournant en parallèle sur ces serveurs, afin de rester maître des ressources machine (RAM, CPU et réseau) utilisées par ces batchs.
  5. Enfin, en cas de panne d’un des deux serveurs, la charge doit automatiquement rebasculer sur le serveur encore actif en attendant que l’autre redevienne opérationnel.

La solution au problème pourrait être de la forme :

Déploiement logique de la solution

Ce schéma représente le déploiement logique de cette solution. Le déploiement physique (serveurs physiques ou virtuels, redondance des machines, répartition des machines dans différentes salles serveur, …) n’est pas abordé ici.

Dans le schéma ci-dessus, les deux serveurs mis à disposition pour les traitements batch sont représentés comme étant les nœuds esclave 1 et 2.

Ces deux serveurs sont pilotés par un nœud maître. En soit, ce nœud maître représente un « single point of failure ». Tout comme un serveur HTTP Apache qui ferait du load-balancing sur plusieurs serveurs Tomcat. La supervision de ce nœud est donc critique et les traitements effectués sur ce dernier doivent rester le plus simple possible afin d’éviter au maximum l’instabilité.

Architecture logicielle du noeud maître

L’architecture du nœud maître pourrait être la suivante :

Architecture applicative du nœud maître

A première vue, cette architecture semble complexe. En réalité, c’est un assemblage de composants Spring-Integration effectué grâce à un fichier de configuration. La seule partie spécifique étant le développement des transformers. Cette architecture repose donc sur des composants éprouvés.

De plus, en dehors du serveur NAS et des deux nœuds esclaves, ce nœud n’est connecté à aucun autre élément extérieur.

Enfin, les seules données échangées étant des données de type message, ce nœud demande donc peu de ressources machine.

Une configuration Spring de ce nœud pourrait être :

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:util=http://www.springframework.org/schema/util
	xmlns:int-http="http://www.springframework.org/schema/integration/http"
	xmlns:int-file="http://www.springframework.org/schema/integration/file"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
		http://www.springframework.org/schema/integration/http http://www.springframework.org/schema/integration/http/spring-integration-http.xsd
		http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
		http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">

	<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
		<property name="locations">
			<list>
				<value>classpath:input.properties</value>
			</list>
		</property>
	</bean>

	<!-- Premier point d'entrée -->
	<int-file:inbound-channel-adapter id="firstAdapter" auto-startup="true" filename-regex="${polling.filename.regex1}"
		directory="${polling.in.directory1}" channel="firstChannel">
		<int:poller default="true" cron="0/1 * * * * *" />
	</int-file:inbound-channel-adapter>

	<int:channel id="firstChannel" />

	<int:transformer input-channel="firstChannel" output-channel="firstRendezvousChannel">
		<bean class="eu.danin.integration.MessageTransfomer">
			<property name="jobName" value="firstJob" />
		</bean>
	</int:transformer>
	<bean id="firstRendezvousChannel" class="eu.danin.integration.RendezvousChannel">
		<constructor-arg value="true" />
	</bean>

	<int:bridge input-channel="firstRendezvousChannel" 	output-channel="mainRequestChannel">
		<int:poller max-messages-per-poll="1" fixed-rate="1000" />
	</int:bridge>

	<!-- Second point d'entrée -->
	<int-file:inbound-channel-adapter id="secondAdapter" auto-startup="true" filename-regex="${polling.filename.regex2}"
		directory="${polling.in.directory2}" channel="secondChannel">
		<int:poller default="true" cron="0/1 * * * * *" />
	</int-file:inbound-channel-adapter>

	<int:channel id="secondChannel" />

	<int:transformer input-channel="secondChannel" output-channel="secondRendezvousChannel">
		<bean class="eu.danin.integration.MessageTransfomer">
			<property name="jobName" value="secondJob" />
		</bean>
	</int:transformer>

	<bean id="secondRendezvousChannel" class="eu.danin.integration.RendezvousChannel">
		<constructor-arg value="true" />
	</bean>

	<int:bridge input-channel="secondRendezvousChannel"
		output-channel="mainRequestChannel">
		<int:poller max-messages-per-poll="1" fixed-rate="1000" />
	</int:bridge>

	<!-- File d'attente commune avant load-balancing -->
	<int:channel id="mainRequestChannel" />

	<!-- Un ordre doit être défini entre les deux gateways HTTP, afin d'activer le load-balancing -->
	<int-http:outbound-gateway request-channel="mainRequestChannel" 	message-converters="messageConverters"
		request-factory="httpRequestFactory" url="http://localhost:8080/springIntegration-firstSlave/receiveGateway"
		http-method="POST" expected-response-type="eu.danin.batch.JobLaunchingResponse" order="1" />
	<int-http:outbound-gateway request-channel="mainRequestChannel" 	message-converters="messageConverters"
		request-factory="httpRequestFactory" url="http://localhost:8080/springIntegration-secondSlave/receiveGateway"
		http-method="POST" expected-response-type="eu.danin.batch.JobLaunchingResponse" order="2" />

	<!-- Il est nécessaire de définir un time-out sur les gateways pour que
		le noeud maître repère les noeuds esclaves "Zombie" -->
	<bean id="httpRequestFactory" class="org.springframework.http.client.SimpleClientHttpRequestFactory">
		<!-- time-out sur l'acquisition d'une connexion -->
		<property name="connectTimeout" value="1000" />
		<!-- time-out sur le temps de traitement du message par le noeud esclave -->
		<property name="readTimeout" value="20000" />
	</bean>

	<!-- Par défaut, le serializingHttpMessageConverter n'est pas associé aux gateway HTTP.
		Il est donc nécessaire de l'associer à ces dernières pour faire transiter des objets serializable
		entre le noeud maître et les noeuds esclaves. Les exceptions faisant partie de ces objets serializable. -->
	<util:list id="messageConverters">
		<bean class="org.springframework.integration.http.converter.SerializingHttpMessageConverter" />
	</util:list>
</beans>

Configuration Spring du nœud maître

Comme vous pouvez le constater au niveau du fichier de configuration, j’ai fait quelques customisations.

Notamment, le rendez-vous channel :
org.springframework.integration.channel.RendezvousChannel ne permet pas de faire du FIFO. Je l’ai donc fait évoluer :

package eu.danin.integration;

import java.util.concurrent.SynchronousQueue;
import org.springframework.integration.Message;
import org.springframework.integration.channel.QueueChannel;

/**
 * {@link org.springframework.integration.channel.RendezvousChannel} with FIFO option.
 *
 * @author Eric DANIN
 */
public class RendezvousChannel extends QueueChannel {

	/**
	 * Constructor.
	 */
	public RendezvousChannel() {
		this(false);
	}

	/**
	 * Constructor.
	 *
	 * @param aFair
	 *            if true, waiting threads contend in FIFO order for access;
	 *            otherwise the order is unspecified.
	 */
	public RendezvousChannel(boolean aFair) {
		super(new SynchronousQueue<Message<?>>(aFair));
	}
}

Rendez-vous channel customisé

Et voici une implémentation générique des transformers :

package eu.danin.integration;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.integration.Message;
import org.springframework.integration.annotation.Transformer;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import eu.danin.batch.JobLaunchingRequest;

/**
 * Message transformer.
 *
 * @author Eric DANIN
 *
 * @param <P> linked payload type
 */
public class MessageTransfomer<P> implements InitializingBean {

	/**
	 * Job name.
	 */
	private String jobName;

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void afterPropertiesSet() {
		Assert.isTrue(StringUtils.hasText(this.jobName), "Job name is required.");
	}

	/**
	 * Message transforming method.
	 *
	 * @param aMessage
	 *            input message
	 * @return parameters
	 */
	@Transformer
	public JobLaunchingRequest<P> transform(final Message<P> aMessage) {
		final JobLaunchingRequest<P> request = new JobLaunchingRequest<P>();
		request.setJobName(this.jobName);
		request.setLinkedPayload(aMessage.getPayload());
		return request;
	}

	@Required
	public void setJobName(final String aJobName) {
		this.jobName = aJobName;
	}
}

Implémentation des transformers

Avec le contenu du message après transformation  :

package eu.danin.batch;

import java.io.Serializable;

/**
 * Job launching request.
 *
 * @author Eric DANIN
 *
 * @param <P>
 *            linked payload type
 */
public class JobLaunchingRequest<P> implements Serializable {

	/**
	 * Serial ID.
	 */
	private static final long serialVersionUID = 1L;

	/**
	 * Job name.
	 */
	private String jobName;

	/**
	 * Linked payload.
	 */
	private P linkedPayload;

	public String getJobName() {
		return this.jobName;
	}

	public void setJobName(final String aJobName) {
		this.jobName = aJobName;
	}

	public P getLinkedPayload() {
		return this.linkedPayload;
	}

	public void setLinkedPayload(final P aLinkedPayload) {
		this.linkedPayload = aLinkedPayload;
	}
}

Contenu des messages après transformation

Architecture logicielle des nœuds esclaves

L’architecture des nœuds esclaves pourrait être la suivante :

Architecture applicative des nœuds esclaves

Cette architecture, plus basique, est elle aussi intégralement constituée de composants Spring-Integration et Spring-Batch « out-of-the-box ».

Une configuration Spring d’un nœud esclave pourrait être :

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
		http://www.springframework.org/schema/integration/http http://www.springframework.org/schema/integration/http/spring-integration-http.xsd
		http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
		http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"
	xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:int-http="http://www.springframework.org/schema/integration/http"
	xmlns:util="http://www.springframework.org/schema/util" xmlns:task="http://www.springframework.org/schema/task">

	<import resource="springBatch-config.xml" />

	<!-- reply-timeout ="-1" -> la gateway ne renvoie une réponse qu'à la fin du traitement batch -->
	<!-- convert-exceptions="true" -> les erreurs sont remontées au noeud maître, afin d'être traitées à partir de ce dernier -->
	<int-http:inbound-gateway request-channel="requestChannel"
		convert-exceptions="true" message-converters="messageConverters"
		reply-timeout="-1" name="/receiveGateway" supported-methods="POST" />

	<!-- Par défaut, le serializingHttpMessageConverter n'est pas associé aux gateway HTTP.
		Il est donc nécessaire de l'associer à ces dernières pour faire transiter des objets serializable
		entre le noeud maître et les noeuds esclaves. Les exceptions faisant partie de ces objets serializable. -->
	<util:list id="messageConverters">
		<bean class="org.springframework.integration.http.converter.SerializingHttpMessageConverter" />
	</util:list>

	<int:channel id="requestChannel">
		<int:queue />
	</int:channel>

	<!-- L'error channel est redéfini afin de ne pas tenir compte des exceptions du type RejectedExecutionException -->
	<int:channel id="errorChannel" />

	<int:outbound-channel-adapter channel="errorChannel" ref="errorLogger" method="handle" />

	<bean id="errorLogger" class="eu.danin.integration.ErrorLogger" />

	<int:service-activator input-channel="requestChannel">
		<int:poller max-messages-per-poll="1" task-executor="executor" fixed-delay="1000" />
		<bean class="eu.danin.batch.JobLaunchingFileMessageHandler">
			<property name="slaveId" value="1" />
			<property name="jobLauncher" ref="jobLauncher" />
		</bean>
	</int:service-activator>

	<!-- Cette configuration permet de limiter l'exécution à 2 traitements batchs en parallèle -->
	<task:executor id="executor" pool-size="1-2" 	queue-capacity="1" />
</beans>

Configuration Spring d’un nœud esclave

Ici encore, je n’ai pas pu m’empêcher de faire quelques customisations :

Il existe dans Spring-Batch une bibliothèque : spring-batch-integration qui assure la jointure avec Spring-Integration. Cette dernière est suffisante pour répondre à la majorité des cas d’utilisation. Hélas, dans notre cas, celle-ci impose de déporter les transformers au niveau des nœuds esclaves et non plus au niveau du nœud maître.

Pour plus de souplesse, j’ai donc préféré implémenter les « job launching message handlers » comme suit :

package eu.danin.batch;

import org.springframework.batch.core.JobExecutionException;

/**
 * The job launch request handler.
 *
 * @author Eric DANIN
 *
 * @param <R>
 *            request type
 */
public interface JobLaunchRequestHandler<R> {

	/**
	 * Launching method.
	 *
	 * @param aRequest
	 *            request
	 * @return response
	 */
	JobLaunchingResponse launch(final R aRequest) throws JobExecutionException;
}

Définition d’un « job launching message handler » générique

package eu.danin.batch;

import java.util.Map;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.util.Assert;

/**
 * The job launching message handler.
 *
 * @author Eric DANIN
 */
public abstract class AbstractJobLaunchingMessageHandler<P> implements JobLaunchRequestHandler<JobLaunchingRequest<P>>, InitializingBean, ApplicationContextAware {

	/**
	 * Slave identifier.
	 */
	private int slaveId;

	/**
	 * Job launcher.
	 */
	private JobLauncher jobLauncher;

	/**
	 * Spring application context.
	 */
	private ApplicationContext applicationContext;

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void afterPropertiesSet() {
		Assert.notNull(this.jobLauncher, "Job launcher is required.");

		Assert.notNull(this.applicationContext,
			"Spring application context is required.");
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	@ServiceActivator
	public final JobLaunchingResponse launch(final JobLaunchingRequest<P> aRequest) throws
	JobExecutionException {
		final Job job = this.applicationContext.getBean(aRequest.getJobName(),
			Job.class);
		final JobLaunchingResponse response = new JobLaunchingResponse();
		final JobExecution execution = this.jobLauncher.run(job,
			new JobParameters(fillJobParameters(aRequest.getJobName(),
			aRequest.getLinkedPayload())));
		response.setStartTime(execution.getStartTime().getTime());
		response.setEndTime(execution.getEndTime().getTime());
		response.setJobName(execution.getJobInstance().getJobName());
		response.setSlaveId(this.slaveId);
		return response;
	}

	/**
	 * Spring-batch job parameters filling method.
	 *
	 * @param aJobName
	 *            job name
	 * @param aPayload
	 *            original payload
	 * @return job parameters
	 */
	protected abstract Map<String, JobParameter> fillJobParameters(final String aJobName,
	final P aPayload);

	@Required
	public final void setSlaveId(final int aSlaveId) {
		this.slaveId = aSlaveId;
	}

	@Required
	public final void setJobLauncher(final JobLauncher aJobLauncher) {
		this.jobLauncher = aJobLauncher;
	}

	protected final ApplicationContext getApplicationContext() {
		return this.applicationContext;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public final void setApplicationContext(final ApplicationContext aApplicationContext)
	{
		this.applicationContext = aApplicationContext;
	}
}

Implémentation d’un « job launching message handler » générique

package eu.danin.batch;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.core.JobParameter;
import eu.danin.utils.Constant;

/**
 * Spring-batch job launching from file message handler.
 *
 * @author Eric DANIN
 */
public class JobLaunchingFileMessageHandler extends AbstractJobLaunchingMessageHandler<File> {

	/**
	 * {@inheritDoc}
	 */
	@Override
	protected Map<String, JobParameter> fillJobParameters(final String aJobName,
	final File aPayload) {
		final Map<String, JobParameter> parameters = new HashMap<String,
			JobParameter>();
		parameters.put(Constant.FILE_NAME, new JobParameter(aPayload.getName()));
		return parameters;
	}
}

Implémentation d’un « job launching message handler » pour un contenu de type « File »

Avec la le contenu des messages réponse :

package eu.danin.batch;

import java.io.Serializable;

/**
 * Job launching response.
 *
 * @author Eric DANIN
 */
public class JobLaunchingResponse implements Serializable {

	/**
	 * Serial ID.
	 */
	private static final long serialVersionUID = 1L;

	/**
	 * Start time.
	 */
	private long startTime;

	/**
	 * End time.
	 */
	private long endTime;

	/**
	 * Slave identifier.
	 */
	private int slaveId;

	/**
	 * Job name.
	 */
	private String jobName;

	public long getStartTime() {
		return this.startTime;
	}

	public void setStartTime(final long aStartTime) {
		this.startTime = aStartTime;
	}

	public long getEndTime() {
		return this.endTime;
	}

	public void setEndTime(final long aEndTime) {
		this.endTime = aEndTime;
	}

	public int getSlaveId() {
		return this.slaveId;
	}

	public void setSlaveId(final int aSlaveId) {
		this.slaveId = aSlaveId;
	}

	public String getJobName() {
		return this.jobName;
	}

	public void setJobName(final String aJobName) {
		this.jobName = aJobName;
	}
}

Contenu des messages réponse

Et enfin, un logger customisé pour tracer les erreurs. Sachant que c’est l’endroit idéal pour connecter les sondes de votre console de supervision, si vous en avez une :

package eu.danin.integration;

import java.util.concurrent.RejectedExecutionException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.integration.Message;

/**
 * Error channel logger.
 */
public class ErrorLogger {

	/**
	 * Class logger.
	 */
	private static final Logger LOG = LoggerFactory.getLogger(ErrorLogger.class);

	/**
	 * Message handling method.
	 *
	 * @param aMessage
	 *            message to handle
	 */
	public void handle(Message<Throwable> aMessage) {
		final Throwable error = aMessage.getPayload();
		if (error instanceof TaskRejectedException && error.getCause() instanceof
			RejectedExecutionException) {
			// No log for task rejection
			return;
		}
		LOG.error("Error logger : ", error);
	}
}

Customisation du logger d’erreurs

Scalability, load-balancing, fail-over et reprise sur erreur

L’architecture globale de cette solution possède une haute scalabilité et permet d’adapter au mieux la configuration logicielle en fonction de la configuration machine.

Le load-balancing entre les nœuds esclaves est assuré par le nœud maître.

En cas de panne d’un des nœuds esclaves, le nœud maître perd la connexion avec le nœud esclave ou celle-ci tombe en time-out. Les tâches en cours de traitement sont alors automatiquement redirigées par le nœud maître vers l’autre nœud esclave.

Lors de la reprise des batchs en cours par le nœud restant :

  • Soit le batch s’est interrompu dans un état stable et dans ce cas, il reprendra là où il s’était arrêté.
  • Soit il s’est arrêté dans un état instable. Dans ce cas, une erreur est remontée au nœud maître. Cette dernière devra être remontée pour engager une intervention humaine afin de relancer le batch proprement.
  • Soit il est terminé. Dans ce cas, une erreur est remontée au nœud maître. Cette dernière ne nécessite aucune intervention humaine.

Enfin, pour la gestion du fail-over au niveau du nœud maître, il existe de multiples solutions. Mais je n’en ai pas trouvé une suffisamment simple pour mériter d’être exposée.

Conclusion

J’ai vu beaucoup d’articles sur Spring-Integration, mais très peu qui mettent en avant les fonctionnalités de load-balancing et fail-over intégrées au framework. J’espère avec cet exemple vous avoir montré la toute puissance de Spring-Integration : avec peu de code (moins de 400 lignes dans notre exemple), vous pouvez faire de grandes choses.

“May the force be with you”

6 commentaires
    • Avatar
      Eric Danin dit :

      Bonjour,
      Dans le cas présent, vu que les inputs sont des fichiers et vu la contrainte du traitement séquentiel par type de fichier, je suis resté sur une gestion du failover « old-school » sur le nœud maître avec un arrêt-relance manuel en cas de crash du nœud maître. Des alertes sont donc envoyées en cas d’engorgement anormal des queues ou de non réponse du processus associé au nœud maître.
      Avec d’autres types d’inputs, j’ai déjà mis en place une redondance en actif-passif du nœud maître.

      Répondre

votre commentaire

Se joindre à la discussion ?
Vous êtes libre de contribuer !

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Inscription newsletter

Ne manquez plus nos derniers articles !