This project has retired. For details please refer to its
Attic page.
DefaultRepositoryArchivaTaskScheduler xref
1 package org.apache.archiva.scheduler.repository;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.archiva.common.ArchivaException;
23 import org.apache.archiva.configuration.ArchivaConfiguration;
24 import org.apache.archiva.configuration.ConfigurationEvent;
25 import org.apache.archiva.configuration.ConfigurationListener;
26 import org.apache.archiva.configuration.ManagedRepositoryConfiguration;
27 import org.apache.archiva.metadata.repository.MetadataRepository;
28 import org.apache.archiva.metadata.repository.MetadataRepositoryException;
29 import org.apache.archiva.metadata.repository.RepositorySession;
30 import org.apache.archiva.metadata.repository.RepositorySessionFactory;
31 import org.apache.archiva.metadata.repository.stats.model.RepositoryStatisticsManager;
32 import org.apache.archiva.components.scheduler.CronExpressionValidator;
33 import org.apache.archiva.components.scheduler.Scheduler;
34 import org.apache.archiva.components.taskqueue.TaskQueue;
35 import org.apache.archiva.components.taskqueue.TaskQueueException;
36 import org.apache.archiva.scheduler.repository.model.RepositoryArchivaTaskScheduler;
37 import org.apache.archiva.scheduler.repository.model.RepositoryTask;
38 import org.apache.commons.lang3.time.StopWatch;
39 import org.quartz.CronScheduleBuilder;
40 import org.quartz.CronTrigger;
41 import org.quartz.JobBuilder;
42 import org.quartz.JobDataMap;
43 import org.quartz.JobDetail;
44 import org.quartz.SchedulerException;
45 import org.quartz.TriggerBuilder;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.springframework.stereotype.Service;
49
50 import javax.annotation.PostConstruct;
51 import javax.annotation.PreDestroy;
52 import javax.inject.Inject;
53 import javax.inject.Named;
54 import java.util.ArrayList;
55 import java.util.HashSet;
56 import java.util.List;
57 import java.util.Set;
58
59
60
61
62 @Service( "archivaTaskScheduler#repository" )
63 public class DefaultRepositoryArchivaTaskScheduler
64 implements RepositoryArchivaTaskScheduler, ConfigurationListener
65 {
66 private Logger log = LoggerFactory.getLogger( getClass() );
67
68 @Inject
69 private Scheduler scheduler;
70
71 @Inject
72 private CronExpressionValidator cronValidator;
73
74 @Inject
75 @Named( value = "taskQueue#repository-scanning" )
76 private TaskQueue<RepositoryTask> repositoryScanningQueue;
77
78 @Inject
79 private ArchivaConfiguration archivaConfiguration;
80
81 @Inject
82 @Named( value = "repositoryStatisticsManager#default" )
83 private RepositoryStatisticsManager repositoryStatisticsManager;
84
85
86
87
88 @Inject
89 private RepositorySessionFactory repositorySessionFactory;
90
91 private static final String REPOSITORY_SCAN_GROUP = "rg";
92
93 private static final String REPOSITORY_JOB = "rj";
94
95 private static final String REPOSITORY_JOB_TRIGGER = "rjt";
96
97 static final String TASK_QUEUE = "TASK_QUEUE";
98
99 static final String TASK_REPOSITORY = "TASK_REPOSITORY";
100
101 public static final String CRON_HOURLY = "0 0 * * * ?";
102
103 private Set<String> jobs = new HashSet<>();
104
105 private List<String> queuedRepos = new ArrayList<>();
106
107 @PostConstruct
108 public void startup()
109 throws ArchivaException
110 {
111
112 StopWatch stopWatch = new StopWatch();
113 stopWatch.start();
114
115 archivaConfiguration.addListener( this );
116
117 List<ManagedRepositoryConfiguration> repositories =
118 archivaConfiguration.getConfiguration().getManagedRepositories();
119
120 RepositorySession repositorySession = null;
121 try
122 {
123 repositorySession = repositorySessionFactory.createSession();
124 }
125 catch ( MetadataRepositoryException e )
126 {
127 e.printStackTrace( );
128 }
129 try
130 {
131 MetadataRepository metadataRepository = repositorySession.getRepository();
132 for ( ManagedRepositoryConfiguration repoConfig : repositories )
133 {
134 if ( repoConfig.isScanned() )
135 {
136 try
137 {
138 scheduleRepositoryJobs( repoConfig );
139 }
140 catch ( SchedulerException e )
141 {
142 throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e );
143 }
144
145 try
146 {
147 if ( !isPreviouslyScanned( repoConfig, metadataRepository ) )
148 {
149 queueInitialRepoScan( repoConfig );
150 }
151 }
152 catch ( MetadataRepositoryException e )
153 {
154 log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: {}",
155 e.getMessage(), e );
156 }
157 }
158 }
159 }
160 finally
161 {
162 repositorySession.close();
163 }
164
165 stopWatch.stop();
166 log.info( "Time to initalize DefaultRepositoryArchivaTaskScheduler: {} ms", stopWatch.getTime() );
167 }
168
169
170 @PreDestroy
171 public void stop()
172 throws SchedulerException
173 {
174 for ( String job : jobs )
175 {
176 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
177 }
178 jobs.clear();
179 queuedRepos.clear();
180
181 }
182
183 @SuppressWarnings( "unchecked" )
184 @Override
185 public boolean isProcessingRepositoryTask( String repositoryId )
186 {
187 synchronized ( repositoryScanningQueue )
188 {
189 List<RepositoryTask> queue = null;
190
191 try
192 {
193 queue = repositoryScanningQueue.getQueueSnapshot();
194 }
195 catch ( TaskQueueException e )
196 {
197
198 }
199
200 for ( RepositoryTask queuedTask : queue )
201 {
202 if ( queuedTask.getRepositoryId().equals( repositoryId ) )
203 {
204 return true;
205 }
206 }
207 return false;
208 }
209 }
210
211 @Override
212 public boolean isProcessingRepositoryTask( RepositoryTask task )
213 {
214 synchronized ( repositoryScanningQueue )
215 {
216 List<RepositoryTask> queue = null;
217
218 try
219 {
220 queue = repositoryScanningQueue.getQueueSnapshot();
221 }
222 catch ( TaskQueueException e )
223 {
224
225 }
226
227 for ( RepositoryTask queuedTask : queue )
228 {
229 if ( task.equals( queuedTask ) )
230 {
231 return true;
232 }
233 }
234 return false;
235 }
236 }
237
238 @Override
239 public void queueTask( RepositoryTask task )
240 throws TaskQueueException
241 {
242 synchronized ( repositoryScanningQueue )
243 {
244 if ( isProcessingRepositoryTask( task ) )
245 {
246 log.debug( "Repository task '{}' is already queued. Skipping task.", task );
247 }
248 else
249 {
250
251 repositoryScanningQueue.put( task );
252 }
253 }
254 }
255
256 @Override
257 public boolean unQueueTask( RepositoryTask task )
258 throws TaskQueueException
259 {
260 synchronized ( repositoryScanningQueue )
261 {
262 if ( !isProcessingRepositoryTask( task ) )
263 {
264 log.info( "cannot unqueue Repository task '{}' not already queued.", task );
265 return false;
266 }
267 else
268 {
269 return repositoryScanningQueue.remove( task );
270 }
271 }
272 }
273
274 @Override
275 public void configurationEvent( ConfigurationEvent event )
276 {
277 if ( event.getType() == ConfigurationEvent.SAVED )
278 {
279 for ( String job : jobs )
280 {
281 try
282 {
283 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
284 }
285 catch ( SchedulerException e )
286 {
287 log.error( "Error restarting the repository scanning job after property change." );
288 }
289 }
290 jobs.clear();
291
292 List<ManagedRepositoryConfiguration> repositories =
293 archivaConfiguration.getConfiguration().getManagedRepositories();
294
295 for ( ManagedRepositoryConfiguration repoConfig : repositories )
296 {
297 if ( repoConfig.getRefreshCronExpression() != null )
298 {
299 try
300 {
301 scheduleRepositoryJobs( repoConfig );
302 }
303 catch ( SchedulerException e )
304 {
305 log.error( "error restarting job: '{}' : '{}'", REPOSITORY_JOB, repoConfig.getId() );
306 }
307 }
308 }
309 }
310 }
311
312 private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig,
313 MetadataRepository metadataRepository )
314 throws MetadataRepositoryException
315 {
316 long start = System.currentTimeMillis();
317
318 boolean res = repositoryStatisticsManager.hasStatistics( repoConfig.getId() );
319
320 long end = System.currentTimeMillis();
321
322 log.debug( "isPreviouslyScanned repo {} {} time: {} ms", repoConfig.getId(), res, ( end - start ) );
323
324 return res;
325 }
326
327
328 private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
329 {
330 String repoId = repoConfig.getId();
331 RepositoryTaskpository/model/RepositoryTask.html#RepositoryTask">RepositoryTask task = new RepositoryTask();
332 task.setRepositoryId( repoId );
333
334 if ( !queuedRepos.contains( repoId ) )
335 {
336 log.info( "Repository [{}] is queued to be scanned as it hasn't been previously.", repoId );
337
338 try
339 {
340 queuedRepos.add( repoConfig.getId() );
341 this.queueTask( task );
342 }
343 catch ( TaskQueueException e )
344 {
345 log.error( "Error occurred while queueing repository [{}] task : {}", e.getMessage(), repoId );
346 }
347 }
348 }
349
350 private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
351 throws SchedulerException
352 {
353 if ( repoConfig.getRefreshCronExpression() == null )
354 {
355 log.warn( "Skipping job, no cron expression for {}", repoConfig.getId() );
356 return;
357 }
358
359 if ( !repoConfig.isScanned() )
360 {
361 log.warn( "Skipping job, repository scannable has been disabled for {}", repoConfig.getId() );
362 return;
363 }
364
365
366 String cronString = repoConfig.getRefreshCronExpression();
367
368 if ( !cronValidator.validate( cronString ) )
369 {
370 log.warn( "Cron expression [{}] for repository [{}] is invalid. Defaulting to hourly.", cronString,
371 repoConfig.getId() );
372 cronString = CRON_HOURLY;
373 }
374
375 JobDataMap jobDataMap = new JobDataMap( );
376 jobDataMap.put( TASK_QUEUE, repositoryScanningQueue );
377 jobDataMap.put( TASK_REPOSITORY, repoConfig.getId() );
378
379
380 JobDetail repositoryJob = JobBuilder.newJob( RepositoryTaskJob.class )
381 .withIdentity( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
382 .setJobData( jobDataMap )
383 .build();
384
385 try
386 {
387 CronTrigger trigger = TriggerBuilder.newTrigger()
388 .withIdentity( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
389 .withSchedule( CronScheduleBuilder.cronSchedule( cronString ) )
390 .build();
391
392 jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
393 scheduler.scheduleJob( repositoryJob, trigger );
394 }
395 catch ( RuntimeException e )
396 {
397 log.error(
398 "ParseException in repository scanning cron expression, disabling repository scanning for '{}': {}",
399 repoConfig.getId(), e.getMessage() );
400 }
401
402 }
403 }