Principe général

UIMA AS est développé en remplacement du Collection Processing Manager (CPM) dans le but d'offrir de meilleures capacités de flexibilités et montée en charge. UIMA AS ne remplace nullement UIMA, et les composants développés pour UIMA peuvent aussi bien fonctionner dans une chaîne UIMA AS. AS profite du système d'Aggregate pour organiser les composants en chaînes de traitement, tirant parti au passage des capacités des flow controllers.

L'approche CPM (UIMA classique)

Lorsqu'une chaîne de traitement UIMA est déployée par le CPM, la configuration suivante se met en place :

  • le CPM instancie autant de processing pipelines (PP, combinaison séquentielle de composants) que nécessaire, chaque PP contenant une seule et unique instance de chaque composant ;
  • le Collection Reader (CR) est instancié dans un seul thread (pas de parallélisation possible) ;
  • les CAS produits par le CR sont stockés dans une queue tampon avant d'être distribués vers les différents PP qui ne peuvent qu'en traiter un à la fois : un CAS en sortie d'un composant est directement passé en entrée du composant suivant.

Ce fonctionnement est décrit par le schéma suivant tiré de la documentation d'Apache UIMA si ce n'est que les cas consumers n'existent plus en tant qu'entités particulières mais sont des composants comme les autres (et donc instanciés dans les PP) :

cpe-detail.png

L'approche UIMA AS

Les chaînes dans UIMA AS sont déployées sous forme de services auxquels peuvent se connecter des clients. Les clients envoient des requêtes au service puis attendent le résultat du traitement. L'interface entre les clients et les services se fait au travers d'un système de messagerie asynchrone :

  • les requêtes des clients sont stockées dans une queue tampon en entrée du service similaire à la queue du PP pour le CPM ;
  • les résultats des traitements sont renvoyées dans une queue tampon propre à chaque client.

Ce fonctionnement est décrit par le schéma ci-dessous tiré de la documentation d'Apache UIMA AS. Il permet d'instancier une seule fois une chaîne de traitement et de lui soumettre un flux continu de données à traiter en provenance potentiellement de plusieurs sources.

uima-as-arch.png

Le système de messagerie asynchrone prend en charge la distribution des requêtes entre différentes instances potentielles du service permettant ainsi une montée en charge à chaud. Il suffit de déployer une nouvelle instance du service et lui indiquer de se connecter à la queue tampon d'entrée pour augmenter les capacités de traitement. Le système de messagerie utilisé par défaut, ActiveMQ, ayant la capacité de communiquer au travers du réseau, ledit service peut tout à fait être déployé sur une machine distante.

Une autre force de UIMA AS est que ce système de queue tampons et de routage des CAS au travers d'un système de messagerie asynchrone peut être mis en place au sein même du service entre les différents composants. Pour cette première prise en main évitons tout de même de compliquer les choses...

Mise en place de l'environnement

La mise en place d'une chaîne de traitement UIMA AS comparable à un CPE nécessite trois éléments : le service UIMA AS en charge du traitement, le client qui soumet les données et récupère les résultats du traitement et le système de messagerie coordonne la communication entre ces deux premiers.

La mise en place d'un service UIMA AS nécessite, outre le développement des composants, un descripteur de déploiement (deployment descriptor). Afin de simplifier son écriture dans la prochaine section de ce billet, il est préférable d'installer les plugins Eclipse dédiés.

Un certain nombre de dépendances sont nécessaires pour la compilation et l'exécution d'un service et d'un client UIMA AS, a minima :

Finalement, il est nécessaire d'installer un système de messagerie de type JMS. Le plus simple est d'utiliser celui proposé par défaut : Apache ActiveMQ. Une version est distribuée dans l'archive UIMA AS. Toutefois, si vous êtes sous un environnement Linux, il sera plus aisé et pérenne du paquetage de votre distribution.

Ainsi pour Debian Wheezy, installez le paquet :

sudo apt-get install activemq

puis activez l'instance par défaut :

cd /etc/activemq/instances-enabled/
sudo ln -s ../instances-available/main/

et lancez enfin le système :

sudo /etc/init.d/activemq start

Création et déploiement d'un service

Descripteur de déploiement

