Пример реализации класса на psycopg2 для PostgreSQL. А также несколько примеров без классов.
Чистые запросы через psycopg2 быстрее чем через SqlAlchemy (надстройку над psycopg2). И там где нужна скорость логично применять чистые psycopg2.
Дальше есть 2 варианта – каждый раз писать полный синтаксис или сделать под свои нужны класс для быстрого использования psycopg2. Ниже пример такого класса. В каждом проекте он может выглядеть по-своему.
Пример класса на psycopg2
from psycopg2.extras import NamedTupleCursor
class DbPsycopg:
def __init__(self):
self.conn = psycopg2.connect(
# Для localhost достаточно dbname
dbname='ploshadka',
# пример удаленного соединения
# user='ploshadka',
# password='ploshadka.net',
# host='name_of_bd.cixfs.us-east-2.rds.amazonaws.com'
)
self.conn.autocommit = True
def fetch_one(self, query, arg=None, factory=None, clean=None):
""" Получает только одно ЕДИНСТВЕННОЕ значение (не ряд!) из таблицы
:param query: Запрос
:param arg: Переменные
:param factory: dic (возвращает словарь - ключ/значение) или list (возвращает list)
:param clean: С параметром вернет только значение. Без параметра вернет значение в кортеже.
"""
try:
cur = self.__connection(factory)
self.__execute(cur, query, arg)
return self.__fetch(cur, clean)
except (Exception, psycopg2.Error) as error:
self.__error(error)
def fetch_all(self, query, arg=None, factory=None):
""" Получает множетсвенные данные из таблицы
:param query: Запрос
:param arg: Переменные
:param factory: dic (возвращает словарь - ключ/значение) или list (возвращает list)
"""
try:
cur = self.__connection(factory)
self.__execute(cur, query, arg)
return cur.fetchall()
except (Exception, psycopg2.Error) as error:
self.__error(error)
def query_update(self, query, arg, message=None):
""" Обновляет данные в таблице и возвращает сообщение об успешной операции """
try:
cur = self.conn.cursor()
cur.execute(query, arg)
return message
except (Exception, psycopg2.Error) as error:
self.__error(error)
def close(self):
cur = self.conn.cursor()
cur.close()
self.conn.close()
def __connection(self, factory=None):
# Dic - возвращает словарь - ключ/значение
if factory == 'dic':
cur = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
# List - возвращает list (хотя и называется DictCursor)
elif factory == 'list':
cur = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
# Tuple
else:
cur = self.conn.cursor()
return cur
@staticmethod
def __execute(cur, query, arg=None):
# Метод 'execute' всегда возвращает None
if arg:
cur.execute(query, arg)
else:
cur.execute(query)
@staticmethod
def __fetch(cur, clean):
# Если запрос был выполнен успешно, получим данные с помощью 'fetchone'
if clean == 'no':
# Вернет:
# Название:
# ('Royal Caribbean Cruises',)
# Дата:
# (datetime.datetime(2020, 6, 2, 13, 36, 35, 61052, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=0, name=None)),)
fetch = cur.fetchone()
else:
# Вернет:
# Название:
# Royal Caribbean Cruises
# Дата:
# (da2020-06-02 13:36:35.061052+00:00
fetch = cur.fetchone()[0]
return fetch
@staticmethod
def __error(error):
# В том числе, если в БД данных нет, будет ошибка на этапе fetchone
print('Данных в бд нет или ошибка: {}'.format(error))
return None
Примеры использования класса psycopg2
Класс инициализируем в главном файле __init__.py:
db_psycopg = DbPsycopg()
Затем там где будем использовать класс импортируем:
fetch_one
Функция:
""" Получить время с момента последней записи в таблицу """
query = 'SELECT date FROM operations ORDER BY date DESC limit 1'
return db_psycopg.fetch_one(query)
Получит:
Тот же запрос на SQLAlchemy:
query_update
Пример обновления данных:
""" Add operation to DB """
for operation in operations:
query = """
INSERT INTO operations (user_id, name, price, date)
VALUES (%s, %s, %s, %s)
ON CONFLICT DO NOTHING
;"""
db_psycopg.query_update(query, (user_id,
operation['name'],
operation['price'],
operation['date']))
Еще может это выглядеть так:
""" Сохраняет последнюю цену покупки и продажи акции """
query = 'UPDATE portfolio SET price_last_buy = %s, price_last_sell = %s WHERE figi = %s'
response = self.db.query_update(query, (buy, sell, figi), 'Цена обновлена')
Используем тот же апдейт метод, но на удаление:
# Удаляет исчезнувшую акцию из портфеля
for i in name:
query = """
DELETE FROM portfolio WHERE name = %s;
"""
response = self.db.query_update(query, (i,), 'Акция ' + i + ' удалена из портфеля')
Пример на SQLAlchemy
Добавить новую операцию на SQLAlchemy:
for operation in operations:
exist = db.session.query(Operation.id).filter_by(id=operation['id']).first()
if not exist:
new = Operation(
user_id=user_id,
id=operation['id'],
price=operation['price'],
date=operation['date']
)
db.session.add(new)
db.session.commit()
psycopg2 без класса
Код на обновление данных в БД
try:
with self.connect.cursor() as cursor:
self.connect.autocommit = True
for share in portfolio:
query = """
INSERT INTO portfolio (figi, name, average_price, currency, lots, expected_yield)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (figi)
DO UPDATE SET
(average_price, lots, expected_yield, updated_at)
= (EXCLUDED.average_price, EXCLUDED.lots, EXCLUDED.expected_yield, EXCLUDED.updated_at)
;"""
cursor.execute(query, (share.figi,
share.name,
share.average_position_price.value,
share.average_position_price.currency,
share.lots,
share.expected_yield.value))
return 'Последнее портфолио было сохранено в таблице portfolio'
except (Exception, psycopg2.Error) as error:
return 'Failed inserting record into portfolio table from Api {}'.format(error)
Код на вставку данных в БД через psycopg2
with self.connect.cursor() as cursor:
self.connect.autocommit = True
for item in dic:
values = [(
item['id_tinkoff'],
item['figi'],
item['name'],
item['currency'],
item['price'],
item['quantity'],
item['payment'],
item['commission'],
item['operation_type'],
item['date']
)]
query = sql.SQL(
'INSERT INTO shares (id_tinkoff, figi, name, currency, price, quantity, payment, commission, operation_type, date) '
'VALUES {} ON CONFLICT DO NOTHING').format(sql.SQL(',').join(map(sql.Literal, values)))
cursor.execute(query)
Пример инициализации psycopg2 в одном файле
from psycopg2 import sql
class PostgreSQL:
def __init__(self):
self.connect = psycopg2.connect(
dbname='',
user='',
password='',
host=''
)
def some_function(self, dic):
with self.connect.cursor() as cursor:
self.connect.autocommit = True
# Тут далее код запроса в БД