Note: I'm migrating from gonzalo123.com to here. When I finish I'll swap the DNS to here. The "official" blog will be always gonzalo123.com

      Database utils for psycopg2

      I normally need to perform raw queries when I’m working with Python. Even when I’m using Django and it’s ORM I need to execute sql against the database. As I use (almost always) PostgreSQL and I use (as we all do) psycopg2. Psycopg2 is very complete and easy to use but sometimes I need a few helpers to make my usual tasks easier. I don’t want to create a complex library on top of psycopg2, only a helper.

      Table of Contents

      1. Connection
      2. Fetch all
      3. Fetch one
      4. Select
      5. Insert
      6. Update
      7. Delete
      8. Upsert
      9. Stored procedures
      10. Transactions
      11. Return values
      12. Unit tests
      13. Install

      Something that I miss from PHP and DBAL library is the way that DBAL helps me to create simple CRUD operations. The idea of this helper functions is to do something similar. Let’s start

      I’ve created procedural functions to do all and also a Db class. Normally all functions needs the cursor parameter.

      def fetch_all(cursor, sql, params=None):
          cursor.execute(
              query=sql,
              vars={} if params is None else params)
       
          return cursor.fetchall()
      

      The Db class accepts cursor in the constructor

      db = Db(cursor=cursor)
      data = db.fetch_all("SELECT * FROM table")
      

      Connection

      Nothing especial in the connection. I like to use “connection_factory=NamedTupleConnection” to allow me to access to the recordset as an Object. I’ve created simple factory helper to create connections:

      def get_conn(dsn, named_tuple=False, autocommit=False):
          conn = psycopg2.connect(
              dsn=dsn,
              connection_factory=NamedTupleConnection if named_tuple else None,
          )
          conn.autocommit = autocommit
       
          return conn
      

      Sometimes I use NamedTuple in the cursor instead connection, so I’ve created a simple helper

      def get_cursor(conn, named_tuple=True):
          return conn.cursor(cursor_factory=NamedTupleCursor if named_tuple else None)
      

      Fetch all

      db = Db(cursor)
      for reg in db.fetch_all(sql="SELECT email, name from users"):
          assert 'user1' == reg.name
          assert 'user1@email.com' == reg.email
      

      Fetch one

      data == db.fetch_one(sql=SQL)
      

      Select

      data = db.select(
              table='users',
              where={'email': 'user1@email.com'})
      

      This helper only allows me to perform simple where statements (joined with AND). If I need one complex Where, then I use fetch

      Insert

      Insert one row

      db.insert(
              table='users',
              values={'email': 'user1@email.com', 'name': 'user1'})
      

      Or insert multiple rows (using spycopg2’s executemany)

      db.insert(
              table='users',
              values=[
                  {'email': 'user2@email.com', 'name': 'user2'},
                  {'email': 'user3@email.com', 'name': 'user3'}
              ])
      

      We also can use insert_batch to insert all rows within one command

      db.insert_batch(
              table='users',
              values=[
                  {'email': 'user2@email.com', 'name': 'user2'},
                  {'email': 'user3@email.com', 'name': 'user3'}
              ])
      

      Update

      db.update(
              table='users',
              data={'name': 'xxxx'},
              identifier={'email': 'user1@email.com'},
          )
      

      Delete

      db.delete(table='users', where={'email': 'user1@email.com'})
      

      Upsert

      Sometimes we need to insert one row or update the row if the primary key already exists. We can do that with two statements: One select and if there isn’t any result one insert. Else on update. We can do that with one sql statement. I’ve created a helper to to that:

      def _get_upsert_sql(data, identifier, table):
          raw_sql = """
              WITH
                  upsert AS (
                      UPDATE {tbl}
                      SET {t_set}
                      WHERE {t_where}
                      RETURNING {tbl}.*),
                  inserted AS (
                      INSERT INTO {tbl} ({t_fields})
                      SELECT {t_select_fields}
                      WHERE NOT EXISTS (SELECT 1 FROM upsert)
                      RETURNING *)
              SELECT * FROM upsert
              UNION ALL
              SELECT * FROM inserted
          """
          merger_data = {**data, **identifier}
          sql = psycopg2_sql.SQL(raw_sql).format(
              tbl=psycopg2_sql.Identifier(table),
              t_set=psycopg2_sql.SQL(', ').join(_get_conditions(data)),
              t_where=psycopg2_sql.SQL(' and ').join(_get_conditions(identifier)),
              t_fields=psycopg2_sql.SQL(', ').join(map(psycopg2_sql.Identifier, merger_data.keys())),
              t_select_fields=psycopg2_sql.SQL(', ').join(map(psycopg2_sql.Placeholder, merger_data.keys())))
       
          return merger_data, sql
       
       
      def upsert(cursor, table, data, identifier):
          merger_data, sql = _get_upsert_sql(data, identifier, table)
       
          cursor.execute(sql, merger_data)
          return cursor.rowcount
      

      Now we can execute a upsert in a simple way:

      db.upsert(
              table='users',
              data={'name': 'yyyy'},
              identifier={'email': 'user1@email.com'})
      

      Stored procedures

      data = db.sp_fetch_one(
          function='hello',
          params={'name': 'Gonzalo'})
      
      data = db.sp_fetch_all(
          function='hello',
          params={'name': 'Gonzalo'})
      

      Transactions

      In PHP and DBAL to create one transaction we can use this syntax

      <?php
      $conn->transactional(function($conn) {
          // do stuff
      });
      

      In Python we can do the same with context management, but I prefer to use this syntax:

      with transactional(conn) as db:
          assert 1 == db.insert(
              table='users',
              values={'email': 'user1@email.com', 'name': 'user1'})
      

      The transactional function is like that (I’ve created two functions. One with raw cursor and another one with my Db class):

      def _get_transactional(conn, named_tuple, callback):
          try:
              with conn as connection:
                  with get_cursor(conn=connection, named_tuple=named_tuple) as cursor:
                      yield callback(cursor)
                  conn.commit()
          except Exception as e:
              conn.rollback()
              raise e
       
      @contextmanager
      def transactional(conn, named_tuple=False):
          return _get_transactional(conn, named_tuple, lambda cursor: Db(cursor))
       
       
      @contextmanager
      def transactional_cursor(conn, named_tuple=False):
          return _get_transactional(conn, named_tuple, lambda cursor: cursor)
      

      Return values

      Select and fetch helpers returns the recordset, but insert, update, delete and upsert returns the number of affected rows. For example that’s the insert function:

      def _get_insert_sql(table, values):
          keys = values[0].keys() if type(values) is list else values.keys()
       
          raw_sql = "insert into {tbl} ({t_fields}) values ({t_values})"
          return psycopg2_sql.SQL(raw_sql).format(
              tbl=psycopg2_sql.Identifier(table),
              t_fields=psycopg2_sql.SQL(', ').join(map(psycopg2_sql.Identifier, keys)),
              t_values=psycopg2_sql.SQL(', ').join(map(psycopg2_sql.Placeholder, keys)))
       
       
      def insert(cursor, table, values):
          sql = _get_insert_sql(table=table, values=values)
          cursor.executemany(query=sql, vars=values) if type(values) is list else cursor.execute(sql, values)
       
          return cursor.rowcount
      

      Unit tests

      You can try the library running the unit tests. I’ve also provide one docker-compose.yml file to set up a PostgreSQL database to run the tests.

      version: '3.6'
       
      services:
        pg:
          build:
            context: .docker/pg
            dockerfile: Dockerfile
          ports:
            - 5432:5432
          environment:
            POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
            POSTGRES_USER: ${POSTGRES_USER}
            POSTGRES_DB: ${POSTGRES_DB}
            PGDATA: /var/lib/postgresql/data/pgdata
      

      Install

      You can install the library using pip

      pip install dbutils-gonzalo123
      

      Full code in my github

      comments powered by Disqus