Commit 5e8d7d8b authored by David Read's avatar David Read
Browse files

Merge pull request #23 from ckan/update-test

"update-test" command added
parents 7b649300 cdbd984b
......@@ -34,6 +34,10 @@ class Archiver(CkanCommand):
- Archive all resources or just those belonging to a specific
package or group, if specified
paster archiver update-test [{package-name/id}|{group-name/id}]
- Does an archive in the current process i.e. avoiding Celery queue
so that you can test on the command-line more easily.
paster archiver clean-status
- Cleans the TaskStatus records that contain the status of each
archived resource, whether it was successful or not, with errors.
......@@ -102,6 +106,8 @@ class Archiver(CkanCommand):
if cmd == 'update':
self.update()
elif cmd == 'update-test':
self.update_test()
elif cmd == 'clean-status':
self.clean_status()
elif cmd == 'clean-cached-resources':
......@@ -134,12 +140,65 @@ class Archiver(CkanCommand):
self.log.error('Command %s not recognized' % (cmd,))
def update(self):
from ckan import model
from ckanext.archiver import lib
for pkg_or_res, is_pkg, num_resources_for_pkg, pkg_for_res in \
self._get_packages_and_resources_in_args(self.args[1:]):
if is_pkg:
package = pkg_or_res
self.log.info('Queuing dataset %s (%s resources)',
package.name, num_resources_for_pkg)
lib.create_archiver_package_task(package, self.options.queue)
time.sleep(0.1) # to try to avoid Redis getting overloaded
else:
resource = pkg_or_res
package = pkg_for_res
self.log.info('Queuing resource %s/%s',
package.name, resource.id)
lib.create_archiver_resource_task(resource, self.options.queue)
time.sleep(0.05) # to try to avoid Redis getting overloaded
self.log.info('Completed queueing')
def update_test(self):
from ckanext.archiver import tasks
# Prevent it loading config again
tasks.load_config = lambda x: None
log = logging.getLogger('ckanext.archiver')
for pkg_or_res, is_pkg, num_resources_for_pkg, pkg_for_res in \
self._get_packages_and_resources_in_args(self.args[1:]):
if is_pkg:
package = pkg_or_res
self.log.info('Archiving dataset %s (%s resources)',
package.name, num_resources_for_pkg)
tasks._update_package(package.id, self.options.queue, log)
else:
resource = pkg_or_res
package = pkg_for_res
self.log.info('Queuing resource %s/%s',
package.name, resource.id)
tasks._update_resource(resource.id, self.options.queue, log)
self.log.info('Completed test update')
def _get_packages_and_resources_in_args(self, args):
'''Given command-line arguments that specify one or more datasets or
resources, it generates a list of those packages & resources with some
basic properties.
Returns a tuple:
(pkg_or_res, is_pkg, num_resources_for_pkg, pkg_for_res)
When is_pkg=True:
pkg_or_res - package object
num_resources_for_pkg - number of resources it has
pkg_for_res - None
When is_pkg=False:
pkg_or_res - resource object
num_resources_for_pkg - None
pkg_for_res - package object relating to the given resource
'''
from ckan import model
packages = []
resources = []
if len(self.args) > 1:
for arg in self.args[1:]:
if args:
for arg in args:
# try arg as a group id/name
group = model.Group.get(arg)
if group:
......@@ -202,21 +261,14 @@ class Archiver(CkanCommand):
pkg_resources = \
[res for res in package.resources_all
if res.state == 'active']
self.log.info('Queuing dataset %s (%s resources)',
package.name, len(pkg_resources))
lib.create_archiver_package_task(package, self.options.queue)
time.sleep(0.1) # to try to avoid Redis getting overloaded
yield (package, True, len(pkg_resources), None)
for resource in resources:
if p.toolkit.check_ckan_version(max_version='2.2.99'):
package = resource.resource_group.package
else:
package = resource.package
self.log.info('Queuing resource %s/%s', package.name, resource.id)
lib.create_archiver_resource_task(resource, self.options.queue)
time.sleep(0.05) # to try to avoid Redis getting overloaded
self.log.info('Completed queueing')
yield (resource, False, None, package)
def view(self, package_ref=None):
from ckan import model
......
......@@ -82,6 +82,9 @@ def update_resource(ckan_ini_filepath, resource_id, queue='bulk'):
'''
Archive a resource.
'''
load_config(ckan_ini_filepath)
register_translator()
log = update_resource.get_logger()
log.info('Starting update_resource task: res_id=%r queue=%s', resource_id, queue)
......@@ -92,7 +95,7 @@ def update_resource(ckan_ini_filepath, resource_id, queue='bulk'):
# Also put try/except around it is easier to monitor ckan's log rather than
# celery's task status.
try:
result = _update_resource(ckan_ini_filepath, resource_id, queue)
result = _update_resource(resource_id, queue, log)
return result
except Exception, e:
if os.environ.get('DEBUG'):
......@@ -107,40 +110,46 @@ def update_package(ckan_ini_filepath, package_id, queue='bulk'):
'''
Archive a package.
'''
from ckan import model
get_action = toolkit.get_action
load_config(ckan_ini_filepath)
register_translator()
log = update_package.get_logger()
log.info('Starting update_package task: package_id=%r queue=%s', package_id, queue)
log.info('Starting update_package task: package_id=%r queue=%s',
package_id, queue)
num_archived = 0
# Do all work in a sub-routine since it can then be tested without celery.
# Also put try/except around it is easier to monitor ckan's log rather than
# celery's task status.
try:
context_ = {'model': model, 'ignore_auth': True, 'session': model.Session}
package = get_action('package_show')(context_, {'id': package_id})
for resource in package['resources']:
resource_id = resource['id']
res = _update_resource(ckan_ini_filepath, resource_id, queue)
if res:
num_archived += 1
_update_package(package_id, queue, log)
except Exception, e:
if os.environ.get('DEBUG'):
raise
# Any problem at all is logged and reraised so that celery can log it too
log.error('Error occurred during archiving package: %s\nPackage: %r %r',
e, package_id, package['name'] if 'package' in dir() else '')
# Any problem at all is logged and reraised so that celery can log it
# too
log.error('Error occurred during archiving package: %s\nPackage: %s',
e, package_id)
raise
def _update_package(package_id, queue, log):
from ckan import model
get_action = toolkit.get_action
num_archived = 0
context_ = {'model': model, 'ignore_auth': True, 'session': model.Session}
package = get_action('package_show')(context_, {'id': package_id})
for resource in package['resources']:
resource_id = resource['id']
res = _update_resource(resource_id, queue, log)
if res:
num_archived += 1
if num_archived > 0:
log.info("Notifying package as %d items were archived", num_archived)
notify_package(package, queue, ckan_ini_filepath)
notify_package(package, queue)
else:
log.info("Not notifying package as 0 items were archived")
......@@ -167,7 +176,7 @@ def _update_search_index(package_id, log):
log.info('Search indexed %s', package['name'])
def _update_resource(ckan_ini_filepath, resource_id, queue):
def _update_resource(resource_id, queue, log):
"""
Link check and archive the given resource.
If successful, updates the archival table with the cache_url & hash etc.
......@@ -189,11 +198,6 @@ def _update_resource(ckan_ini_filepath, resource_id, queue):
}
If not successful, returns None.
"""
log = update_resource.get_logger()
load_config(ckan_ini_filepath)
register_translator()
from ckan import model
from pylons import config
from ckan.plugins import toolkit
......@@ -489,7 +493,7 @@ def notify_resource(resource, queue, cache_filepath):
cache_filepath=cache_filepath)
def notify_package(package, queue, cache_filepath):
def notify_package(package, queue):
'''
Broadcasts an IPipe notification that a package archival has taken place
(or at least the archival object is changed somehow). e.g.
......@@ -497,8 +501,7 @@ def notify_package(package, queue, cache_filepath):
'''
archiver_interfaces.IPipe.send_data('package-archived',
package_id=package['id'],
queue=queue,
cache_filepath=cache_filepath)
queue=queue)
def get_plugins_waiting_on_ipipe():
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment