Пример запросов на psycopg2

Admin Python

Пример реализации класса на psycopg2 для PostgreSQL. А также несколько примеров без классов.

Это очень старые мои заметки, но я решил их опубликовать, возможно когда-нибудь понадобятся.

Чистые запросы через psycopg2 быстрее чем через SqlAlchemy (надстройку над psycopg2). И там где нужна скорость логично применять чистые psycopg2.

Дальше есть 2 варианта – каждый раз писать полный синтаксис или сделать под свои нужны класс для быстрого использования psycopg2. Ниже пример такого класса. В каждом проекте он может выглядеть по-своему.

Пример класса на psycopg2

import psycopg2
from psycopg2.extras import NamedTupleCursor


class DbPsycopg:
    def __init__(self):
        self.conn = psycopg2.connect(
            # Для localhost достаточно dbname
            dbname='ploshadka',

            # пример удаленного соединения
            # user='ploshadka',
            # password='ploshadka.net',
            # host='name_of_bd.cixfs.us-east-2.rds.amazonaws.com'
        )
        self.conn.autocommit = True

    def fetch_one(self, query, arg=None, factory=None, clean=None):
        """ Получает только одно ЕДИНСТВЕННОЕ значение (не ряд!) из таблицы
        :param query: Запрос
        :param arg: Переменные
        :param factory: dic (возвращает словарь - ключ/значение) или list (возвращает list)
        :param clean: С параметром вернет только значение. Без параметра вернет значение в кортеже.
        """

        try:
            cur = self.__connection(factory)
            self.__execute(cur, query, arg)
            return self.__fetch(cur, clean)

        except (Exception, psycopg2.Error) as error:
            self.__error(error)

    def fetch_all(self, query, arg=None, factory=None):
        """ Получает множетсвенные данные из таблицы
        :param query: Запрос
        :param arg: Переменные
        :param factory: dic (возвращает словарь - ключ/значение) или list (возвращает list)
        """

        try:
            cur = self.__connection(factory)
            self.__execute(cur, query, arg)
            return cur.fetchall()

        except (Exception, psycopg2.Error) as error:
            self.__error(error)

    def query_update(self, query, arg, message=None):
        """ Обновляет данные в таблице и возвращает сообщение об успешной операции """
        try:
            cur = self.conn.cursor()
            cur.execute(query, arg)
            return message

        except (Exception, psycopg2.Error) as error:
            self.__error(error)

    def close(self):
        cur = self.conn.cursor()
        cur.close()
        self.conn.close()

    def __connection(self, factory=None):
        # Dic - возвращает словарь - ключ/значение
        if factory == 'dic':
            cur = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

        # List - возвращает list (хотя и называется DictCursor)
        elif factory == 'list':
            cur = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)

        # Tuple
        else:
            cur = self.conn.cursor()

        return cur

    @staticmethod
    def __execute(cur, query, arg=None):
        # Метод 'execute' всегда возвращает None
        if arg:
            cur.execute(query, arg)
        else:
            cur.execute(query)

    @staticmethod
    def __fetch(cur, clean):
        # Если запрос был выполнен успешно, получим данные с помощью 'fetchone'
        if clean == 'no':
            # Вернет:
            #   Название:
            #       ('Royal Caribbean Cruises',)
            #   Дата:
            #       (datetime.datetime(2020, 6, 2, 13, 36, 35, 61052, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=0, name=None)),)
            fetch = cur.fetchone()
        else:
            # Вернет:
            #   Название:
            #       Royal Caribbean Cruises
            #   Дата:
            #       (da2020-06-02 13:36:35.061052+00:00
            fetch = cur.fetchone()[0]
        return fetch

    @staticmethod
    def __error(error):
        # В том числе, если в БД данных нет, будет ошибка на этапе fetchone
        print('Данных в бд нет или ошибка: {}'.format(error))
        return None

Примеры использования класса psycopg2

Класс инициализируем в главном файле __init__.py:

# Для прямых запросов (хардкор)
db_psycopg = DbPsycopg()

Затем там где будем использовать класс импортируем:

from app import db_psycopg

fetch_one

Функция:

def __get_time_from_last_insert_row():
    """ Получить время с момента последней записи в таблицу """
    query = 'SELECT date FROM operations ORDER BY date DESC limit 1'
    return db_psycopg.fetch_one(query)

Получит:

