Пример запросов на psycopg2

Admin Python

Пример реализации класса на psycopg2 для PostgreSQL. А также несколько примеров без классов.

Это очень старые мои заметки, но я решил их опубликовать, возможно когда-нибудь понадобятся.

Чистые запросы через psycopg2 быстрее чем через SqlAlchemy (надстройку над psycopg2). И там где нужна скорость логично применять чистые psycopg2.

Дальше есть 2 варианта – каждый раз писать полный синтаксис или сделать под свои нужны класс для быстрого использования psycopg2. Ниже пример такого класса. В каждом проекте он может выглядеть по-своему.

Пример класса на psycopg2

import psycopg2
from psycopg2.extras import NamedTupleCursor


class DbPsycopg:
    def __init__(self):
        self.conn = psycopg2.connect(
            # Для localhost достаточно dbname
            dbname='ploshadka',

            # пример удаленного соединения
            # user='ploshadka',
            # password='ploshadka.net',
            # host='name_of_bd.cixfs.us-east-2.rds.amazonaws.com'
        )
        self.conn.autocommit = True

    def fetch_one(self, query, arg=None, factory=None, clean=None):
        """ Получает только одно ЕДИНСТВЕННОЕ значение (не ряд!) из таблицы
        :param query: Запрос
        :param arg: Переменные
        :param factory: dic (возвращает словарь - ключ/значение) или list (возвращает list)
        :param clean: С параметром вернет только значение. Без параметра вернет значение в кортеже.
        """

        try:
            cur = self.__connection(factory)
            self.__execute(cur, query, arg)
            return self.__fetch(cur, clean)

        except (Exception, psycopg2.Error) as error:
            self.__error(error)

    def fetch_all(self, query, arg=None, factory=None):
        """ Получает множетсвенные данные из таблицы
        :param query: Запрос
        :param arg: Переменные
        :param factory: dic (возвращает словарь - ключ/значение) или list (возвращает list)
        """

        try:
            cur = self.__connection(factory)
            self.__execute(cur, query, arg)
            return cur.fetchall()

        except (Exception, psycopg2.Error) as error:
            self.__error(error)

    def query_update(self, query, arg, message=None):
        """ Обновляет данные в таблице и возвращает сообщение об успешной операции """
        try:
            cur = self.conn.cursor()
            cur.execute(query, arg)
            return message

        except (Exception, psycopg2.Error) as error:
            self.__error(error)

    def close(self):
        cur = self.conn.cursor()
        cur.close()
        self.conn.close()

    def __connection(self, factory=None):
        # Dic - возвращает словарь - ключ/значение
        if factory == 'dic':
            cur = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

        # List - возвращает list (хотя и называется DictCursor)
        elif factory == 'list':
            cur = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)

        # Tuple
        else:
            cur = self.conn.cursor()

        return cur

    @staticmethod
    def __execute(cur, query, arg=None):
        # Метод 'execute' всегда возвращает None
        if arg:
            cur.execute(query, arg)
        else:
            cur.execute(query)

    @staticmethod
    def __fetch(cur, clean):
        # Если запрос был выполнен успешно, получим данные с помощью 'fetchone'
        if clean == 'no':
            # Вернет:
            #   Название:
            #       ('Royal Caribbean Cruises',)
            #   Дата:
            #       (datetime.datetime(2020, 6, 2, 13, 36, 35, 61052, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=0, name=None)),)
            fetch = cur.fetchone()
        else:
            # Вернет:
            #   Название:
            #       Royal Caribbean Cruises
            #   Дата:
            #       (da2020-06-02 13:36:35.061052+00:00
            fetch = cur.fetchone()[0]
        return fetch

    @staticmethod
    def __error(error):
        # В том числе, если в БД данных нет, будет ошибка на этапе fetchone
        print('Данных в бд нет или ошибка: {}'.format(error))
        return None

Примеры использования класса psycopg2

Класс инициализируем в главном файле __init__.py:

# Для прямых запросов (хардкор)
db_psycopg = DbPsycopg()

Затем там где будем использовать класс импортируем:

from app import db_psycopg

fetch_one

Функция:

def __get_time_from_last_insert_row():
    """ Получить время с момента последней записи в таблицу """
    query = 'SELECT date FROM operations ORDER BY date DESC limit 1'
    return db_psycopg.fetch_one(query)

Получит:

2020-10-29 21:31:15.934801+03:00

Тот же запрос на SQLAlchemy:

db.session.query(Operation.date).order_by(db.desc(Operation.date)).limit(1).scalar()

query_update

Пример обновления данных:

def __save_operations(operations, user_id):
    """ Add operation to DB """
    for operation in operations:
        query = """
            INSERT INTO operations (user_id, name, price, date)
            VALUES (%s, %s, %s, %s)
            ON CONFLICT DO NOTHING
        ;"""


        db_psycopg.query_update(query, (user_id,
                                        operation['name'],
                                        operation['price'],
                                        operation['date']))
Если надо обновлять данные при конфликте на новые, то вместо NOTHING поставить UPDATE.

Еще может это выглядеть так:

def update_price(self, figi, buy, sell):
    """ Сохраняет последнюю цену покупки и продажи акции """
    query = 'UPDATE portfolio SET price_last_buy = %s, price_last_sell = %s WHERE figi = %s'
    response = self.db.query_update(query, (buy, sell, figi), 'Цена обновлена')

Используем тот же апдейт метод, но на удаление:

def clear_portfolio_table(self, name):
    # Удаляет исчезнувшую акцию из портфеля
    for i in name:
        query = """
            DELETE FROM portfolio WHERE name = %s;
        """

        response = self.db.query_update(query, (i,), 'Акция ' + i + ' удалена из портфеля')

Пример на SQLAlchemy

Добавить новую операцию на SQLAlchemy:

def __save_operations(operations, user_id):
    for operation in operations:
        exist = db.session.query(Operation.id).filter_by(id=operation['id']).first()

        if not exist:
            new = Operation(
                user_id=user_id,
                id=operation['id'],
                price=operation['price'],
                date=operation['date']
            )
            db.session.add(new)
   db.session.commit()

psycopg2 без класса

Код на обновление данных в БД

def update_portfolio(self, portfolio):
    try:
        with self.connect.cursor() as cursor:
            self.connect.autocommit = True

            for share in portfolio:
                query = """
                    INSERT INTO portfolio (figi, name, average_price, currency, lots, expected_yield)
                    VALUES (%s, %s, %s, %s, %s, %s)
                    ON CONFLICT (figi)
                    DO UPDATE SET
                    (average_price, lots, expected_yield, updated_at)
                    = (EXCLUDED.average_price, EXCLUDED.lots, EXCLUDED.expected_yield, EXCLUDED.updated_at)
                ;"""


                cursor.execute(query, (share.figi,
                                       share.name,
                                       share.average_position_price.value,
                                       share.average_position_price.currency,
                                       share.lots,
                                       share.expected_yield.value))

            return 'Последнее портфолио было сохранено в таблице portfolio'

    except (Exception, psycopg2.Error) as error:
        return 'Failed inserting record into portfolio table from Api {}'.format(error)

Код на вставку данных в БД через psycopg2

def add_shares(self, dic):
    with self.connect.cursor() as cursor:
        self.connect.autocommit = True

        for item in dic:
            values = [(
                item['id_tinkoff'],
                item['figi'],
                item['name'],
                item['currency'],
                item['price'],
                item['quantity'],
                item['payment'],
                item['commission'],
                item['operation_type'],
                item['date']
            )]

            query = sql.SQL(
                'INSERT INTO shares (id_tinkoff, figi, name, currency, price, quantity, payment, commission, operation_type, date) '
                'VALUES {} ON CONFLICT DO NOTHING').format(sql.SQL(',').join(map(sql.Literal, values)))
            cursor.execute(query)

Пример инициализации psycopg2 в одном файле

import psycopg2
from psycopg2 import sql

class PostgreSQL:

    def __init__(self):
        self.connect = psycopg2.connect(
            dbname='',
            user='',
            password='',
            host=''
        )

    def some_function(self, dic):
        with self.connect.cursor() as cursor:
            self.connect.autocommit = True

# Тут далее код запроса в БД
Метки:

У сайта нет цели самоокупаться, поэтому на сайте нет рекламы. Но если вам пригодилась информация, можете лайкнуть страницу, оставить комментарий или отправить мне подарок на чашечку кофе.

Добавить комментарий

Напишите свой комментарий, если вам есть что добавить/поправить/спросить по теме текущей статьи:
"Пример запросов на psycopg2"