ETag checking
NB Previously you needed both ckanext-archiver and ckanext-qa to see the broken
python ckanext/archiver/bin/ --write production.ini
Migrations post 2.0
Over time it is possible that the database structure will change. In these cases you can use the migrate command to update the database schema.
paster --plugin=ckanext-archiver archiver migrate -c <path to CKAN ini file>
This is only necessary if you update ckanext-archiver and already have the database tables in place.
Installing a Celery queue backend
import ckan.plugins as p
from pylons import config
from ckan.lib.cli import CkanCommand
from ckan.lib.helpers import OrderedDict
REQUESTS_HEADER = {'content-type': 'application/json'}
Running this moves them to the new locations and updates the
Running this moves them to the new locations and updates the
cache_url on each resource to reflect the new location.
paster archiver migrate
- Updates the database schema to include new fields.
# paster archiver clean-files
# paster archiver clean-files
elif cmd == 'migrate-archive-dirs':
elif cmd == 'migrate':
self.log.error('Command %s not recognized' % (cmd,))
self.log.error('Command %s not recognized' % (cmd,))
print " No cache_filepath: {0}".format(not_cached_deleted)
print " cache_filepath not on disk: {0}".format(file_not_found_deleted)
def migrate(self):
""" Adds any missing columns to the database table for Archival by
checking the schema and adding those that are missing.
If you wish to add a column, add the column name and sql
statement to MIGRATIONS_ADD which will check that the column is
not present before running the query.
If you wish to modify or delete a column, add the column name and
query to the MIGRATIONS_MODIFY which only runs if the column
does exist.
from ckan import model
MIGRATIONS_ADD = OrderedDict({
"etag": "ALTER TABLE archival ADD COLUMN etag character varying",
"last_modified": "ALTER TABLE archival ADD COLUMN last_modified character varying"
q = "select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = 'archival';"
current_cols = list([m[0] for m in model.Session.execute(q)])
for k, v in MIGRATIONS_ADD.iteritems():
if not k in current_cols:"Adding column '{0}'".format(k))"Executing '{0}'".format(v))
for k, v in MIGRATIONS_MODIFY.iteritems():
if k in current_cols:"Removing column '{0}'".format(k))"Executing '{0}'".format(v))
model.Session.commit()"Migrations complete")
def migrate_archive_dirs(self):
from ckan import model
from ckan.logic import get_action
class Status:
not_broken = {
# is_broken = False
0: 'Archived successfully',
1: 'Content has not changed',
broken = {
# is_broken = True
def is_ok(cls, status_id):
def is_ok(cls, status_id):
return status_id == 0
return status_id in [0, 1]
broken_enum = {True: 'Broken',
None: 'Not sure if broken',
size = Column(types.BigInteger, default=0)
size = Column(types.BigInteger, default=0)
mimetype = Column(types.UnicodeText)
hash = Column(types.UnicodeText)
etag = Column(types.UnicodeText)
last_modified = Column(types.UnicodeText)
# History
first_failure = Column(types.DateTime)
# IActions
# IActions
def get_actions(self):
return dict((name, function) for name, function
in action.__dict__.items()
if callable(function))
return {
'archiver_resource_show': action.archiver_resource_show,
'archiver_dataset_show': action.archiver_dataset_show,
# IAuthFunctions
def get_auth_functions(self):
return dict((name, function) for name, function
in auth.__dict__.items()
if callable(function))
return {
'archiver_resource_show': auth.archiver_resource_show,
'archiver_dataset_show': auth.archiver_dataset_show,
# ITemplateHelpers
# ITemplateHelpers
class ChooseNotToDownload(ArchiverErrorAfterDownloadStarted):
class NotChanged(ArchiverErrorAfterDownloadStarted):
class LinkCheckerError(ArchiverError):
class LinkInvalidError(LinkCheckerError):
class LinkCheckerError(ArchiverError):
log = update_package.get_logger()'Starting update_package task: package_id=%r queue=%s', package_id, queue)
num_archived = 0
# Do all work in a sub-routine since it can then be tested without celery.
# Also put try/except around it is easier to monitor ckan's log rather than
# celery's task status.
for resource in package['resources']:
for resource in package['resources']:
resource_id = resource['id']
_update_resource(ckan_ini_filepath, resource_id, queue)
res = _update_resource(ckan_ini_filepath, resource_id, queue)
if res:
num_archived += 1
except Exception, e:
if os.environ.get('DEBUG'):
e, package_id, package['name'] if 'package' in dir() else '')
e, package_id, package['name'] if 'package' in dir() else '')
notify_package(package, queue, ckan_ini_filepath)
if num_archived > 0:"Notifying package as %d items were archived", num_archived)
notify_package(package, queue, ckan_ini_filepath)
else:"Not notifying package as 0 items were archived")
# Refresh the index for this dataset, so that it contains the latest
# archive info. However skip it if there are downstream plugins that will
archive_result.get('cache_filename') if archive_result else None)
archive_result.get('cache_filename') if archive_result else None)
# Download
try_as_api = False
requires_archive = True"Attempting to download resource: %s" % resource['url'])
download_result = None
from ckanext.archiver.model import Status
from ckanext.archiver.model import Status, Archival
download_status_id = Status.by_text('Archived successfully')
context = {
'site_url': config.get('ckan.site_url_internally') or config['ckan.site_url'],
'cache_url_root': config.get('ckanext-archiver.cache_url_root'),
'previous': Archival.get_for_resource(resource_id)
download_result = download(context, resource)
except NotChanged, e:
download_status_id = Status.by_text('Content has not changed')
try_as_api = False
requires_archive = False
except LinkInvalidError, e:
download_status_id = Status.by_text('URL invalid')
try_as_api = False
try_as_api = False
_save(download_status_id, e, resource, *extra_args)
if not requires_archive:
# We don't need to archive if the remote content has not changed
return None
# Archival'Attempting to archive resource')
# Archival
# Success
_save(Status.by_text('Archived successfully'), '', resource,
download_result['url_redirected_to'], download_result, archive_result)
download_result['url_redirected_to'], download_result, archive_result)
# The return value is only used by tests. Serialized for Celery.
return json.dumps(dict(download_result, **archive_result))
......@@ -320,6 +342,12 @@ def download(context, resource, url_timeout=30,
res = requests_wrapper(log, method_func, url, timeout=url_timeout,
stream=True, headers=headers)
url_redirected_to = res.url if url != res.url else None
if context.get('previous') and ('etag' in res.headers):
if context.get('previous').etag == res.headers['etag']:"ETAG matches, not downloading content")
raise NotChanged("etag suggests content has not changed")
if not res.ok: # i.e. 404 or something
raise DownloadError('Server reported status error: %s %s' %
(res.status_code, res.reason),
archival.size = download_result['size']
archival.size = download_result['size']
archival.mimetype = download_result['mimetype']
archival.hash = download_result['hash']
archival.etag = download_result['headers'].get('etag')
archival.last_modified = download_result['headers'].get('last-modified')
# History
if archival.is_broken is False:
{% endfor %}
{% endfor %}
{% endif %}
{% if c.options['organization'] != None %}
{% else %}
<li>Broken datasets: {{['num_broken_packages'] }} / {{['num_packages'] }} ({{'broken_package_percent') }}%)</li>
<li>Broken links: {{['num_broken_resources'] }} / {{['num_resources'] }} ({{'broken_resource_percent') }}%)</li>