2020-10-29 21:31:15.934801+03:00

Тот же запрос на SQLAlchemy:

db.session.query(Operation.date).order_by(db.desc(Operation.date)).limit(1).scalar()

query_update

Пример обновления данных:

def __save_operations(operations, user_id):
    """ Add operation to DB """
    for operation in operations:
        query = """
            INSERT INTO operations (user_id, name, price, date)
            VALUES (%s, %s, %s, %s)
            ON CONFLICT DO NOTHING
        ;"""


        db_psycopg.query_update(query, (user_id,
                                        operation['name'],
                                        operation['price'],
                                        operation['date']))
Если надо обновлять данные при конфликте на новые, то вместо NOTHING поставить UPDATE.

Еще может это выглядеть так:

def update_price(self, figi, buy, sell):
    """ Сохраняет последнюю цену покупки и продажи акции """
    query = 'UPDATE portfolio SET price_last_buy = %s, price_last_sell = %s WHERE figi = %s'
    response = self.db.query_update(query, (buy, sell, figi), 'Цена обновлена')

Используем тот же апдейт метод, но на удаление:

def clear_portfolio_table(self, name):
    # Удаляет исчезнувшую акцию из портфеля
    for i in name:
        query = """
            DELETE FROM portfolio WHERE name = %s;
        """

        response = self.db.query_update(query, (i,), 'Акция ' + i + ' удалена из портфеля')

Пример на SQLAlchemy

Добавить новую операцию на SQLAlchemy:

def __save_operations(operations, user_id):
    for operation in operations:
        exist = db.session.query(Operation.id).filter_by(id=operation['id']).first()

        if not exist:
            new = Operation(
                user_id=user_id,
                id=operation['id'],
                price=operation['price'],
                date=operation['date']
            )
            db.session.add(new)
   db.session.commit()

psycopg2 без класса

Код на обновление данных в БД

def update_portfolio(self, portfolio):
    try:
        with self.connect.cursor() as cursor:
            self.connect.autocommit = True

            for share in portfolio:
                query = """
                    INSERT INTO portfolio (figi, name, average_price, currency, lots, expected_yield)
                    VALUES (%s, %s, %s, %s, %s, %s)
                    ON CONFLICT (figi)
                    DO UPDATE SET
                    (average_price, lots, expected_yield, updated_at)
                    = (EXCLUDED.average_price, EXCLUDED.lots, EXCLUDED.expected_yield, EXCLUDED.updated_at)
                ;"""


                cursor.execute(query, (share.figi,
                                       share.name,
                                       share.average_position_price.value,
                                       share.average_position_price.currency,
                                       share.lots,
                                       share.expected_yield.value))

            return 'Последнее портфолио было сохранено в таблице portfolio'

    except (Exception, psycopg2.Error) as error:
        return 'Failed inserting record into portfolio table from Api {}'.format(error)

Код на вставку данных в БД через psycopg2

def add_shares(self, dic):
    with self.connect.cursor() as cursor:
        self.connect.autocommit = True

        for item in dic:
            values = [(
                item['id_tinkoff'],
                item['figi'],
                item['name'],
                item['currency'],
                item['price'],
                item['quantity'],
                item['payment'],
                item['commission'],
                item['operation_type'],
                item['date']
            )]

            query = sql.SQL(
                'INSERT INTO shares (id_tinkoff, figi, name, currency, price, quantity, payment, commission, operation_type, date) '
                'VALUES {} ON CONFLICT DO NOTHING').format(sql.SQL(',').join(map(sql.Literal, values)))
            cursor.execute(query)

Пример инициализации psycopg2 в одном файле

import psycopg2
from psycopg2 import sql

class PostgreSQL:

    def __init__(self):
        self.connect = psycopg2.connect(
            dbname='',
            user='',
            password='',
            host=''
        )

    def some_function(self, dic):
        with self.connect.cursor() as cursor:
            self.connect.autocommit = True

# Тут далее код запроса в БД
Метки:

Кстати, на сайте нет рекламы. У сайта нет цели самоокупаться, но если вам пригодилась информация можете задонатить мне на чашечку кофе в макдаке. Лайкнуть страницу или просто поблагодарить. Карма вам зачтется.

Добавить комментарий

Напишите свой комментарий, если вам есть что добавить/поправить/спросить по теме текущей статьи:
"Пример запросов на psycopg2"