root/head/ambra/webapp/src/main/java/org/topazproject/ambra/search2/service/ArticleIndexingServiceImpl.java @ 8253

Revision 8253, 14.4 KB (checked in by dragisak, 7 months ago)

Remove duplicate copyright comment header.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id HeadURL Revision
Line 
1/*
2 * $HeadURL$
3 * $Id$
4 *
5 * Copyright (c) 2006-2010 by Topaz, Inc.
6 * http://topazproject.org
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 *     http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21package org.topazproject.ambra.search2.service;
22
23import org.apache.camel.Handler;
24import org.apache.commons.configuration.Configuration;
25import org.slf4j.Logger;
26import org.slf4j.LoggerFactory;
27import org.springframework.beans.factory.annotation.Required;
28import org.springframework.transaction.annotation.Transactional;
29import org.topazproject.ambra.ApplicationException;
30import org.topazproject.ambra.admin.service.OnCrossPubListener;
31import org.topazproject.ambra.admin.service.OnDeleteListener;
32import org.topazproject.ambra.admin.service.OnPublishListener;
33import org.topazproject.ambra.article.service.ArticleDocumentService;
34import org.topazproject.ambra.models.Article;
35import org.topazproject.ambra.models.Representation;
36import org.topazproject.ambra.queue.MessageSender;
37import org.topazproject.ambra.queue.Routes;
38import org.topazproject.otm.Blob;
39import org.topazproject.otm.Query;
40import org.topazproject.otm.Session;
41import org.topazproject.otm.SessionFactory;
42import org.topazproject.otm.Transaction;
43import org.topazproject.otm.query.Results;
44import org.w3c.dom.Document;
45import org.w3c.dom.Element;
46
47import java.io.Serializable;
48import java.net.URI;
49import java.util.HashMap;
50import java.util.HashSet;
51import java.util.Map;
52import java.util.Set;
53import java.util.SortedMap;
54import java.util.TreeMap;
55
56/**
57 * Service class that handles article search indexing. It is plugged in as OnPublishListener into
58 * DocumentManagementService.
59 *
60 * @author Dragisa Krsmanovic
61 */
62public class ArticleIndexingServiceImpl implements OnPublishListener, OnDeleteListener, OnCrossPubListener, ArticleIndexingService {
63
64  private static final Logger log = LoggerFactory.getLogger(ArticleIndexingServiceImpl.class);
65
66  protected static final int VERY_LONG_TX_TIMEOUT = 7200; // 2hrs
67
68  private ArticleDocumentService articleDocumentService;
69  private MessageSender messageSender;
70  private SessionFactory sessionFactory;
71  private Session otmSession;
72  private String indexingQueue;
73  private String deleteQueue;
74
75  @Required
76  public void setArticleDocumentService(ArticleDocumentService articleDocumentService) {
77    this.articleDocumentService = articleDocumentService;
78  }
79
80  @Required
81  public void setMessageSender(MessageSender messageSender) {
82    this.messageSender = messageSender;
83  }
84
85  @Required
86  public void setAmbraConfiguration(Configuration ambraConfiguration) {
87    indexingQueue = ambraConfiguration.getString("ambra.services.search.articleIndexingQueue", null);
88    log.info("Article indexing queue set to " + indexingQueue);
89    deleteQueue = ambraConfiguration.getString("ambra.services.search.articleDeleteQueue", null);
90    log.info("Article delete queue set to " + deleteQueue);
91  }
92
93  public void setSessionFactory(SessionFactory sessionFactory) {
94    this.sessionFactory = sessionFactory;
95  }
96
97  @Required
98  public void setOtmSession(Session otmSession) {
99    this.otmSession = otmSession;
100  }
101
102  /**
103   * Method that is fired on article publish operation.
104   * <p/>
105   * Message is sent to an asynchronous, SEDA queue and from there it's sent to plos-queue. That
106   * way we ensure that publish operation will succeed even if ActiveMQ is down.
107   *
108   * @see Routes
109   * @param articleId ID of the published article
110   * @throws Exception if message send fails
111   */
112  @Transactional(readOnly = true)
113  public void articlePublished(String articleId) throws Exception {
114    if (indexingQueue != null) {
115      log.info("Indexing published article " + articleId);
116      indexOneArticle(articleId);
117    } else {
118      log.warn("Article indexing queue not set. Article " + articleId + " will not be indexed.");
119    }
120  }
121
122  /**
123   * Method that is fired on article delete operation.
124   * <p/>
125   * Message is sent to an asynchronous, SEDA queue and from there it's sent to plos-queue. That
126   * way we ensure that delete operation will succeed even if ActiveMQ is down.
127   *
128   * @see Routes
129   * @param articleId ID of the deleted article
130   * @throws Exception if message send fails
131   */
132  public void articleDeleted(String articleId) throws Exception {
133    if (deleteQueue != null) {
134      log.info("Deleting article " + articleId + " from search index.");
135      messageSender.sendMessage(Routes.SEARCH_DELETE, articleId);
136    } else {
137      log.warn("Article index delete queue not set. Article " + articleId + " will not be deleted from search index.");
138    }
139  }
140
141  /**
142   * Method that is fired on article cross publish operation.
143   * <p/>
144   * Message is sent to an asynchronous, SEDA queue and from there it's sent to plos-queue. That
145   * way we ensure that cross publish operation will succeed even if ActiveMQ is down.
146   *
147   * @see Routes
148   * @param articleId ID of the cross published article
149   * @throws Exception if message send fails
150   */
151  @Transactional(readOnly = true)
152  public void articleCrossPublished(String articleId) throws Exception {
153    if (indexingQueue != null) {
154      log.info("Indexing cross published article " + articleId);
155      indexArticle(articleId);
156    } else {
157      log.warn("Article indexing queue not set. Article " + articleId + " will not be re-indexed.");
158    }
159  }
160
161  public void startIndexingAllArticles() throws Exception {
162     // Message content is unimportant here
163    messageSender.sendMessage(Routes.SEARCH_INDEXALL, "start");
164  }
165
166  /**
167   * Index one article. Disables filters so can be applied in any journal context.
168   *
169   * @param articleId Article ID
170   * @throws Exception If operation fails
171   */
172  @Transactional(readOnly = true)
173  public void indexArticle(String articleId) throws Exception {
174
175    if (indexingQueue == null) {
176      throw new ApplicationException("Article indexing queue not set. Article " + articleId + " will not be re-indexed.");
177    }
178
179    Set<String> filters = disableFilters(otmSession);
180
181    Document doc;
182    try {
183      doc = articleDocumentService.getFullDocument(articleId);
184    } finally {
185      enableFilters(filters, otmSession);
186    }
187
188    if (doc == null) {
189      log.error("Search indexing failed for " + articleId + ". Returned document is NULL.");
190      return;
191    }
192
193    messageSender.sendMessage(indexingQueue, doc);
194  }
195
196  /**
197   * Same as indexArticle() except that it doesn't disable filters.
198   *
199   * @param articleId Article ID
200   * @throws Exception If operation fails
201   */
202  private void indexOneArticle(String articleId) throws Exception {
203
204    Document doc = articleDocumentService.getFullDocument(articleId);
205
206    if (doc == null) {
207      log.error("Search indexing failed for " + articleId + ". Returned document is NULL.");
208      return;
209    }
210
211    messageSender.sendMessage(Routes.SEARCH_INDEX, doc);
212  }
213
214  /**
215   * Send all articles for re-indexing.
216   * <p/>
217   * Queries to fetch all articles and to get all cross-published articles are separated to
218   * speed up the process.
219   * <p/>
220   * This is Apache Camel handler. It is invoked asynchronously after user submits a message to SEDA
221   * queue.
222   *
223   * @return Email message body
224   * @throws Exception
225   * @see org.topazproject.ambra.queue.Routes
226   */
227  @Handler
228  public String indexAllArticles() throws Exception {
229
230
231    if (indexingQueue != null) {
232
233      Session session = null;
234      Transaction transaction = null;
235      try {
236        session = sessionFactory.openSession();
237        transaction = session.beginTransaction(true, VERY_LONG_TX_TIMEOUT);
238
239        Set<String> filters = disableFilters(session);
240
241        try {
242          long timestamp = System.currentTimeMillis();
243          int count = indexAll(session, articleDocumentService, messageSender, indexingQueue);
244          String message = "Finished indexing " + count + " articles in " + (System.currentTimeMillis() - timestamp) / 1000l + " sec";
245          log.info(message);
246          return message;
247
248        } finally {
249          enableFilters(filters, session);
250        }
251
252      } finally {
253        try {
254          if (transaction != null) {
255            transaction.rollback();
256          }
257        } catch (Throwable t) {
258          log.warn("Error in rollback", t);
259        }
260        try {
261          if (session != null) {
262            session.close();
263          }
264        } catch (Throwable t) {
265          log.warn("Error closing session", t);
266        }
267      }
268    } else {
269      throw new ApplicationException("Indexing queue not defined");
270    }
271  }
272
273  /**
274   * Keep this method static to ensure that it's not using instance fields.
275   *
276   * @param session OTM session
277   * @param articleDocumentService ArticleDocumentService
278   * @param messageSender MessageSender
279   * @param indexingQueue IndexingQueue
280   * @return Number of articles indexed
281   * @throws Exception If operation fails
282   */
283  private static int indexAll(
284      Session session,
285      ArticleDocumentService articleDocumentService,
286      MessageSender messageSender,
287      String indexingQueue) throws Exception {
288
289    int totalIndexed = 0;
290
291    // Sorted map to make sure they are in the same order for each test run
292    SortedMap<URI, Set<String>> allArticles = new TreeMap<URI, Set<String>>();
293    Map<URI, Blob> articleXmls = new HashMap<URI, Blob>();
294    loadAllArticles(session, allArticles, articleXmls);
295    Map<String, JournalFields> journals = getJournals(session);
296    loadAllCrossPublishedArticles(session, allArticles);
297
298    for (Map.Entry<URI, Set<String>> articleEntry : allArticles.entrySet()) {
299
300      Document doc = articleDocumentService.getDocument(articleXmls.get(articleEntry.getKey()));
301
302      Element additionalInfoElement = doc.createElementNS(ArticleDocumentService.XML_NAMESPACE, "ambra");
303
304      Element journalsElement = doc.createElementNS(ArticleDocumentService.XML_NAMESPACE, "journals");
305
306      for (String eIssn : articleEntry.getValue()) {
307        JournalFields jrnlFields = journals.get(eIssn);
308        journalsElement.appendChild(
309            createJournalElement(doc, eIssn, jrnlFields.getKey(), jrnlFields.getName()));
310      }
311      additionalInfoElement.appendChild(journalsElement);
312      doc.getDocumentElement().appendChild(additionalInfoElement);
313
314      messageSender.sendMessage(indexingQueue, doc);
315      totalIndexed++;
316    }
317
318    return totalIndexed;
319  }
320
321  private static Element createJournalElement(Document doc, String eIssn, String key, String name) {
322    Element journalElement = doc.createElementNS(ArticleDocumentService.XML_NAMESPACE, "journal");
323
324    Element eIssnNode = doc.createElementNS(ArticleDocumentService.XML_NAMESPACE, "eIssn");
325    eIssnNode.appendChild(doc.createTextNode(eIssn));
326    journalElement.appendChild(eIssnNode);
327
328    Element keyNode = doc.createElementNS(ArticleDocumentService.XML_NAMESPACE, "key");
329    keyNode.appendChild(doc.createTextNode(key));
330    journalElement.appendChild(keyNode);
331
332    Element nameNode = doc.createElementNS(ArticleDocumentService.XML_NAMESPACE, "name");
333    nameNode.appendChild(doc.createTextNode(name));
334    journalElement.appendChild(nameNode);
335    return journalElement;
336  }
337
338  private static void loadAllCrossPublishedArticles(Session session, SortedMap<URI, Set<String>> allArticles) {
339    Query query = session.createQuery("select j.eIssn eIssn, j.simpleCollection articleId from Journal j;");
340    Results results = query.execute();
341
342    while (results.next()) {
343      Set<String> articleJournals = allArticles.get(results.getURI(1));
344      if (articleJournals != null) {
345        articleJournals.add(results.getString(0));
346      }
347    }
348  }
349
350  private static void loadAllArticles(Session session, SortedMap<URI, Set<String>> allArticles, Map<URI, Blob> articleXmls) {
351    Query q = session.createQuery(
352        "select a.id articleId, a.eIssn eIssn, rep " +
353            "from Article a " +
354            "where a.state = '" + Article.STATE_ACTIVE + "'^^<xsd:int> " +
355            "and rep := a.representations " +
356            "and rep.name = 'XML';");
357    Results r = q.execute();
358    while (r.next()) {
359      URI articleId = r.getURI(0);
360      Representation representation = (Representation) r.get(2);
361      if (representation.getBody() != null) {
362        Set<String> articleJournals = new HashSet<String>();
363        articleJournals.add(r.getString(1));
364        allArticles.put(articleId, articleJournals);
365        articleXmls.put(articleId, representation.getBody());
366      } else {
367        log.warn(articleId.toString() + " has null representation blob for XML.");
368      }
369    }
370  }
371
372  /**
373   * Create map of all journals by eIssn.
374   *
375   * @param session OTM Session
376   * @return Map where key is eIssn and value is object containing key and name.
377   */
378  private static Map<String, JournalFields> getJournals(Session session) {
379    Map<String, JournalFields> journals = new HashMap<String, JournalFields>();
380    Query journalsQuery = session.createQuery("select j.eIssn eIssn, j.key key, j.dublinCore.title name from Journal j;");
381    Results journalsResults = journalsQuery.execute();
382
383    while (journalsResults.next()) {
384      journals.put(journalsResults.getString(0),
385                   new JournalFields(journalsResults.getString(1), journalsResults.getString(2)));
386    }
387    return journals;
388  }
389
390  private static Set<String> disableFilters(Session session) {
391    Set<String> filters = session.listFilters();
392    for (String filter : filters) {
393      session.disableFilter(filter);
394    }
395    return filters;
396  }
397
398  private static void enableFilters(Set<String> filters, Session session) {
399    for (String filter : filters) {
400      session.enableFilter(filter);
401    }
402  }
403
404  /**
405   * Private class to hold journal details needed to form additional info XML.
406   */
407  private static class JournalFields implements Serializable {
408
409    private static final long serialVersionUID = 5576062239245504730L;
410   
411    private String key;
412    private String name;
413
414    private JournalFields(String key, String name) {
415      this.key = key;
416      this.name = name;
417    }
418
419    public String getKey() {
420      return key;
421    }
422
423    public String getName() {
424      return name;
425    }
426
427    @Override
428    public String toString() {
429      return "JournalFields{" +
430          "key='" + key + '\'' +
431          ", name='" + name + '\'' +
432          '}';
433    }
434  }
435
436}
Note: See TracBrowser for help on using the browser.