Package coprs :: Package logic :: Module coprs_logic
[hide private]
[frames] | no frames]

Source Code for Module coprs.logic.coprs_logic

  1  import time 
  2   
  3  from coprs import db 
  4  from coprs import exceptions 
  5  from coprs import helpers 
  6  from coprs import models 
  7  from coprs import signals 
  8  from coprs import whoosheers 
  9  from coprs.logic import users_logic 
10 11 -class CoprsLogic(object):
12 """Used for manipulating Coprs. All methods accept user object as a first argument, as this may be needed in future.""" 13 14 @classmethod
15 - def get(cls, user, username, coprname, **kwargs):
16 with_builds = kwargs.get('with_builds', False) 17 with_mock_chroots = kwargs.get('with_mock_chroots', False) 18 19 query = db.session.query(models.Copr).\ 20 join(models.Copr.owner).\ 21 options(db.contains_eager(models.Copr.owner)).\ 22 filter(models.Copr.name == coprname).\ 23 filter(models.User.openid_name == models.User.openidize_name(username)).\ 24 filter(models.Copr.deleted==False) 25 26 if with_builds: 27 query = query.outerjoin(models.Copr.builds).\ 28 options(db.contains_eager(models.Copr.builds)).\ 29 order_by(models.Build.submitted_on.desc()) 30 if with_mock_chroots: 31 query = query.outerjoin(*models.Copr.mock_chroots.attr).\ 32 options(db.contains_eager(*models.Copr.mock_chroots.attr)).\ 33 order_by(models.MockChroot.os_release.asc()).\ 34 order_by(models.MockChroot.os_version.asc()).\ 35 order_by(models.MockChroot.arch.asc()) 36 37 return query
38 39 @classmethod
40 - def get_multiple(cls, user, **kwargs):
41 user_relation = kwargs.get('user_relation', None) 42 username = kwargs.get('username', None) 43 with_mock_chroots = kwargs.get('with_mock_chroots', None) 44 incl_deleted = kwargs.get('incl_deleted', None) 45 ids = kwargs.get('ids', None) 46 47 query = db.session.query(models.Copr).\ 48 join(models.Copr.owner).\ 49 options(db.contains_eager(models.Copr.owner)).\ 50 order_by(models.Copr.id.desc()) 51 52 if not incl_deleted: 53 query = query.filter(models.Copr.deleted==False) 54 55 if isinstance(ids, list): # can be an empty list 56 query = query.filter(models.Copr.id.in_(ids)) 57 58 if user_relation == 'owned': 59 query = query.filter(models.User.openid_name == models.User.openidize_name(username)) 60 elif user_relation == 'allowed': 61 aliased_user = db.aliased(models.User) 62 query = query.join(models.CoprPermission, models.Copr.copr_permissions).\ 63 filter(models.CoprPermission.copr_builder == helpers.PermissionEnum('approved')).\ 64 join(aliased_user, models.CoprPermission.user).\ 65 filter(aliased_user.openid_name == models.User.openidize_name(username)) 66 if with_mock_chroots: 67 query = query.outerjoin(*models.Copr.mock_chroots.attr).\ 68 options(db.contains_eager(*models.Copr.mock_chroots.attr)).\ 69 order_by(models.MockChroot.os_release.asc()).\ 70 order_by(models.MockChroot.os_version.asc()).\ 71 order_by(models.MockChroot.arch.asc()) 72 73 return query
74 75 @classmethod
76 - def get_multiple_fulltext(cls, user, search_string):
77 query = models.Copr.query.join(models.User).\ 78 filter(models.Copr.deleted==False).\ 79 whooshee_search(search_string) 80 return query
81 82 @classmethod
83 - def add(cls, user, name, repos, selected_chroots, description, 84 instructions, check_for_duplicates=False):
85 copr = models.Copr(name=name, 86 repos=repos, 87 owner=user, 88 description=description, 89 instructions=instructions, 90 created_on=int(time.time())) 91 CoprsLogic.new(user, copr, 92 check_for_duplicates=check_for_duplicates) # form validation checks for duplicates 93 CoprChrootsLogic.new_from_names(user, copr, 94 selected_chroots) 95 return copr
96 97 @classmethod
98 - def new(cls, user, copr, check_for_duplicates=True):
99 if check_for_duplicates and cls.exists_for_user(user, copr.name).all(): 100 raise exceptions.DuplicateException( 101 'Copr: "{0}" already exists'.format(copr.name)) 102 signals.copr_created.send(cls, copr=copr) 103 db.session.add(copr)
104 105 @classmethod
106 - def update(cls, user, copr, check_for_duplicates = True):
107 cls.raise_if_unfinished_blocking_action(user, copr, 108 'Can\'t change this Copr name, another operation is in progress: {action}') 109 users_logic.UsersLogic.raise_if_cant_update_copr(user, copr, 110 'Only owners and admins may update their Coprs.') 111 112 existing = cls.exists_for_user(copr.owner, copr.name).first() 113 if existing: 114 if check_for_duplicates and existing.id != copr.id: 115 raise exceptions.DuplicateException('Copr: "{0}" already exists'.format(copr.name)) 116 else: # we're renaming 117 # if we fire a models.Copr.query, it will use the modified copr in session 118 # -> workaround this by just getting the name 119 old_copr_name = db.session.query(models.Copr.name).filter(models.Copr.id==copr.id).\ 120 filter(models.Copr.deleted==False).\ 121 first()[0] 122 action = models.Action(action_type=helpers.ActionTypeEnum('rename'), 123 object_type='copr', 124 object_id=copr.id, 125 old_value='{0}/{1}'.format(copr.owner.name, old_copr_name), 126 new_value='{0}/{1}'.format(copr.owner.name, copr.name), 127 created_on=int(time.time())) 128 db.session.add(action) 129 db.session.add(copr)
130 131 @classmethod
132 - def delete(cls, user, copr, check_for_duplicates=True):
133 cls.raise_if_cant_delete(user, copr) 134 # TODO: do we want to dump the information somewhere, so that we can search it in future? 135 cls.raise_if_unfinished_blocking_action(user, copr, 136 'Can\'t delete this Copr, another operation is in progress: {action}') 137 action = models.Action(action_type=helpers.ActionTypeEnum('delete'), 138 object_type='copr', 139 object_id=copr.id, 140 old_value='{0}/{1}'.format(copr.owner.name, copr.name), 141 new_value='', 142 created_on=int(time.time())) 143 copr.deleted = True 144 145 db.session.add(action) 146 147 return copr
148 149 @classmethod
150 - def exists_for_user(cls, user, coprname, incl_deleted=False):
151 existing = models.Copr.query.filter(models.Copr.name==coprname).\ 152 filter(models.Copr.owner_id==user.id) 153 if not incl_deleted: 154 existing = existing.filter(models.Copr.deleted==False) 155 156 return existing
157 158 @classmethod
159 - def increment_build_count(cls, user, copr): # TODO API of this method is different, maybe change?
160 models.Copr.query.filter(models.Copr.id == copr.id).\ 161 update({models.Copr.build_count: models.Copr.build_count + 1})
162 163 @classmethod
164 - def unfinished_blocking_actions_for(cls, user, copr):
165 blocking_actions = [helpers.ActionTypeEnum('rename'), 166 helpers.ActionTypeEnum('delete')] 167 actions = models.Action.query.filter(models.Action.object_type=='copr').\ 168 filter(models.Action.object_id==copr.id).\ 169 filter(models.Action.result==helpers.BackendResultEnum('waiting')).\ 170 filter(models.Action.action_type.in_(blocking_actions)) 171 172 return actions
173 174 @classmethod
175 - def raise_if_unfinished_blocking_action(cls, user, copr, message):
176 """This method raises ActionInProgressException if given copr has an unfinished 177 action. Returns None otherwise. 178 """ 179 unfinished_actions = cls.unfinished_blocking_actions_for(user, copr).all() 180 if unfinished_actions: 181 raise exceptions.ActionInProgressException(message, unfinished_actions[0])
182 183 @classmethod
184 - def raise_if_cant_delete(cls, user, copr):
185 """This method raises InsufficientRightsException if given copr cant be deleted 186 by given user. Returns None otherwise. 187 """ 188 if not user.admin and user != copr.owner: 189 raise exceptions.InsufficientRightsException('Only owners may delete their Coprs.')
190
191 -class CoprPermissionsLogic(object):
192 @classmethod
193 - def get(cls, user, copr, searched_user):
194 query = models.CoprPermission.query.filter(models.CoprPermission.copr == copr).\ 195 filter(models.CoprPermission.user == searched_user) 196 197 return query
198 199 @classmethod
200 - def get_for_copr(cls, user, copr):
201 query = models.CoprPermission.query.filter(models.CoprPermission.copr == copr) 202 203 return query
204 205 @classmethod
206 - def new(cls, user, copr_permission):
207 db.session.add(copr_permission)
208 209 @classmethod
210 - def update_permissions(cls, user, copr, copr_permission, new_builder, new_admin):
211 users_logic.UsersLogic.raise_if_cant_update_copr(user, copr, 212 'Only owners and admins may update their Coprs permissions.') 213 models.CoprPermission.query.filter(models.CoprPermission.copr_id == copr.id).\ 214 filter(models.CoprPermission.user_id == copr_permission.user_id).\ 215 update({'copr_builder': new_builder, 216 'copr_admin': new_admin})
217 218 @classmethod
219 - def update_permissions_by_applier(cls, user, copr, copr_permission, new_builder, new_admin):
220 if copr_permission: 221 # preserve approved permissions if set 222 if not new_builder or copr_permission.copr_builder != helpers.PermissionEnum('approved'): 223 copr_permission.copr_builder = new_builder 224 if not new_admin or copr_permission.copr_admin != helpers.PermissionEnum('approved'): 225 copr_permission.copr_admin = new_admin 226 else: 227 perm = models.CoprPermission(user = user, copr = copr, copr_builder = new_builder, copr_admin = new_admin) 228 cls.new(user, perm)
229 230 @classmethod
231 - def delete(cls, user, copr_permission):
232 db.session.delete(copr_permission)
233
234 -class CoprChrootsLogic(object):
235 @classmethod
236 - def mock_chroots_from_names(cls, user, names):
237 db_chroots = models.MockChroot.query.all() 238 mock_chroots = [] 239 for ch in db_chroots: 240 if ch.chroot_name in names: 241 mock_chroots.append(ch) 242 243 return mock_chroots
244 245 @classmethod
246 - def new(cls, user, mock_chroot):
247 db.session.add(mock_chroot)
248 249 @classmethod
250 - def new_from_names(cls, user, copr, names):
253 254 @classmethod
255 - def update_from_names(cls, user, copr, names):
256 current_chroots = copr.mock_chroots 257 new_chroots = cls.mock_chroots_from_names(user, names) 258 # add non-existing 259 for mock_chroot in new_chroots: 260 if mock_chroot not in current_chroots: 261 db.session.add(models.CoprChroot(copr=copr, mock_chroot=mock_chroot)) 262 263 # delete no more present 264 to_remove = [] 265 for mock_chroot in current_chroots: 266 if mock_chroot not in new_chroots: 267 # can't delete here, it would change current_chroots and break iteration 268 to_remove.append(mock_chroot) 269 270 for mc in to_remove: 271 copr.mock_chroots.remove(mc)
272
273 -class MockChrootsLogic(object):
274 @classmethod
275 - def get(cls, user, os_release, os_version, arch, active_only = False):
279 280 @classmethod
281 - def get_multiple(cls, user, active_only=False):
282 query = models.MockChroot.query 283 if active_only: 284 query = query.filter(models.MockChroot.is_active==True) 285 return query
286 287 @classmethod
288 - def add(cls, user, name):
289 name_tuple = cls.tuple_from_name(user, name) 290 if cls.get(user, *name_tuple).first(): 291 raise exceptions.DuplicateException('Mock chroot with this name already exists.') 292 new_chroot = models.MockChroot(os_release=name_tuple[0], 293 os_version=name_tuple[1], 294 arch=name_tuple[2]) 295 cls.new(user, new_chroot) 296 return new_chroot
297 298 @classmethod
299 - def new(cls, user, mock_chroot):
300 db.session.add(mock_chroot)
301 302 @classmethod
303 - def edit_by_name(cls, user, name, is_active):
304 name_tuple = cls.tuple_from_name(user, name) 305 mock_chroot = cls.get(user, *name_tuple).first() 306 if not mock_chroot: 307 raise exceptions.NotFoundException('Mock chroot with this name doesn\'t exist.') 308 309 mock_chroot.is_active = is_active 310 cls.update(user, mock_chroot) 311 return mock_chroot
312 313 @classmethod
314 - def update(cls, user, mock_chroot):
315 db.session.add(mock_chroot)
316 317 318 @classmethod
319 - def delete_by_name(cls, user, name):
320 name_tuple = cls.tuple_from_name(user, name) 321 mock_chroot = cls.get(user, *name_tuple).first() 322 if not mock_chroot: 323 raise exceptions.NotFoundException('Mock chroot with this name doesn\'t exist.') 324 325 cls.delete(user, mock_chroot)
326 327 @classmethod
328 - def delete(cls, user, mock_chroot):
329 db.session.delete(mock_chroot)
330 331 @classmethod
332 - def tuple_from_name(cls, user, name):
333 split_name = name.rsplit('-', 1) 334 if len(split_name) < 2: 335 raise exceptions.MalformedArgumentException( 336 'Chroot Name doesn\'t contain dash, can\'t determine chroot architecure.') 337 if '-' in split_name[0]: 338 os_release, os_version = split_name[0].rsplit('-') 339 else: 340 os_release, os_version = split_name[0], '' 341 342 arch = split_name[1] 343 return (os_release, os_version, arch)
344