This project has retired. For details please refer to its Attic page.
DefaultRepositoryArchivaTaskScheduler xref
View Javadoc
1   package org.apache.archiva.scheduler.repository;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.archiva.common.ArchivaException;
23  import org.apache.archiva.configuration.ArchivaConfiguration;
24  import org.apache.archiva.configuration.ConfigurationEvent;
25  import org.apache.archiva.configuration.ConfigurationListener;
26  import org.apache.archiva.configuration.ManagedRepositoryConfiguration;
27  import org.apache.archiva.metadata.repository.MetadataRepository;
28  import org.apache.archiva.metadata.repository.MetadataRepositoryException;
29  import org.apache.archiva.metadata.repository.RepositorySession;
30  import org.apache.archiva.metadata.repository.RepositorySessionFactory;
31  import org.apache.archiva.metadata.repository.stats.model.RepositoryStatisticsManager;
32  import org.apache.archiva.components.scheduler.CronExpressionValidator;
33  import org.apache.archiva.components.scheduler.Scheduler;
34  import org.apache.archiva.components.taskqueue.TaskQueue;
35  import org.apache.archiva.components.taskqueue.TaskQueueException;
36  import org.apache.archiva.scheduler.repository.model.RepositoryArchivaTaskScheduler;
37  import org.apache.archiva.scheduler.repository.model.RepositoryTask;
38  import org.apache.commons.lang3.time.StopWatch;
39  import org.quartz.CronScheduleBuilder;
40  import org.quartz.CronTrigger;
41  import org.quartz.JobBuilder;
42  import org.quartz.JobDataMap;
43  import org.quartz.JobDetail;
44  import org.quartz.SchedulerException;
45  import org.quartz.TriggerBuilder;
46  import org.slf4j.Logger;
47  import org.slf4j.LoggerFactory;
48  import org.springframework.stereotype.Service;
49  
50  import javax.annotation.PostConstruct;
51  import javax.annotation.PreDestroy;
52  import javax.inject.Inject;
53  import javax.inject.Named;
54  import java.util.ArrayList;
55  import java.util.HashSet;
56  import java.util.List;
57  import java.util.Set;
58  
59  /**
60   * Default implementation of a scheduling component for archiva.
61   */
62  @Service( "archivaTaskScheduler#repository" )
63  public class DefaultRepositoryArchivaTaskScheduler
64      implements RepositoryArchivaTaskScheduler, ConfigurationListener
65  {
66      private Logger log = LoggerFactory.getLogger( getClass() );
67  
68      @Inject
69      private Scheduler scheduler;
70  
71      @Inject
72      private CronExpressionValidator cronValidator;
73  
74      @Inject
75      @Named( value = "taskQueue#repository-scanning" )
76      private TaskQueue<RepositoryTask> repositoryScanningQueue;
77  
78      @Inject
79      private ArchivaConfiguration archivaConfiguration;
80  
81      @Inject
82      @Named( value = "repositoryStatisticsManager#default" )
83      private RepositoryStatisticsManager repositoryStatisticsManager;
84  
85      /**
86       * TODO: could have multiple implementations
87       */
88      @Inject
89      private RepositorySessionFactory repositorySessionFactory;
90  
91      private static final String REPOSITORY_SCAN_GROUP = "rg";
92  
93      private static final String REPOSITORY_JOB = "rj";
94  
95      private static final String REPOSITORY_JOB_TRIGGER = "rjt";
96  
97      static final String TASK_QUEUE = "TASK_QUEUE";
98  
99      static final String TASK_REPOSITORY = "TASK_REPOSITORY";
100 
101     public static final String CRON_HOURLY = "0 0 * * * ?";
102 
103     private Set<String> jobs = new HashSet<>();
104 
105     private List<String> queuedRepos = new ArrayList<>();
106 
107     @PostConstruct
108     public void startup()
109         throws ArchivaException
110     {
111 
112         StopWatch stopWatch = new StopWatch();
113         stopWatch.start();
114 
115         archivaConfiguration.addListener( this );
116 
117         List<ManagedRepositoryConfiguration> repositories =
118             archivaConfiguration.getConfiguration().getManagedRepositories();
119 
120         RepositorySession repositorySession = null;
121         try
122         {
123             repositorySession = repositorySessionFactory.createSession();
124         }
125         catch ( MetadataRepositoryException e )
126         {
127             e.printStackTrace( );
128         }
129         try
130         {
131             MetadataRepository metadataRepository = repositorySession.getRepository();
132             for ( ManagedRepositoryConfiguration repoConfig : repositories )
133             {
134                 if ( repoConfig.isScanned() )
135                 {
136                     try
137                     {
138                         scheduleRepositoryJobs( repoConfig );
139                     }
140                     catch ( SchedulerException e )
141                     {
142                         throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e );
143                     }
144 
145                     try
146                     {
147                         if ( !isPreviouslyScanned( repoConfig, metadataRepository ) )
148                         {
149                             queueInitialRepoScan( repoConfig );
150                         }
151                     }
152                     catch ( MetadataRepositoryException e )
153                     {
154                         log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: {}",
155                                   e.getMessage(), e );
156                     }
157                 }
158             }
159         }
160         finally
161         {
162             repositorySession.close();
163         }
164 
165         stopWatch.stop();
166         log.info( "Time to initalize DefaultRepositoryArchivaTaskScheduler: {} ms", stopWatch.getTime() );
167     }
168 
169 
170     @PreDestroy
171     public void stop()
172         throws SchedulerException
173     {
174         for ( String job : jobs )
175         {
176             scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
177         }
178         jobs.clear();
179         queuedRepos.clear();
180 
181     }
182 
183     @SuppressWarnings( "unchecked" )
184     @Override
185     public boolean isProcessingRepositoryTask( String repositoryId )
186     {
187         synchronized ( repositoryScanningQueue )
188         {
189             List<RepositoryTask> queue = null;
190 
191             try
192             {
193                 queue = repositoryScanningQueue.getQueueSnapshot();
194             }
195             catch ( TaskQueueException e )
196             {
197                 // not possible with plexus-taskqueue implementation, ignore
198             }
199 
200             for ( RepositoryTask queuedTask : queue )
201             {
202                 if ( queuedTask.getRepositoryId().equals( repositoryId ) )
203                 {
204                     return true;
205                 }
206             }
207             return false;
208         }
209     }
210 
211     @Override
212     public boolean isProcessingRepositoryTask( RepositoryTask task )
213     {
214         synchronized ( repositoryScanningQueue )
215         {
216             List<RepositoryTask> queue = null;
217 
218             try
219             {
220                 queue = repositoryScanningQueue.getQueueSnapshot();
221             }
222             catch ( TaskQueueException e )
223             {
224                 // not possible with plexus-taskqueue implementation, ignore
225             }
226 
227             for ( RepositoryTask queuedTask : queue )
228             {
229                 if ( task.equals( queuedTask ) )
230                 {
231                     return true;
232                 }
233             }
234             return false;
235         }
236     }
237 
238     @Override
239     public void queueTask( RepositoryTask task )
240         throws TaskQueueException
241     {
242         synchronized ( repositoryScanningQueue )
243         {
244             if ( isProcessingRepositoryTask( task ) )
245             {
246                 log.debug( "Repository task '{}' is already queued. Skipping task.", task );
247             }
248             else
249             {
250                 // add check if the task is already queued if it is a file scan
251                 repositoryScanningQueue.put( task );
252             }
253         }
254     }
255 
256     @Override
257     public boolean unQueueTask( RepositoryTask task )
258         throws TaskQueueException
259     {
260         synchronized ( repositoryScanningQueue )
261         {
262             if ( !isProcessingRepositoryTask( task ) )
263             {
264                 log.info( "cannot unqueue Repository task '{}' not already queued.", task );
265                 return false;
266             }
267             else
268             {
269                 return repositoryScanningQueue.remove( task );
270             }
271         }
272     }
273 
274     @Override
275     public void configurationEvent( ConfigurationEvent event )
276     {
277         if ( event.getType() == ConfigurationEvent.SAVED )
278         {
279             for ( String job : jobs )
280             {
281                 try
282                 {
283                     scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
284                 }
285                 catch ( SchedulerException e )
286                 {
287                     log.error( "Error restarting the repository scanning job after property change." );
288                 }
289             }
290             jobs.clear();
291 
292             List<ManagedRepositoryConfiguration> repositories =
293                 archivaConfiguration.getConfiguration().getManagedRepositories();
294 
295             for ( ManagedRepositoryConfiguration repoConfig : repositories )
296             {
297                 if ( repoConfig.getRefreshCronExpression() != null )
298                 {
299                     try
300                     {
301                         scheduleRepositoryJobs( repoConfig );
302                     }
303                     catch ( SchedulerException e )
304                     {
305                         log.error( "error restarting job: '{}' : '{}'", REPOSITORY_JOB, repoConfig.getId() );
306                     }
307                 }
308             }
309         }
310     }
311 
312     private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig,
313                                          MetadataRepository metadataRepository )
314         throws MetadataRepositoryException
315     {
316         long start = System.currentTimeMillis();
317 
318         boolean res = repositoryStatisticsManager.hasStatistics( repoConfig.getId() );
319 
320         long end = System.currentTimeMillis();
321 
322         log.debug( "isPreviouslyScanned repo {} {} time: {} ms", repoConfig.getId(), res, ( end - start ) );
323 
324         return res;
325     }
326 
327     // MRM-848: Pre-configured repository initially appear to be empty
328     private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
329     {
330         String repoId = repoConfig.getId();
331         RepositoryTaskpository/model/RepositoryTask.html#RepositoryTask">RepositoryTask task = new RepositoryTask();
332         task.setRepositoryId( repoId );
333 
334         if ( !queuedRepos.contains( repoId ) )
335         {
336             log.info( "Repository [{}] is queued to be scanned as it hasn't been previously.", repoId );
337 
338             try
339             {
340                 queuedRepos.add( repoConfig.getId() );
341                 this.queueTask( task );
342             }
343             catch ( TaskQueueException e )
344             {
345                 log.error( "Error occurred while queueing repository [{}] task : {}", e.getMessage(), repoId );
346             }
347         }
348     }
349 
350     private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
351         throws SchedulerException
352     {
353         if ( repoConfig.getRefreshCronExpression() == null )
354         {
355             log.warn( "Skipping job, no cron expression for {}", repoConfig.getId() );
356             return;
357         }
358 
359         if ( !repoConfig.isScanned() )
360         {
361             log.warn( "Skipping job, repository scannable has been disabled for {}", repoConfig.getId() );
362             return;
363         }
364 
365         // get the cron string for these database scanning jobs
366         String cronString = repoConfig.getRefreshCronExpression();
367 
368         if ( !cronValidator.validate( cronString ) )
369         {
370             log.warn( "Cron expression [{}] for repository [{}] is invalid.  Defaulting to hourly.", cronString,
371                       repoConfig.getId() );
372             cronString = CRON_HOURLY;
373         }
374 
375         JobDataMap jobDataMap = new JobDataMap( );
376         jobDataMap.put( TASK_QUEUE, repositoryScanningQueue );
377         jobDataMap.put( TASK_REPOSITORY, repoConfig.getId() );
378 
379         // setup the unprocessed artifact job
380         JobDetail repositoryJob = JobBuilder.newJob( RepositoryTaskJob.class )
381                                         .withIdentity( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
382                                         .setJobData( jobDataMap )
383                                         .build();
384 
385         try
386         {
387             CronTrigger trigger = TriggerBuilder.newTrigger()
388                     .withIdentity( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
389                     .withSchedule( CronScheduleBuilder.cronSchedule( cronString ) )
390                     .build();
391 
392             jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
393             scheduler.scheduleJob( repositoryJob, trigger );
394         }
395         catch ( RuntimeException e )
396         {
397             log.error(
398                 "ParseException in repository scanning cron expression, disabling repository scanning for '{}': {}",
399                 repoConfig.getId(), e.getMessage() );
400         }
401 
402     }
403 }