Commit 107cef68 authored by David Read's avatar David Read
Browse files

Merge pull request #10 from datagovuk/7-only-download-if-necessary

Only download files that have changed
parents 5bafe19c 4dfdc420
......@@ -142,6 +142,17 @@ The Archiver can be used in two ways:
paster archiver --help
Migrations
----------
Over time it is possible that the database structure will change. In these cases you can use the migrate command to update the database schema.
::
paster --plugin=ckanext-archiver archiver migrate -c <path to CKAN ini file>
This is only necessary if you update ckanext-archiver and already have the database tables in place.
Testing
-------
......
......@@ -9,6 +9,7 @@ import itertools
from pylons import config
from ckan.lib.cli import CkanCommand
from ckan.lib.helpers import OrderedDict
REQUESTS_HEADER = {'content-type': 'application/json'}
......@@ -59,6 +60,10 @@ class Archiver(CkanCommand):
{2-chars-of-resource-id}/{resource-id}/filename.csv
Running this moves them to the new locations and updates the
cache_url on each resource to reflect the new location.
paster archiver migrate
- Updates the database schema to include new fields.
'''
# TODO
# paster archiver clean-files
......@@ -118,6 +123,8 @@ class Archiver(CkanCommand):
self.log.info('Archiver tables are initialized')
elif cmd == 'migrate-archive-dirs':
self.migrate_archive_dirs()
elif cmd == 'migrate':
self.migrate()
else:
self.log.error('Command %s not recognized' % (cmd,))
......@@ -352,6 +359,46 @@ class Archiver(CkanCommand):
print " No cache_filepath: {0}".format(not_cached_deleted)
print " cache_filepath not on disk: {0}".format(file_not_found_deleted)
def migrate(self):
""" Adds any missing columns to the database table for Archival by
checking the schema and adding those that are missing.
If you wish to add a column, add the column name and sql
statement to MIGRATIONS_ADD which will check that the column is
not present before running the query.
If you wish to modify or delete a column, add the column name and
query to the MIGRATIONS_MODIFY which only runs if the column
does exist.
"""
from ckan import model
MIGRATIONS_ADD = OrderedDict({
"etag": "ALTER TABLE archival ADD COLUMN etag character varying",
"last_modified": "ALTER TABLE archival ADD COLUMN last_modified character varying"
})
MIGRATIONS_MODIFY = OrderedDict({
})
q = "select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = 'archival';"
current_cols = list([m[0] for m in model.Session.execute(q)])
for k, v in MIGRATIONS_ADD.iteritems():
if not k in current_cols:
self.log.info(u"Adding column '{0}'".format(k))
self.log.info(u"Executing '{0}'".format(v))
model.Session.execute(v)
model.Session.commit()
for k, v in MIGRATIONS_MODIFY.iteritems():
if k in current_cols:
self.log.info(u"Removing column '{0}'".format(k))
self.log.info(u"Executing '{0}'".format(v))
model.Session.execute(v)
model.Session.commit()
self.log.info("Migrations complete")
def migrate_archive_dirs(self):
from ckan import model
from ckan.logic import get_action
......
......@@ -29,6 +29,7 @@ class Status:
not_broken = {
# is_broken = False
0: 'Archived successfully',
1: 'Content has not changed',
}
broken = {
# is_broken = True
......@@ -72,7 +73,7 @@ class Status:
@classmethod
def is_ok(cls, status_id):
return status_id == 0
return status_id in [0, 1]
class Archival(Base):
......@@ -99,6 +100,8 @@ class Archival(Base):
size = Column(types.BigInteger, default=0)
mimetype = Column(types.UnicodeText)
hash = Column(types.UnicodeText)
etag = Column(types.UnicodeText)
last_modified = Column(types.UnicodeText)
# History
first_failure = Column(types.DateTime)
......
......@@ -64,6 +64,8 @@ class ArchiveError(ArchiverErrorAfterDownloadStarted):
pass
class ChooseNotToDownload(ArchiverErrorAfterDownloadStarted):
pass
class NotChanged(ArchiverErrorAfterDownloadStarted):
pass
class LinkCheckerError(ArchiverError):
pass
class LinkInvalidError(LinkCheckerError):
......@@ -115,6 +117,7 @@ def update_package(ckan_ini_filepath, package_id, queue='bulk'):
log = update_package.get_logger()
log.info('Starting update_package task: package_id=%r queue=%s', package_id, queue)
num_archived = 0
# Do all work in a sub-routine since it can then be tested without celery.
# Also put try/except around it is easier to monitor ckan's log rather than
# celery's task status.
......@@ -124,7 +127,9 @@ def update_package(ckan_ini_filepath, package_id, queue='bulk'):
for resource in package['resources']:
resource_id = resource['id']
_update_resource(ckan_ini_filepath, resource_id, queue)
res = _update_resource(ckan_ini_filepath, resource_id, queue)
if res:
num_archived += 1
except Exception, e:
if os.environ.get('DEBUG'):
raise
......@@ -133,7 +138,11 @@ def update_package(ckan_ini_filepath, package_id, queue='bulk'):
e, package_id, package['name'] if package in dir() else '')
raise
notify_package(package, queue, ckan_ini_filepath)
if num_archived > 0:
log.info("Notifying package as %d items were archived", num_archived)
notify_package(package, queue, ckan_ini_filepath)
else:
log.info("Not notifying package as 0 items were archived")
def _update_resource(ckan_ini_filepath, resource_id, queue):
......@@ -188,16 +197,24 @@ def _update_resource(ckan_ini_filepath, resource_id, queue):
archive_result.get('cache_filename') if archive_result else None)
# Download
try_as_api = False
requires_archive = True
log.info("Attempting to download resource: %s" % resource['url'])
download_result = None
from ckanext.archiver.model import Status
from ckanext.archiver.model import Status, Archival
download_status_id = Status.by_text('Archived successfully')
context = {
'site_url': config.get('ckan.site_url_internally') or config['ckan.site_url'],
'cache_url_root': config.get('ckan.cache_url_root'),
'previous': Archival.get_for_resource(resource_id)
}
try:
download_result = download(context, resource)
except NotChanged, e:
download_status_id = Status.by_text('Content has not changed')
try_as_api = False
requires_archive = False
except LinkInvalidError, e:
download_status_id = Status.by_text('URL invalid')
try_as_api = False
......@@ -234,22 +251,26 @@ def _update_resource(ckan_ini_filepath, resource_id, queue):
_save(download_status_id, e, resource, *extra_args)
return
# Archival
log.info('Attempting to archive resource')
try:
archive_result = archive_resource(context, resource, log, download_result)
except ArchiveError, e:
log.error('System error during archival: %r, %r', e, e.args)
_save(Status.by_text('System error during archival'), e, resource, download_result['url_redirected_to'])
return
# Archival. We don't need this if the remote content has not changed
if requires_archive:
log.info('Attempting to archive resource')
try:
archive_result = archive_resource(context, resource, log, download_result)
except ArchiveError, e:
log.error('System error during archival: %r, %r', e, e.args)
_save(Status.by_text('System error during archival'), e, resource, download_result['url_redirected_to'])
return
# Success
_save(Status.by_text('Archived successfully'), '', resource,
download_result['url_redirected_to'], download_result, archive_result)
# Success
_save(Status.by_text('Archived successfully'), '', resource,
download_result['url_redirected_to'], download_result, archive_result)
# The return value is only used by tests. Serialized for Celery.
print download_result
print archive_result
return json.dumps(dict(download_result, **archive_result))
# The return value is only used by tests. Serialized for Celery.
print download_result
print archive_result
return json.dumps(dict(download_result, **archive_result))
return None
def download(context, resource, url_timeout=30,
......@@ -292,6 +313,12 @@ def download(context, resource, url_timeout=30,
res = convert_requests_exceptions(log, method_func, url, timeout=url_timeout,
stream=True)
url_redirected_to = res.url if url != res.url else None
if 'previous' in context:
if 'etag' in res.headers and context['previous'].etag == res.headers['etag']:
log.info("ETAG matches, not downloading content")
raise NotChanged("etag suggests content has not changed")
if not res.ok: # i.e. 404 or something
raise DownloadError('Server reported status error: %s %s' %
(res.status_code, res.reason),
......@@ -560,6 +587,8 @@ def save_archival(resource, status_id, reason, url_redirected_to,
archival.size = download_result['size']
archival.mimetype = download_result['mimetype']
archival.hash = download_result['hash']
archival.etag = download_result['headers'].get('etag')
archival.last_modified = download_result['headers'].get('last-modified')
# History
if archival.is_broken is False:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment