Getting to Know a Database Continued: a Showcase of Some Custom Functions

hacking skills
Author

zenggyu

Published

2018-08-22

Abstract
A follow-up post on getting to know a database.

Introduction

A previous post has introduced the information schema, which provides various information about a database1. Based on that knowledge, I created three custom functions written in PL/pgSQL to simplify the retrieval process. This post is intended to showcase these functions, whose source code is presented in the appendix.

1 There are other sources where similar information can be retrieved (e.g., system catalogs). However, the information schema is a SQL standard and is therefore more portable.

All the functions are created and have been tested in PostgreSQL v10.5. However, it should be noted that the functions are intended for interactive use only and they may not cover every use case.

Some example objects

Before introducing the usage of the functions, we need to define some objects to be used as examples. Here I created two tables (t1 and t2) and two functions (f1 and f2):

create table t1 (c1 int,
                 c2 text,
                 c3 varchar,
                 c4 date not null,
                 c5 decimal,
                 unique(c1, c2),
                 primary key (c3),
                 check (c5 > 0));

create table t2 (c1 int,
                 c2 text,
                 c3 varchar,
                 c4 date,
                 c5 decimal,
                 foreign key (c1, c2) references t1 (c1, c2),
                 primary key (c3, c4),
                 check (c5 > 0 and c5 > c1::decimal));

create function f1 (x1 int, x2 int) returns int as $body$
declare
  output int;
begin
  output := x1 + x2;
  return output;
end;
$body$ language plpgsql;

create function f2 () returns void as $body$
begin
  raise notice 'Hello world!';
end;
$body$ language plpgsql;

skim_tbl_col()

The skim_tbl_col() function takes a table name and a schema name (the schema where the table exists; defaults to the current schema) as inputs, and returns the name and data type of each column in the table.

For example:

select skim_tbl_col('t1') as col;

-- The result is:
/*
col
------------------------------
c1 integer
c2 text
c3 character varying
c4 date
c5 numeric
*/

skim_tbl_con()

The skim_tbl_con() function takes the same inputs as the skim_tbl_col() function. However, instead of returning column names and types, it returns the constraints of the specified table.

For example:

select skim_tbl_con('t1') as con;

-- The result is:
/*
con
------------------------------
UNIQUE (c1, c2)
PRIMARY KEY (c3)
CHECK ((c5 > (0)::numeric))
CHECK c3 IS NOT NULL
CHECK c4 IS NOT NULL
*/

Notice that NOT NULL constraints are stored as CHECK constraints with clause IS NOT NULL in the information schema.

Another example:

select skim_tbl_con('t2') as con;

-- The result is:
/*
con
------------------------------
PRIMARY KEY (c3, c4)
FOREIGN KEY (c1, c2) REFERNCES (public.t1.c1, public.t1.c2)
CHECK (((c5 > (0)::numeric) AND (c5 > (c1)::numeric)))
CHECK c3 IS NOT NULL
CHECK c4 IS NOT NULL
*/

Note that the identifier public in the foreign key definition is the name of the schema where table t1 exists.

skim_fun()

The skim_fun() function takes a function name and a schema name (the schema where the function exists; defaults to the current schema) as inputs, and returns the source code of the function.

For example:

select skim_fun('f1') as src;

--The result is:
/*
src
------------------------------
CREATE OR REPLACE FUNCTION f1 (IN x1 integer, IN x2 integer) returns integer as 
$body$

declare
  output int;
begin
  output := x1 + x2;
  return output;
end;

$body$
language PLPGSQL;
*/

Another example:

select skim_fun('f2') as src;

-- The result is:
/*
src
------------------------------
CREATE OR REPLACE FUNCTION f2 () returns void as 
$body$

begin
  raise notice 'Hello world!';
end;

$body$
language PLPGSQL;
*/

Note that the information schema does not seem to provide information about whether a function returns a set (which is specified with the SETOF keyword in the function definition) or a scalar. Therefore, skim_fun() cannot distinguish the two kinds of functions.

Appendix

Source code of the skim_tbl_col() function:

create or replace function skim_tbl_col (v_tbl varchar,
                                         v_sch varchar default current_schema,
                                         out col text)
