tasks.py 36.7 KB
Newer Older
1
import os
2
3
import hashlib
import httplib
4
5
import requests
import json
6
7
import urllib
import urlparse
8
import tempfile
9
import shutil
10
import datetime
11
import copy
12
import mimetypes
13
import re
14
import time
15

16
17
from requests.packages import urllib3

kindly's avatar
kindly committed
18
from ckan.lib.celery_app import celery
19
from ckan.lib import uploader
20
from ckan import plugins as p
21
from ckanext.archiver import interfaces as archiver_interfaces
22

23
24
toolkit = p.toolkit

25
26
ALLOWED_SCHEMES = set(('http', 'https', 'ftp'))

27
USER_AGENT = 'ckanext-archiver'
Ross Jones's avatar
Ross Jones committed
28
29


30
def load_config(ckan_ini_filepath):
Ross Jones's avatar
Ross Jones committed
31
    import paste.deploy
32
    config_abs_path = os.path.abspath(ckan_ini_filepath)
Ross Jones's avatar
Ross Jones committed
33
34
35
    conf = paste.deploy.appconfig('config:' + config_abs_path)
    import ckan
    ckan.config.environment.load_environment(conf.global_conf,
36
                                             conf.local_conf)
Ross Jones's avatar
Ross Jones committed
37
38
39
40
41
42
43
44
45


def register_translator():
    # Register a translator in this thread so that
    # the _() functions in logic layer can work
    from paste.registry import Registry
    from pylons import translator
    from ckan.lib.cli import MockTranslator
    global registry
46
    registry = Registry()
Ross Jones's avatar
Ross Jones committed
47
48
    registry.prepare()
    global translator_obj
49
    translator_obj = MockTranslator()
Ross Jones's avatar
Ross Jones committed
50
    registry.register(translator, translator_obj)
51

52
53
class ArchiverError(Exception):
    pass
54
class ArchiverErrorBeforeDownloadStarted(ArchiverError):
55
    pass
56
class DownloadException(ArchiverError):
57
    pass
58
59
60
class ArchiverErrorAfterDownloadStarted(ArchiverError):
    def __init__(self, msg, url_redirected_to=None):
        super(ArchiverError, self).__init__(msg)
61
        self.url_redirected_to = url_redirected_to
62
63
64
65
66
class DownloadError(ArchiverErrorAfterDownloadStarted):
    pass
class ArchiveError(ArchiverErrorAfterDownloadStarted):
    pass
class ChooseNotToDownload(ArchiverErrorAfterDownloadStarted):
67
    pass
68
69
class NotChanged(ArchiverErrorAfterDownloadStarted):
    pass
70
class LinkCheckerError(ArchiverError):
71
    pass
72
73
74
75
class LinkInvalidError(LinkCheckerError):
    pass
class LinkHeadRequestError(LinkCheckerError):
    pass
76
77
class LinkHeadMethodNotSupported(LinkCheckerError):
    pass
78
class CkanError(ArchiverError):
79
80
81
    pass


82
83
@celery.task(name="archiver.update_resource")
def update_resource(ckan_ini_filepath, resource_id, queue='bulk'):
84
    '''
85
    Archive a resource.
86
    '''
87
88
89
    load_config(ckan_ini_filepath)
    register_translator()

90
91
    log = update_resource.get_logger()
    log.info('Starting update_resource task: res_id=%r queue=%s', resource_id, queue)
Ross Jones's avatar
Ross Jones committed
92

93
94
95
    # HACK because of race condition #1481
    time.sleep(2)

96
    # Do all work in a sub-routine since it can then be tested without celery.
97
98
    # Also put try/except around it is easier to monitor ckan's log rather than
    # celery's task status.
99
    try:
100
        result = _update_resource(resource_id, queue, log)
101
        return result
102
    except Exception, e:
103
104
        if os.environ.get('DEBUG'):
            raise
105
        # Any problem at all is logged and reraised so that celery can log it too
106
        log.error('Error occurred during archiving resource: %s\nResource: %r',
107
                  e, resource_id)
108
109
        raise

110
@celery.task(name="archiver.update_package")
111
def update_package(ckan_ini_filepath, package_id, queue='bulk'):
112
    '''
113
    Archive a package.
114
    '''
115
116
117
    load_config(ckan_ini_filepath)
    register_translator()

118
    log = update_package.get_logger()
119
120
    log.info('Starting update_package task: package_id=%r queue=%s',
             package_id, queue)
121

122
123
124
    # Do all work in a sub-routine since it can then be tested without celery.
    # Also put try/except around it is easier to monitor ckan's log rather than
    # celery's task status.
125
    try:
126
        _update_package(package_id, queue, log)
127
128
129
    except Exception, e:
        if os.environ.get('DEBUG'):
            raise
130
131
132
133
        # Any problem at all is logged and reraised so that celery can log it
        # too
        log.error('Error occurred during archiving package: %s\nPackage: %s',
                  e, package_id)
134
135
        raise

136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151

def _update_package(package_id, queue, log):
    from ckan import model

    get_action = toolkit.get_action

    num_archived = 0
    context_ = {'model': model, 'ignore_auth': True, 'session': model.Session}
    package = get_action('package_show')(context_, {'id': package_id})

    for resource in package['resources']:
        resource_id = resource['id']
        res = _update_resource(resource_id, queue, log)
        if res:
            num_archived += 1

152
153
    if num_archived > 0:
        log.info("Notifying package as %d items were archived", num_archived)
154
        notify_package(package, queue)
155
156
    else:
        log.info("Not notifying package as 0 items were archived")
157

158
    # Refresh the index for this dataset, so that it contains the latest
159
160
161
162
163
164
    # archive info. However skip it if there are downstream plugins that will
    # do this anyway, since it is an expensive step to duplicate.
    if 'qa' not in get_plugins_waiting_on_ipipe():
        _update_search_index(package_id, log)
    else:
        log.info('Search index skipped %s', package['name'])
165
166
167
168
169
170
171
172


def _update_search_index(package_id, log):
    '''
    Tells CKAN to update its search index for a given package.
    '''
    from ckan import model
    from ckan.lib.search.index import PackageSearchIndex
173
174
175
    package_index = PackageSearchIndex()
    context_ = {'model': model, 'ignore_auth': True, 'session': model.Session,
                'use_cache': False, 'validate': False}
176
    package = toolkit.get_action('package_show')(context_, {'id': package_id})
177
    package_index.index_package(package, defer_commit=False)
178
    log.info('Search indexed %s', package['name'])
179

180

181
def _update_resource(resource_id, queue, log):
182
183
    """
    Link check and archive the given resource.
184
    If successful, updates the archival table with the cache_url & hash etc.
185
    Finally, a notification of the archival is broadcast.
186

187
188
    Params:
      resource - resource dict
189
      queue - name of the celery queue
190

191
192
193
194
195
196
    Should only raise on a fundamental error:
      ArchiverError
      CkanError

    Returns a JSON dict, ready to be returned from the celery task giving a
    success status:
197
198
199
200
        {
            'resource': the updated resource dict,
            'file_path': path to archived file (if archive successful), or None
        }
201
    If not successful, returns None.
202
    """
203
204
    from ckan import model
    from pylons import config
205
    from ckan.plugins import toolkit
206
    from ckanext.archiver import default_settings as settings
207
    from ckanext.archiver.model import Status, Archival
208
209

    get_action = toolkit.get_action
210
211
212
213

    assert is_id(resource_id), resource_id
    context_ = {'model': model, 'ignore_auth': True, 'session': model.Session}
    resource = get_action('resource_show')(context_, {'id': resource_id})
214
215

    if not os.path.exists(settings.ARCHIVE_DIR):
216
        log.info("Creating archive directory: %s" % settings.ARCHIVE_DIR)
217
218
        os.mkdir(settings.ARCHIVE_DIR)

219
220
221
222
223
224
225
    def _save(status_id, exception, resource, url_redirected_to=None,
              download_result=None, archive_result=None):
        reason = '%s' % exception
        save_archival(resource, status_id,
                      reason, url_redirected_to,
                      download_result, archive_result,
                      log)
David Read's avatar
David Read committed
226
227
228
229
        notify_resource(
            resource,
            queue,
            archive_result.get('cache_filename') if archive_result else None)
230
231
232
233
234
        
    def _save_if_status_changed(status_id, reason, resource):
        cache_filename = update_archival_status(resource, status_id, reason, log)
        if cache_filename:  #if cache_filename is returned, status was updated
            notify_resource(resource, queue, cache_filename)
235

236
    # Download
237
238
    try_as_api = False
    requires_archive = True
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
    
    url = resource['url']
    if not url.startswith('http'):
        url = config['ckan.site_url'].rstrip('/') + url

    hosted_externally = not url.startswith(config['ckan.site_url'])
    # if resource.get('resource_type') == 'file.upload' and not hosted_externally:
    if resource.get('url_type') == 'upload' and not hosted_externally:
        log.info("Won't attemp to archive resource uploaded locally: %s" % resource['url'])

        upload = uploader.ResourceUpload(resource)
        filepath = upload.get_path(resource['id'])

        try:
            hash, length = _file_hashnlength(filepath)
        except IOError, e:
            log.error('Error while accessing local resource %s: %s', filepath, e)

            download_status_id = Status.by_text('URL request failed')
            _save(download_status_id, e, resource)
            return

        mimetype = None
        headers = None
        content_type, content_encoding = mimetypes.guess_type(url)
        if content_type:
            mimetype = _clean_content_type(content_type)
            headers = {'Content-Type': content_type}

        download_result_mock = {'mimetype': mimetype,
            'size': length,
            'hash': hash,
            'headers': headers,
            'saved_file': filepath,
            'url_redirected_to': url,
            'request_type': 'GET'}

        archive_result_mock = {'cache_filepath': filepath,
        'cache_url': url}

        # Success
        _save(Status.by_text('Archived successfully'), '', resource,
            download_result_mock['url_redirected_to'], download_result_mock, archive_result_mock)

        # The return value is only used by tests. Serialized for Celery.
        return json.dumps(dict(download_result_mock, **archive_result_mock))
        # endif: processing locally uploaded resource