Le déploiement d'un service UIMA AS nécessite deux descripteurs :

  • Un descripteur de composant qui définit l'aggregate qui sera déployé ;
  • Un descripteur de déploiement qui décrit comment l'aggregate sera déployé.

Pour l'exercice nous déploierons un composant de prétraitement : découpage en phrases, en mots et étiquetage des rôles grammaticaux. Nous utiliserons pour cela les composants addons distribués avec UIMA : le WhitespaceTokenizer et le HMMTagger. L'écriture de l'aggregate ne pose pas de difficulté particulière, il s'agit d'un composant UIMA classique sans aucune spécificité que nous nommerons POSTagger.xml.

Le descripteur de déploiement est une nouveauté de UIMA AS. Il est disponible dans l'assistant de création d'un nouveau fichier sous la catégorie UIMA aux côtés des autres descripteurs, si tant est que vous avez correctement installé les plugins UIMA AS. Nous le nommerons POSTagger-Service.xml.

Création d'un nouveau descripteur de déploiement UIMA AS

Le descripteur de déploiement se compose de deux onglets : un onglet de configuration générale (Overview) et un onglet de configuration du déploiement (Deployment Configurations). Il faut préciser dans l'onglet de configuration générale le nom de la queue de messagerie sur laquelle le service sera joignable (Name for input queue) et le nom de l'aggregate à déployer comme service (Top analysis engine descriptor). Ces deux champs sont mis en valeur dans la capture d'écran ci-dessous.

Complétion de la configuration générale du déploiement

Par défaut il n'est pas nécessaire d'intervenir sur la configuration du déploiement. Le service sera alors déployé tel qu'il l'aurait été sous forme d'un CPE, c-à-d sans contrôle particulier sur les composants qui composent l'aggregate. Pour l'exercice, nous allons le déployé comme un aggregate UIMA AS, c-à-d que chaque composant de l'aggregate est lui-même déployé comme un service connecté aux autres composants par le système de messageries et de queues.

Ce déploiement nécessite simplement de cocher la case Run as AS aggregate. Une fois coché, les différents composants qui constituent l'aggregate sont accessibles pour une configuration propre.

Configuration du déploiement comme un AS Aggregate

Pour simplifier nous considérerons qu'ils sont tous déployés en local et dans la même JVM. Nous allons simplement demander de déployer trois instances du HMMTagger étant donné que c'est le composant le plus coûteux. Il suffit pour ce faire de sélectionner le composant HMMTagger dans la liste puis de positionner le paramètre Number of replicated instances sur 3.

Configuration du nombre d'instances déployées pour chaque service

Écriture du client asynchrone

Une fois le descripteur de déploiement écrit, il ne reste plus qu'à écrire le code du client asynchrone qui va s'y connecter. La documentation d'UIMA AS décrit en détail un cas d'utilisation duquel ce billet s'inspire.

Tout d'abord, toujours dans le cadre de l'exercice, nous faisons le choix que le service soit déployé par l'application elle-même :

public static final String DD2SPRINGXSLTFILEPATH = 
	"/LIBS-APPS/UIMA/apache-uima-as-2.3.1/bin/dd2spring.xsl";
public static final String SAXONCLASSPATH = 
	"file:/LIBS-APPS/UIMA/apache-uima-as-2.3.1/saxon/saxon8.jar";
public static final String DEPLOYMENTDESCRIPTOR =
	"desc/POSTagger-Service.xml";
...
// Create Asynchronous Client API
BaseUIMAAsynchronousEngine_impl uimaAsEngine = 
		new BaseUIMAAsynchronousEngine_impl();
 
///////////////////////////////////////////////////// SERVICE DEPLOYING
System.out.println("Deploying service");
// Create a Map to hold required parameters
Map<String, Object> servCtxt = new HashMap<String, Object>();
servCtxt.put(UimaAsynchronousEngine.DD2SpringXsltFilePath, 
		DD2SPRINGXSLTFILEPATH);
servCtxt.put(UimaAsynchronousEngine.SaxonClasspath,
		SAXONCLASSPATH);
String serviceId = uimaAsEngine.deploy(DEPLOYMENTDESCRIPTOR, servCtxt);
System.out.println("\t...Service deployed !");