returns setof text as
$body$
/*
Author: zenggyu
Description:
  - This function can be used to retrieve the columns and the associated data type of a table.
Input:
  - v_tbl: Name of the table whose columns are to be skimmed.
  - v_sch: Name of the schema where the table exists.
Output:
  - col: Column name and data type of the specified table, each on a separate row.
Known issues: -
 */
begin
  return query select column_name || ' ' || data_type
               from information_schema.columns
               where table_name = v_tbl and table_schema = v_sch
               order by ordinal_position;
end;
$body$
language plpgsql;

Source code of the skim_tbl_con() function:

create or replace function skim_tbl_con(v_tbl varchar,
                                        v_sch varchar default current_schema,
                                        out con text)
returns setof text as
$body$
/*
Author: zenggyu
Description:
  - This function retrieves the constraints of a table.
Input:
  - v_tbl: Name of the table whose constraints are to be retrieved.
  - v_sch: Name of the schema where the table exists.
Output:
  - con: table constraints.
Known issues: -
 */
declare
  v_con record;
begin

  <<s00>> -- Check input.
  declare
    v_n int;
  begin
    v_n := (select count(1)
            from information_schema.tables
            where table_name = v_tbl and table_schema = v_sch);
    if v_n = 0 then
      raise exception 'Cannot locate table ''%'' in schema ''%'';
                       please make sure you specify the right name and have required permissions.',
                       v_tbl, v_sch;
    end if;
  end;

  <<s01>> -- Obtain the list of constraints.
  begin
    create temp table t_con_list on commit drop as
    select constraint_type as type,
           constraint_name as name,
           constraint_schema as schema,
           row_number() over (partition by 1 order by constraint_type desc) as row
    from information_schema.table_constraints
    where table_name = v_tbl and
          table_schema = v_sch;
    if (select count(1) from t_con_list) = 0 then
      con := NULL;
      return;
    end if;
  end;

  for i in 1..(select count(1) from t_con_list) loop
    select type, name, schema from t_con_list where row = i into v_con;
    con := v_con.type;

    <<s02>> -- Build up the definition of each constraint.
    declare
      v_col_list text array;
      v_expr text default '';
    begin
      if v_con.type in ('PRIMARY KEY', 'UNIQUE', 'FOREIGN KEY') then
        v_col_list := array(select column_name::text
                             from information_schema.key_column_usage
                             where table_name = v_tbl and
                                   table_schema = v_sch and
                                   constraint_name = v_con.name and
                                   constraint_schema = v_con.schema
                             order by ordinal_position);
        for j in array_lower(v_col_list, 1)..array_upper(v_col_list, 1) loop
          if j = array_lower(v_col_list, 1) then
            v_expr := v_col_list[j];
          else
            v_expr := v_expr || ', ' || v_col_list[j];
          end if;
        end loop;
        v_expr := ' (' || v_expr || ')';
        con := con || v_expr;
        if v_con.type = 'FOREIGN KEY' then
          v_col_list := array(select table_schema||'.'||table_name||'.'||column_name
                             from (select unique_constraint_schema, unique_constraint_name
                                   from information_schema.referential_constraints
                                   where constraint_name = v_con.name and
                                         constraint_schema = v_con.schema) as t1
                                    left join information_schema.key_column_usage as t2
                                      on t2.constraint_name = t1.unique_constraint_name and
                                         t2.constraint_schema = t1.unique_constraint_schema
                             order by ordinal_position);
          for j in array_lower(v_col_list, 1)..array_upper(v_col_list, 1) loop
            if j = array_lower(v_col_list, 1) then
              v_expr := v_col_list[j];
            else
              v_expr := v_expr || ', ' || v_col_list[j];
            end if;
          end loop;
          con := con || ' REFERNCES ' || '(' || v_expr || ')';
        end if;
      elsif v_con.type = 'CHECK' then
        v_expr := (select check_clause::text
                   from information_schema.check_constraints
                   where constraint_name = v_con.name and
                         constraint_schema = v_con.schema);
        con := con || ' ' || v_expr;
      else
        con := 'UNKNOWN CONSTRAINT';
      end if;
    end;

    <<s99>> -- Return the result.
    begin
      return next;
    end;

  end loop;

