remove_revisions.py 6.43 KB
Newer Older
Kristijan Čagran's avatar
Kristijan Čagran committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import json
import logging

from ckan.lib.cli import CkanCommand

# No other CKAN imports allowed until _load_config is run,
# or logging is disabled


class RemoveRevisions(CkanCommand):
    """
        Removing old revisions
    """

    summary = __doc__.split('\n')[0]
    usage = __doc__
    max_args = 1
    min_args = 1
    attributes = ['resource_group_id', 'url', 'format', 'description', 'position', 'state', 'name', 'resource_type', 'url_type']
    def __init__(self, name):
        super(CkanCommand, self).__init__(name)

    def date_from_string(self, date_str):
        from datetime import datetime
        try:
            _date = datetime.strptime(date_str, '%Y-%m-%d')
            return _date
        except:
            print('Wrong date format!')
        return None

    def command(self):
        self._load_config()
        self.log = logging.getLogger(__name__)
        pck_name = self.args[0]
        self.remove(pck_name)

    def are_equal(self, old_revision, new_revision):
        equal = True
        for attr in self.attributes:
41
42
43
44
45
46
47
48
49
            if attr == 'url' and getattr(old_revision, 'url_type') == 'upload' and getattr(new_revision, 'url_type') == 'upload':
                #compare without protocol prefix and domain, because on uploaded file url generation,
                #some plugins might change http into https or localhost might be stored instead of site url
                old_url = getattr(old_revision, attr).split('//',1)[-1].split('/',1)[-1]
                new_url = getattr(new_revision, attr).split('//',1)[-1].split('/',1)[-1]
                if old_url != new_url:
                    equal = False
                    break
            elif getattr(old_revision, attr) != getattr(new_revision, attr):
Kristijan Čagran's avatar
Kristijan Čagran committed
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
                equal = False
                break
        return equal

    def remove(self, pck_name):
        from ckan import model
        import ckan.lib.dictization as d
        def chunks(l, n):
            '''Yield successive n-sized chunks from l.'''
            for i in xrange(0, len(l), n):
                yield l[i:i + n]

        def delete(res_revs, res):
            sql = ['''
                       ALTER TABLE package_tag DROP CONSTRAINT package_tag_revision_id_fkey;
                       ALTER TABLE package_extra DROP CONSTRAINT package_extra_revision_id_fkey;
                       ALTER TABLE resource DROP CONSTRAINT resource_revision_id_fkey;
                       ''']
            for res_rev in res_revs:
                sql.append("DELETE from resource_revision where id='%s' and revision_id='%s';\n" % (res.id, res_rev.revision_id))
                # a revision created (e.g. over the API) can be connect to other
                # resources or a dataset, so only delete the revision if only
                # connected to this one.
                if model.Session.query(model.ResourceRevision). \
                        filter_by(revision_id=res_rev.revision_id). \
                        count() == 1 and \
                        model.Session.query(model.PackageRevision). \
                                filter_by(revision_id=res_rev.revision_id).count() == 0 and \
                        model.Session.query(model.PackageExtraRevision).filter_by(revision_id=res_rev.revision_id).count() == 0:
                    sql.append("DELETE from revision where id='%s';\n" % res_rev.revision_id)
            # sql.append("UPDATE resource SET revision_id='%s' WHERE id='%s';\n" % \
            #           (latest_good_res_rev.revision_id, res.id))
            sql.append('''
                       ALTER TABLE package_tag ADD CONSTRAINT package_tag_revision_id_fkey FOREIGN KEY (revision_id) REFERENCES revision(id);
                       ALTER TABLE package_extra ADD CONSTRAINT package_extra_revision_id_fkey FOREIGN KEY (revision_id) REFERENCES revision(id);
                       ALTER TABLE resource ADD CONSTRAINT resource_revision_id_fkey FOREIGN KEY (revision_id) REFERENCES revision(id);
                    ''')
            print("Executing sql....")
            model.Session.execute(''.join(sql))
            print("Committing sql....")
            model.Session.commit()
            print("Sql committed.")
            model.Session.remove()

        resources = model.Session.query(model.Resource) \
            .filter_by(state='active') \
            .join(model.ResourceGroup) \
            .join(model.Package)\
            .filter(model.Package.name == pck_name) \
            .order_by(model.Resource.position).all()
        for resource in resources:
            resource = model.Resource.get(resource.id)
            print(resource.id)
            revisions = model.Session.query(model.ResourceRevision)\
                .filter_by(id=resource.id) \
                .order_by(model.ResourceRevision.revision_timestamp)\
                .all()
            print("Resource: %s" % resource.url)
            print("Num revisions: %d" % len(revisions))
            revisions_dict = {rev.revision_id: rev for rev in revisions}

            grouped_revisions = {}
            current_group = 0
            prev = None

            for revision in revisions:
                if prev is None:
                    prev = revision
                    grouped_revisions[current_group] = [revision.revision_id]
                    continue
                if not self.are_equal(prev, revision):
                    current_group += 1
                group = grouped_revisions.get(current_group, [])
                group.append(revision.revision_id)
                grouped_revisions[current_group] = group
                prev = revision

            revisions_to_remove = []
            for key, grouped in grouped_revisions.items():
                if len(grouped) <= 2:
                    continue
                _tmp = grouped[1:-1]
                for revision_id in _tmp:
                    revision = revisions_dict.get(revision_id, None)
                    if revision is not None:
                        revisions_to_remove.append(revision)

            print("Num revisions to remove: %d" % len(revisions_to_remove))
            print("Revisions to remove: %s" % ','.join([x.revision_id for x in revisions_to_remove]))

            for chunk_of_res_revs in chunks(revisions_to_remove, 1000):
                print('Preparing sql')
                delete(chunk_of_res_revs, resource)
                print('-------------------------------------------------------------------------')
            model.Session.commit()
            model.Session.remove()
        return