OK in this case, you need to look more closely, though there is a warning here that likely should become an exception, and I'll look into that. Here's a working version of your example:
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
Base= declarative_base()
tagging = Table('tagging',Base.metadata,
Column('tag_id', Integer, ForeignKey('tag.id', ondelete='cascade'), primary_key=True),
Column('role_id', Integer, ForeignKey('role.id', ondelete='cascade'), primary_key=True)
)
class Tag(Base):
__tablename__ = 'tag'
id = Column(Integer, primary_key=True)
name = Column(String(100), unique=True, nullable=False)
def __init__(self, name=None):
self.name = name
class Role(Base):
__tablename__ = 'role'
id = Column(Integer, primary_key=True)
tag_names = association_proxy('tags', 'name')
tags = relationship('Tag',
secondary=tagging,
cascade='all,delete-orphan',
backref=backref('roles', cascade='all'))
e = create_engine("sqlite://", echo=True)
Base.metadata.create_all(e)
s = Session(e)
r1 = Role()
r1.tag_names.extend(["t1", "t2", "t3"])
s.add(r1)
s.commit()
Now let's run:
... creates tables
/Users/classic/dev/sqlalchemy/lib/sqlalchemy/orm/properties.py:918: SAWarning: On Role.tags, delete-orphan cascade is not supported on a many-to-many or many-to-one relationship when single_parent is not set. Set single_parent=True on the relationship().
self._determine_direction()
Traceback (most recent call last):
... stacktrace ...
File "/Users/classic/dev/sqlalchemy/lib/sqlalchemy/orm/attributes.py", line 349, in hasparent
assert self.trackparent, "This AttributeImpl is not configured to track parents."
AssertionError: This AttributeImpl is not configured to track parents.
So here's the important part: SAWarning: On Role.tags, delete-orphan cascade is not supported on a many-to-many or many-to-one relationship when single_parent is not set. Set single_parent=True on the relationship().
So the error is fixed, if you say this:
tags = relationship('Tag',
secondary=tagging,
cascade='all,delete-orphan',
single_parent=True,
backref=backref('roles', cascade='all'))
But, you may observe, that this is not really what you want:
r1 = Role()
r2 = Role()
t1, t2 = Tag("t1"), Tag("t2")
r1.tags.extend([t1, t2])
r2.tags.append(t1)
output:
sqlalchemy.exc.InvalidRequestError: Instance <Tag at 0x101503a10> is already associated with an instance of <class '__main__.Role'> via its Role.tags attribute, and is only allowed a single parent.
That's your "single parent" - the "delete-orphan" feature only works with what's called a lifecycle relationship, where the child exists entirely within the scope of it's single parent. So there's virtually no point in using a many-to-many with "orphan", and it's only supported because some folks really, really wanted to get this behavior with an association table regardless (legacy DB stuff, perhaps).
Heres the doc for that:
delete-orphan cascade implies that each child object can only have one
parent at a time, so is configured in the vast majority of cases on a
one-to-many relationship. Setting it on a many-to-one or many-to-many
relationship is more awkward; for this use case, SQLAlchemy requires
that the relationship() be configured with the single_parent=True
function, which establishes Python-side validation that ensures the
object is associated with only one parent at a time.
What is implied when you say, "I want it to clean out the orphans" ? It would mean here, that if you were to say r1.tags.remove(t1)
, then you said "flush". SQLAlchemy would see, "r1.tags, t1 has been removed, and if it's an orphan we need to delete ! OK, so let's go out to "tagging" and then scan the whole table for any entries that remain. " To do this naively for each tag at a time would clearly be really inefficient - if you affected a few hundred tag collections in a session there'd be a few hundred of these potentially enormous queries. To do so less than naively would be a pretty complicated feature add, as the unit of work tends to think in terms of one collection at a time - and it would still add palpable query overhead that people might not really want. The unit of work does what it does really well, but it tries to stay out of the business of unusual edge cases that add lots of complexity and surprises. In reality, the "delete-orphan" system only comes into play when an object B is detached from an object A in memory - there's no scanning the database or anything like that, it's much simpler than that - and the flush process has to keep things as simple as possible.
So what you're doing here with "delete orphans" is on the right track, but let's stick it into an event and also use a more efficient query, and delete everything we don't need in one go:
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import event
Base= declarative_base()
tagging = Table('tagging',Base.metadata,
Column('tag_id', Integer, ForeignKey('tag.id', ondelete='cascade'), primary_key=True),
Column('role_id', Integer, ForeignKey('role.id', ondelete='cascade'), primary_key=True)
)
class Tag(Base):
__tablename__ = 'tag'
id = Column(Integer, primary_key=True)
name = Column(String(100), unique=True, nullable=False)
def __init__(self, name=None):
self.name = name
class Role(Base):
__tablename__ = 'role'
id = Column(Integer, primary_key=True)
tag_names = association_proxy('tags', 'name')
tags = relationship('Tag',
secondary=tagging,
backref='roles')
@event.listens_for(Session, 'after_flush')
def delete_tag_orphans(session, ctx):
session.query(Tag).
filter(~Tag.roles.any()).
delete(synchronize_session=False)
e = create_engine("sqlite://", echo=True)
Base.metadata.create_all(e)
s = Session(e)
r1 = Role()
r2 = Role()
r3 = Role()
t1, t2, t3, t4 = Tag("t1"), Tag("t2"), Tag("t3"), Tag("t4")
r1.tags.extend([t1, t2])
r2.tags.extend([t2, t3])
r3.tags.extend([t4])
s.add_all([r1, r2, r3])
assert s.query(Tag).count() == 4
r2.tags.remove(t2)
assert s.query(Tag).count() == 4
r1.tags.remove(t2)
assert s.query(Tag).count() == 3
r1.tags.remove(t1)
assert s.query(Tag).count() == 2
now with each flush we get this query at the end:
DELETE FROM tag WHERE NOT (EXISTS (SELECT 1
FROM tagging, role
WHERE tag.id = tagging.tag_id AND role.id = tagging.role_id))
So we don't need to pull objects into memory in order to delete them, when we can delete on a simple SQL criterion (relying on pulling rows into memory when the database can do an operation more efficiently has been called row by agonizing row programming). The "NOT EXISTS" works very well when searching for the absence of a related row too, compared to the OUTER JOIN which tends to be more expensive in the planner.