This project has retired. For details please refer to its
Attic page.
DefaultRepositoryArchivaTaskScheduler xref
1 package org.apache.archiva.scheduler.repository;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.archiva.common.ArchivaException;
23 import org.apache.archiva.configuration.ArchivaConfiguration;
24 import org.apache.archiva.configuration.ConfigurationEvent;
25 import org.apache.archiva.configuration.ConfigurationListener;
26 import org.apache.archiva.configuration.ManagedRepositoryConfiguration;
27 import org.apache.archiva.metadata.repository.MetadataRepository;
28 import org.apache.archiva.metadata.repository.MetadataRepositoryException;
29 import org.apache.archiva.metadata.repository.RepositorySession;
30 import org.apache.archiva.metadata.repository.RepositorySessionFactory;
31 import org.apache.archiva.metadata.repository.stats.RepositoryStatisticsManager;
32 import org.apache.archiva.redback.components.scheduler.CronExpressionValidator;
33 import org.apache.archiva.redback.components.scheduler.Scheduler;
34 import org.apache.archiva.redback.components.taskqueue.TaskQueue;
35 import org.apache.archiva.redback.components.taskqueue.TaskQueueException;
36 import org.apache.archiva.scheduler.repository.model.RepositoryArchivaTaskScheduler;
37 import org.apache.archiva.scheduler.repository.model.RepositoryTask;
38 import org.apache.commons.lang.time.StopWatch;
39 import org.quartz.CronScheduleBuilder;
40 import org.quartz.CronTrigger;
41 import org.quartz.JobBuilder;
42 import org.quartz.JobDataMap;
43 import org.quartz.JobDetail;
44 import org.quartz.SchedulerException;
45 import org.quartz.TriggerBuilder;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.springframework.stereotype.Service;
49
50 import javax.annotation.PostConstruct;
51 import javax.annotation.PreDestroy;
52 import javax.inject.Inject;
53 import javax.inject.Named;
54 import java.util.ArrayList;
55 import java.util.HashSet;
56 import java.util.List;
57 import java.util.Set;
58
59
60
61
62 @Service( "archivaTaskScheduler#repository" )
63 public class DefaultRepositoryArchivaTaskScheduler
64 implements RepositoryArchivaTaskScheduler, ConfigurationListener
65 {
66 private Logger log = LoggerFactory.getLogger( getClass() );
67
68 @Inject
69 private Scheduler scheduler;
70
71 @Inject
72 private CronExpressionValidator cronValidator;
73
74 @Inject
75 @Named( value = "taskQueue#repository-scanning" )
76 private TaskQueue repositoryScanningQueue;
77
78 @Inject
79 private ArchivaConfiguration archivaConfiguration;
80
81 @Inject
82 @Named( value = "repositoryStatisticsManager#default" )
83 private RepositoryStatisticsManager repositoryStatisticsManager;
84
85
86
87
88 @Inject
89 private RepositorySessionFactory repositorySessionFactory;
90
91 private static final String REPOSITORY_SCAN_GROUP = "rg";
92
93 private static final String REPOSITORY_JOB = "rj";
94
95 private static final String REPOSITORY_JOB_TRIGGER = "rjt";
96
97 static final String TASK_QUEUE = "TASK_QUEUE";
98
99 static final String TASK_REPOSITORY = "TASK_REPOSITORY";
100
101 public static final String CRON_HOURLY = "0 0 * * * ?";
102
103 private Set<String> jobs = new HashSet<>();
104
105 private List<String> queuedRepos = new ArrayList<>();
106
107 @PostConstruct
108 public void startup()
109 throws ArchivaException
110 {
111
112 StopWatch stopWatch = new StopWatch();
113 stopWatch.start();
114
115 archivaConfiguration.addListener( this );
116
117 List<ManagedRepositoryConfiguration> repositories =
118 archivaConfiguration.getConfiguration().getManagedRepositories();
119
120 RepositorySession repositorySession = repositorySessionFactory.createSession();
121 try
122 {
123 MetadataRepository metadataRepository = repositorySession.getRepository();
124 for ( ManagedRepositoryConfiguration repoConfig : repositories )
125 {
126 if ( repoConfig.isScanned() )
127 {
128 try
129 {
130 scheduleRepositoryJobs( repoConfig );
131 }
132 catch ( SchedulerException e )
133 {
134 throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e );
135 }
136
137 try
138 {
139 if ( !isPreviouslyScanned( repoConfig, metadataRepository ) )
140 {
141 queueInitialRepoScan( repoConfig );
142 }
143 }
144 catch ( MetadataRepositoryException e )
145 {
146 log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: {}",
147 e.getMessage(), e );
148 }
149 }
150 }
151 }
152 finally
153 {
154 repositorySession.close();
155 }
156
157 stopWatch.stop();
158 log.info( "Time to initalize DefaultRepositoryArchivaTaskScheduler: {} ms", stopWatch.getTime() );
159 }
160
161
162 @PreDestroy
163 public void stop()
164 throws SchedulerException
165 {
166 for ( String job : jobs )
167 {
168 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
169 }
170 jobs.clear();
171 queuedRepos.clear();
172
173 }
174
175 @SuppressWarnings( "unchecked" )
176 @Override
177 public boolean isProcessingRepositoryTask( String repositoryId )
178 {
179 synchronized ( repositoryScanningQueue )
180 {
181 List<RepositoryTask> queue = null;
182
183 try
184 {
185 queue = repositoryScanningQueue.getQueueSnapshot();
186 }
187 catch ( TaskQueueException e )
188 {
189
190 }
191
192 for ( RepositoryTask queuedTask : queue )
193 {
194 if ( queuedTask.getRepositoryId().equals( repositoryId ) )
195 {
196 return true;
197 }
198 }
199 return false;
200 }
201 }
202
203 @Override
204 public boolean isProcessingRepositoryTask( RepositoryTask task )
205 {
206 synchronized ( repositoryScanningQueue )
207 {
208 List<RepositoryTask> queue = null;
209
210 try
211 {
212 queue = repositoryScanningQueue.getQueueSnapshot();
213 }
214 catch ( TaskQueueException e )
215 {
216
217 }
218
219 for ( RepositoryTask queuedTask : queue )
220 {
221 if ( task.equals( queuedTask ) )
222 {
223 return true;
224 }
225 }
226 return false;
227 }
228 }
229
230 @Override
231 public void queueTask( RepositoryTask task )
232 throws TaskQueueException
233 {
234 synchronized ( repositoryScanningQueue )
235 {
236 if ( isProcessingRepositoryTask( task ) )
237 {
238 log.debug( "Repository task '{}' is already queued. Skipping task.", task );
239 }
240 else
241 {
242
243 repositoryScanningQueue.put( task );
244 }
245 }
246 }
247
248 @Override
249 public boolean unQueueTask( RepositoryTask task )
250 throws TaskQueueException
251 {
252 synchronized ( repositoryScanningQueue )
253 {
254 if ( !isProcessingRepositoryTask( task ) )
255 {
256 log.info( "cannot unqueue Repository task '{}' not already queued.", task );
257 return false;
258 }
259 else
260 {
261 return repositoryScanningQueue.remove( task );
262 }
263 }
264 }
265
266 @Override
267 public void configurationEvent( ConfigurationEvent event )
268 {
269 if ( event.getType() == ConfigurationEvent.SAVED )
270 {
271 for ( String job : jobs )
272 {
273 try
274 {
275 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
276 }
277 catch ( SchedulerException e )
278 {
279 log.error( "Error restarting the repository scanning job after property change." );
280 }
281 }
282 jobs.clear();
283
284 List<ManagedRepositoryConfiguration> repositories =
285 archivaConfiguration.getConfiguration().getManagedRepositories();
286
287 for ( ManagedRepositoryConfiguration repoConfig : repositories )
288 {
289 if ( repoConfig.getRefreshCronExpression() != null )
290 {
291 try
292 {
293 scheduleRepositoryJobs( repoConfig );
294 }
295 catch ( SchedulerException e )
296 {
297 log.error( "error restarting job: '{}' : '{}'", REPOSITORY_JOB, repoConfig.getId() );
298 }
299 }
300 }
301 }
302 }
303
304 private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig,
305 MetadataRepository metadataRepository )
306 throws MetadataRepositoryException
307 {
308 long start = System.currentTimeMillis();
309
310 boolean res = repositoryStatisticsManager.hasStatistics( metadataRepository, repoConfig.getId() );
311
312 long end = System.currentTimeMillis();
313
314 log.debug( "isPreviouslyScanned repo {} {} time: {} ms", repoConfig.getId(), res, ( end - start ) );
315
316 return res;
317 }
318
319
320 private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
321 {
322 String repoId = repoConfig.getId();
323 RepositoryTask task = new RepositoryTask();
324 task.setRepositoryId( repoId );
325
326 if ( !queuedRepos.contains( repoId ) )
327 {
328 log.info( "Repository [{}] is queued to be scanned as it hasn't been previously.", repoId );
329
330 try
331 {
332 queuedRepos.add( repoConfig.getId() );
333 this.queueTask( task );
334 }
335 catch ( TaskQueueException e )
336 {
337 log.error( "Error occurred while queueing repository [{}] task : {}", e.getMessage(), repoId );
338 }
339 }
340 }
341
342 private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
343 throws SchedulerException
344 {
345 if ( repoConfig.getRefreshCronExpression() == null )
346 {
347 log.warn( "Skipping job, no cron expression for {}", repoConfig.getId() );
348 return;
349 }
350
351 if ( !repoConfig.isScanned() )
352 {
353 log.warn( "Skipping job, repository scannable has been disabled for {}", repoConfig.getId() );
354 return;
355 }
356
357
358 String cronString = repoConfig.getRefreshCronExpression();
359
360 if ( !cronValidator.validate( cronString ) )
361 {
362 log.warn( "Cron expression [{}] for repository [{}] is invalid. Defaulting to hourly.", cronString,
363 repoConfig.getId() );
364 cronString = CRON_HOURLY;
365 }
366
367 JobDataMap jobDataMap = new JobDataMap( );
368 jobDataMap.put( TASK_QUEUE, repositoryScanningQueue );
369 jobDataMap.put( TASK_REPOSITORY, repoConfig.getId() );
370
371
372 JobDetail repositoryJob = JobBuilder.newJob( RepositoryTaskJob.class )
373 .withIdentity( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
374 .setJobData( jobDataMap )
375 .build();
376
377 try
378 {
379 CronTrigger trigger = TriggerBuilder.newTrigger()
380 .withIdentity( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
381 .withSchedule( CronScheduleBuilder.cronSchedule( cronString ) )
382 .build();
383
384 jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
385 scheduler.scheduleJob( repositoryJob, trigger );
386 }
387 catch ( RuntimeException e )
388 {
389 log.error(
390 "ParseException in repository scanning cron expression, disabling repository scanning for '': {}",
391 repoConfig.getId(), e.getMessage() );
392 }
393
394 }
395 }