001package org.apache.archiva.scheduler.repository; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import org.apache.archiva.common.ArchivaException; 023import org.apache.archiva.configuration.ArchivaConfiguration; 024import org.apache.archiva.configuration.ConfigurationEvent; 025import org.apache.archiva.configuration.ConfigurationListener; 026import org.apache.archiva.configuration.ManagedRepositoryConfiguration; 027import org.apache.archiva.metadata.repository.MetadataRepository; 028import org.apache.archiva.metadata.repository.MetadataRepositoryException; 029import org.apache.archiva.metadata.repository.RepositorySession; 030import org.apache.archiva.metadata.repository.RepositorySessionFactory; 031import org.apache.archiva.metadata.repository.stats.RepositoryStatisticsManager; 032import org.apache.archiva.redback.components.scheduler.CronExpressionValidator; 033import org.apache.archiva.redback.components.scheduler.Scheduler; 034import org.apache.archiva.redback.components.taskqueue.TaskQueue; 035import org.apache.archiva.redback.components.taskqueue.TaskQueueException; 036import org.apache.archiva.scheduler.repository.model.RepositoryArchivaTaskScheduler; 037import org.apache.archiva.scheduler.repository.model.RepositoryTask; 038import org.apache.commons.lang.time.StopWatch; 039import org.quartz.CronScheduleBuilder; 040import org.quartz.CronTrigger; 041import org.quartz.JobBuilder; 042import org.quartz.JobDataMap; 043import org.quartz.JobDetail; 044import org.quartz.SchedulerException; 045import org.quartz.TriggerBuilder; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048import org.springframework.stereotype.Service; 049 050import javax.annotation.PostConstruct; 051import javax.annotation.PreDestroy; 052import javax.inject.Inject; 053import javax.inject.Named; 054import java.util.ArrayList; 055import java.util.HashSet; 056import java.util.List; 057import java.util.Set; 058 059/** 060 * Default implementation of a scheduling component for archiva. 061 */ 062@Service( "archivaTaskScheduler#repository" ) 063public class DefaultRepositoryArchivaTaskScheduler 064 implements RepositoryArchivaTaskScheduler, ConfigurationListener 065{ 066 private Logger log = LoggerFactory.getLogger( getClass() ); 067 068 @Inject 069 private Scheduler scheduler; 070 071 @Inject 072 private CronExpressionValidator cronValidator; 073 074 @Inject 075 @Named( value = "taskQueue#repository-scanning" ) 076 private TaskQueue repositoryScanningQueue; 077 078 @Inject 079 private ArchivaConfiguration archivaConfiguration; 080 081 @Inject 082 @Named( value = "repositoryStatisticsManager#default" ) 083 private RepositoryStatisticsManager repositoryStatisticsManager; 084 085 /** 086 * TODO: could have multiple implementations 087 */ 088 @Inject 089 private RepositorySessionFactory repositorySessionFactory; 090 091 private static final String REPOSITORY_SCAN_GROUP = "rg"; 092 093 private static final String REPOSITORY_JOB = "rj"; 094 095 private static final String REPOSITORY_JOB_TRIGGER = "rjt"; 096 097 static final String TASK_QUEUE = "TASK_QUEUE"; 098 099 static final String TASK_REPOSITORY = "TASK_REPOSITORY"; 100 101 public static final String CRON_HOURLY = "0 0 * * * ?"; 102 103 private Set<String> jobs = new HashSet<>(); 104 105 private List<String> queuedRepos = new ArrayList<>(); 106 107 @PostConstruct 108 public void startup() 109 throws ArchivaException 110 { 111 112 StopWatch stopWatch = new StopWatch(); 113 stopWatch.start(); 114 115 archivaConfiguration.addListener( this ); 116 117 List<ManagedRepositoryConfiguration> repositories = 118 archivaConfiguration.getConfiguration().getManagedRepositories(); 119 120 RepositorySession repositorySession = repositorySessionFactory.createSession(); 121 try 122 { 123 MetadataRepository metadataRepository = repositorySession.getRepository(); 124 for ( ManagedRepositoryConfiguration repoConfig : repositories ) 125 { 126 if ( repoConfig.isScanned() ) 127 { 128 try 129 { 130 scheduleRepositoryJobs( repoConfig ); 131 } 132 catch ( SchedulerException e ) 133 { 134 throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e ); 135 } 136 137 try 138 { 139 if ( !isPreviouslyScanned( repoConfig, metadataRepository ) ) 140 { 141 queueInitialRepoScan( repoConfig ); 142 } 143 } 144 catch ( MetadataRepositoryException e ) 145 { 146 log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: {}", 147 e.getMessage(), e ); 148 } 149 } 150 } 151 } 152 finally 153 { 154 repositorySession.close(); 155 } 156 157 stopWatch.stop(); 158 log.info( "Time to initalize DefaultRepositoryArchivaTaskScheduler: {} ms", stopWatch.getTime() ); 159 } 160 161 162 @PreDestroy 163 public void stop() 164 throws SchedulerException 165 { 166 for ( String job : jobs ) 167 { 168 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP ); 169 } 170 jobs.clear(); 171 queuedRepos.clear(); 172 173 } 174 175 @SuppressWarnings( "unchecked" ) 176 @Override 177 public boolean isProcessingRepositoryTask( String repositoryId ) 178 { 179 synchronized ( repositoryScanningQueue ) 180 { 181 List<RepositoryTask> queue = null; 182 183 try 184 { 185 queue = repositoryScanningQueue.getQueueSnapshot(); 186 } 187 catch ( TaskQueueException e ) 188 { 189 // not possible with plexus-taskqueue implementation, ignore 190 } 191 192 for ( RepositoryTask queuedTask : queue ) 193 { 194 if ( queuedTask.getRepositoryId().equals( repositoryId ) ) 195 { 196 return true; 197 } 198 } 199 return false; 200 } 201 } 202 203 @Override 204 public boolean isProcessingRepositoryTask( RepositoryTask task ) 205 { 206 synchronized ( repositoryScanningQueue ) 207 { 208 List<RepositoryTask> queue = null; 209 210 try 211 { 212 queue = repositoryScanningQueue.getQueueSnapshot(); 213 } 214 catch ( TaskQueueException e ) 215 { 216 // not possible with plexus-taskqueue implementation, ignore 217 } 218 219 for ( RepositoryTask queuedTask : queue ) 220 { 221 if ( task.equals( queuedTask ) ) 222 { 223 return true; 224 } 225 } 226 return false; 227 } 228 } 229 230 @Override 231 public void queueTask( RepositoryTask task ) 232 throws TaskQueueException 233 { 234 synchronized ( repositoryScanningQueue ) 235 { 236 if ( isProcessingRepositoryTask( task ) ) 237 { 238 log.debug( "Repository task '{}' is already queued. Skipping task.", task ); 239 } 240 else 241 { 242 // add check if the task is already queued if it is a file scan 243 repositoryScanningQueue.put( task ); 244 } 245 } 246 } 247 248 @Override 249 public boolean unQueueTask( RepositoryTask task ) 250 throws TaskQueueException 251 { 252 synchronized ( repositoryScanningQueue ) 253 { 254 if ( !isProcessingRepositoryTask( task ) ) 255 { 256 log.info( "cannot unqueue Repository task '{}' not already queued.", task ); 257 return false; 258 } 259 else 260 { 261 return repositoryScanningQueue.remove( task ); 262 } 263 } 264 } 265 266 @Override 267 public void configurationEvent( ConfigurationEvent event ) 268 { 269 if ( event.getType() == ConfigurationEvent.SAVED ) 270 { 271 for ( String job : jobs ) 272 { 273 try 274 { 275 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP ); 276 } 277 catch ( SchedulerException e ) 278 { 279 log.error( "Error restarting the repository scanning job after property change." ); 280 } 281 } 282 jobs.clear(); 283 284 List<ManagedRepositoryConfiguration> repositories = 285 archivaConfiguration.getConfiguration().getManagedRepositories(); 286 287 for ( ManagedRepositoryConfiguration repoConfig : repositories ) 288 { 289 if ( repoConfig.getRefreshCronExpression() != null ) 290 { 291 try 292 { 293 scheduleRepositoryJobs( repoConfig ); 294 } 295 catch ( SchedulerException e ) 296 { 297 log.error( "error restarting job: '{}' : '{}'", REPOSITORY_JOB, repoConfig.getId() ); 298 } 299 } 300 } 301 } 302 } 303 304 private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig, 305 MetadataRepository metadataRepository ) 306 throws MetadataRepositoryException 307 { 308 long start = System.currentTimeMillis(); 309 310 boolean res = repositoryStatisticsManager.hasStatistics( metadataRepository, repoConfig.getId() ); 311 312 long end = System.currentTimeMillis(); 313 314 log.debug( "isPreviouslyScanned repo {} {} time: {} ms", repoConfig.getId(), res, ( end - start ) ); 315 316 return res; 317 } 318 319 // MRM-848: Pre-configured repository initially appear to be empty 320 private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig ) 321 { 322 String repoId = repoConfig.getId(); 323 RepositoryTask task = new RepositoryTask(); 324 task.setRepositoryId( repoId ); 325 326 if ( !queuedRepos.contains( repoId ) ) 327 { 328 log.info( "Repository [{}] is queued to be scanned as it hasn't been previously.", repoId ); 329 330 try 331 { 332 queuedRepos.add( repoConfig.getId() ); 333 this.queueTask( task ); 334 } 335 catch ( TaskQueueException e ) 336 { 337 log.error( "Error occurred while queueing repository [{}] task : {}", e.getMessage(), repoId ); 338 } 339 } 340 } 341 342 private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig ) 343 throws SchedulerException 344 { 345 if ( repoConfig.getRefreshCronExpression() == null ) 346 { 347 log.warn( "Skipping job, no cron expression for {}", repoConfig.getId() ); 348 return; 349 } 350 351 if ( !repoConfig.isScanned() ) 352 { 353 log.warn( "Skipping job, repository scannable has been disabled for {}", repoConfig.getId() ); 354 return; 355 } 356 357 // get the cron string for these database scanning jobs 358 String cronString = repoConfig.getRefreshCronExpression(); 359 360 if ( !cronValidator.validate( cronString ) ) 361 { 362 log.warn( "Cron expression [{}] for repository [{}] is invalid. Defaulting to hourly.", cronString, 363 repoConfig.getId() ); 364 cronString = CRON_HOURLY; 365 } 366 367 JobDataMap jobDataMap = new JobDataMap( ); 368 jobDataMap.put( TASK_QUEUE, repositoryScanningQueue ); 369 jobDataMap.put( TASK_REPOSITORY, repoConfig.getId() ); 370 371 // setup the unprocessed artifact job 372 JobDetail repositoryJob = JobBuilder.newJob( RepositoryTaskJob.class ) 373 .withIdentity( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP ) 374 .setJobData( jobDataMap ) 375 .build(); 376 377 try 378 { 379 CronTrigger trigger = TriggerBuilder.newTrigger() 380 .withIdentity( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP ) 381 .withSchedule( CronScheduleBuilder.cronSchedule( cronString ) ) 382 .build(); 383 384 jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() ); 385 scheduler.scheduleJob( repositoryJob, trigger ); 386 } 387 catch ( RuntimeException e ) 388 { 389 log.error( 390 "ParseException in repository scanning cron expression, disabling repository scanning for '': {}", 391 repoConfig.getId(), e.getMessage() ); 392 } 393 394 } 395}