287

288
    log.info("Attempting to download resource: %s" % resource['url'])
289
290
291
292
    download_result = None
    download_status_id = Status.by_text('Archived successfully')
    context = {
        'site_url': config.get('ckan.site_url_internally') or config['ckan.site_url'],
293
        'cache_url_root': config.get('ckanext-archiver.cache_url_root'),
294
        'previous': Archival.get_for_resource(resource_id)
295
        }
296
    try:
297
        download_result = download(context, resource)
298
299
300
301
    except NotChanged, e:
        download_status_id = Status.by_text('Content has not changed')
        try_as_api = False
        requires_archive = False
302
    except LinkInvalidError, e:
303
        download_status_id = Status.by_text('URL invalid')
304
        try_as_api = False
305
    except DownloadException, e:
306
        download_status_id = Status.by_text('Download error')
307
        try_as_api = True
308
    except DownloadError, e:
309
        download_status_id = Status.by_text('Download error')
310
        try_as_api = True
311
    except ChooseNotToDownload, e:
312
        download_status_id = Status.by_text('Chose not to download')
313
        try_as_api = False
314
    except Exception, e:
315
316
        if os.environ.get('DEBUG'):
            raise
317
        log.error('Uncaught download failure: %r, %r', e, e.args)
318
        _save(Status.by_text('Download failure'), e, resource)
319
        return
320

321
322
323
324
325
326
327
328
329
330
331
    if not Status.is_ok(download_status_id):
        log.info('GET error: %s - %r, %r "%s"',
                 Status.by_id(download_status_id), e, e.args,
                 resource.get('url'))

        if try_as_api:
            download_result = api_request(context, resource)
            if download_result:
                download_status_id = Status.by_text('Archived successfully')
            # else the download_status_id (i.e. an error) is left what it was
            # from the previous download (i.e. not when we tried it as an API)
332

333
        if not try_as_api or not Status.is_ok(download_status_id):
334
            extra_args = [e.url_redirected_to] if 'url_redirected_to' in e else []
335
            _save(download_status_id, e, resource, *extra_args)
336
337
            return

338
    if not requires_archive:
339
340
        # We don't need to archive if the remote content has not changed, only update status
        _save_if_status_changed(Status.by_text('Content has not changed'), '', resource)
341
        return None
342

343
    # Archival
344
345
    log.info('Attempting to archive resource')
    try:
346
        archive_result = archive_resource(context, resource, log, download_result)
347
348
    except ArchiveError, e:
        log.error('System error during archival: %r, %r', e, e.args)
349
        _save(Status.by_text('System error during archival'), e, resource, download_result['url_redirected_to'])
350
        return
351

352
    # Success
353
    _save(Status.by_text('Archived successfully'), '', resource,
354
            download_result['url_redirected_to'], download_result, archive_result)
355

356
357
    # The return value is only used by tests. Serialized for Celery.
    return json.dumps(dict(download_result, **archive_result))
358

359

360
def download(context, resource, url_timeout=30,
361
             max_content_length='default',
362
363
             method='GET'):
    '''Given a resource, tries to download it.
364

365
366
    Params:
      resource - dict of the resource
367

368
369
    Exceptions from tidy_url may be propagated:
       LinkInvalidError if the URL is invalid
370

371
372
373
374
375
376
377
378
379
380
381
382
383
    If there is an error performing the download, raises:
       DownloadException - connection problems etc.
       DownloadError - HTTP status code is an error or 0 length

    If download is not suitable (e.g. too large), raises:
       ChooseNotToDownload

    If the basic GET fails then it will try it with common API
    parameters (SPARQL, WMS etc) to get a better response.

    Returns a dict of results of a successful download:
      mimetype, size, hash, headers, saved_file, url_redirected_to
    '''
384
    from ckanext.archiver import default_settings as settings
385
    from pylons import config
386
    log = update_resource.get_logger()
387

388
389
390
    if max_content_length == 'default':
        max_content_length = settings.MAX_CONTENT_LENGTH

