Mon problème

La chaîne de traitement que j'utilise pour ma thèse calcule des scores de similarité entre des documents. Afin de simplifier l'évaluation de mon système, je souhaite réunir tous ces scores de similarité dans un même fichier CSV.

La solution naïve qui fonctionne presque consiste à ce que le composant ouvre un fichier et y écrive au fur et à mesure. Les systèmes ne rechignent pas à ce que plusieurs processus écrivent en même temps dans le même fichier... Malheureusement ils ne s'assurent pas non plus d'ordonnancer les requêtes d'écriture au risque que chacun écrive sur la copie de son voisin !

Voici le résultat que j'attends :

 source-document00003.txt;suspicious-document00016.txt;not-derivative;0.0016992353
 source-document00003.txt;suspicious-document00040.txt;not-derivative;0.23280424
 source-document00267.txt;suspicious-document00055.txt;derivative;0.14148398

et voici ce que ça peut donner quand les différents processus décident d'écrire en même temps :

 source-document00003.txt;suspicious-docsource-document00003.txt;suspicious-source-document00267.txt;suspicious-document00055.txt;derivative;0.14148398
 ument00016.txt;not-derivative;0.0016992353
 document00040.txt;not-derivative;0.23280424

Le workflow d'UIMA

J'avais déjà rapidement présenté le workflow d'UIMA dans mon billet sur les ressources. Un petit résumé des épisodes précédents ne sera peut-être pas de trop.

Comme d'habitude, pour simplifier, on va considérer qu'on utilise uniquement le flow controler par défaut qui fait s'enchaîner séquentiellement les composants. Cela revient à considérer que la sortie d'un composant alimente directement l'entrée du composant suivant, à la manière des pipes dans le monde Unix.

Dans cette configuration, UIMA regroupe les chaînes de CAS processors (les composants qui manipulent les CAS) dans autant de processing pipelines que l'attribut processingUnitThreadCount du CPE le demande, comme l'illustre le schéma ci-dessous :

cpe-detail.png

l est nécessaire d'instancier les composants utilisés autant de fois qu'il y a de processing pipelines. Jusque-là, rien de bien nouveau en réalité. Mais ce que j'ai découvert récemment, douloureusement bien entendu, c'est que cette instanciation ne se faisait pas au sein des pipelines, mais au sein du thread principal du CPE. Soit la procédure suivante, pour chaque composant :

  1. Instanciation du composant dans le thread principal du CPE ;
  2. Appel de la méthode initialize() toujours dans le thread principal ;
  3. Création des threads des pipelines et rattachement des composants à ces derniers ;
  4. Appels successifs à process() au sein de ces threads ;

Ma solution

J'aurais certes pu faire le choix d'utiliser le système des ressources pour mon composant, comme je le préconise dans mon billet précédent. Toutefois, je n'ai pas eu le temps de manipuler le système des ressources et il est grand temps que j'en finisse avec ma thèse :) Je me suis donc tourné vers quelque chose de plus direct.

Mon choix s'est donc porté sur la création d'autant de fichiers qu'il y a d'instances de mon composant. Je suffixe tout simplement le nom du fichier par l'identifiant du thread courant. Cette méthode fonctionne suffisamment bien, mais nécessite bien entendu de concaténer l'ensemble des fichiers à la fin du traitement. De plus elle ne permet pas directement de garder une trace de l’ordonnancement des écritures ! Dans mon cas ce n'était pas un prérequis.

Pour la petite histoire, j'ai découvert que la méthode initialize() était appelée dans le thread principal car tous mes fichiers étaient suffixés par l'identifiant 1. J'ai donc simplement déplacé l'ouverture des fichiers dans la méthode process() en ajoutant un test pour s'assurer que cette initialisation n'avait pas déjà eu lieu.

Voici le code (synthétique) correspondant, toute l'astuce réside dans le Thread.currentThread().getId() :

public void process(JCas aJCas) throws AnalysisEngineProcessException {
	// Check the streams have been initialized
	if (theStream == null) {
		Long tid = Thread.currentThread().getId();
		theStream =new PrintStream( 
			new File( theExportSuffix + ".__" + tid), "utf-8");
	}
	// Select the right view
	JCas view = aJCas.getView(theViewName);
	...