This project has retired. For details please refer to its Attic page.
Source code
001package org.apache.archiva.scheduler.repository;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import org.apache.archiva.common.ArchivaException;
023import org.apache.archiva.configuration.ArchivaConfiguration;
024import org.apache.archiva.configuration.ConfigurationEvent;
025import org.apache.archiva.configuration.ConfigurationListener;
026import org.apache.archiva.configuration.ManagedRepositoryConfiguration;
027import org.apache.archiva.metadata.repository.MetadataRepository;
028import org.apache.archiva.metadata.repository.MetadataRepositoryException;
029import org.apache.archiva.metadata.repository.RepositorySession;
030import org.apache.archiva.metadata.repository.RepositorySessionFactory;
031import org.apache.archiva.metadata.repository.stats.model.RepositoryStatisticsManager;
032import org.apache.archiva.components.scheduler.CronExpressionValidator;
033import org.apache.archiva.components.scheduler.Scheduler;
034import org.apache.archiva.components.taskqueue.TaskQueue;
035import org.apache.archiva.components.taskqueue.TaskQueueException;
036import org.apache.archiva.scheduler.repository.model.RepositoryArchivaTaskScheduler;
037import org.apache.archiva.scheduler.repository.model.RepositoryTask;
038import org.apache.commons.lang3.time.StopWatch;
039import org.quartz.CronScheduleBuilder;
040import org.quartz.CronTrigger;
041import org.quartz.JobBuilder;
042import org.quartz.JobDataMap;
043import org.quartz.JobDetail;
044import org.quartz.SchedulerException;
045import org.quartz.TriggerBuilder;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048import org.springframework.stereotype.Service;
049
050import javax.annotation.PostConstruct;
051import javax.annotation.PreDestroy;
052import javax.inject.Inject;
053import javax.inject.Named;
054import java.util.ArrayList;
055import java.util.HashSet;
056import java.util.List;
057import java.util.Set;
058
059/**
060 * Default implementation of a scheduling component for archiva.
061 */
062@Service( "archivaTaskScheduler#repository" )
063public class DefaultRepositoryArchivaTaskScheduler
064    implements RepositoryArchivaTaskScheduler, ConfigurationListener
065{
066    private Logger log = LoggerFactory.getLogger( getClass() );
067
068    @Inject
069    private Scheduler scheduler;
070
071    @Inject
072    private CronExpressionValidator cronValidator;
073
074    @Inject
075    @Named( value = "taskQueue#repository-scanning" )
076    private TaskQueue<RepositoryTask> repositoryScanningQueue;
077
078    @Inject
079    private ArchivaConfiguration archivaConfiguration;
080
081    @Inject
082    @Named( value = "repositoryStatisticsManager#default" )
083    private RepositoryStatisticsManager repositoryStatisticsManager;
084
085    /**
086     * TODO: could have multiple implementations
087     */
088    @Inject
089    private RepositorySessionFactory repositorySessionFactory;
090
091    private static final String REPOSITORY_SCAN_GROUP = "rg";
092
093    private static final String REPOSITORY_JOB = "rj";
094
095    private static final String REPOSITORY_JOB_TRIGGER = "rjt";
096
097    static final String TASK_QUEUE = "TASK_QUEUE";
098
099    static final String TASK_REPOSITORY = "TASK_REPOSITORY";
100
101    public static final String CRON_HOURLY = "0 0 * * * ?";
102
103    private Set<String> jobs = new HashSet<>();
104
105    private List<String> queuedRepos = new ArrayList<>();
106
107    @PostConstruct
108    public void startup()
109        throws ArchivaException
110    {
111
112        StopWatch stopWatch = new StopWatch();
113        stopWatch.start();
114
115        archivaConfiguration.addListener( this );
116
117        List<ManagedRepositoryConfiguration> repositories =
118            archivaConfiguration.getConfiguration().getManagedRepositories();
119
120        RepositorySession repositorySession = null;
121        try
122        {
123            repositorySession = repositorySessionFactory.createSession();
124        }
125        catch ( MetadataRepositoryException e )
126        {
127            e.printStackTrace( );
128        }
129        try
130        {
131            MetadataRepository metadataRepository = repositorySession.getRepository();
132            for ( ManagedRepositoryConfiguration repoConfig : repositories )
133            {
134                if ( repoConfig.isScanned() )
135                {
136                    try
137                    {
138                        scheduleRepositoryJobs( repoConfig );
139                    }
140                    catch ( SchedulerException e )
141                    {
142                        throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e );
143                    }
144
145                    try
146                    {
147                        if ( !isPreviouslyScanned( repoConfig, metadataRepository ) )
148                        {
149                            queueInitialRepoScan( repoConfig );
150                        }
151                    }
152                    catch ( MetadataRepositoryException e )
153                    {
154                        log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: {}",
155                                  e.getMessage(), e );
156                    }
157                }
158            }
159        }
160        finally
161        {
162            repositorySession.close();
163        }
164
165        stopWatch.stop();
166        log.info( "Time to initalize DefaultRepositoryArchivaTaskScheduler: {} ms", stopWatch.getTime() );
167    }
168
169
170    @PreDestroy
171    public void stop()
172        throws SchedulerException
173    {
174        for ( String job : jobs )
175        {
176            scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
177        }
178        jobs.clear();
179        queuedRepos.clear();
180
181    }
182
183    @SuppressWarnings( "unchecked" )
184    @Override
185    public boolean isProcessingRepositoryTask( String repositoryId )
186    {
187        synchronized ( repositoryScanningQueue )
188        {
189            List<RepositoryTask> queue = null;
190
191            try
192            {
193                queue = repositoryScanningQueue.getQueueSnapshot();
194            }
195            catch ( TaskQueueException e )
196            {
197                // not possible with plexus-taskqueue implementation, ignore
198            }
199
200            for ( RepositoryTask queuedTask : queue )
201            {
202                if ( queuedTask.getRepositoryId().equals( repositoryId ) )
203                {
204                    return true;
205                }
206            }
207            return false;
208        }
209    }
210
211    @Override
212    public boolean isProcessingRepositoryTask( RepositoryTask task )
213    {
214        synchronized ( repositoryScanningQueue )
215        {
216            List<RepositoryTask> queue = null;
217
218            try
219            {
220                queue = repositoryScanningQueue.getQueueSnapshot();
221            }
222            catch ( TaskQueueException e )
223            {
224                // not possible with plexus-taskqueue implementation, ignore
225            }
226
227            for ( RepositoryTask queuedTask : queue )
228            {
229                if ( task.equals( queuedTask ) )
230                {
231                    return true;
232                }
233            }
234            return false;
235        }
236    }
237
238    @Override
239    public void queueTask( RepositoryTask task )
240        throws TaskQueueException
241    {
242        synchronized ( repositoryScanningQueue )
243        {
244            if ( isProcessingRepositoryTask( task ) )
245            {
246                log.debug( "Repository task '{}' is already queued. Skipping task.", task );
247            }
248            else
249            {
250                // add check if the task is already queued if it is a file scan
251                repositoryScanningQueue.put( task );
252            }
253        }
254    }
255
256    @Override
257    public boolean unQueueTask( RepositoryTask task )
258        throws TaskQueueException
259    {
260        synchronized ( repositoryScanningQueue )
261        {
262            if ( !isProcessingRepositoryTask( task ) )
263            {
264                log.info( "cannot unqueue Repository task '{}' not already queued.", task );
265                return false;
266            }
267            else
268            {
269                return repositoryScanningQueue.remove( task );
270            }
271        }
272    }
273
274    @Override
275    public void configurationEvent( ConfigurationEvent event )
276    {
277        if ( event.getType() == ConfigurationEvent.SAVED )
278        {
279            for ( String job : jobs )
280            {
281                try
282                {
283                    scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
284                }
285                catch ( SchedulerException e )
286                {
287                    log.error( "Error restarting the repository scanning job after property change." );
288                }
289            }
290            jobs.clear();
291
292            List<ManagedRepositoryConfiguration> repositories =
293                archivaConfiguration.getConfiguration().getManagedRepositories();
294
295            for ( ManagedRepositoryConfiguration repoConfig : repositories )
296            {
297                if ( repoConfig.getRefreshCronExpression() != null )
298                {
299                    try
300                    {
301                        scheduleRepositoryJobs( repoConfig );
302                    }
303                    catch ( SchedulerException e )
304                    {
305                        log.error( "error restarting job: '{}' : '{}'", REPOSITORY_JOB, repoConfig.getId() );
306                    }
307                }
308            }
309        }
310    }
311
312    private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig,
313                                         MetadataRepository metadataRepository )
314        throws MetadataRepositoryException
315    {
316        long start = System.currentTimeMillis();
317
318        boolean res = repositoryStatisticsManager.hasStatistics( repoConfig.getId() );
319
320        long end = System.currentTimeMillis();
321
322        log.debug( "isPreviouslyScanned repo {} {} time: {} ms", repoConfig.getId(), res, ( end - start ) );
323
324        return res;
325    }
326
327    // MRM-848: Pre-configured repository initially appear to be empty
328    private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
329    {
330        String repoId = repoConfig.getId();
331        RepositoryTask task = new RepositoryTask();
332        task.setRepositoryId( repoId );
333
334        if ( !queuedRepos.contains( repoId ) )
335        {
336            log.info( "Repository [{}] is queued to be scanned as it hasn't been previously.", repoId );
337
338            try
339            {
340                queuedRepos.add( repoConfig.getId() );
341                this.queueTask( task );
342            }
343            catch ( TaskQueueException e )
344            {
345                log.error( "Error occurred while queueing repository [{}] task : {}", e.getMessage(), repoId );
346            }
347        }
348    }
349
350    private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
351        throws SchedulerException
352    {
353        if ( repoConfig.getRefreshCronExpression() == null )
354        {
355            log.warn( "Skipping job, no cron expression for {}", repoConfig.getId() );
356            return;
357        }
358
359        if ( !repoConfig.isScanned() )
360        {
361            log.warn( "Skipping job, repository scannable has been disabled for {}", repoConfig.getId() );
362            return;
363        }
364
365        // get the cron string for these database scanning jobs
366        String cronString = repoConfig.getRefreshCronExpression();
367
368        if ( !cronValidator.validate( cronString ) )
369        {
370            log.warn( "Cron expression [{}] for repository [{}] is invalid.  Defaulting to hourly.", cronString,
371                      repoConfig.getId() );
372            cronString = CRON_HOURLY;
373        }
374
375        JobDataMap jobDataMap = new JobDataMap( );
376        jobDataMap.put( TASK_QUEUE, repositoryScanningQueue );
377        jobDataMap.put( TASK_REPOSITORY, repoConfig.getId() );
378
379        // setup the unprocessed artifact job
380        JobDetail repositoryJob = JobBuilder.newJob( RepositoryTaskJob.class )
381                                        .withIdentity( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
382                                        .setJobData( jobDataMap )
383                                        .build();
384
385        try
386        {
387            CronTrigger trigger = TriggerBuilder.newTrigger()
388                    .withIdentity( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
389                    .withSchedule( CronScheduleBuilder.cronSchedule( cronString ) )
390                    .build();
391
392            jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
393            scheduler.scheduleJob( repositoryJob, trigger );
394        }
395        catch ( RuntimeException e )
396        {
397            log.error(
398                "ParseException in repository scanning cron expression, disabling repository scanning for '{}': {}",
399                repoConfig.getId(), e.getMessage() );
400        }
401
402    }
403}