391
392
    url = resource['url']
    url = tidy_url(url)
393

394
    if (resource.get('url_type') == 'upload' and
395
396
            not url.startswith('http')):
        url = context['site_url'].rstrip('/') + url
397
398
399
400
401
402
403
404
405
406
407
408
        
    hosted_externally = not url.startswith(config['ckan.site_url'])
    if resource.get('url_type') == 'upload' and hosted_externally:
        # ckanext-cloudstorage for example does that

        # enable ckanext-archiver.archive_cloud for qa to work on cloud resources
        # till https://github.com/ckan/ckanext-qa/issues/48 is resolved
        # Warning: this will result in double storage of all files below archival filesize limit

        if not config.get('ckanext-archiver.archive_cloud', False):
            raise ChooseNotToDownload('Skipping resource hosted externally to download resource: %s'
                                      % url, url)
409

410
411
    headers = _set_user_agent_string({})

412
413
414
    # start the download - just get the headers
    # May raise DownloadException
    method_func = {'GET': requests.get, 'POST': requests.post}[method]
415
    res = requests_wrapper(log, method_func, url, timeout=url_timeout,
416
417
418
                           stream=True, headers=headers,
                           verify=verify_https(),
                           )
419
    url_redirected_to = res.url if url != res.url else None
420

421
422
    if context.get('previous') and ('etag' in res.headers):
        if context.get('previous').etag == res.headers['etag']:
423
            log.info("ETAG matches, not downloading content")
424
425
            raise NotChanged("etag suggests content has not changed")

426
427
428
429
    if not res.ok:  # i.e. 404 or something
        raise DownloadError('Server reported status error: %s %s' %
                            (res.status_code, res.reason),
                            url_redirected_to)
David Read's avatar
David Read committed
430
    log.info('GET started successfully. Content headers: %r', res.headers)
431

432
433
    # record headers
    mimetype = _clean_content_type(res.headers.get('content-type', '').lower())
434

435
436
    # make sure resource content-length does not exceed our maximum
    content_length = res.headers.get('content-length')
437

438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
    if content_length:
        try:
            content_length = int(content_length)
        except ValueError:
            # if there are multiple Content-Length headers, requests
            # will return all the values, comma separated
            if ',' in content_length:
                try:
                    content_length = int(content_length.split(',')[0])
                except ValueError:
                    pass
    if isinstance(content_length, int) and \
       int(content_length) >= max_content_length:
            # record fact that resource is too large to archive
            log.warning('Resource too large to download: %s > max (%s). '
                        'Resource: %s %r', content_length,
                        max_content_length, resource['id'], url)
            raise ChooseNotToDownload('Content-length %s exceeds maximum '
                                      'allowed value %s' %
                                      (content_length, max_content_length),
                                      url_redirected_to)
    # content_length in the headers is useful but can be unreliable, so when we
    # download, we will monitor it doesn't go over the max.
461

462
463
464
    # continue the download - stream the response body
    def get_content():
        return res.content
465
    log.info('Downloading the body')
466
    content = requests_wrapper(log, get_content)
467

468
469
470
471
472
    # APIs can return status 200, but contain an error message in the body
    if response_is_an_api_error(content):
        raise DownloadError('Server content contained an API error message: %s' % \
                            content[:250],
                            url_redirected_to)
473

474
475
476
477
478
    content_length = len(content)
    if content_length > max_content_length:
        raise ChooseNotToDownload("Content-length %s exceeds maximum allowed value %s" %
                                  (content_length, max_content_length),
                                  url_redirected_to)
479

480
    log.info('Saving resource')
481
    try:
482
483
484
485
        length, hash, saved_file_path = _save_resource(resource, res, max_content_length)
    except ChooseNotToDownload, e:
        raise ChooseNotToDownload(str(e), url_redirected_to)
    log.info('Resource saved. Length: %s File: %s', length, saved_file_path)
486

487
488
489
490
491
492
493
    # zero length (or just one byte) indicates a problem
    if length < 2:
        # record fact that resource is zero length
        log.warning('Resource found was length %i - not archiving. Resource: %s %r',
                 length, resource['id'], url)
        raise DownloadError("Content-length after streaming was %i" % length,
                            url_redirected_to)
494

495
496
    log.info('Resource downloaded: id=%s url=%r cache_filename=%s length=%s hash=%s',
             resource['id'], url, saved_file_path, length, hash)
497

498
499
500
    return {'mimetype': mimetype,
            'size': length,
            'hash': hash,
501
            'headers': dict(res.headers),
502
503
504
            'saved_file': saved_file_path,
            'url_redirected_to': url_redirected_to,
            'request_type': method}
505

