This project has retired. For details please refer to its Attic page.
DefaultRepositoryArchivaTaskScheduler xref
View Javadoc
1   package org.apache.archiva.scheduler.repository;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.archiva.common.ArchivaException;
23  import org.apache.archiva.configuration.ArchivaConfiguration;
24  import org.apache.archiva.configuration.ConfigurationEvent;
25  import org.apache.archiva.configuration.ConfigurationListener;
26  import org.apache.archiva.configuration.ManagedRepositoryConfiguration;
27  import org.apache.archiva.metadata.repository.MetadataRepository;
28  import org.apache.archiva.metadata.repository.MetadataRepositoryException;
29  import org.apache.archiva.metadata.repository.RepositorySession;
30  import org.apache.archiva.metadata.repository.RepositorySessionFactory;
31  import org.apache.archiva.metadata.repository.stats.RepositoryStatisticsManager;
32  import org.apache.archiva.redback.components.scheduler.CronExpressionValidator;
33  import org.apache.archiva.redback.components.scheduler.Scheduler;
34  import org.apache.archiva.redback.components.taskqueue.TaskQueue;
35  import org.apache.archiva.redback.components.taskqueue.TaskQueueException;
36  import org.apache.archiva.scheduler.repository.model.RepositoryArchivaTaskScheduler;
37  import org.apache.archiva.scheduler.repository.model.RepositoryTask;
38  import org.apache.commons.lang.time.StopWatch;
39  import org.quartz.CronScheduleBuilder;
40  import org.quartz.CronTrigger;
41  import org.quartz.JobBuilder;
42  import org.quartz.JobDataMap;
43  import org.quartz.JobDetail;
44  import org.quartz.SchedulerException;
45  import org.quartz.TriggerBuilder;
46  import org.slf4j.Logger;
47  import org.slf4j.LoggerFactory;
48  import org.springframework.stereotype.Service;
49  
50  import javax.annotation.PostConstruct;
51  import javax.annotation.PreDestroy;
52  import javax.inject.Inject;
53  import javax.inject.Named;
54  import java.util.ArrayList;
55  import java.util.HashSet;
56  import java.util.List;
57  import java.util.Set;
58  
59  /**
60   * Default implementation of a scheduling component for archiva.
61   */
62  @Service( "archivaTaskScheduler#repository" )
63  public class DefaultRepositoryArchivaTaskScheduler
64      implements RepositoryArchivaTaskScheduler, ConfigurationListener
65  {
66      private Logger log = LoggerFactory.getLogger( getClass() );
67  
68      @Inject
69      private Scheduler scheduler;
70  
71      @Inject
72      private CronExpressionValidator cronValidator;
73  
74      @Inject
75      @Named( value = "taskQueue#repository-scanning" )
76      private TaskQueue repositoryScanningQueue;
77  
78      @Inject
79      private ArchivaConfiguration archivaConfiguration;
80  
81      @Inject
82      @Named( value = "repositoryStatisticsManager#default" )
83      private RepositoryStatisticsManager repositoryStatisticsManager;
84  
85      /**
86       * TODO: could have multiple implementations
87       */
88      @Inject
89      private RepositorySessionFactory repositorySessionFactory;
90  
91      private static final String REPOSITORY_SCAN_GROUP = "rg";
92  
93      private static final String REPOSITORY_JOB = "rj";
94  
95      private static final String REPOSITORY_JOB_TRIGGER = "rjt";
96  
97      static final String TASK_QUEUE = "TASK_QUEUE";
98  
99      static final String TASK_REPOSITORY = "TASK_REPOSITORY";
100 
101     public static final String CRON_HOURLY = "0 0 * * * ?";
102 
103     private Set<String> jobs = new HashSet<>();
104 
105     private List<String> queuedRepos = new ArrayList<>();
106 
107     @PostConstruct
108     public void startup()
109         throws ArchivaException
110     {
111 
112         StopWatch stopWatch = new StopWatch();
113         stopWatch.start();
114 
115         archivaConfiguration.addListener( this );
116 
117         List<ManagedRepositoryConfiguration> repositories =
118             archivaConfiguration.getConfiguration().getManagedRepositories();
119 
120         RepositorySession repositorySession = repositorySessionFactory.createSession();
121         try
122         {
123             MetadataRepository metadataRepository = repositorySession.getRepository();
124             for ( ManagedRepositoryConfiguration repoConfig : repositories )
125             {
126                 if ( repoConfig.isScanned() )
127                 {
128                     try
129                     {
130                         scheduleRepositoryJobs( repoConfig );
131                     }
132                     catch ( SchedulerException e )
133                     {
134                         throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e );
135                     }
136 
137                     try
138                     {
139                         if ( !isPreviouslyScanned( repoConfig, metadataRepository ) )
140                         {
141                             queueInitialRepoScan( repoConfig );
142                         }
143                     }
144                     catch ( MetadataRepositoryException e )
145                     {
146                         log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: {}",
147                                   e.getMessage(), e );
148                     }
149                 }
150             }
151         }
152         finally
153         {
154             repositorySession.close();
155         }
156 
157         stopWatch.stop();
158         log.info( "Time to initalize DefaultRepositoryArchivaTaskScheduler: {} ms", stopWatch.getTime() );
159     }
160 
161 
162     @PreDestroy
163     public void stop()
164         throws SchedulerException
165     {
166         for ( String job : jobs )
167         {
168             scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
169         }
170         jobs.clear();
171         queuedRepos.clear();
172 
173     }
174 
175     @SuppressWarnings( "unchecked" )
176     @Override
177     public boolean isProcessingRepositoryTask( String repositoryId )
178     {
179         synchronized ( repositoryScanningQueue )
180         {
181             List<RepositoryTask> queue = null;
182 
183             try
184             {
185                 queue = repositoryScanningQueue.getQueueSnapshot();
186             }
187             catch ( TaskQueueException e )
188             {
189                 // not possible with plexus-taskqueue implementation, ignore
190             }
191 
192             for ( RepositoryTask queuedTask : queue )
193             {
194                 if ( queuedTask.getRepositoryId().equals( repositoryId ) )
195                 {
196                     return true;
197                 }
198             }
199             return false;
200         }
201     }
202 
203     @Override
204     public boolean isProcessingRepositoryTask( RepositoryTask task )
205     {
206         synchronized ( repositoryScanningQueue )
207         {
208             List<RepositoryTask> queue = null;
209 
210             try
211             {
212                 queue = repositoryScanningQueue.getQueueSnapshot();
213             }
214             catch ( TaskQueueException e )
215             {
216                 // not possible with plexus-taskqueue implementation, ignore
217             }
218 
219             for ( RepositoryTask queuedTask : queue )
220             {
221                 if ( task.equals( queuedTask ) )
222                 {
223                     return true;
224                 }
225             }
226             return false;
227         }
228     }
229 
230     @Override
231     public void queueTask( RepositoryTask task )
232         throws TaskQueueException
233     {
234         synchronized ( repositoryScanningQueue )
235         {
236             if ( isProcessingRepositoryTask( task ) )
237             {
238                 log.debug( "Repository task '{}' is already queued. Skipping task.", task );
239             }
240             else
241             {
242                 // add check if the task is already queued if it is a file scan
243                 repositoryScanningQueue.put( task );
244             }
245         }
246     }
247 
248     @Override
249     public boolean unQueueTask( RepositoryTask task )
250         throws TaskQueueException
251     {
252         synchronized ( repositoryScanningQueue )
253         {
254             if ( !isProcessingRepositoryTask( task ) )
255             {
256                 log.info( "cannot unqueue Repository task '{}' not already queued.", task );
257                 return false;
258             }
259             else
260             {
261                 return repositoryScanningQueue.remove( task );
262             }
263         }
264     }
265 
266     @Override
267     public void configurationEvent( ConfigurationEvent event )
268     {
269         if ( event.getType() == ConfigurationEvent.SAVED )
270         {
271             for ( String job : jobs )
272             {
273                 try
274                 {
275                     scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
276                 }
277                 catch ( SchedulerException e )
278                 {
279                     log.error( "Error restarting the repository scanning job after property change." );
280                 }
281             }
282             jobs.clear();
283 
284             List<ManagedRepositoryConfiguration> repositories =
285                 archivaConfiguration.getConfiguration().getManagedRepositories();
286 
287             for ( ManagedRepositoryConfiguration repoConfig : repositories )
288             {
289                 if ( repoConfig.getRefreshCronExpression() != null )
290                 {
291                     try
292                     {
293                         scheduleRepositoryJobs( repoConfig );
294                     }
295                     catch ( SchedulerException e )
296                     {
297                         log.error( "error restarting job: '{}' : '{}'", REPOSITORY_JOB, repoConfig.getId() );
298                     }
299                 }
300             }
301         }
302     }
303 
304     private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig,
305                                          MetadataRepository metadataRepository )
306         throws MetadataRepositoryException
307     {
308         long start = System.currentTimeMillis();
309 
310         boolean res = repositoryStatisticsManager.hasStatistics( metadataRepository, repoConfig.getId() );
311 
312         long end = System.currentTimeMillis();
313 
314         log.debug( "isPreviouslyScanned repo {} {} time: {} ms", repoConfig.getId(), res, ( end - start ) );
315 
316         return res;
317     }
318 
319     // MRM-848: Pre-configured repository initially appear to be empty
320     private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
321     {
322         String repoId = repoConfig.getId();
323         RepositoryTask task = new RepositoryTask();
324         task.setRepositoryId( repoId );
325 
326         if ( !queuedRepos.contains( repoId ) )
327         {
328             log.info( "Repository [{}] is queued to be scanned as it hasn't been previously.", repoId );
329 
330             try
331             {
332                 queuedRepos.add( repoConfig.getId() );
333                 this.queueTask( task );
334             }
335             catch ( TaskQueueException e )
336             {
337                 log.error( "Error occurred while queueing repository [{}] task : {}", e.getMessage(), repoId );
338             }
339         }
340     }
341 
342     private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
343         throws SchedulerException
344     {
345         if ( repoConfig.getRefreshCronExpression() == null )
346         {
347             log.warn( "Skipping job, no cron expression for {}", repoConfig.getId() );
348             return;
349         }
350 
351         if ( !repoConfig.isScanned() )
352         {
353             log.warn( "Skipping job, repository scannable has been disabled for {}", repoConfig.getId() );
354             return;
355         }
356 
357         // get the cron string for these database scanning jobs
358         String cronString = repoConfig.getRefreshCronExpression();
359 
360         if ( !cronValidator.validate( cronString ) )
361         {
362             log.warn( "Cron expression [{}] for repository [{}] is invalid.  Defaulting to hourly.", cronString,
363                       repoConfig.getId() );
364             cronString = CRON_HOURLY;
365         }
366 
367         JobDataMap jobDataMap = new JobDataMap( );
368         jobDataMap.put( TASK_QUEUE, repositoryScanningQueue );
369         jobDataMap.put( TASK_REPOSITORY, repoConfig.getId() );
370 
371         // setup the unprocessed artifact job
372         JobDetail repositoryJob = JobBuilder.newJob( RepositoryTaskJob.class )
373                                         .withIdentity( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
374                                         .setJobData( jobDataMap )
375                                         .build();
376 
377         try
378         {
379             CronTrigger trigger = TriggerBuilder.newTrigger()
380                     .withIdentity( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
381                     .withSchedule( CronScheduleBuilder.cronSchedule( cronString ) )
382                     .build();
383 
384             jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
385             scheduler.scheduleJob( repositoryJob, trigger );
386         }
387         catch ( RuntimeException e )
388         {
389             log.error(
390                 "ParseException in repository scanning cron expression, disabling repository scanning for '': {}",
391                 repoConfig.getId(), e.getMessage() );
392         }
393 
394     }
395 }