import json import logging from ckan.lib.cli import CkanCommand # No other CKAN imports allowed until _load_config is run, # or logging is disabled class RemoveRevisions(CkanCommand): """ Removing old revisions """ summary = __doc__.split('\n')[0] usage = __doc__ max_args = 1 min_args = 1 attributes = ['resource_group_id', 'url', 'format', 'description', 'position', 'state', 'name', 'resource_type', 'url_type'] def __init__(self, name): super(CkanCommand, self).__init__(name) def date_from_string(self, date_str): from datetime import datetime try: _date = datetime.strptime(date_str, '%Y-%m-%d') return _date except: print('Wrong date format!') return None def command(self): self._load_config() self.log = logging.getLogger(__name__) pck_name = self.args[0] self.remove(pck_name) def are_equal(self, old_revision, new_revision): equal = True for attr in self.attributes: if attr == 'url' and getattr(old_revision, 'url_type') == 'upload' and getattr(new_revision, 'url_type') == 'upload': #compare without protocol prefix and domain, because on uploaded file url generation, #some plugins might change http into https or localhost might be stored instead of site url old_url = getattr(old_revision, attr).split('//',1)[-1].split('/',1)[-1] new_url = getattr(new_revision, attr).split('//',1)[-1].split('/',1)[-1] if old_url != new_url: equal = False break elif getattr(old_revision, attr) != getattr(new_revision, attr): equal = False break return equal def remove(self, pck_name): from ckan import model import ckan.lib.dictization as d def chunks(l, n): '''Yield successive n-sized chunks from l.''' for i in xrange(0, len(l), n): yield l[i:i + n] def delete(res_revs, res): sql = [''' ALTER TABLE package_tag DROP CONSTRAINT package_tag_revision_id_fkey; ALTER TABLE package_extra DROP CONSTRAINT package_extra_revision_id_fkey; ALTER TABLE resource DROP CONSTRAINT resource_revision_id_fkey; '''] for res_rev in res_revs: sql.append("DELETE from resource_revision where id='%s' and revision_id='%s';\n" % (res.id, res_rev.revision_id)) # a revision created (e.g. over the API) can be connect to other # resources or a dataset, so only delete the revision if only # connected to this one. if model.Session.query(model.ResourceRevision). \ filter_by(revision_id=res_rev.revision_id). \ count() == 1 and \ model.Session.query(model.PackageRevision). \ filter_by(revision_id=res_rev.revision_id).count() == 0 and \ model.Session.query(model.PackageExtraRevision).filter_by(revision_id=res_rev.revision_id).count() == 0: sql.append("DELETE from revision where id='%s';\n" % res_rev.revision_id) # sql.append("UPDATE resource SET revision_id='%s' WHERE id='%s';\n" % \ # (latest_good_res_rev.revision_id, res.id)) sql.append(''' ALTER TABLE package_tag ADD CONSTRAINT package_tag_revision_id_fkey FOREIGN KEY (revision_id) REFERENCES revision(id); ALTER TABLE package_extra ADD CONSTRAINT package_extra_revision_id_fkey FOREIGN KEY (revision_id) REFERENCES revision(id); ALTER TABLE resource ADD CONSTRAINT resource_revision_id_fkey FOREIGN KEY (revision_id) REFERENCES revision(id); ''') print("Executing sql....") model.Session.execute(''.join(sql)) print("Committing sql....") model.Session.commit() print("Sql committed.") model.Session.remove() resources = model.Session.query(model.Resource) \ .filter_by(state='active') \ .join(model.ResourceGroup) \ .join(model.Package)\ .filter(model.Package.name == pck_name) \ .order_by(model.Resource.position).all() for resource in resources: resource = model.Resource.get(resource.id) print(resource.id) revisions = model.Session.query(model.ResourceRevision)\ .filter_by(id=resource.id) \ .order_by(model.ResourceRevision.revision_timestamp)\ .all() print("Resource: %s" % resource.url) print("Num revisions: %d" % len(revisions)) revisions_dict = {rev.revision_id: rev for rev in revisions} grouped_revisions = {} current_group = 0 prev = None for revision in revisions: if prev is None: prev = revision grouped_revisions[current_group] = [revision.revision_id] continue if not self.are_equal(prev, revision): current_group += 1 group = grouped_revisions.get(current_group, []) group.append(revision.revision_id) grouped_revisions[current_group] = group prev = revision revisions_to_remove = [] for key, grouped in grouped_revisions.items(): if len(grouped) <= 2: continue _tmp = grouped[1:-1] for revision_id in _tmp: revision = revisions_dict.get(revision_id, None) if revision is not None: revisions_to_remove.append(revision) print("Num revisions to remove: %d" % len(revisions_to_remove)) print("Revisions to remove: %s" % ','.join([x.revision_id for x in revisions_to_remove])) for chunk_of_res_revs in chunks(revisions_to_remove, 1000): print('Preparing sql') delete(chunk_of_res_revs, resource) print('-------------------------------------------------------------------------') model.Session.commit() model.Session.remove() return