506
507
508
509
510
511
512
513
514
515
516
517
518
519
def _file_hashnlength(local_path):
    BLOCKSIZE = 65536
    hasher = hashlib.sha1()
    length = 0

    with open(local_path, 'rb') as afile:
        buf = afile.read(BLOCKSIZE)
        while len(buf) > 0:
            hasher.update(buf)
            length += len(buf)

            buf = afile.read(BLOCKSIZE)

    return (unicode(hasher.hexdigest()), length)
520

521
def archive_resource(context, resource, log, result=None, url_timeout=30):
522
    """
523
    Archive the given resource. Moves the file from the temporary location
524
    given in download().
525
526
527
528
529

    Params:
       result - result of the download(), containing keys: length, saved_file

    If there is a failure, raises ArchiveError.
530

531
    Returns: {cache_filepath, cache_url}
532
    """
533
    from ckanext.archiver import default_settings as settings
534
535
536
537
538
539
540
541
542
    relative_archive_path = os.path.join(resource['id'][:2], resource['id'])
    archive_dir = os.path.join(settings.ARCHIVE_DIR, relative_archive_path)
    if not os.path.exists(archive_dir):
        os.makedirs(archive_dir)
    # try to get a file name from the url
    parsed_url = urlparse.urlparse(resource.get('url'))
    try:
        file_name = parsed_url.path.split('/')[-1] or 'resource'
        file_name = file_name.strip()  # trailing spaces cause problems
543
        file_name = file_name.encode('ascii', 'ignore')  # e.g. u'\xa3' signs
544
545
546
547
548
549
550
551
552
553
554
555
556
    except:
        file_name = "resource"

    # move the temp file to the resource's archival directory
    saved_file = os.path.join(archive_dir, file_name)
    shutil.move(result['saved_file'], saved_file)
    log.info('Going to do chmod: %s', saved_file)
    try:
        os.chmod(saved_file, 0644)  # allow other users to read it
    except Exception, e:
        log.error('chmod failed %s: %s', saved_file, e)
        raise
    log.info('Archived resource as: %s', saved_file)
557

558
559
    # calculate the cache_url
    if not context.get('cache_url_root'):
David Read's avatar
David Read committed
560
561
        log.warning('Not saved cache_url because no value for '
                    'ckanext-archiver.cache_url_root in config')
562
        raise ArchiveError('No value for ckanext-archiver.cache_url_root in config')
563
564
565
566
    cache_url = urlparse.urljoin(context['cache_url_root'],
                                 '%s/%s' % (relative_archive_path, file_name))
    return {'cache_filepath': saved_file,
            'cache_url': cache_url}
567
568


David Read's avatar
David Read committed
569
def notify_resource(resource, queue, cache_filepath):
570
    '''
571
572
    Broadcasts an IPipe notification that an resource archival has taken place
    (or at least the archival object is changed somehow).
573
574
575
    '''
    archiver_interfaces.IPipe.send_data('archived',
                                        resource_id=resource['id'],
576
                                        queue=queue,
577
578
                                        cache_filepath=cache_filepath)

579

580
def notify_package(package, queue):
581
    '''
582
583
584
    Broadcasts an IPipe notification that a package archival has taken place
    (or at least the archival object is changed somehow). e.g.
    ckanext-packagezip listens for this
585
586
587
    '''
    archiver_interfaces.IPipe.send_data('package-archived',
                                        package_id=package['id'],
588
                                        queue=queue)
589
590


591
592
593
594
595
def get_plugins_waiting_on_ipipe():
    return [observer.name for observer in
            p.PluginImplementations(archiver_interfaces.IPipe)]


596
597
598
599
600
def verify_https():
    from pylons import config
    return toolkit.asbool(config.get('ckanext-archiver.verify_https', True))


601
602
603
604
605
606
607
608
def _clean_content_type(ct):
    # For now we should remove the charset from the content type and
    # handle it better, differently, later on.
    if 'charset' in ct:
        return ct[:ct.index(';')]
    return ct


609
610
611
612
613
def _set_user_agent_string(headers):
    '''
    Update the passed headers object with a `User-Agent` key, if there is a
    USER_AGENT_STRING option in settings.
    '''
614
    from ckanext.archiver import default_settings as settings
615
616
617
618
619
620
    ua_str = settings.USER_AGENT_STRING
    if ua_str is not None:
        headers['User-Agent'] = ua_str
    return headers


