This project has retired. For details please refer to its Attic page.
Source code
001package org.apache.archiva.scheduler.repository;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import org.apache.archiva.common.ArchivaException;
023import org.apache.archiva.configuration.ArchivaConfiguration;
024import org.apache.archiva.configuration.ConfigurationEvent;
025import org.apache.archiva.configuration.ConfigurationListener;
026import org.apache.archiva.configuration.ManagedRepositoryConfiguration;
027import org.apache.archiva.metadata.repository.MetadataRepository;
028import org.apache.archiva.metadata.repository.MetadataRepositoryException;
029import org.apache.archiva.metadata.repository.RepositorySession;
030import org.apache.archiva.metadata.repository.RepositorySessionFactory;
031import org.apache.archiva.metadata.repository.stats.RepositoryStatisticsManager;
032import org.apache.archiva.redback.components.scheduler.CronExpressionValidator;
033import org.apache.archiva.redback.components.scheduler.Scheduler;
034import org.apache.archiva.redback.components.taskqueue.TaskQueue;
035import org.apache.archiva.redback.components.taskqueue.TaskQueueException;
036import org.apache.archiva.scheduler.repository.model.RepositoryArchivaTaskScheduler;
037import org.apache.archiva.scheduler.repository.model.RepositoryTask;
038import org.apache.commons.lang.time.StopWatch;
039import org.quartz.CronScheduleBuilder;
040import org.quartz.CronTrigger;
041import org.quartz.JobBuilder;
042import org.quartz.JobDataMap;
043import org.quartz.JobDetail;
044import org.quartz.SchedulerException;
045import org.quartz.TriggerBuilder;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048import org.springframework.stereotype.Service;
049
050import javax.annotation.PostConstruct;
051import javax.annotation.PreDestroy;
052import javax.inject.Inject;
053import javax.inject.Named;
054import java.util.ArrayList;
055import java.util.HashSet;
056import java.util.List;
057import java.util.Set;
058
059/**
060 * Default implementation of a scheduling component for archiva.
061 */
062@Service( "archivaTaskScheduler#repository" )
063public class DefaultRepositoryArchivaTaskScheduler
064    implements RepositoryArchivaTaskScheduler, ConfigurationListener
065{
066    private Logger log = LoggerFactory.getLogger( getClass() );
067
068    @Inject
069    private Scheduler scheduler;
070
071    @Inject
072    private CronExpressionValidator cronValidator;
073
074    @Inject
075    @Named( value = "taskQueue#repository-scanning" )
076    private TaskQueue repositoryScanningQueue;
077
078    @Inject
079    private ArchivaConfiguration archivaConfiguration;
080
081    @Inject
082    @Named( value = "repositoryStatisticsManager#default" )
083    private RepositoryStatisticsManager repositoryStatisticsManager;
084
085    /**
086     * TODO: could have multiple implementations
087     */
088    @Inject
089    private RepositorySessionFactory repositorySessionFactory;
090
091    private static final String REPOSITORY_SCAN_GROUP = "rg";
092
093    private static final String REPOSITORY_JOB = "rj";
094
095    private static final String REPOSITORY_JOB_TRIGGER = "rjt";
096
097    static final String TASK_QUEUE = "TASK_QUEUE";
098
099    static final String TASK_REPOSITORY = "TASK_REPOSITORY";
100
101    public static final String CRON_HOURLY = "0 0 * * * ?";
102
103    private Set<String> jobs = new HashSet<>();
104
105    private List<String> queuedRepos = new ArrayList<>();
106
107    @PostConstruct
108    public void startup()
109        throws ArchivaException
110    {
111
112        StopWatch stopWatch = new StopWatch();
113        stopWatch.start();
114
115        archivaConfiguration.addListener( this );
116
117        List<ManagedRepositoryConfiguration> repositories =
118            archivaConfiguration.getConfiguration().getManagedRepositories();
119
120        RepositorySession repositorySession = repositorySessionFactory.createSession();
121        try
122        {
123            MetadataRepository metadataRepository = repositorySession.getRepository();
124            for ( ManagedRepositoryConfiguration repoConfig : repositories )
125            {
126                if ( repoConfig.isScanned() )
127                {
128                    try
129                    {
130                        scheduleRepositoryJobs( repoConfig );
131                    }
132                    catch ( SchedulerException e )
133                    {
134                        throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e );
135                    }
136
137                    try
138                    {
139                        if ( !isPreviouslyScanned( repoConfig, metadataRepository ) )
140                        {
141                            queueInitialRepoScan( repoConfig );
142                        }
143                    }
144                    catch ( MetadataRepositoryException e )
145                    {
146                        log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: {}",
147                                  e.getMessage(), e );
148                    }
149                }
150            }
151        }
152        finally
153        {
154            repositorySession.close();
155        }
156
157        stopWatch.stop();
158        log.info( "Time to initalize DefaultRepositoryArchivaTaskScheduler: {} ms", stopWatch.getTime() );
159    }
160
161
162    @PreDestroy
163    public void stop()
164        throws SchedulerException
165    {
166        for ( String job : jobs )
167        {
168            scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
169        }
170        jobs.clear();
171        queuedRepos.clear();
172
173    }
174
175    @SuppressWarnings( "unchecked" )
176    @Override
177    public boolean isProcessingRepositoryTask( String repositoryId )
178    {
179        synchronized ( repositoryScanningQueue )
180        {
181            List<RepositoryTask> queue = null;
182
183            try
184            {
185                queue = repositoryScanningQueue.getQueueSnapshot();
186            }
187            catch ( TaskQueueException e )
188            {
189                // not possible with plexus-taskqueue implementation, ignore
190            }
191
192            for ( RepositoryTask queuedTask : queue )
193            {
194                if ( queuedTask.getRepositoryId().equals( repositoryId ) )
195                {
196                    return true;
197                }
198            }
199            return false;
200        }
201    }
202
203    @Override
204    public boolean isProcessingRepositoryTask( RepositoryTask task )
205    {
206        synchronized ( repositoryScanningQueue )
207        {
208            List<RepositoryTask> queue = null;
209
210            try
211            {
212                queue = repositoryScanningQueue.getQueueSnapshot();
213            }
214            catch ( TaskQueueException e )
215            {
216                // not possible with plexus-taskqueue implementation, ignore
217            }
218
219            for ( RepositoryTask queuedTask : queue )
220            {
221                if ( task.equals( queuedTask ) )
222                {
223                    return true;
224                }
225            }
226            return false;
227        }
228    }
229
230    @Override
231    public void queueTask( RepositoryTask task )
232        throws TaskQueueException
233    {
234        synchronized ( repositoryScanningQueue )
235        {
236            if ( isProcessingRepositoryTask( task ) )
237            {
238                log.debug( "Repository task '{}' is already queued. Skipping task.", task );
239            }
240            else
241            {
242                // add check if the task is already queued if it is a file scan
243                repositoryScanningQueue.put( task );
244            }
245        }
246    }
247
248    @Override
249    public boolean unQueueTask( RepositoryTask task )
250        throws TaskQueueException
251    {
252        synchronized ( repositoryScanningQueue )
253        {
254            if ( !isProcessingRepositoryTask( task ) )
255            {
256                log.info( "cannot unqueue Repository task '{}' not already queued.", task );
257                return false;
258            }
259            else
260            {
261                return repositoryScanningQueue.remove( task );
262            }
263        }
264    }
265
266    @Override
267    public void configurationEvent( ConfigurationEvent event )
268    {
269        if ( event.getType() == ConfigurationEvent.SAVED )
270        {
271            for ( String job : jobs )
272            {
273                try
274                {
275                    scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
276                }
277                catch ( SchedulerException e )
278                {
279                    log.error( "Error restarting the repository scanning job after property change." );
280                }
281            }
282            jobs.clear();
283
284            List<ManagedRepositoryConfiguration> repositories =
285                archivaConfiguration.getConfiguration().getManagedRepositories();
286
287            for ( ManagedRepositoryConfiguration repoConfig : repositories )
288            {
289                if ( repoConfig.getRefreshCronExpression() != null )
290                {
291                    try
292                    {
293                        scheduleRepositoryJobs( repoConfig );
294                    }
295                    catch ( SchedulerException e )
296                    {
297                        log.error( "error restarting job: '{}' : '{}'", REPOSITORY_JOB, repoConfig.getId() );
298                    }
299                }
300            }
301        }
302    }
303
304    private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig,
305                                         MetadataRepository metadataRepository )
306        throws MetadataRepositoryException
307    {
308        long start = System.currentTimeMillis();
309
310        boolean res = repositoryStatisticsManager.hasStatistics( metadataRepository, repoConfig.getId() );
311
312        long end = System.currentTimeMillis();
313
314        log.debug( "isPreviouslyScanned repo {} {} time: {} ms", repoConfig.getId(), res, ( end - start ) );
315
316        return res;
317    }
318
319    // MRM-848: Pre-configured repository initially appear to be empty
320    private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
321    {
322        String repoId = repoConfig.getId();
323        RepositoryTask task = new RepositoryTask();
324        task.setRepositoryId( repoId );
325
326        if ( !queuedRepos.contains( repoId ) )
327        {
328            log.info( "Repository [{}] is queued to be scanned as it hasn't been previously.", repoId );
329
330            try
331            {
332                queuedRepos.add( repoConfig.getId() );
333                this.queueTask( task );
334            }
335            catch ( TaskQueueException e )
336            {
337                log.error( "Error occurred while queueing repository [{}] task : {}", e.getMessage(), repoId );
338            }
339        }
340    }
341
342    private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
343        throws SchedulerException
344    {
345        if ( repoConfig.getRefreshCronExpression() == null )
346        {
347            log.warn( "Skipping job, no cron expression for {}", repoConfig.getId() );
348            return;
349        }
350
351        if ( !repoConfig.isScanned() )
352        {
353            log.warn( "Skipping job, repository scannable has been disabled for {}", repoConfig.getId() );
354            return;
355        }
356
357        // get the cron string for these database scanning jobs
358        String cronString = repoConfig.getRefreshCronExpression();
359
360        if ( !cronValidator.validate( cronString ) )
361        {
362            log.warn( "Cron expression [{}] for repository [{}] is invalid.  Defaulting to hourly.", cronString,
363                      repoConfig.getId() );
364            cronString = CRON_HOURLY;
365        }
366
367        JobDataMap jobDataMap = new JobDataMap( );
368        jobDataMap.put( TASK_QUEUE, repositoryScanningQueue );
369        jobDataMap.put( TASK_REPOSITORY, repoConfig.getId() );
370
371        // setup the unprocessed artifact job
372        JobDetail repositoryJob = JobBuilder.newJob( RepositoryTaskJob.class )
373                                        .withIdentity( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
374                                        .setJobData( jobDataMap )
375                                        .build();
376
377        try
378        {
379            CronTrigger trigger = TriggerBuilder.newTrigger()
380                    .withIdentity( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
381                    .withSchedule( CronScheduleBuilder.cronSchedule( cronString ) )
382                    .build();
383
384            jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
385            scheduler.scheduleJob( repositoryJob, trigger );
386        }
387        catch ( RuntimeException e )
388        {
389            log.error(
390                "ParseException in repository scanning cron expression, disabling repository scanning for '': {}",
391                repoConfig.getId(), e.getMessage() );
392        }
393
394    }
395}