Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
424 views
in Technique[技术] by (71.8m points)

python - SQLAlchemy, array_agg, and matching an input list

I am attempting to use SQLAlchemy more fully, rather than just falling back to pure SQL at the first sign of distress. In this case, I have a table in a Postgres database (9.5) which stores a set of integers as a group by associating individual items atom_id with a group identifier group_id.

Given a list of atom_ids, I'd like to be able to figure out which group_id, if any, that set of atom_ids belong to. Solving this with just the group_id and atom_id columns was straightforward.

Now I'm trying to generalize such that a 'group' is made up of not just a list of atom_ids, but other context as well. In the example below, the list is ordered by including a sequence column, but conceptually other columns could be used instead, such as a weight column which gives each atom_id a [0,1] floating point value representing that atom's 'share' of the group.

Below is most of a unit test demonstrating my issue.

First, some setup:

def test_multi_column_grouping(self):
    class MultiColumnGroups(base.Base):
        __tablename__ = 'multi_groups'

        group_id = Column(Integer)
        atom_id = Column(Integer)
        sequence = Column(Integer)  # arbitrary 'other' column.  In this case, an integer, but it could be a float (e.g. weighting factor)

    base.Base.metadata.create_all(self.engine)

    # Insert 6 rows representing 2 different 'groups' of values
    vals = [
        # Group 1
        {'group_id': 1, 'atom_id': 1, 'sequence': 1},
        {'group_id': 1, 'atom_id': 2, 'sequence': 2},
        {'group_id': 1, 'atom_id': 3, 'sequence': 3},
        # Group 2
        {'group_id': 2, 'atom_id': 1, 'sequence': 3},
        {'group_id': 2, 'atom_id': 2, 'sequence': 2},
        {'group_id': 2, 'atom_id': 3, 'sequence': 1},
    ]

    self.session.bulk_save_objects(
        [MultiColumnGroups(**x) for x in vals])
    self.session.flush()

    self.assertEqual(6, len(self.session.query(MultiColumnGroups).all()))

Now, I want to query the above table to find which group a specific set of inputs belongs to. I'm using a list of (named) tuples to represent the query parameters.

    from collections import namedtuple
    Entity = namedtuple('Entity', ['atom_id', 'sequence'])
    values_to_match = [
        # (atom_id, sequence)
        Entity(1, 3),
        Entity(2, 2),
        Entity(3, 1),
        ]
    # The above list _should_ match with `group_id == 2`

Raw SQL solution. I'd prefer not to fall back on this, as a part of this exercise is to learn more SQLAlchemy.

    r = self.session.execute('''
        select group_id
        from multi_groups
        group by group_id
        having array_agg((atom_id, sequence)) = :query_tuples
        ''', {'query_tuples': values_to_match}).fetchone()
    print(r)  # > (2,)
    self.assertEqual(2, r[0])

Here is the above raw-SQL solution converted fairly directly into a broken SQLAlchemy query. Running this produces a psycopg2 error: (psycopg2.ProgrammingError) operator does not exist: record[] = integer[]. I believe that I need to cast the array_agg into an int[]? That would work so long as the grouping columns are all integers (which, if need be, is an acceptable limitation), but ideally this would work with mixed-type input tuples / table columns.

    from sqlalchemy import tuple_
    from sqlalchemy.dialects.postgresql import array_agg

    existing_group = self.session.query(MultiColumnGroups).
        with_entities(MultiColumnGroups.group_id).
        group_by(MultiColumnGroups.group_id).
        having(array_agg(tuple_(MultiColumnGroups.atom_id, MultiColumnGroups.sequence)) == values_to_match).
        one_or_none()

    self.assertIsNotNone(existing_group)
    print('|{}|'.format(existing_group))

Is the above session.query() close? Have I blinded myself here, and am missing something super obvious that would solve this problem in some other way?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I think your solution would produce indeterminate results, because the rows within a group are in unspecified order, and so the comparison between the array aggregate and given array may produce true or false based on that:

[local]:5432 u@sopython*=> select group_id
[local] u@sopython- > from multi_groups 
[local] u@sopython- > group by group_id
[local] u@sopython- > having array_agg((atom_id, sequence)) = ARRAY[(1,3),(2,2),(3,1)];
 group_id 
----------
        2
(1 row)

[local]:5432 u@sopython*=> update multi_groups set atom_id = atom_id where atom_id = 2;
UPDATE 2
[local]:5432 u@sopython*=> select group_id                                             
from multi_groups 
group by group_id
having array_agg((atom_id, sequence)) = ARRAY[(1,3),(2,2),(3,1)];
 group_id 
----------
(0 rows)

You could apply an ordering to both, or try something entirely different: instead of array comparison you could use relational division.

In order to divide you have to form a temporary relation from your list of Entity records. Again, there are many ways to approach that. Here's one using unnested arrays:

In [112]: vtm = select([
     ...:     func.unnest(postgresql.array([
     ...:         getattr(e, f) for e in values_to_match
     ...:     ])).label(f)
     ...:     for f in Entity._fields
     ...: ]).alias()

And another using a union:

In [114]: vtm = union_all(*[
     ...:     select([literal(e.atom_id).label('atom_id'),
     ...:             literal(e.sequence).label('sequence')])
     ...:     for e in values_to_match
     ...: ]).alias()

A temporary table would do as well.

With the new relation at hand you want to find the answer to "find those multi_groups for which no entity exists that is not in the group". It's a horrible sentence, but makes sense:

In [117]: mg = aliased(MultiColumnGroups)

In [119]: session.query(MultiColumnGroups.group_id).
     ...:     filter(~exists().
     ...:         select_from(vtm).
     ...:         where(~exists().
     ...:             where(MultiColumnGroups.group_id == mg.group_id).
     ...:             where(tuple_(vtm.c.atom_id, vtm.c.sequence) ==
     ...:                   tuple_(mg.atom_id, mg.sequence)).
     ...:             correlate_except(mg))).
     ...:     distinct().
     ...:     all()
     ...: 
Out[119]: [(2)]

On the other hand you could also just select the intersection of groups with the given entities:

In [19]: gs = intersect(*[
    ...:     session.query(MultiColumnGroups.group_id).
    ...:         filter(MultiColumnGroups.atom_id == vtm.atom_id,
    ...:                MultiColumnGroups.sequence == vtm.sequence)
    ...:     for vtm in values_to_match
    ...: ])

In [20]: session.execute(gs).fetchall()
Out[20]: [(2,)]

The error

ProgrammingError: (psycopg2.ProgrammingError) operator does not exist: record[] = integer[]
LINE 3: ...gg((multi_groups.atom_id, multi_groups.sequence)) = ARRAY[AR...
                                                             ^
HINT:  No operator matches the given name and argument type(s). You might need to add explicit type casts.
 [SQL: 'SELECT multi_groups.group_id AS multi_groups_group_id 
FROM multi_groups GROUP BY multi_groups.group_id 
HAVING array_agg((multi_groups.atom_id, multi_groups.sequence)) = %(array_agg_1)s'] [parameters: {'array_agg_1': [[1, 3], [2, 2], [3, 1]]}] (Background on this error at: http://sqlalche.me/e/f405)

is a result of how your values_to_match is first converted to a list of lists (for reasons unknown) and then converted to an array by your DB-API driver. It results in an array of array of integer, not an array of record (int, int). Using a raw DB-API connection and cursor, passing a list of tuples works as you'd expect.

In SQLAlchemy if you wrap the list values_to_match with sqlalchemy.dialects.postgresql.array(), it works as you meant it to work, though remember that the results are indeterminate.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...