621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
def tidy_url(url):
    '''
    Given a URL it does various checks before returning a tidied version
    suitable for calling.

    It may raise LinkInvalidError if the URL has a problem.
    '''

    # Find out if it has unicode characters, and if it does, quote them
    # so we are left with an ascii string
    try:
        url = url.decode('ascii')
    except:
        parts = list(urlparse.urlparse(url))
        parts[2] = urllib.quote(parts[2].encode('utf-8'))
        parts[1] = urllib.quote(parts[1].encode('utf-8'))
        url = urlparse.urlunparse(parts)
    url = str(url)

    # strip whitespace from url
    # (browsers appear to do this)
    url = url.strip()

    # Use urllib3 to parse the url ahead of time, since that is what
    # requests uses, but when it does it during a GET, errors are not
    # caught well
    try:
        parsed_url = urllib3.util.parse_url(url)
    except urllib3.exceptions.LocationParseError, e:
        raise LinkInvalidError('URL parsing failure: %s' % e)

David Read's avatar
David Read committed
652
653
    # Check we aren't using any schemes we shouldn't be.
    # Scheme is case-insensitive.
654
    if not parsed_url.scheme or not parsed_url.scheme.lower() in ALLOWED_SCHEMES:
655
656
657
658
659
660
661
662
663
        raise LinkInvalidError('Invalid url scheme. Please use one of: %s' %
                               ' '.join(ALLOWED_SCHEMES))

    if not parsed_url.host:
        raise LinkInvalidError('URL parsing failure - did not find a host name')

    return url


David Read's avatar
David Read committed
664
def _save_resource(resource, response, max_file_size, chunk_size=1024*16):
665
666
667
668
669
670
671
    """
    Write the response content to disk.

    Returns a tuple:

        (file length: int, content hash: string, saved file path: string)
    """
672
673
    resource_hash = hashlib.sha1()
    length = 0
674

675
    fd, tmp_resource_file_path = tempfile.mkstemp()
676

677
    with open(tmp_resource_file_path, 'wb') as fp:
678
679
        for chunk in response.iter_content(chunk_size=chunk_size,
                                           decode_unicode=False):
680
681
682
            fp.write(chunk)
            length += len(chunk)
            resource_hash.update(chunk)
683

684
            if length >= max_file_size:
685
686
687
                raise ChooseNotToDownload(
                    "Content length %s exceeds maximum allowed value %s" %
                    (length, max_file_size))
688

689
    os.close(fd)
690

691
    content_hash = unicode(resource_hash.hexdigest())
692
    return length, content_hash, tmp_resource_file_path
693

694
695
696
def save_archival(resource, status_id, reason, url_redirected_to,
                  download_result, archive_result, log):
    '''Writes to the archival table the result of an attempt to download
697
698
699
700
    the resource.

    May propagate a CkanError.
    '''
701
702
703
704
705
706
    now = datetime.datetime.now()

    from ckanext.archiver.model import Archival, Status
    from ckan import model

    archival = Archival.get_for_resource(resource['id'])
707
    first_archival = not archival
708
    previous_archival_was_broken = None
709
710
711
    if not archival:
        archival = Archival.create(resource['id'])
        model.Session.add(archival)
712
713
714
    else:
        log.info('Archival from before: %r', archival)
        previous_archival_was_broken = archival.is_broken
715
716
717
718
719
720

    revision = model.Session.query(model.Revision).get(resource['revision_id'])
    archival.resource_timestamp = revision.timestamp

    # Details of the latest archival attempt
    archival.status_id = status_id
721
    archival.is_broken = Status.is_status_broken(status_id)
722
723
724
725
726
727
728
729
730
731
    archival.reason = reason
    archival.url_redirected_to = url_redirected_to

    # Details of successful archival
    if archival.is_broken is False:
        archival.cache_filepath = archive_result['cache_filepath']
        archival.cache_url = archive_result['cache_url']
        archival.size = download_result['size']
        archival.mimetype = download_result['mimetype']
        archival.hash = download_result['hash']
732
733
        archival.etag = download_result['headers'].get('etag')
        archival.last_modified = download_result['headers'].get('last-modified')
734
735
736
737
738
739
740

    # History
    if archival.is_broken is False:
        archival.last_success = now
        archival.first_failure = None
        archival.failure_count = 0
    else:
741
742
743
744
        log.info('First_archival=%r Previous_broken=%r Failure_count=%r' %
                 (first_archival, previous_archival_was_broken,
                  archival.failure_count))
        if first_archival or previous_archival_was_broken is False:
745
746
747
748
749
            # i.e. this is the first failure (or the first archival)
            archival.first_failure = now
            archival.failure_count = 1
        else:
            archival.failure_count += 1
750

751
752
753
    archival.updated = now
    log.info('Archival saved: %r', archival)
    model.repo.commit_and_remove()
754