end;
$body$
language plpgsql;

Source code of the skim_fun() function:

create or replace function skim_fun (v_fun varchar,
                                     v_sch varchar default current_schema,
                                     out src text)
returns setof text as
$body$
/*
Author: zenggyu
Description:
  - This function can be used to retrieve the source code of other functions.
Input:
  - v_fun: Name of the function whose source code is to be retrieved.
  - v_sch: Name of the schema where the function exists.
Output:
  - src: The source code of the specified function. There can be more than one result due to function overload.
Known issues:
  - Cannot determine whether a function returns a set or not.
 */
declare
  v_name varchar array;
  v_head text default '';
  v_body text default '';
  v_tail text default '';
begin
  v_name := array(select specific_name::varchar
                  from information_schema.routines
                  where routine_name = v_fun and routine_schema = v_sch);

  <<s00>>
  -- Step00: perform input checking.
  begin
    if array_length(v_name, 1) is null then
      raise exception 'Cannot locate function ''%'' in schema ''%'';
                       please make sure you enter the right name and have required permissions.',
                       v_fun, v_sch;
    end if;
  end;

  for i in array_lower(v_name, 1)..array_upper(v_name, 1) loop
  -- Multiple functions can share the same name due to function overload.
  -- For each function, perform the following steps.

    <<s01>>
    -- Step01: retrieve the head of the function definition.
    declare
      v_col_def text array;
      v_type text;
    begin
      v_head := 'CREATE OR REPLACE FUNCTION '||v_fun||' (';
      v_col_def := array(select parameter_mode||' '||parameter_name||' '||data_type as col_def
                         from information_schema.parameters
                         where specific_name = v_name[i]
                         order by ordinal_position);
      if array_length(v_col_def, 1) is not null then
        for j in array_lower(v_col_def, 1)..array_upper(v_col_def, 1) loop
          if j = array_upper(v_col_def, 1) then
            v_head := v_head || v_col_def[j];
          else
            v_head := v_head || v_col_def[j] || ', ';
          end if;
        end loop;
      end if;
      v_head := v_head || ')';
      v_type := (select data_type
                 from information_schema.routines
                 where specific_name = v_name[i]);
      v_head := v_head || ' returns ' || v_type || ' as ';

      <<s02>>
      -- Step02: retrieve the body of the function definition.
      begin
        v_body := (select routine_definition
                   from information_schema.routines
                   where specific_name = v_name[i]);
        if v_body is null  then
          v_body := '/*The current user does not have read access to the body of this function.*/';
        end if;
        v_body := E'\$body\$' || E'\n' || v_body || E'\n' || E'\$body\$';
      end;

      <<s03>>
      -- Step03: retrieve the tail of the function definition.
      begin
        v_tail := 'language ' ||
                  (select external_language
                   from information_schema.routines
                   where specific_name = v_name[i]) ||
                  ';';
      end;
    end;

    <<s99>>
    -- Step99: concatenate various parts to form the final result.
    begin
      src := v_head || E'\n' || v_body || E'\n' || v_tail;
    end;

    return next;
  end loop;
end;
$body$
language plpgsql;

Additional notes

When I first started writing the aforementioned functions, I wasn’t aware of some useful builtin functions of PostgreSQL that can simplify the code. For example, instead of using the syntax string1 || ', ' || string2 || ', ' || string3 to concatenate the strings with a delimiter, one can also use the concat_ws() function, which has the syntax concat(', ', string1, string2, string3).

Another useful builtin function that can simplify the code is the array_agg() function. It is an aggregate function that can concatenate a column of strings into one single string, each separated by a delimiter. This function avoids the need of using a for loop, as is shown in the code above.

Conversion between string, set and array is a common task in programing with PL/pgSQL. The following builtin functions provide such functionalities and can make things easier.

  • array_to_string()
  • string_to_array()
  • unnest()
  • array_agg()
  • string_agg()
  • regexp_split_to_array()
  • regexp_split_to_table()