Une fois le service déployé, nous allons configurer un client asynchrone pour s'y connecter et soumettre des requêtes. Il est important ici que :

  • le paramètre UimaAsynchronousEngine.ServerUri corresponde à l'adresse du gestionnaire de messagerie (broker) spécifié auparavant dans le descripteur de déploiement (nous avons laissé la valeur par défaut ${defaultBrokerURL}, nous verrons après comment la spécifier à l'exécution) ;
  • le paramètre UimaAsynchronousEngine.Endpoint doit correspondre au nom de la queue sur laquelle le service est joignable.
///////////////////////////////////////////////////// SERVICE EXECUTION
System.out.println("Preparing for execution");
// Create map to pass server URI and Endpoint parameters
Map<String, Object> appCtxt = new HashMap<String, Object>();
appCtxt.put(UimaAsynchronousEngine.ServerUri, "tcp://localhost:61616");
appCtxt.put(UimaAsynchronousEngine.Endpoint, "POSTaggerQueue");
appCtxt.put(UimaAsynchronousEngine.CasPoolSize, 2);
// Initialize
uimaAsEngine.initialize(appCtxt);
uimaAsEngine.addStatusCallbackListener(new MyStatusCallbackListener());
// Get an empty CAS from the Cas pool
CAS cas = uimaAsEngine.getCAS();
System.out.println("\t...CAS retrieved");
// Initialize it with input data
cas.setDocumentText("Un exemple de texte...");
cas.setDocumentLanguage("fr");
// Send CAS to service for processing
uimaAsEngine.sendCAS(cas);
System.out.println("\t...CAS sent");

Il est intéressant d'observer le fonctionnement du client :

  1. L'initialisation consiste, outre la configuration des différents paramètres précités, à définir l'instance StatusCallbackListener qui sera chargée de consommer les résultats du traitement par le service (toute l'asynchronicité du client réside dans ce fonctionnement) ;
  2. L'envoi d'une demande de traitement passe d'abord par la demande d'un CAS disponible par la méthode getCAS(). Une fois le CAS obtenu, il est initialisé comme le ferait un Collection Reader, puis soumis au système par la méthode sendCAS().

Il ne reste alors plus qu'à implémenter le StatusCallbackListener et plus particulièrement la méthode entityProcessComplete appelée lorsque le traitement d'un CAS est terminé :

static class MyStatusCallbackListener extends UimaAsBaseCallbackListener {
 
	@Override
	public void entityProcessComplete(CAS aCas, EntityProcessStatus aStatus) {
		System.out.println("Entity process complete.");
		// Handle errors
		if ( (aStatus != null) && aStatus.isException() ) {
			List<Exception> exceptions = aStatus.getExceptions();
			for(Exception e: exceptions) {
				e.printStackTrace();
			}
			return;
		}
		// Process CAS
		try {
			System.out.println("Concepts identified:");
			JCas jcas = aCas.getJCas();
			FSIterator<Annotation> it = 
					jcas.getAnnotationIndex(T_Token.type).iterator();
			while( it.hasNext() ) {
				T_Token token = (T_Token) it.next();
				System.out.println("\t+ POS:" + token.getPos());
			}
		} catch (CASException e) {
			System.out.println("Problem getting a JCas !");
		}
	}
 
}

Exécution

Une fois tout ce travail réalisé, il ne reste plus qu'à exécuter le tout. Il est nécessaire d'ajouter toutes les dépendances nécessaires à l'exécution soit toutes les bibliothèques UIMA et UIMA AS ainsi que les nombreuses dépendances Spring.

Il est également nécessaire de spécifier, par le biais d'une variable de JVM l'adresse du gestionnaire de messagerie que nous avons positionné jusqu'alors à ${defaultBrokerURL}. Pour l'exercice, nous utilisons un broker ActiveMQ local. Le paramètre de la JVM est donc le suivant :

-DdefaultBrokerURL=tcp://localhost:61616

À partir d'ici le service devrait se déployer et le client asynchrone s'y connecter sans accroc. Si vous rencontrez des erreurs du type Connection refused c'est très probablement que votre système de messagerie ne tourne pas ou bien que les noms de queues précisés dans le descripteur et pour le client sont différents.