755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
def update_archival_status(resource, status_id, reason, log):
    '''Checks if archival status need updating. This is only needed
    in case where resources are not redownloaded because of ETag
    matching, but link was inaccessible for a while and is now broken.
    '''
    now = datetime.datetime.now()

    from ckanext.archiver.model import Archival, Status
    from ckan import model

    archival = Archival.get_for_resource(resource['id'])
    if archival:
        previous_archival_was_broken = archival.is_broken
        current_archival_is_ok = Status.is_ok(status_id)
        if previous_archival_was_broken and current_archival_is_ok:
            log.info('Archival from before: %r', archival)
            # save the updated archival
            archival.status_id = Status.by_text('Archived successfully')
            archival.is_broken = Status.is_status_broken(status_id)
            archival.reason = reason
            archival.last_success = now
            archival.first_failure = None
            archival.failure_count = 0
            archival.updated = now
            model.Session.add(archival)
            log.info('Archival status updated: %r', archival)
            model.repo.commit_and_remove()
            if hasattr(archival, 'cache_filepath'):
                return archival.cache_filepath
784
785

def requests_wrapper(log, func, *args, **kwargs):
786
    '''
787
788
    Run a requests command, catching exceptions and reraising them as
    DownloadException. Status errors, such as 404 or 500 do not cause
David Read's avatar
David Read committed
789
    exceptions, instead exposed as not response.ok.
790
    e.g.
791
    >>> requests_wrapper(log, requests.get, url, timeout=url_timeout)
792
    runs:
793
        res = requests.get(url, timeout=url_timeout)
794
    '''
795
    from requests_ssl import SSLv3Adapter
796
797
798
799
800
801
802
    from pylons import config
    proxies = {}
    if config.get('ckanext-archiver.proxy_url_http', ''):
        proxies['http'] = config.get('ckanext-archiver.proxy_url_http')
    if config.get('ckanext-archiver.proxy_url_https', ''):
        proxies['https'] = config.get('ckanext-archiver.proxy_url_https')
    log.debug("Setting proxies: "+str(proxies))
803
    try:
804
        try:
805
806
807
808
            if func.__name__ != 'get_content':
                response = func(*args, proxies=proxies, **kwargs)
            else:
                response = func(*args, **kwargs)
809
810
811
812
813
        except requests.exceptions.ConnectionError, e:
            if 'SSL23_GET_SERVER_HELLO' not in str(e):
                raise
            log.info('SSLv23 failed so trying again using SSLv3: %r', args)
            requests_session = requests.Session()
814
815
            if proxies:
                requests_session.proxies.update(proxies)
816
817
818
819
820
            requests_session.mount('https://', SSLv3Adapter())
            func = {requests.get: requests_session.get,
                    requests.post: requests_session.post}[func]
            response = func(*args, **kwargs)

821
    except requests.exceptions.ConnectionError, e:
822
        raise DownloadException('Connection error: %s' % e)
823
    except requests.exceptions.HTTPError, e:
824
        raise DownloadException('Invalid HTTP response: %s' % e)
825
    except requests.exceptions.Timeout, e:
826
        raise DownloadException('Connection timed out after %ss' % kwargs.get('timeout', '?'))
827
    except requests.exceptions.TooManyRedirects, e:
828
        raise DownloadException('Too many redirects')
829
    except requests.exceptions.RequestException, e:
830
        raise DownloadException('Error downloading: %s' % e)
831
    except Exception, e:
832
833
        if os.environ.get('DEBUG'):
            raise
834
        raise DownloadException('Error with the download: %s' % e)
835
    return response
836

837

838
839
def ogc_request(context, resource, service, wms_version):
    original_url = url = resource['url']
840
841
842
    # Remove parameters
    url = url.split('?')[0]
    # Add WMS GetCapabilities parameters
843
844
    url += '?service=%s&request=GetCapabilities&version=%s' % \
           (service, wms_version)
845
    resource['url'] = url
846
847
848
849
850
    # Make the request
    response = download(context, resource)
    # Restore the URL so that it doesn't get saved in the actual resource
    resource['url'] = original_url
    return response
851

852

853
def wms_1_3_request(context, resource):
854
    res = ogc_request(context, resource, 'WMS', '1.3')
855
856
857
    res['request_type'] = 'WMS 1.3'
    return res

858

859
def wms_1_1_1_request(context, resource):
860
    res = ogc_request(context, resource, 'WMS', '1.1.1')
861
862
863
    res['request_type'] = 'WMS 1.1.1'
    return res

864

865
def wfs_request(context, resource):
866
    res = ogc_request(context, resource, 'WFS', '2.0')
867
868
869
    res['request_type'] = 'WFS 2.0'
    return res

870

871
872
873
def api_request(context, resource):
    '''
    Tries making requests as if the resource is a well-known sort of API to try
874
875
    and get a valid response. If it does it returns the response, otherwise
    Archives the response and stores what sort of request elicited it.
876
    '''
877
    log = update_resource.get_logger()
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
    # 'resource' holds the results of the download and will get saved. Only if
    # an API request is successful do we want to save the details of it.
    # However download() gets altered for these API requests. So only give
    # download() a copy of 'resource'.
    for api_request_func in wms_1_3_request, wms_1_1_1_request, wfs_request:
        resource_copy = copy.deepcopy(resource)
        try:
            download_dict = api_request_func(context, resource_copy)
        except ArchiverError, e:
            log.info('API %s error: %r, %r "%s"', api_request_func,
                     e, e.args, resource.get('url'))
            continue
        except Exception, e:
            if os.environ.get('DEBUG'):
                raise
            log.error('Uncaught API %s failure: %r, %r', api_request_func,
                      e, e.args)
            continue

        return download_dict

899
900
901
902
903
904
905

def is_id(id_string):
    '''Tells the client if the string looks like a revision id or not'''
    reg_ex = '^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
    return bool(re.match(reg_ex, id_string))


906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
def response_is_an_api_error(response_body):
    '''Some APIs return errors as the response body, but HTTP status 200. So we
    need to check response bodies for these error messages.
    '''
    response_sample = response_body[:250]  # to allow for <?xml> and <!DOCTYPE> lines

    # WMS spec
    # e.g. https://map.bgs.ac.uk/ArcGIS/services/BGS_Detailed_Geology/MapServer/WMSServer?service=abc
    # <?xml version="1.0" encoding="UTF-8" standalone="yes" ?>
    # <ServiceExceptionReport version="1.3.0"
    if '<ServiceExceptionReport' in response_sample:
        return True

    # This appears to be an alternative - I can't find the spec.
    # e.g. http://sedsh13.sedsh.gov.uk/ArcGIS/services/HS/Historic_Scotland/MapServer/WFSServer?service=abc
    # <ows:ExceptionReport version='1.1.0' language='en' xmlns:ows='http://www.opengis.net/ows'><ows:Exception exceptionCode='NoApplicableCode'><ows:ExceptionText>Wrong service type.</ows:ExceptionText></ows:Exception></ows:ExceptionReport>
    if '<ows:ExceptionReport' in response_sample:
        return True

925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951

@celery.task(name="archiver.clean")
def clean():
    """
    Remove all archived resources.
    """
    log = clean.get_logger()
    log.error("clean task not implemented yet")


@celery.task(name="archiver.link_checker")
def link_checker(context, data):
    """
    Check that the resource's url is valid, and accepts a HEAD request.

    Redirects are not followed - they simple return 'location' in the headers.

    data is a JSON dict describing the link:
        { 'url': url,
          'url_timeout': url_timeout }

    Raises LinkInvalidError if the URL is invalid
    Raises LinkHeadRequestError if HEAD request fails
    Raises LinkHeadMethodNotSupported if server says HEAD is not supported

    Returns a json dict of the headers of the request
    """
952
    log = update_resource.get_logger()
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
    data = json.loads(data)
    url_timeout = data.get('url_timeout', 30)

    error_message = ''
    headers = {'User-Agent': USER_AGENT}

    url = tidy_url(data['url'])

    # Send a head request
    try:
        res = requests.head(url, timeout=url_timeout)
        headers = res.headers
    except httplib.InvalidURL, ve:
        log.error("Could not make a head request to %r, error is: %s. Package is: %r. This sometimes happens when using an old version of requests on a URL which issues a 301 redirect. Version=%s", url, ve, data.get('package'), requests.__version__)
        raise LinkHeadRequestError("Invalid URL or Redirect Link")
    except ValueError, ve:
        log.error("Could not make a head request to %r, error is: %s. Package is: %r.", url, ve, data.get('package'))
        raise LinkHeadRequestError("Could not make HEAD request")
    except requests.exceptions.ConnectionError, e:
        raise LinkHeadRequestError('Connection error: %s' % e)
    except requests.exceptions.HTTPError, e:
        raise LinkHeadRequestError('Invalid HTTP response: %s' % e)
    except requests.exceptions.Timeout, e:
        raise LinkHeadRequestError('Connection timed out after %ss' % url_timeout)
    except requests.exceptions.TooManyRedirects, e:
        raise LinkHeadRequestError('Too many redirects')
    except requests.exceptions.RequestException, e:
        raise LinkHeadRequestError('Error during request: %s' % e)
    except Exception, e:
        raise LinkHeadRequestError('Error with the request: %s' % e)
    else:
        if res.status_code == 405:
            # this suggests a GET request may be ok, so proceed to that
            # in the download
            raise LinkHeadMethodNotSupported()
        if not res.ok or res.status_code >= 400:
            error_message = 'Server returned HTTP error status: %s %s' % \
                (res.status_code, res.reason)
            raise LinkHeadRequestError(error_message)
992
    return json.dumps(dict(headers))
993