Примеры разных запросов к базе данных через ОРМ SQLAlchemy в python.
Как посмотреть SQL-запрос
Посмотреть сформированный запрос sql alchemy можно так:
Получение данных в SQLAlchemy
first()
Получаем одно значение с помощью first()
Если нам нужно получить не tuple, то добавляем [0]
В этом случае, если не ловить это с помощью исключений try и catch, на запрос [0], будет выкидываться ошибка, если данные не найдены.
Можно этого избежать использую scalar()
scalar()
scalar получает «чистые» данные вида:
В то время как first получил бы в этом случае:
Scalar получит первый элемент запроса или None, в случае если его нет. Если данных несколько вызовет MultipleResultsFound.
При множественном совпадении, можно так:
IN в SQLAlchemy
names = db.session.query(Market.name).filter(MarketShort.id.in_(ids)).all()
Получение данных из моделей SQLAlchemy в виде словарей в списке
Способов получения несколько, что пришлось их выделить в отдельную статью — как вернуть данные в SQLAlchemy в виде списка словарей.
OR (или)
И
User.name.like(search_1),
User.login.like(search_2)
)
«И» вместе с «IN»
Task.id == CategoriesTask.task_id,
CategoriesTask.cat_inner_id == Category.id,
Category.id.notin_(ids)
)
all()
Получение всех данных с помощью .all()
db.func.max
Получим максимальное число в таблице Setting колонки count:
Сортировки
Получим последние 10 значений из колонки price таблицы Yahoo колонки date:
Или так:
Получим последнее (отсортированное по дате) значение sp500 по названию ticker:
db.session.query(Yahoo.price).filter_by(ticker=self.ticker).order_by(
db.desc(Yahoo.date)).first()[0])
count() и сравнение
filter(3216 >Yahoo.price) — здесь сравниваем, больше ли значение в ценах таблицы Yahoo
Альтернативно можно посчитать и так, в том случае, чтобы не делать несколько запросов на выборку всего и подсчета:
x['count'] = len(count)
Выборка под дате
Например, выборка названий по дате за последние 7 дней:
return datetime.today() - timedelta(days=days)
date = get_date_before_days(7)
response = db.session.query(Operation.name).filter_by(user_id=1).filter(
Operation.date > date).order_by(db.asc(Operation.name)).distinct()
shares = [x.name for x in response]
Выборка под месяцу и году
query = query.filter(extract('year', Task.time) == year).filter(extract('month', Task.time) == month_index)
Сохранение данных в SQLAlchemy
Подготавливаем объект для сохранения:
Добавляем данные:
Фиксируем данные и завершаем сессию:
Обновление данных в SQLAlchemy
Вариант 1
Обновление данных пользователя, у которого user_id = user_id и name = name. Обновляем у него timer и percent:
{'timer': int(timer), 'percent': float(percent)}
)
Фиксируем данные и завершаем сессию:
Вариант 2
Иногда удобнее использовать другой вариант. Он подойдет, когда нужно проверить, есть ли данные, то обновить их, а если нет, то сохранить новые.
if user_setting:
user_setting.timer = timer
user_setting.percent = percent
else:
user_setting = UserSetting(user_id=user_id, name=name, timer=timer, percent=percent)
db.session.add(user_setting)
db.session.commit()
Удаление объектов в SQLAlchemy
Удаление не связанных данных
Удаление происходит через метод delete(). У него есть свойство synchronize_session.
Для удаление объекта без обновления состояние объектов в сессии (т.е. если дальше в коде не нужно обращаться к удаленному объекту) надо использовать synchronize_session=False.
Удаление связанных данных
Если попытаться удалить связанные данные в SQLAlchemy методом выше, то возникнет ошибка. Для такого удаления подойдет другой способ:
db.session.delete(x)
db.session.commit()
Получение словарей в SQLAlchemy
Об этом читайте подробнее в статье о vars.
Обработка таблиц вне моделей
'select * from table_name order by date_task DESC limit 50'
)]
В этом случае надо также сериализовать данные. Такой пример обработки данных есть здесь.
Как получить выборку по дате
В БД хранится дата в таком виде:
Мы легко можем получить данные по конкретному дню отправим дату в таком виде:
Делается это так:
date = f'{year}-{day}-{month}'
response = [x.serialize for x in
db.session.query(Task)
.filter_by(user_id=flask_login.current_user.id)
.filter(func.DATE(Task.time) == date)
.order_by(db.desc(Task.time)).all()
]
Получение пользователей по их ролям
О добавление ролей на сайт можно почитать здесь: пользователи и роли на Flask.
Так:
for role in user.roles:
if role == 'admin':
Или:
for x in users:
for role in x.roles:
if role == 'admin':
То же самое, если используется Flask-Security:
if user.has_role('user'):
Not None
Фильтрация пользователей. Забирает только тех из них у кого заполнен телеграм-id:
Not None и func.sum
Пример, в котором объединяются две таблицы, фильтруется выборка по дате, а затем суммируется значение в колонке, если оно не None.
user_id=user_id
).filter(
Task.time > first_day,
Task.time < last_day,
CategoriesTask.cat_inner_id == cat_inner_id,
CategoriesTask.task_id == Task.id,
Task.count.isnot(None)
).scalar()
Left Join
Пример с left join:
CategoryGlobalInner, CategoryGlobal.id == CategoryGlobalInner.cat_global_id
).filter(
(Task.hidden == None) | (Task.hidden == False)
)
join может не понадобится, все прописывается в моделях, вместо используется фильтр. Тогда выше код сократится до такого:
CategoryGlobal.id == CategoryGlobalInner.cat_global_id,
(Task.hidden == None) | (Task.hidden == False)
)
Существует значение
Проверяем есть ли объект в БД со значением hidden == True
user_id=user_id, task_id=task_id
).filter(
CategoriesTask.cat_inner_id == Category.id,
Category.hidden
).first()