Source code for sacrud.preprocessing

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2014 uralbash <root@uralbash.ru>
#
# Distributed under terms of the MIT license.

"""
Preprocessing
"""
import ast
import json
import inspect
from datetime import date, datetime

import six
import sqlalchemy

from .common import get_columns, column_to_attr_name


[docs]def list_of_lists_to_dict(l): """ Convert list of key,value lists to dict [['id', 1], ['id', 2], ['id', 3], ['foo': 4]] {'id': [1, 2, 3], 'foo': [4]} """ d = {} for key, val in l: d.setdefault(key, []).append(val) return d
[docs]def get_m2m_objs(session, relation, id_from_request): mapper = relation.mapper pk_list = mapper.primary_key ids = [] if id_from_request: if isinstance(id_from_request, six.string_types): id_from_request = [id_from_request, ] for id in id_from_request: try: ids.append(json.loads(id)) except ValueError: pass except TypeError: return None ids = list_of_lists_to_dict(ids) else: ids = {} objs = session.query(mapper) for pk in pk_list: objs = objs.filter(pk.in_(ids.get(pk.name, []))) return objs.all()
[docs]def get_m2m_value(session, request, obj): """ Set m2m value for model obj from request params like "group[]" :Parameters: - `session`: SQLAlchemy DBSession - `request`: request as dict - `obj`: model instance """ params = {} ''' m2m_request: {u'company[]': [u'["id", 1]'], u'professions[]': [u'["id", 2]', u'["id", 3]']} ''' m2m_request = {k: v for k, v in list(request.items()) if k.endswith('[]')} for k, v in list(m2m_request.items()): key = k[:-2] relation = getattr(obj.__class__, key, False) if not relation: relation = getattr(obj, key, False) if not relation: continue # pragma: no cover value = get_m2m_objs(session, relation, v) if value is None: continue if relation.property.uselist is False: if value: value = value[0] else: value = None params[key] = value return params
[docs]class RequestPreprocessing(object): def __init__(self, request): self.request = request self.types_list = {'Boolean': self._check_boolean, 'HSTORE': self._check_hstore, 'JSON': self._check_hstore, 'JSONB': self._check_hstore, 'Date': self._check_date, 'DateTime': self._check_date, 'BYTEA': self._check_bytea, 'LargeBinary': self._check_bytea, 'TIMESTAMP': self._check_date } def _check_boolean(self, value): if not value: return False if value in ('0', 'False', 'None', 'false'): return False return True def _check_bytea(self, value): return bytearray(value, 'utf-8') def _check_hstore(self, value): if not value: return None try: return json.loads(str(value)) except Exception: try: return ast.literal_eval(str(value)) except Exception: raise TypeError( "[HSTORE, JSON, JSONB]: does't suppot '%s' format. %s" % (value, 'Valid example: {"foo": "bar", u"baz": u"biz"}')) def _check_date(self, value): if isinstance(value, (date, datetime)): return value # XXX: I feel the dissonance here try: return datetime.strptime(value, '%Y-%m-%d') except ValueError: try: return datetime.strptime(value, '%Y-%m-%d %H:%M') except ValueError: return datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
[docs] def check_type(self, table, key): self.key = key self.column = None obj_columns = get_columns(table) if key in obj_columns: self.column = obj_columns[key] else: self.column = getattr(table, key) column_type = self.column.type.__class__.__name__ value = self.request[key] if type(value) in (list, tuple) and\ column_type not in ['JSON', 'JSONB', 'HSTORE']: value = value[0] if not isinstance(value, sqlalchemy.sql.elements.Null) and not value\ and not hasattr(value, 'filename')\ and not column_type == 'Boolean': if self.column.default or self.column.primary_key: return None if column_type in list(self.types_list.keys()): check = self.types_list[column_type] return check(value) return value
[docs]class ObjPreprocessing(object): def __init__(self, obj): self.obj = obj
[docs] def add(self, session, data, table): request_preprocessing = RequestPreprocessing(data) # filter data for object for key in list(data.keys()): obj_columns = get_columns(self.obj) # check if columns not exist if key not in obj_columns and not hasattr(self.obj, key): if not key.endswith('[]'): data.pop(key, None) continue # pragma: no cover value = request_preprocessing.check_type(table, key) if value is None: data.pop(key, None) continue data[key] = value params = {column_to_attr_name(k, table): v for k, v in data.items() if not k.endswith('[]')} m2m_params = get_m2m_value(session, data, self.obj) params = dict(list(params.items()) + list(m2m_params.items())) if inspect.isclass(self.obj): return self.obj(**params) for k, v in params.items(): setattr(self.obj, k, v) return self.obj
[docs] def delete(self): return self.obj
  Read the Docs
v: latest  
Versions
latest
stable
master
Free document hosting provided by Read the Docs.