Trees | Indices | Help |
---|
|
1 import time 2 3 from coprs import db 4 from coprs import exceptions 5 from coprs import helpers 6 from coprs import models 7 from coprs import signals 8 from coprs import whoosheers 9 from coprs.logic import users_logic12 """Used for manipulating Coprs. All methods accept user object as a first argument, as this may be needed in future.""" 13 14 @classmethod162 163 @classmethod16 with_builds = kwargs.get('with_builds', False) 17 with_mock_chroots = kwargs.get('with_mock_chroots', False) 18 19 query = db.session.query(models.Copr).\ 20 join(models.Copr.owner).\ 21 options(db.contains_eager(models.Copr.owner)).\ 22 filter(models.Copr.name == coprname).\ 23 filter(models.User.openid_name == models.User.openidize_name(username)).\ 24 filter(models.Copr.deleted==False) 25 26 if with_builds: 27 query = query.outerjoin(models.Copr.builds).\ 28 options(db.contains_eager(models.Copr.builds)).\ 29 order_by(models.Build.submitted_on.desc()) 30 if with_mock_chroots: 31 query = query.outerjoin(*models.Copr.mock_chroots.attr).\ 32 options(db.contains_eager(*models.Copr.mock_chroots.attr)).\ 33 order_by(models.MockChroot.os_release.asc()).\ 34 order_by(models.MockChroot.os_version.asc()).\ 35 order_by(models.MockChroot.arch.asc()) 36 37 return query38 39 @classmethod41 user_relation = kwargs.get('user_relation', None) 42 username = kwargs.get('username', None) 43 with_mock_chroots = kwargs.get('with_mock_chroots', None) 44 incl_deleted = kwargs.get('incl_deleted', None) 45 ids = kwargs.get('ids', None) 46 47 query = db.session.query(models.Copr).\ 48 join(models.Copr.owner).\ 49 options(db.contains_eager(models.Copr.owner)).\ 50 order_by(models.Copr.id.desc()) 51 52 if not incl_deleted: 53 query = query.filter(models.Copr.deleted==False) 54 55 if isinstance(ids, list): # can be an empty list 56 query = query.filter(models.Copr.id.in_(ids)) 57 58 if user_relation == 'owned': 59 query = query.filter(models.User.openid_name == models.User.openidize_name(username)) 60 elif user_relation == 'allowed': 61 aliased_user = db.aliased(models.User) 62 query = query.join(models.CoprPermission, models.Copr.copr_permissions).\ 63 filter(models.CoprPermission.copr_builder == helpers.PermissionEnum('approved')).\ 64 join(aliased_user, models.CoprPermission.user).\ 65 filter(aliased_user.openid_name == models.User.openidize_name(username)) 66 if with_mock_chroots: 67 query = query.outerjoin(*models.Copr.mock_chroots.attr).\ 68 options(db.contains_eager(*models.Copr.mock_chroots.attr)).\ 69 order_by(models.MockChroot.os_release.asc()).\ 70 order_by(models.MockChroot.os_version.asc()).\ 71 order_by(models.MockChroot.arch.asc()) 72 73 return query74 75 @classmethod77 query = models.Copr.query.join(models.User).\ 78 filter(models.Copr.deleted==False).\ 79 whooshee_search(search_string) 80 return query81 82 @classmethod83 - def add(cls, user, name, repos, selected_chroots, description, 84 instructions, check_for_duplicates=False):85 copr = models.Copr(name=name, 86 repos=repos, 87 owner=user, 88 description=description, 89 instructions=instructions, 90 created_on=int(time.time())) 91 CoprsLogic.new(user, copr, 92 check_for_duplicates=check_for_duplicates) # form validation checks for duplicates 93 CoprChrootsLogic.new_from_names(user, copr, 94 selected_chroots) 95 return copr96 97 @classmethod99 if check_for_duplicates and cls.exists_for_user(user, copr.name).all(): 100 raise exceptions.DuplicateException( 101 'Copr: "{0}" already exists'.format(copr.name)) 102 signals.copr_created.send(cls, copr=copr) 103 db.session.add(copr)104 105 @classmethod107 cls.raise_if_unfinished_blocking_action(user, copr, 108 'Can\'t change this Copr name, another operation is in progress: {action}') 109 users_logic.UsersLogic.raise_if_cant_update_copr(user, copr, 110 'Only owners and admins may update their Coprs.') 111 112 existing = cls.exists_for_user(copr.owner, copr.name).first() 113 if existing: 114 if check_for_duplicates and existing.id != copr.id: 115 raise exceptions.DuplicateException('Copr: "{0}" already exists'.format(copr.name)) 116 else: # we're renaming 117 # if we fire a models.Copr.query, it will use the modified copr in session 118 # -> workaround this by just getting the name 119 old_copr_name = db.session.query(models.Copr.name).filter(models.Copr.id==copr.id).\ 120 filter(models.Copr.deleted==False).\ 121 first()[0] 122 action = models.Action(action_type=helpers.ActionTypeEnum('rename'), 123 object_type='copr', 124 object_id=copr.id, 125 old_value='{0}/{1}'.format(copr.owner.name, old_copr_name), 126 new_value='{0}/{1}'.format(copr.owner.name, copr.name), 127 created_on=int(time.time())) 128 db.session.add(action) 129 db.session.add(copr)130 131 @classmethod133 cls.raise_if_cant_delete(user, copr) 134 # TODO: do we want to dump the information somewhere, so that we can search it in future? 135 cls.raise_if_unfinished_blocking_action(user, copr, 136 'Can\'t delete this Copr, another operation is in progress: {action}') 137 action = models.Action(action_type=helpers.ActionTypeEnum('delete'), 138 object_type='copr', 139 object_id=copr.id, 140 old_value='{0}/{1}'.format(copr.owner.name, copr.name), 141 new_value='', 142 created_on=int(time.time())) 143 copr.deleted = True 144 145 db.session.add(action) 146 147 return copr148 149 @classmethod151 existing = models.Copr.query.filter(models.Copr.name==coprname).\ 152 filter(models.Copr.owner_id==user.id) 153 if not incl_deleted: 154 existing = existing.filter(models.Copr.deleted==False) 155 156 return existing157 158 @classmethod159 - def increment_build_count(cls, user, copr): # TODO API of this method is different, maybe change?160 models.Copr.query.filter(models.Copr.id == copr.id).\ 161 update({models.Copr.build_count: models.Copr.build_count + 1})165 blocking_actions = [helpers.ActionTypeEnum('rename'), 166 helpers.ActionTypeEnum('delete')] 167 actions = models.Action.query.filter(models.Action.object_type=='copr').\ 168 filter(models.Action.object_id==copr.id).\ 169 filter(models.Action.result==helpers.BackendResultEnum('waiting')).\ 170 filter(models.Action.action_type.in_(blocking_actions)) 171 172 return actions173 174 @classmethod176 """This method raises ActionInProgressException if given copr has an unfinished 177 action. Returns None otherwise. 178 """ 179 unfinished_actions = cls.unfinished_blocking_actions_for(user, copr).all() 180 if unfinished_actions: 181 raise exceptions.ActionInProgressException(message, unfinished_actions[0])182 183 @classmethod185 """This method raises InsufficientRightsException if given copr cant be deleted 186 by given user. Returns None otherwise. 187 """ 188 if not user.admin and user != copr.owner: 189 raise exceptions.InsufficientRightsException('Only owners may delete their Coprs.')190192 @classmethod233194 query = models.CoprPermission.query.filter(models.CoprPermission.copr == copr).\ 195 filter(models.CoprPermission.user == searched_user) 196 197 return query198 199 @classmethod201 query = models.CoprPermission.query.filter(models.CoprPermission.copr == copr) 202 203 return query204 205 @classmethod 208 209 @classmethod211 users_logic.UsersLogic.raise_if_cant_update_copr(user, copr, 212 'Only owners and admins may update their Coprs permissions.') 213 models.CoprPermission.query.filter(models.CoprPermission.copr_id == copr.id).\ 214 filter(models.CoprPermission.user_id == copr_permission.user_id).\ 215 update({'copr_builder': new_builder, 216 'copr_admin': new_admin})217 218 @classmethod220 if copr_permission: 221 # preserve approved permissions if set 222 if not new_builder or copr_permission.copr_builder != helpers.PermissionEnum('approved'): 223 copr_permission.copr_builder = new_builder 224 if not new_admin or copr_permission.copr_admin != helpers.PermissionEnum('approved'): 225 copr_permission.copr_admin = new_admin 226 else: 227 perm = models.CoprPermission(user = user, copr = copr, copr_builder = new_builder, copr_admin = new_admin) 228 cls.new(user, perm)229 230 @classmethod235 @classmethod272237 db_chroots = models.MockChroot.query.all() 238 mock_chroots = [] 239 for ch in db_chroots: 240 if ch.chroot_name in names: 241 mock_chroots.append(ch) 242 243 return mock_chroots244 245 @classmethod 248 249 @classmethod251 for mock_chroot in cls.mock_chroots_from_names(user, names): 252 db.session.add(models.CoprChroot(copr=copr, mock_chroot=mock_chroot))253 254 @classmethod256 current_chroots = copr.mock_chroots 257 new_chroots = cls.mock_chroots_from_names(user, names) 258 # add non-existing 259 for mock_chroot in new_chroots: 260 if mock_chroot not in current_chroots: 261 db.session.add(models.CoprChroot(copr=copr, mock_chroot=mock_chroot)) 262 263 # delete no more present 264 to_remove = [] 265 for mock_chroot in current_chroots: 266 if mock_chroot not in new_chroots: 267 # can't delete here, it would change current_chroots and break iteration 268 to_remove.append(mock_chroot) 269 270 for mc in to_remove: 271 copr.mock_chroots.remove(mc)274 @classmethod344276 return models.MockChroot.query.filter(models.MockChroot.os_release==os_release, 277 models.MockChroot.os_version==os_version, 278 models.MockChroot.arch==arch)279 280 @classmethod282 query = models.MockChroot.query 283 if active_only: 284 query = query.filter(models.MockChroot.is_active==True) 285 return query286 287 @classmethod289 name_tuple = cls.tuple_from_name(user, name) 290 if cls.get(user, *name_tuple).first(): 291 raise exceptions.DuplicateException('Mock chroot with this name already exists.') 292 new_chroot = models.MockChroot(os_release=name_tuple[0], 293 os_version=name_tuple[1], 294 arch=name_tuple[2]) 295 cls.new(user, new_chroot) 296 return new_chroot297 298 @classmethod 301 302 @classmethod304 name_tuple = cls.tuple_from_name(user, name) 305 mock_chroot = cls.get(user, *name_tuple).first() 306 if not mock_chroot: 307 raise exceptions.NotFoundException('Mock chroot with this name doesn\'t exist.') 308 309 mock_chroot.is_active = is_active 310 cls.update(user, mock_chroot) 311 return mock_chroot312 313 @classmethod 316 317 318 @classmethod320 name_tuple = cls.tuple_from_name(user, name) 321 mock_chroot = cls.get(user, *name_tuple).first() 322 if not mock_chroot: 323 raise exceptions.NotFoundException('Mock chroot with this name doesn\'t exist.') 324 325 cls.delete(user, mock_chroot)326 327 @classmethod 330 331 @classmethod333 split_name = name.rsplit('-', 1) 334 if len(split_name) < 2: 335 raise exceptions.MalformedArgumentException( 336 'Chroot Name doesn\'t contain dash, can\'t determine chroot architecure.') 337 if '-' in split_name[0]: 338 os_release, os_version = split_name[0].rsplit('-') 339 else: 340 os_release, os_version = split_name[0], '' 341 342 arch = split_name[1] 343 return (os_release, os_version, arch)
Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Tue Sep 10 09:30:20 2013 | http://epydoc.sourceforge.net |