001package org.apache.archiva.scheduler.repository; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import org.apache.archiva.common.ArchivaException; 023import org.apache.archiva.configuration.ArchivaConfiguration; 024import org.apache.archiva.configuration.ConfigurationEvent; 025import org.apache.archiva.configuration.ConfigurationListener; 026import org.apache.archiva.configuration.ManagedRepositoryConfiguration; 027import org.apache.archiva.metadata.repository.MetadataRepository; 028import org.apache.archiva.metadata.repository.MetadataRepositoryException; 029import org.apache.archiva.metadata.repository.RepositorySession; 030import org.apache.archiva.metadata.repository.RepositorySessionFactory; 031import org.apache.archiva.metadata.repository.stats.model.RepositoryStatisticsManager; 032import org.apache.archiva.components.scheduler.CronExpressionValidator; 033import org.apache.archiva.components.scheduler.Scheduler; 034import org.apache.archiva.components.taskqueue.TaskQueue; 035import org.apache.archiva.components.taskqueue.TaskQueueException; 036import org.apache.archiva.scheduler.repository.model.RepositoryArchivaTaskScheduler; 037import org.apache.archiva.scheduler.repository.model.RepositoryTask; 038import org.apache.commons.lang3.time.StopWatch; 039import org.quartz.CronScheduleBuilder; 040import org.quartz.CronTrigger; 041import org.quartz.JobBuilder; 042import org.quartz.JobDataMap; 043import org.quartz.JobDetail; 044import org.quartz.SchedulerException; 045import org.quartz.TriggerBuilder; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048import org.springframework.stereotype.Service; 049 050import javax.annotation.PostConstruct; 051import javax.annotation.PreDestroy; 052import javax.inject.Inject; 053import javax.inject.Named; 054import java.util.ArrayList; 055import java.util.HashSet; 056import java.util.List; 057import java.util.Set; 058 059/** 060 * Default implementation of a scheduling component for archiva. 061 */ 062@Service( "archivaTaskScheduler#repository" ) 063public class DefaultRepositoryArchivaTaskScheduler 064 implements RepositoryArchivaTaskScheduler, ConfigurationListener 065{ 066 private Logger log = LoggerFactory.getLogger( getClass() ); 067 068 @Inject 069 private Scheduler scheduler; 070 071 @Inject 072 private CronExpressionValidator cronValidator; 073 074 @Inject 075 @Named( value = "taskQueue#repository-scanning" ) 076 private TaskQueue<RepositoryTask> repositoryScanningQueue; 077 078 @Inject 079 private ArchivaConfiguration archivaConfiguration; 080 081 @Inject 082 @Named( value = "repositoryStatisticsManager#default" ) 083 private RepositoryStatisticsManager repositoryStatisticsManager; 084 085 /** 086 * TODO: could have multiple implementations 087 */ 088 @Inject 089 private RepositorySessionFactory repositorySessionFactory; 090 091 private static final String REPOSITORY_SCAN_GROUP = "rg"; 092 093 private static final String REPOSITORY_JOB = "rj"; 094 095 private static final String REPOSITORY_JOB_TRIGGER = "rjt"; 096 097 static final String TASK_QUEUE = "TASK_QUEUE"; 098 099 static final String TASK_REPOSITORY = "TASK_REPOSITORY"; 100 101 public static final String CRON_HOURLY = "0 0 * * * ?"; 102 103 private Set<String> jobs = new HashSet<>(); 104 105 private List<String> queuedRepos = new ArrayList<>(); 106 107 @PostConstruct 108 public void startup() 109 throws ArchivaException 110 { 111 112 StopWatch stopWatch = new StopWatch(); 113 stopWatch.start(); 114 115 archivaConfiguration.addListener( this ); 116 117 List<ManagedRepositoryConfiguration> repositories = 118 archivaConfiguration.getConfiguration().getManagedRepositories(); 119 120 RepositorySession repositorySession = null; 121 try 122 { 123 repositorySession = repositorySessionFactory.createSession(); 124 } 125 catch ( MetadataRepositoryException e ) 126 { 127 e.printStackTrace( ); 128 } 129 try 130 { 131 MetadataRepository metadataRepository = repositorySession.getRepository(); 132 for ( ManagedRepositoryConfiguration repoConfig : repositories ) 133 { 134 if ( repoConfig.isScanned() ) 135 { 136 try 137 { 138 scheduleRepositoryJobs( repoConfig ); 139 } 140 catch ( SchedulerException e ) 141 { 142 throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e ); 143 } 144 145 try 146 { 147 if ( !isPreviouslyScanned( repoConfig, metadataRepository ) ) 148 { 149 queueInitialRepoScan( repoConfig ); 150 } 151 } 152 catch ( MetadataRepositoryException e ) 153 { 154 log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: {}", 155 e.getMessage(), e ); 156 } 157 } 158 } 159 } 160 finally 161 { 162 repositorySession.close(); 163 } 164 165 stopWatch.stop(); 166 log.info( "Time to initalize DefaultRepositoryArchivaTaskScheduler: {} ms", stopWatch.getTime() ); 167 } 168 169 170 @PreDestroy 171 public void stop() 172 throws SchedulerException 173 { 174 for ( String job : jobs ) 175 { 176 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP ); 177 } 178 jobs.clear(); 179 queuedRepos.clear(); 180 181 } 182 183 @SuppressWarnings( "unchecked" ) 184 @Override 185 public boolean isProcessingRepositoryTask( String repositoryId ) 186 { 187 synchronized ( repositoryScanningQueue ) 188 { 189 List<RepositoryTask> queue = null; 190 191 try 192 { 193 queue = repositoryScanningQueue.getQueueSnapshot(); 194 } 195 catch ( TaskQueueException e ) 196 { 197 // not possible with plexus-taskqueue implementation, ignore 198 } 199 200 for ( RepositoryTask queuedTask : queue ) 201 { 202 if ( queuedTask.getRepositoryId().equals( repositoryId ) ) 203 { 204 return true; 205 } 206 } 207 return false; 208 } 209 } 210 211 @Override 212 public boolean isProcessingRepositoryTask( RepositoryTask task ) 213 { 214 synchronized ( repositoryScanningQueue ) 215 { 216 List<RepositoryTask> queue = null; 217 218 try 219 { 220 queue = repositoryScanningQueue.getQueueSnapshot(); 221 } 222 catch ( TaskQueueException e ) 223 { 224 // not possible with plexus-taskqueue implementation, ignore 225 } 226 227 for ( RepositoryTask queuedTask : queue ) 228 { 229 if ( task.equals( queuedTask ) ) 230 { 231 return true; 232 } 233 } 234 return false; 235 } 236 } 237 238 @Override 239 public void queueTask( RepositoryTask task ) 240 throws TaskQueueException 241 { 242 synchronized ( repositoryScanningQueue ) 243 { 244 if ( isProcessingRepositoryTask( task ) ) 245 { 246 log.debug( "Repository task '{}' is already queued. Skipping task.", task ); 247 } 248 else 249 { 250 // add check if the task is already queued if it is a file scan 251 repositoryScanningQueue.put( task ); 252 } 253 } 254 } 255 256 @Override 257 public boolean unQueueTask( RepositoryTask task ) 258 throws TaskQueueException 259 { 260 synchronized ( repositoryScanningQueue ) 261 { 262 if ( !isProcessingRepositoryTask( task ) ) 263 { 264 log.info( "cannot unqueue Repository task '{}' not already queued.", task ); 265 return false; 266 } 267 else 268 { 269 return repositoryScanningQueue.remove( task ); 270 } 271 } 272 } 273 274 @Override 275 public void configurationEvent( ConfigurationEvent event ) 276 { 277 if ( event.getType() == ConfigurationEvent.SAVED ) 278 { 279 for ( String job : jobs ) 280 { 281 try 282 { 283 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP ); 284 } 285 catch ( SchedulerException e ) 286 { 287 log.error( "Error restarting the repository scanning job after property change." ); 288 } 289 } 290 jobs.clear(); 291 292 List<ManagedRepositoryConfiguration> repositories = 293 archivaConfiguration.getConfiguration().getManagedRepositories(); 294 295 for ( ManagedRepositoryConfiguration repoConfig : repositories ) 296 { 297 if ( repoConfig.getRefreshCronExpression() != null ) 298 { 299 try 300 { 301 scheduleRepositoryJobs( repoConfig ); 302 } 303 catch ( SchedulerException e ) 304 { 305 log.error( "error restarting job: '{}' : '{}'", REPOSITORY_JOB, repoConfig.getId() ); 306 } 307 } 308 } 309 } 310 } 311 312 private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig, 313 MetadataRepository metadataRepository ) 314 throws MetadataRepositoryException 315 { 316 long start = System.currentTimeMillis(); 317 318 boolean res = repositoryStatisticsManager.hasStatistics( repoConfig.getId() ); 319 320 long end = System.currentTimeMillis(); 321 322 log.debug( "isPreviouslyScanned repo {} {} time: {} ms", repoConfig.getId(), res, ( end - start ) ); 323 324 return res; 325 } 326 327 // MRM-848: Pre-configured repository initially appear to be empty 328 private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig ) 329 { 330 String repoId = repoConfig.getId(); 331 RepositoryTask task = new RepositoryTask(); 332 task.setRepositoryId( repoId ); 333 334 if ( !queuedRepos.contains( repoId ) ) 335 { 336 log.info( "Repository [{}] is queued to be scanned as it hasn't been previously.", repoId ); 337 338 try 339 { 340 queuedRepos.add( repoConfig.getId() ); 341 this.queueTask( task ); 342 } 343 catch ( TaskQueueException e ) 344 { 345 log.error( "Error occurred while queueing repository [{}] task : {}", e.getMessage(), repoId ); 346 } 347 } 348 } 349 350 private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig ) 351 throws SchedulerException 352 { 353 if ( repoConfig.getRefreshCronExpression() == null ) 354 { 355 log.warn( "Skipping job, no cron expression for {}", repoConfig.getId() ); 356 return; 357 } 358 359 if ( !repoConfig.isScanned() ) 360 { 361 log.warn( "Skipping job, repository scannable has been disabled for {}", repoConfig.getId() ); 362 return; 363 } 364 365 // get the cron string for these database scanning jobs 366 String cronString = repoConfig.getRefreshCronExpression(); 367 368 if ( !cronValidator.validate( cronString ) ) 369 { 370 log.warn( "Cron expression [{}] for repository [{}] is invalid. Defaulting to hourly.", cronString, 371 repoConfig.getId() ); 372 cronString = CRON_HOURLY; 373 } 374 375 JobDataMap jobDataMap = new JobDataMap( ); 376 jobDataMap.put( TASK_QUEUE, repositoryScanningQueue ); 377 jobDataMap.put( TASK_REPOSITORY, repoConfig.getId() ); 378 379 // setup the unprocessed artifact job 380 JobDetail repositoryJob = JobBuilder.newJob( RepositoryTaskJob.class ) 381 .withIdentity( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP ) 382 .setJobData( jobDataMap ) 383 .build(); 384 385 try 386 { 387 CronTrigger trigger = TriggerBuilder.newTrigger() 388 .withIdentity( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP ) 389 .withSchedule( CronScheduleBuilder.cronSchedule( cronString ) ) 390 .build(); 391 392 jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() ); 393 scheduler.scheduleJob( repositoryJob, trigger ); 394 } 395 catch ( RuntimeException e ) 396 { 397 log.error( 398 "ParseException in repository scanning cron expression, disabling repository scanning for '{}': {}", 399 repoConfig.getId(), e.getMessage() ); 400 } 401 402 } 403}