Commit 427a8ddb authored by Marko Kuder's avatar Marko Kuder
Browse files

added force option for updating archival of dataset

parent 81584f3f
......@@ -34,6 +34,9 @@ class Archiver(CkanCommand):
paster archiver update [{package-name/id}|{group-name/id}]
- Archive all resources or just those belonging to a specific
package or group, if specified
- add --force option to ignore if target resources have not changed
(their ETag is the same) and download them again
paster archiver update-test [{package-name/id}|{group-name/id}]
- Does an archive in the current process i.e. avoiding Celery queue
......@@ -97,6 +100,10 @@ class Archiver(CkanCommand):
action='store',
dest='queue',
help='Send to a particular queue')
self.parser.add_option('-f', '--force',
action='store',
dest='force',
help='Force download even if ETag is the same')
def command(self):
"""
......@@ -153,20 +160,22 @@ class Archiver(CkanCommand):
def update(self):
from ckanext.archiver import lib
if not self.options.force:
self.options.force=False
for pkg_or_res, is_pkg, num_resources_for_pkg, pkg_for_res in \
self._get_packages_and_resources_in_args(self.args[1:]):
if is_pkg:
package = pkg_or_res
self.log.info('Queuing dataset %s (%s resources)',
package.name, num_resources_for_pkg)
lib.create_archiver_package_task(package, self.options.queue)
lib.create_archiver_package_task(package, self.options.queue, self.options.force)
time.sleep(0.1) # to try to avoid Redis getting overloaded
else:
resource = pkg_or_res
package = pkg_for_res
self.log.info('Queuing resource %s/%s',
package.name, resource.id)
lib.create_archiver_resource_task(resource, self.options.queue)
lib.create_archiver_resource_task(resource, self.options.queue, self.options.force)
time.sleep(0.05) # to try to avoid Redis getting overloaded
self.log.info('Completed queueing')
......@@ -175,19 +184,21 @@ class Archiver(CkanCommand):
# Prevent it loading config again
tasks.load_config = lambda x: None
log = logging.getLogger('ckanext.archiver')
if not self.options.force:
self.options.force=False
for pkg_or_res, is_pkg, num_resources_for_pkg, pkg_for_res in \
self._get_packages_and_resources_in_args(self.args[1:]):
if is_pkg:
package = pkg_or_res
self.log.info('Archiving dataset %s (%s resources)',
package.name, num_resources_for_pkg)
tasks._update_package(package.id, self.options.queue, log)
tasks._update_package(package.id, self.options.queue, log, self.options.force)
else:
resource = pkg_or_res
package = pkg_for_res
self.log.info('Queuing resource %s/%s',
package.name, resource.id)
tasks._update_resource(resource.id, self.options.queue, log)
tasks._update_resource(resource.id, self.options.queue, log, self.options.force)
self.log.info('Completed test update')
def _get_packages_and_resources_in_args(self, args):
......
......@@ -8,7 +8,7 @@ from ckan.lib.celery_app import celery
log = logging.getLogger(__name__)
def create_archiver_resource_task(resource, queue):
def create_archiver_resource_task(resource, queue, force):
from pylons import config
if p.toolkit.check_ckan_version(max_version='2.2.99'):
# earlier CKANs had ResourceGroup
......@@ -18,18 +18,18 @@ def create_archiver_resource_task(resource, queue):
task_id = '%s/%s/%s' % (package.name, resource.id[:4], make_uuid()[:4])
ckan_ini_filepath = os.path.abspath(config['__file__'])
celery.send_task('archiver.update_resource',
args=[ckan_ini_filepath, resource.id, queue],
args=[ckan_ini_filepath, resource.id, queue, force],
task_id=task_id, queue=queue)
log.debug('Archival of resource put into celery queue %s: %s/%s url=%r',
queue, package.name, resource.id, resource.url)
def create_archiver_package_task(package, queue):
def create_archiver_package_task(package, queue, force):
from pylons import config
task_id = '%s/%s' % (package.name, make_uuid()[:4])
ckan_ini_filepath = os.path.abspath(config['__file__'])
celery.send_task('archiver.update_package',
args=[ckan_ini_filepath, package.id, queue],
args=[ckan_ini_filepath, package.id, queue, force],
task_id=task_id, queue=queue)
log.debug('Archival of package put into celery queue %s: %s',
queue, package.name)
......
......@@ -88,7 +88,7 @@ class CkanError(ArchiverError):
@celery.task(name="archiver.update_resource")
def update_resource(ckan_ini_filepath, resource_id, queue='bulk'):
def update_resource(ckan_ini_filepath, resource_id, queue='bulk', force=False):
'''
Archive a resource.
'''
......@@ -105,7 +105,7 @@ def update_resource(ckan_ini_filepath, resource_id, queue='bulk'):
# Also put try/except around it is easier to monitor ckan's log rather than
# celery's task status.
try:
result = _update_resource(resource_id, queue, log)
result = _update_resource(resource_id, queue, log, force)
return result
except Exception, e:
if os.environ.get('DEBUG'):
......@@ -116,7 +116,7 @@ def update_resource(ckan_ini_filepath, resource_id, queue='bulk'):
raise
@celery.task(name="archiver.update_package")
def update_package(ckan_ini_filepath, package_id, queue='bulk'):
def update_package(ckan_ini_filepath, package_id, queue='bulk', force=False):
'''
Archive a package.
'''
......@@ -131,7 +131,7 @@ def update_package(ckan_ini_filepath, package_id, queue='bulk'):
# Also put try/except around it is easier to monitor ckan's log rather than
# celery's task status.
try:
_update_package(package_id, queue, log)
_update_package(package_id, queue, log, force)
except Exception, e:
if os.environ.get('DEBUG'):
raise
......@@ -142,7 +142,7 @@ def update_package(ckan_ini_filepath, package_id, queue='bulk'):
raise
def _update_package(package_id, queue, log):
def _update_package(package_id, queue, log, force):
from ckan import model
get_action = toolkit.get_action
......@@ -153,7 +153,7 @@ def _update_package(package_id, queue, log):
for resource in package['resources']:
resource_id = resource['id']
res = _update_resource(resource_id, queue, log)
res = _update_resource(resource_id, queue, log, force)
if res:
num_archived += 1
......@@ -186,7 +186,7 @@ def _update_search_index(package_id, log):
log.info('Search indexed %s', package['name'])
def _update_resource(resource_id, queue, log):
def _update_resource(resource_id, queue, log, force):
"""
Link check and archive the given resource.
If successful, updates the archival table with the cache_url & hash etc.
......@@ -298,9 +298,13 @@ def _update_resource(resource_id, queue, log):
download_status_id = Status.by_text('Archived successfully')
context = {
'site_url': config.get('ckan.site_url_internally') or config['ckan.site_url'],
'cache_url_root': config.get('ckanext-archiver.cache_url_root'),
'previous': Archival.get_for_resource(resource_id)
'cache_url_root': config.get('ckanext-archiver.cache_url_root')
}
if not force:
context['previous'] = Archival.get_for_resource(resource_id)
else:
log.info("Ignoring previous archivals and forcing download")
try:
download_result = download(context, resource)
except NotChanged, e:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment