commands.py 17.6 KB
Newer Older
1
2
3
import logging
import os
import sys
4
import time
5
6
import re
import shutil
7
import itertools
8
import ckan.plugins as p
9

John Glover's avatar
John Glover committed
10
from pylons import config
John Glover's avatar
John Glover committed
11

David Read's avatar
David Read committed
12
13
from ckan.lib.cli import CkanCommand

14
15
REQUESTS_HEADER = {'content-type': 'application/json'}

16

John Glover's avatar
John Glover committed
17
class Archiver(CkanCommand):
18
    '''
John Glover's avatar
John Glover committed
19
20
    Download and save copies of all package resources.

21
22
    The result of each download attempt is saved to the CKAN task_status table,
    so the information can be used later for QA analysis.
John Glover's avatar
John Glover committed
23
24
25

    Usage:

Ross Jones's avatar
Ross Jones committed
26
27
28
        paster archiver init
           - Creates the database table archiver needs to run

29
30
31
        paster archiver update [{package-name/id}|{group-name/id}]
           - Archive all resources or just those belonging to a specific
             package or group, if specified
32
33
34
35
36
37

        paster archiver clean-status
           - Cleans the TaskStatus records that contain the status of each
             archived resource, whether it was successful or not, with errors.
             It does not change the cache_url etc. in the Resource

38
        paster archiver clean-cached-resources
39
40
           - Removes all cache_urls and other references to resource files on
             disk.
41

42
43
44
        paster archiver view [{dataset name/id}]
           - Views info archival info, in general and if you specify one, about
             a particular dataset\'s resources.
45

46
47
48
49
50
51
52
53
54
55
56
        paster archiver report [outputfile]
           - Generates a report on orphans, either resources where the path
             does not exist, or files on disk that don't have a corresponding
             orphan. The outputfile parameter is the name of the CSV output
             from running the report

        paster archiver delete-orphans [outputfile]
           - Deletes orphans that are files on disk with no corresponding
             resource. This uses the report command and will write out a
             report to [outputfile]

57
58
59
60
61
62
63
        paster archiver migrate-archive-dirs
           - Migrate the layout of the archived resource directories.
             Previous versions of ckanext-archiver stored resources on disk
             at: {resource-id}/filename.csv and this version puts them at:
             {2-chars-of-resource-id}/{resource-id}/filename.csv
             Running this moves them to the new locations and updates the
             cache_url on each resource to reflect the new location.
64
65
66
67
    '''
    # TODO
    #    paster archiver clean-files
    #       - Remove all archived resources
68

John Glover's avatar
John Glover committed
69
70
71
    summary = __doc__.split('\n')[0]
    usage = __doc__
    min_args = 0
72
    max_args = 2
John Glover's avatar
John Glover committed
73

74
    def __init__(self, name):
75
        super(Archiver, self).__init__(name)
76
77
78
        self.parser.add_option('-q', '--queue',
                               action='store',
                               dest='queue',
79
                               help='Send to a particular queue')
80

John Glover's avatar
John Glover committed
81
82
83
84
85
    def command(self):
        """
        Parse command line arguments and call appropriate method.
        """
        if not self.args or self.args[0] in ['--help', '-h', 'help']:
86
            print self.usage
87
            sys.exit(1)
John Glover's avatar
John Glover committed
88
89

        cmd = self.args[0]
John Glover's avatar
John Glover committed
90
        self._load_config()
David Read's avatar
David Read committed
91
92
93
94

        # Initialise logger after the config is loaded, so it is not disabled.
        self.log = logging.getLogger(__name__)

95
96
97
98
        if cmd == 'update':
            self.update()
        elif cmd == 'clean-status':
            self.clean_status()
99
100
        elif cmd == 'clean-cached-resources':
            self.clean_cached_resources()
101
102
103
104
        elif cmd == 'view':
            if len(self.args) == 2:
                self.view(self.args[1])
            else:
105
                self.view()
106
107
108
109
110
111
112
113
114
115
        elif cmd == 'report':
            if len(self.args) != 2:
                self.log.error('Command requires a parameter, the name of the output')
                return
            self.report(self.args[1], delete=False)
        elif cmd == 'delete-orphans':
            if len(self.args) != 2:
                self.log.error('Command requires a parameter, the name of the output')
                return
            self.report(self.args[1], delete=True)
Ross Jones's avatar
Ross Jones committed
116
117
118
119
        elif cmd == 'init':
            import ckan.model as model
            from ckanext.archiver.model import init_tables
            init_tables(model.meta.engine)
120
            self.log.info('Archiver tables are initialized')
121
122
        elif cmd == 'migrate-archive-dirs':
            self.migrate_archive_dirs()
123
124
125
126
127
        else:
            self.log.error('Command %s not recognized' % (cmd,))

    def update(self):
        from ckan import model
128
        from ckanext.archiver import lib
129
130
        packages = []
        resources = []
131
        if len(self.args) > 1:
132
133
134
135
            for arg in self.args[1:]:
                # try arg as a group id/name
                group = model.Group.get(arg)
                if group:
136
137
                    if group.is_organization:
                        packages.extend(
138
                            model.Session.query(model.Package)
139
140
141
                                 .filter_by(owner_org=group.id))
                    else:
                        packages.extend(group.packages(with_private=True))
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
                    if not self.options.queue:
                        self.options.queue = 'bulk'
                    continue
                # try arg as a package id/name
                pkg = model.Package.get(arg)
                if pkg:
                    packages.append(pkg)
                    if not self.options.queue:
                        self.options.queue = 'priority'
                    continue
                # try arg as a resource id
                res = model.Resource.get(arg)
                if res:
                    resources.append(res)
                    if not self.options.queue:
                        self.options.queue = 'priority'
                    continue
                else:
                    self.log.error('Could not recognize as a group, package '
                                   'or resource: %r', arg)
                    sys.exit(1)
163
        else:
164
165
166
167
168
            # all packages
            pkgs = model.Session.query(model.Package)\
                        .filter_by(state='active')\
                        .order_by('name').all()
            packages.extend(pkgs)
169
170
171
            if not self.options.queue:
                self.options.queue = 'bulk'

172
173
174
175
176
177
178
        if packages:
            self.log.info('Datasets to archive: %d', len(packages))
        if resources:
            self.log.info('Resources to archive: %d', len(resources))
        if not (packages or resources):
            self.log.error('No datasets or resources to process')
            sys.exit(1)
179

180
181
        self.log.info('Queue: %s', self.options.queue)
        for package in packages:
182
            if p.toolkit.check_ckan_version(max_version='2.2.99'):
183
184
185
186
187
188
189
190
191
192
193
194
                # earlier CKANs had ResourceGroup
                pkg_resources = \
                    [res for res in
                        itertools.chain.from_iterable(
                            (rg.resources_all
                             for rg in package.resource_groups_all)
                        )
                     if res.state == 'active']
            else:
                pkg_resources = \
                    [res for res in package.resources_all
                     if res.state == 'active']
195
            self.log.info('Queuing dataset %s (%s resources)',
196
                          package.name, len(pkg_resources))
197
            lib.create_archiver_package_task(package, self.options.queue)
198
            time.sleep(0.1)  # to try to avoid Redis getting overloaded
199

200
        for resource in resources:
201
202
203
204
            if p.toolkit.check_ckan_version(max_version='2.2.99'):
                package = resource.resource_group.package
            else:
                package = resource.package
205
            self.log.info('Queuing resource %s/%s', package.name, resource.id)
206
            lib.create_archiver_resource_task(resource, self.options.queue)
207
            time.sleep(0.05)  # to try to avoid Redis getting overloaded
208

209
        self.log.info('Completed queueing')
210
211
212

    def view(self, package_ref=None):
        from ckan import model
213
        from ckanext.archiver.model import Archival
214

215
        r_q = model.Session.query(model.Resource).filter_by(state='active')
216
        print 'Resources: %i total' % r_q.count()
217
218
219
220
221
222
        a_q = model.Session.query(Archival)
        print 'Archived resources: %i total' % a_q.count()
        num_with_cache_url = a_q.filter(Archival.cache_url!='').count()
        print '                    %i with cache_url' % num_with_cache_url
        last_updated_res = a_q.order_by(Archival.updated.desc()).first()
        print 'Latest archival: %s' % (last_updated_res.updated.strftime('%Y-%m-%d %H:%M') if last_updated_res else '(no)')
223
224
225
226
227
228

        if package_ref:
            pkg = model.Package.get(package_ref)
            print 'Package %s %s' % (pkg.name, pkg.id)
            for res in pkg.resources:
                print 'Resource %s' % res.id
229
230
                for archival in a_q.filter_by(resource_id=res.id):
                    print '* %r' % archival
231
232
233

    def clean_status(self):
        from ckan import model
234
        from ckanext.archiver.model import Archival
235

236
237
        print 'Before:'
        self.view()
John Glover's avatar
John Glover committed
238

239
        q = model.Session.query(Archival)
240
241
        q.delete()
        model.Session.commit()
John Glover's avatar
John Glover committed
242

243
        print 'After:'
244
        self.view()
245

246
247
    def clean_cached_resources(self):
        from ckan import model
248
        from ckanext.archiver.model import Archival
249
250
251
252

        print 'Before:'
        self.view()

253
254
255
        q = model.Session.query(Archival).filter(Archival.cache_url != '')
        archivals = q.all()
        num_archivals = len(archivals)
256
        progress = 0
257
258
259
260
261
262
        for archival in archivals:
            archival.cache_url = None
            archival.cache_filepath = None
            archival.size = None
            archival.mimetype = None
            archival.hash = None
263
264
            progress += 1
            if progress % 1000 == 0:
265
                print 'Done %i/%i' % (progress, num_archivals)
266
267
268
269
270
271
272
                model.Session.commit()
        model.Session.commit()
        model.Session.remove()

        print 'After:'
        self.view()

273
274
275
276
277
278
279
280
281
    def report(self, output_file, delete=False):
        """
        Generates a report containing orphans (either files or resources)
        """
        import csv
        from ckan import model

        archive_root = config.get('ckanext-archiver.archive_dir')
        if not archive_root:
Oscar Perez's avatar
Oscar Perez committed
282
            self.log.error("Could not find archiver root")
283
284
285
286
287
            return

        # We'll use this to match the UUID part of the path
        uuid_re = re.compile(".*([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}).*")

288
289
290
291
292
293
        not_cached_active = 0
        not_cached_deleted = 0
        file_not_found_active = 0
        file_not_found_deleted = 0
        perm_error = 0
        file_no_resource = 0
294
295
296
297
298
299
300
301
302
303
304

        with open(output_file, "w") as f:
            writer = csv.writer(f)
            writer.writerow(["Resource ID", "Filepath", "Problem"])
            resources = {}
            for resource in model.Session.query(model.Resource).all():
                resources[resource.id] = True

                # Check the resource's cached_filepath
                fp = resource.extras.get('cache_filepath')
                if fp is None:
Ross Jones's avatar
Ross Jones committed
305
306
307
308
309
                    if resource.state == 'active':
                        not_cached_active += 1
                    else:
                        not_cached_deleted += 1
                    writer.writerow([resource.id, str(resource.extras), "Resource not cached: {0}".format(resource.state)])
310
311
312
313
                    continue

                # Check that the cached file is there and readable
                if not os.path.exists(fp):
Ross Jones's avatar
Ross Jones committed
314
315
316
317
                    if resource.state == 'active':
                        file_not_found_active += 1
                    else:
                        file_not_found_deleted += 1
318

Ross Jones's avatar
Ross Jones committed
319
                    writer.writerow([resource.id, fp.encode('utf-8'), "File not found: {0}".format(resource.state)])
320
321
322
                    continue

                try:
323
                    os.stat(fp)
324
                except OSError:
Ross Jones's avatar
Ross Jones committed
325
326
                    perm_error += 1
                    writer.writerow([resource.id, fp.encode('utf-8'), "File not readable"])
327
328
329
330
331
332
333
334
335
336
337
338
339
                    continue

            # Iterate over the archive root and check each file by matching the
            # resource_id part of the path to the resources dict
            for root, _, files in os.walk(archive_root):
                for filename in files:
                    archived_path = os.path.join(root, filename)
                    m = uuid_re.match(archived_path)
                    if not m:
                        writer.writerow([resource.id, archived_path, "Malformed path (no UUID)"])
                        continue

                    if not resources.get(m.groups(0)[0].strip(), False):
Ross Jones's avatar
Ross Jones committed
340
341
                        file_no_resource += 1

342
343
344
345
346
347
348
349
350
351
352
353
354
355
                        if delete:
                            try:
                                os.unlink(archived_path)
                                self.log.info("Unlinked {0}".format(archived_path))
                                os.rmdir(root)
                                self.log.info("Unlinked {0}".format(root))
                                writer.writerow([m.groups(0)[0], archived_path, "Resource not found, file deleted"])
                            except Exception, e:
                                self.log.error("Failed to unlink {0}: {1}".format(archived_path,e))
                        else:
                            writer.writerow([m.groups(0)[0], archived_path, "Resource not found"])

                        continue

356
357
358
359
360
361
362
363
364
365
        print "General info:"
        print "  Permission error reading file: {0}".format(perm_error)
        print "  file on disk but no resource: {0}".format(file_no_resource)
        print "  Total resources: {0}".format(model.Session.query(model.Resource).count())
        print "Active resource info:"
        print "  No cache_filepath: {0}".format(not_cached_active)
        print "  cache_filepath not on disk: {0}".format(file_not_found_active)
        print "Deleted resource info:"
        print "  No cache_filepath: {0}".format(not_cached_deleted)
        print "  cache_filepath not on disk: {0}".format(file_not_found_deleted)
366

367
368
    def migrate_archive_dirs(self):
        from ckan import model
369
370
371
372
373
        from ckan.logic import get_action

        site_user = get_action('get_site_user')(
            {'model': model, 'ignore_auth': True, 'defer_commit': True}, {}
        )
374

375
        site_url_base = config['ckanext-archiver.cache_url_root'].rstrip('/')
376
377
378
379
380
381
382
383
384
385
386
387
388
389
        old_dir_regex = re.compile(r'(.*)/([a-f0-9\-]+)/([^/]*)$')
        new_dir_regex = re.compile(r'(.*)/[a-f0-9]{2}/[a-f0-9\-]{36}/[^/]*$')
        for resource in model.Session.query(model.Resource).\
            filter(model.Resource.state != model.State.DELETED):
            if not resource.cache_url or resource.cache_url == 'None':
                continue
            if new_dir_regex.match(resource.cache_url):
                print 'Resource with new url already: %s' % resource.cache_url
                continue
            match = old_dir_regex.match(resource.cache_url)
            if not match:
                print 'ERROR Could not match url: %s' % resource.cache_url
                continue
            url_base, res_id, filename = match.groups()
390
391
392
            # check the package isn't deleted
            # Need to refresh the resource's session
            resource = model.Session.query(model.Resource).get(resource.id)
393
394
395
396
397
398
399
400
401
402
            if p.toolkit.check_ckan_version(max_version='2.2.99'):
                package = None
                if resource.resource_group:
                    package = resource.resource_group.package
            else:
                package = resource.package

            if package and package.state == model.State.DELETED:
                print 'Package is deleted'
                continue       
403
404
405
406
407
408
409
410
411
412

            if url_base != site_url_base:
                print 'ERROR Base URL is incorrect: %r != %r' % (url_base, site_url_base)
                continue

            # move the file
            filepath_base = config['ckanext-archiver.archive_dir']
            old_path = os.path.join(filepath_base, resource.id)
            new_dir = os.path.join(filepath_base, resource.id[:2])
            new_path = os.path.join(filepath_base, resource.id[:2], resource.id)
413
            new_filepath = os.path.join(new_path, filename)
414
415
416
417
418
419
            if not os.path.exists(new_dir):
                os.mkdir(new_dir)
            if os.path.exists(new_path) and not os.path.exists(old_path):
                print 'File already moved: %s' % new_path
            else:
                print 'File: "%s" -> "%s"' % (old_path, new_path)
420
421
422
423
424
                try:
                    shutil.move(old_path, new_path)
                except IOError, e:
                    print 'ERROR moving resource: %s' % e
                    continue
425

426
            # change the cache_url and cache_filepath
427
            new_cache_url = '/'.join((url_base, res_id[:2], res_id, filename))
428
429
430
431
432
433
434
            print 'cache_filepath: "%s" -> "%s"' % (resource.extras.get('cache_filepath'), new_filepath)
            print 'cache_url: "%s" -> "%s"' % (resource.cache_url, new_cache_url)
            context = {'model': model, 'user': site_user['name'], 'ignore_auth': True, 'session': model.Session}
            data_dict = {'id': resource.id}
            res_dict = get_action('resource_show')(context, data_dict)
            res_dict['cache_filepath'] = new_filepath
            res_dict['cache_url'] = new_cache_url
Dev1's avatar
Dev1 committed
435
            data_dict = res_dict
David Read's avatar
David Read committed
436
            result = get_action('resource_update')(context, data_dict)
437
            if result.get('id') == res_id:
David Read's avatar
David Read committed
438
439
440
                print 'Successfully updated resource'
            else:
                print 'ERROR updating resource: %r' % result