oslo_db.sqlalchemy 包

子包

子模块

oslo_db.sqlalchemy.asyncio_facade 模块

oslo_db.sqlalchemy.asyncio_facade.async_transaction_context_provider(klass)

sessionconnection 属性装饰一个类。

oslo_db.sqlalchemy.asyncio_facade.configure(**kw)

将配置选项应用于全局工厂。

此方法只能在任何特定的事务开始方法被调用之前调用。

参见

_TransactionFactory.configure()

oslo_db.sqlalchemy.asyncio_facade.reader = <oslo_db.sqlalchemy.asyncio_facade._AsyncTransactionContextManager 对象>

全局 ‘reader’ 起始点。

oslo_db.sqlalchemy.asyncio_facade.transaction_context()

构造一个本地事务上下文。

oslo_db.sqlalchemy.asyncio_facade.writer = <oslo_db.sqlalchemy.asyncio_facade._AsyncTransactionContextManager 对象>

全局 ‘writer’ 起始点。

oslo_db.sqlalchemy.enginefacade 模块

异常 oslo_db.sqlalchemy.enginefacade.AlreadyStartedError

基类: TypeError

当工厂被要求第二次初始化时引发。

为了兼容性,继承 TypeError

oslo_db.sqlalchemy.enginefacade.LegacyEngineFacade(sql_connection, slave_connection=None, sqlite_fk=False, expire_on_commit=False, _conf=None, _factory=None, **kwargs)

基类: object

一个辅助类,用于从 oslo.db 中移除全局引擎实例。

自 1.12.0 版本弃用: 请使用 oslo_db.sqlalchemy.enginefacade 进行新开发。

作为一个库,oslo.db 无法决定在哪里存储/何时创建引擎和 sessionmaker 实例,因此这必须留给目标应用程序来决定。

另一方面,为了简化 oslo.db 更改的采用,我们将提供一个辅助类,该类在其实例化时创建引擎和 sessionmaker,并提供与目标项目中当前存在的相应实用程序函数兼容的 get_engine()/get_session() 方法,例如在 Nova 中。

引擎/sessionmaker 实例仍然是全局的(并且应该全局的),但它们将存储在应用程序上下文中,而不是在 oslo.db 上下文中。

需要记住的两点

  1. 引擎实例实际上是数据库连接池,因此应该共享(并且是线程安全的)。

  2. 会话实例不应该共享,代表数据库事务上下文(即不是线程安全的)。sessionmaker 是会话的工厂。

参数:
  • sql_connection (字符串) – 要使用的数据库的连接字符串

  • slave_connection (字符串) – 要使用的 ‘slave’ 数据库的连接字符串。如果未提供,则 ‘master’ 数据库将用于所有操作。注意:这旨在用于将读取操作卸载到异步复制的 slave 以减少 master 数据库上的负载。

  • sqlite_fk (布尔值) – 启用 SQLite 中的外键

  • expire_on_commit (布尔值) – 在提交时使会话对象失效

关键字参数

参数:
  • mysql_sql_mode – 要用于 MySQL 会话的 SQL 模式。(默认为 TRADITIONAL)

  • mysql_wsrep_sync_wait – Galera 的 wsrep_sync_wait 的值(默认为 None,表示不会传递任何设置)

  • connection_recycle_time – 连接在结账时回收的时间段(默认为 3600)

  • connection_debug – SQL 调试信息的详细程度。-1=关闭,0=无,100=全部(默认为 0)

  • max_pool_size – 保持在池中打开的 SQL 连接的最大数量(默认为 SQLAlchemy 设置)

  • max_overflow – 如果设置,则使用 sqlalchemy 的 max_overflow 的此值(默认为 SQLAlchemy 设置)

  • pool_timeout – 如果设置,则使用 sqlalchemy 的 pool_timeout 的此值(默认为 SQLAlchemy 设置)

  • sqlite_synchronous – 如果为 True,SQLite 使用同步模式(默认为 True)

  • connection_trace – 将 python 堆栈跟踪作为注释字符串添加到 SQL(默认为 False)

  • max_retries – 启动期间数据库连接重试的最大次数。(设置 -1 表示无限重试次数)(默认为 10)

  • retry_interval – 重新打开 sql 连接的间隔(默认为 10)

  • thread_checkin – 一个布尔值,指示在每次引擎结账事件之间是否会发生 sleep(0) 以允许其他绿线程运行(默认为 True)

类方法 from_config(conf, sqlite_fk=False, expire_on_commit=False)

使用 oslo.config 配置实例选项初始化 EngineFacade。

参数:
  • conf (oslo_config.cfg.ConfigOpts) – oslo.config 配置实例

  • sqlite_fk (布尔值) – 启用 SQLite 中的外键

  • expire_on_commit (布尔值) – 在提交时使会话对象失效

get_engine(use_slave=False)

获取引擎实例(注意,它是共享的)。

参数:

use_slave (布尔值) – 如果可能,为此引擎使用 ‘slave’ 数据库。如果未提供 slave 数据库的连接字符串,则将返回 ‘master’ 引擎。(默认为 False)

get_session(use_slave=False, **kwargs)

获取一个 Session 实例。

参数:

use_slave (布尔值) – 如果可能,为这个会话使用 ‘slave’ 数据库连接。如果未提供 slave 数据库的连接字符串,则将返回绑定到 ‘master’ 引擎的会话。(默认为 False)

关键字参数将按原样传递给 sessionmaker 实例(如果传递,它们将覆盖在创建 sessionmaker 实例时使用的参数)。有关详细信息,请参阅 SQLAlchemy Session 文档。

get_sessionmaker(use_slave=False)

获取用于创建 Session 的 sessionmaker 实例。

当需要临时将 sessionmaker() 注入一些状态(例如特定连接)时,可以调用此方法。

oslo_db.sqlalchemy.enginefacade.configure(**kw)

将配置选项应用于全局工厂。

此方法只能在任何特定的事务开始方法被调用之前调用。

参见

_TransactionFactory.configure()

oslo_db.sqlalchemy.enginefacade.get_legacy_facade()

返回一个 LegacyEngineFacade,用于全局工厂。

此 facade 将使用与此工厂相同的引擎和 sessionmaker,但是不会共享相同的事务上下文;legacy facade 继续以旧方式工作,即每次调用 get_session() 时返回一个新的 Session。

oslo_db.sqlalchemy.enginefacade.reader = <oslo_db.sqlalchemy.enginefacade._TransactionContextManager 对象>

全局 ‘reader’ 起始点。

oslo_db.sqlalchemy.enginefacade.transaction_context()

构造一个本地事务上下文。

oslo_db.sqlalchemy.enginefacade.transaction_context_provider(klass)

sessionconnection 属性装饰一个类。

oslo_db.sqlalchemy.enginefacade.writer = <oslo_db.sqlalchemy.enginefacade._TransactionContextManager 对象>

全局 ‘writer’ 起始点。

oslo_db.sqlalchemy.engines 模块

核心 SQLAlchemy 连接例程。

oslo_db.sqlalchemy.engines.create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, mysql_wsrep_sync_wait=None, connection_recycle_time=3600, connection_debug=0, max_pool_size=None, max_overflow=None, pool_timeout=None, sqlite_synchronous=True, connection_trace=False, max_retries=10, retry_interval=10, thread_checkin=True, logging_name=None, json_serializer=None, json_deserializer=None, connection_parameters=None, _engine_target=<function _create_engine>, _test_connection=<function _test_connection>)

返回一个新的 SQLAlchemy 引擎。

oslo_db.sqlalchemy.exc_filters 模块

定义 SQLAlchemy DBAPI 异常的异常重定义。

oslo_db.sqlalchemy.exc_filters.filters(dbname, exception_type, regex)

将一个函数标记为接收过滤后的异常。

参数:
  • dbname – 字符串数据库名称,例如 ‘mysql’

  • exception_type – 一个 SQLAlchemy 数据库异常类,它扩展自 sqlalchemy.exc.DBAPIError

  • regex – 一个字符串,或一个字符串元组,它将被处理为匹配正则表达式。

oslo_db.sqlalchemy.exc_filters.handler(context)

遍历可用的过滤器并调用匹配的过滤器。

第一个引发异常的过滤器获胜。尝试过滤器的顺序按特异性排序 - 方言名称或 “*”,异常类按方法解析顺序 (__mro__)。使用方法解析顺序,以便指示更具体的异常类的过滤器规则首先尝试。

oslo_db.sqlalchemy.exc_filters.register_engine(engine)

oslo_db.sqlalchemy.models 模块

SQLAlchemy 模型。

class oslo_db.sqlalchemy.models.ModelBase

基类: object

模型的基类。

get(key, default=None)
items()

使模型对象表现得像一个字典。

iteritems()

使模型对象表现得像一个字典。

keys()

使模型对象表现得像一个字典。

save(session)

保存此对象。

update(values)

使模型对象表现得像一个字典。

class oslo_db.sqlalchemy.models.ModelIterator(model, columns)

基类: object

class oslo_db.sqlalchemy.models.SoftDeleteMixin

基类: object

deleted = Column(None, SoftDeleteInteger(), table=None, default=ScalarElementColumnDefault(0))
deleted_at = Column(None, DateTime(), table=None)
soft_delete(session)

将此对象标记为已删除。

class oslo_db.sqlalchemy.models.TimestampMixin

基类: object

created_at = Column(None, DateTime(), table=None, default=CallableColumnDefault(<function TimestampMixin.<lambda>>))
updated_at = Column(None, DateTime(), table=None, onupdate=CallableColumnDefault(<function TimestampMixin.<lambda>>))

oslo_db.sqlalchemy.orm 模块

SQLAlchemy ORM 连接性和查询结构。

class oslo_db.sqlalchemy.orm.Query(entities: _ColumnsClauseArgument[Any] | Sequence[_ColumnsClauseArgument[Any]], session: Session | None = None)

Bases: Query

带有 soft_delete() 方法的 sqlalchemy.query 子类。

soft_delete(synchronize_session='evaluate')
update_on_match(specimen, surrogate_key, values, **kw)

发出匹配给定样本的 UPDATE 语句。

这是 oslo_db.sqlalchemy.update_match.update_on_match() 的方法版本;有关用法详情,请参阅该函数。

update_returning_pk(values, surrogate_key)

执行 UPDATE,返回匹配行的主键。

这是 oslo_db.sqlalchemy.update_match.update_returning_pk() 的方法版本;有关用法详情,请参阅该函数。

class oslo_db.sqlalchemy.orm.Session(bind: _SessionBind | None = None, *, autoflush: bool = True, future: Literal[True] = True, expire_on_commit: bool = True, autobegin: bool = True, twophase: bool = False, binds: Dict[_SessionBindKey, _SessionBind] | None = None, enable_baked_queries: bool = True, info: _InfoType | None = None, query_cls: Type[Query[Any]] | None = None, autocommit: Literal[False] = False, join_transaction_mode: JoinTransactionMode = 'conditional_savepoint', close_resets_only: bool | _NoArg = _NoArg.NO_ARG)

Bases: Session

oslo.db 特定的 Session 子类。

oslo_db.sqlalchemy.orm.get_maker(engine, autocommit=False, expire_on_commit=False)

使用给定的引擎返回 SQLAlchemy sessionmaker。

oslo_db.sqlalchemy.provision 模块

为特定 DB 后端配置测试环境

class oslo_db.sqlalchemy.provision.Backend(database_type, url)

基类: object

表示可以配置的特定数据库后端。

Backend 对象维护一个数据库类型(例如,没有特定驱动程序类型的数据库,例如“sqlite”、“postgresql”等)、一个目标 URL、一个可用于配置数据库的该 URL 对象的基 Engine 以及一个知道如何对这种类型的 Engine 执行操作的 BackendImpl

classmethod all_viable_backends()

返回一个迭代器,其中包含所有存在的

并且可以配置的 Backend 对象。

classmethod backend_for_database_type(database_type)

返回给定数据库类型的 Backend

backends_by_database_type = {'mysql': <oslo_db.sqlalchemy.provision.Backend object>, 'postgresql': <oslo_db.sqlalchemy.provision.Backend object>, 'sqlite': <oslo_db.sqlalchemy.provision.Backend object>}
create_named_database(ident, conditional=False)

创建具有给定名称的数据库。

database_exists(ident)

如果存在具有给定名称的数据库,则返回 True。

drop_all_objects(engine)

删除所有数据库对象。

删除给定引擎的默认模式上剩余的所有数据库对象。

drop_named_database(ident, conditional=False)

删除具有给定名称的数据库。

provisioned_database_url(ident)

给定匿名数据库的标识符,返回一个 URL。

对于基于主机名的 URL,这通常涉及仅将 URL 的“database”部分替换为给定的名称并创建一个 URL。

对于 SQLite URL,标识符可用于创建文件名,或者在内存数据库的情况下,可以忽略该标识符。

class oslo_db.sqlalchemy.provision.BackendImpl(drivername)

基类: object

提供关键配置的数据库特定实现

功能。

BackendImpl 由一个 Backend 实例拥有,该实例将其委托给它以处理所有数据库特定功能。

classmethod all_impls()

返回所有可能的 BackendImpl 对象的迭代器。

这些是已实现的 BackendImpl,但不一定可配置。

abstract create_named_database(engine, ident, conditional=False)

创建具有给定名称的数据库。

abstract create_opportunistic_driver_url()

生成一个字符串 URL,称为“opportunistic” URL。

此 URL 对应于 OpenStack 建立的约定,用于预先建立的数据库登录,当在本地环境中检测到可用时,将自动用作特定类型驱动程序的测试平台。

default_engine_kwargs = {}
dispose(engine)
drop_additional_objects(conn)
drop_all_objects(engine)

删除所有数据库对象。

删除给定引擎的默认模式上剩余的所有数据库对象。

每个数据库的实现还需要删除特定于这些系统的项目,例如序列、自定义类型(例如 pg ENUM)等。

abstract drop_named_database(engine, ident, conditional=False)

删除具有给定名称的数据库。

impl = <oslo_db.sqlalchemy.utils.DialectSingleFunctionDispatcher object>
provisioned_database_url(base_url, ident)

返回配置的数据库 URL。

给定特定数据库后端的 URL 和该后端中特定“database”的字符串名称,返回直接引用该命名数据库的 URL。

对于基于主机名的 URL,这通常涉及仅将 URL 的“database”部分替换为给定的名称并创建引擎。

对于处理 DSN 的 URL,规则可能更自定义;例如,引擎可能需要连接到根 URL,然后发出切换到命名数据库的命令。

supports_drop_fk = True
class oslo_db.sqlalchemy.provision.BackendResource(database_type, ad_hoc_url=None)

基类: TestResourceManager

clean(resource)

覆盖此类方法以挂接到资源删除。

isDirty()

如果此管理器的缓存资源已损坏,则返回 True。

在资源当前未保留时调用具有未定义行为。

make(dependency_resources)

覆盖此方法以构造资源。

参数:

dependency_resources – 映射名称 -> 资源的字典,用于指定的依赖项资源。

返回值:

创建的资源。

class oslo_db.sqlalchemy.provision.DatabaseResource(database_type, _enginefacade=None, provision_new_database=True, ad_hoc_url=None)

基类: TestResourceManager

数据库资源,它连接和断开到 URL。

对于 SQLite,这意味着数据库隐式创建,这是 SQLite 通常行为的结果。如果数据库是基于文件的 URL,则在资源被拆解后将保留。

对于所有其他类型的数据库,资源指示连接和断开到该数据库。

clean(resource)

覆盖此类方法以挂接到资源删除。

isDirty()

如果此管理器的缓存资源已损坏,则返回 True。

在资源当前未保留时调用具有未定义行为。

make(dependency_resources)

覆盖此方法以构造资源。

参数:

dependency_resources – 映射名称 -> 资源的字典,用于指定的依赖项资源。

返回值:

创建的资源。

class oslo_db.sqlalchemy.provision.ProvisionedDatabase(backend, enginefacade, engine, db_token)

基类: object

表示指向准备好运行测试的 DB 的数据库引擎。

backend: Backend 的一个实例

enginefacade: _TransactionFactory 的一个实例

engine: 一个 SQLAlchemy Engine

db_token: 如果使用了 provision_new_database,这是随机

生成的数据库名称。请注意,对于 SQLite 内存连接,此令牌将被忽略。对于未实际创建的数据库,将为 None。

backend
db_token
engine
enginefacade
class oslo_db.sqlalchemy.provision.Schema

基类: object

“表示已填充或将要填充的数据库模式。

这是 testresources 所需的标记对象,否则不充当任何目的。

database
class oslo_db.sqlalchemy.provision.SchemaResource(database_resource, generate_schema, teardown=False)

基类: TestResourceManager

clean(resource)

覆盖此类方法以挂接到资源删除。

isDirty()

如果此管理器的缓存资源已损坏,则返回 True。

在资源当前未保留时调用具有未定义行为。

make(dependency_resources)

覆盖此方法以构造资源。

参数:

dependency_resources – 映射名称 -> 资源的字典,用于指定的依赖项资源。

返回值:

创建的资源。

oslo_db.sqlalchemy.session 模块

SQLAlchemy 后端的会话处理。

在此框架内使用会话的推荐方法

  • 使用 enginefacade 系统进行连接、会话和事务管理

    from oslo_db.sqlalchemy import enginefacade
    
    @enginefacade.reader
    def get_foo(context, foo):
        return (model_query(models.Foo, context.session).
                filter_by(foo=foo).
                first())
    
    @enginefacade.writer
    def update_foo(context, id, newfoo):
        (model_query(models.Foo, context.session).
                filter_by(id=id).
                update({'foo': newfoo}))
    
    @enginefacade.writer
    def create_foo(context, values):
        foo_ref = models.Foo()
        foo_ref.update(values)
        foo_ref.save(context.session)
        return foo_ref
    

    在上述系统中,事务会自动提交,并在所有依赖数据库方法之间共享。确保将“写入”数据的的方法封装在 @writer 块中。

    注意

    会话范围内的语句不会自动重试。

  • 如果在会话中创建模型,则需要添加它们,但不需要调用 model.save()

    @enginefacade.writer
    def create_many_foo(context, foos):
        for foo in foos:
            foo_ref = models.Foo()
            foo_ref.update(foo)
            context.session.add(foo_ref)
    
    @enginefacade.writer
    def update_bar(context, foo_id, newbar):
        foo_ref = (model_query(models.Foo, context.session).
                    filter_by(id=foo_id).
                    first())
        (model_query(models.Bar, context.session).
                    filter_by(id=foo_ref['bar_id']).
                    update({'bar': newbar}))
    

    update_bar 中的两个查询可以 alternatively 使用单个查询来表示,这取决于场景,可能会更有效。

    @enginefacade.writer
    def update_bar(context, foo_id, newbar):
        subq = (model_query(models.Foo.id, context.session).
                filter_by(id=foo_id).
                limit(1).
                subquery())
        (model_query(models.Bar, context.session).
                filter_by(id=subq.as_scalar()).
                update({'bar': newbar}))
    

    作为参考,这将发出近似以下 SQL 语句

    UPDATE bar SET bar = '${newbar}'
        WHERE id=(SELECT bar_id FROM foo WHERE id = '${foo_id}' LIMIT 1);
    

    注意

    create_duplicate_foo 是在上下文管理器管理的单个会话中捕获异常的简单示例。这里我们创建两个具有相同主键的重复实例,必须在上下文之外捕获异常

    @enginefacade.writer
    def create_duplicate_foo(context):
        foo1 = models.Foo()
        foo2 = models.Foo()
        foo1.id = foo2.id = 1
        try:
            with context.session.begin_nested():
                session.add(foo1)
                session.add(foo2)
        except exception.DBDuplicateEntry as e:
            handle_error(e)
    
  • enginefacade 系统消除了在方法之间传递会话的必要性。所有方法都应共享一个公共上下文对象;enginefacade 系统将维护跨方法调用的事务。

    @enginefacade.writer
    def myfunc(context, foo):
        # do some database things
        bar = _private_func(context, foo)
        return bar
    
    def _private_func(context, foo):
        with enginefacade.using_writer(context) as session:
            # do some other database things
            session.add(SomeObject())
        return bar
    
  • 避免在可能的情况下使用 with_lockmode('UPDATE')

    FOR UPDATE 与 MySQL/Galera 不兼容。相反,应使用“机会主义”方法,即如果 UPDATE 失败,则应重试整个事务。@wrap_db_retry 装饰器就是一个可以用来实现这一点的系统。

启用软删除

  • 要使用/启用软删除,可以使用 SoftDeleteMixin。例如

    class NovaBase(models.SoftDeleteMixin, models.ModelBase):
        pass
    

高效使用软删除

  • 虽然有一个 model.soft_delete() 方法,但更推荐使用 query.soft_delete()。一些例子

    @enginefacade.writer
    def soft_delete_bar(context):
        # synchronize_session=False will prevent the ORM from attempting
        # to search the Session for instances matching the DELETE;
        # this is typically not necessary for small operations.
        count = model_query(BarModel, context.session).\
            find(some_condition).soft_delete(synchronize_session=False)
        if count == 0:
            raise Exception("0 entries were soft deleted")
    
    @enginefacade.writer
    def complex_soft_delete_with_synchronization_bar(context):
        # use synchronize_session='evaluate' when you'd like to attempt
        # to update the state of the Session to match that of the DELETE.
        # This is potentially helpful if the operation is complex and
        # continues to work with instances that were loaded, though
        # not usually needed.
        count = (model_query(BarModel, context.session).
                    find(some_condition).
                    soft_delete(synchronize_session='evaulate'))
        if count == 0:
            raise Exception("0 entries were soft deleted")
    
oslo_db.sqlalchemy.session.EngineFacade

LegacyEngineFacade 的别名

class oslo_db.sqlalchemy.session.Query(entities: _ColumnsClauseArgument[Any] | Sequence[_ColumnsClauseArgument[Any]], session: Session | None = None)

Bases: Query

带有 soft_delete() 方法的 sqlalchemy.query 子类。

soft_delete(synchronize_session='evaluate')
update_on_match(specimen, surrogate_key, values, **kw)

发出匹配给定样本的 UPDATE 语句。

这是 oslo_db.sqlalchemy.update_match.update_on_match() 的方法版本;有关用法详情,请参阅该函数。

update_returning_pk(values, surrogate_key)

执行 UPDATE,返回匹配行的主键。

这是 oslo_db.sqlalchemy.update_match.update_returning_pk() 的方法版本;有关用法详情,请参阅该函数。

class oslo_db.sqlalchemy.session.Session(bind: _SessionBind | None = None, *, autoflush: bool = True, future: Literal[True] = True, expire_on_commit: bool = True, autobegin: bool = True, twophase: bool = False, binds: Dict[_SessionBindKey, _SessionBind] | None = None, enable_baked_queries: bool = True, info: _InfoType | None = None, query_cls: Type[Query[Any]] | None = None, autocommit: Literal[False] = False, join_transaction_mode: JoinTransactionMode = 'conditional_savepoint', close_resets_only: bool | _NoArg = _NoArg.NO_ARG)

Bases: Session

oslo.db 特定的 Session 子类。

oslo_db.sqlalchemy.session.create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, mysql_wsrep_sync_wait=None, connection_recycle_time=3600, connection_debug=0, max_pool_size=None, max_overflow=None, pool_timeout=None, sqlite_synchronous=True, connection_trace=False, max_retries=10, retry_interval=10, thread_checkin=True, logging_name=None, json_serializer=None, json_deserializer=None, connection_parameters=None, _engine_target=<function _create_engine>, _test_connection=<function _test_connection>)

返回一个新的 SQLAlchemy 引擎。

oslo_db.sqlalchemy.session.get_maker(engine, autocommit=False, expire_on_commit=False)

使用给定的引擎返回 SQLAlchemy sessionmaker。

oslo_db.sqlalchemy.test_base 模块

oslo_db.sqlalchemy.test_base.backend_specific(*dialects)

装饰器,用于在不适当的引擎上跳过后端特定测试。

::dialects: 测试将启动的方言名称列表。

oslo_db.sqlalchemy.test_fixtures 模块

class oslo_db.sqlalchemy.test_fixtures.AdHocDbFixture(url=None)

继承自 SimpleDbFixture

一个fixture,它会为每个测试创建一个并释放数据库引擎。

还允许传递一个特定的 URL,这意味着该 fixture 可以硬编码到特定的 SQLite 文件。

对于 SQLite,此 fixture 会在设置时创建命名的数据库并在拆卸时将其删除。对于其他数据库,假定数据库已经存在,并且会在拆卸后保留。

class oslo_db.sqlalchemy.test_fixtures.BaseDbFixture(driver=None, ident=None)

基础: Fixture

基础数据库配置 fixture。

这作为其他 fixture 的基类,但本身不实现 _setUp()。它为各种能力 mixin(GenerateSchema、DeletesFromSchema 等)实现的标志提供基础,并提供对特定于 testresources 的配置对象的抽象。总体而言,该 fixture 的使用者只需要使用正确的类,就可以处理 testresources 机制。

DRIVER = 'sqlite'
get_enginefacade()

返回一个 enginefacade._TransactionContextManager。

这通常是 db/api.py 模块中声明的全局变量,例如“context_manager”,并且是 enginefacade.transaction_context() 返回的对象。

如果未实现,则使用全局 enginefacade 管理器。

对于像 Gnocchi 这样使用每个对象或每个测试 enginefacade 的项目,还应实现 get_per_test_enginefacade() 方法。

get_per_test_enginefacade()

返回每个测试的 enginefacade._TransactionContextManager。

此 facade 应该是测试期望代码使用的 facade。通常,这与 get_engineafacade() 返回的 facade 相同,即默认值。对于像 Gnocchi 这样的特殊应用程序,可以覆盖此方法以提供实例级别的 facade。

class oslo_db.sqlalchemy.test_fixtures.DeletesFromSchema

继承自 ResetsData

定义一个可以就地从所有表中删除数据的fixture。

当 DeletesFromSchema 存在于一个fixture中时,_DROP_SCHEMA_PER_TEST 现在为 False;这意味着 provision.SchemaResource 的“teardown”标志将为 False,这将阻止 SchemaResource 在每个测试后删除模式内的所有对象。

这是一个“能力”mixin,与包含 BaseDbFixture 作为基础的类一起工作。

delete_from_schema(engine)

一个钩子,应该删除现有模式中的所有数据。

不应该删除任何对象,只需删除需要在测试之间重置的表中的数据。

reset_schema_data(engine, facade)

重置模式中的数据。

class oslo_db.sqlalchemy.test_fixtures.GeneratesSchema

基类: object

定义一个使用 create_all() 生成模式的fixture的mixin。

这是一个“能力”mixin,与包含 BaseDbFixture 作为基础的类一起工作。

generate_schema_create_all(engine)

一个钩子,应该使用 create_all() 生成模型模式。

此钩子在创建数据库的范围内调用,假设 BUILD_WITH_MIGRATIONS 为 False。

class oslo_db.sqlalchemy.test_fixtures.GeneratesSchemaFromMigrations

Bases: GeneratesSchema

定义一个使用迁移生成模式的fixture的mixin。

这是一个“能力”mixin,与包含 BaseDbFixture 作为基础的类一起工作。

generate_schema_migrations(engine)

一个钩子,应该使用迁移生成模型模式。

此钩子在创建数据库的范围内调用,假设 BUILD_WITH_MIGRATIONS 为 True。

class oslo_db.sqlalchemy.test_fixtures.MySQLOpportunisticFixture(test, driver=None, ident=None)

Bases: OpportunisticDbFixture

DRIVER = 'mysql'
class oslo_db.sqlalchemy.test_fixtures.OpportunisticDBTestMixin

基类: object

测试mixin,将测试套件与testresources集成。

这个系统的目标有三个

  1. 允许创建“stub”测试套件,这些套件将针对特定类型的数据库(例如,Mysql、Postgresql)运行父套件中的所有测试,如果该目标类型的数据库对套件不可用,则跳过整个套件。

  2. 为测试提供一个进程本地的、匿名命名的模式,位于目标数据库中,以便测试可以与其他测试并发运行而不会发生数据冲突

  3. 与testresources.OptimisingTestSuite兼容,该TestSuite会提前将TestCase实例组织成组,所有这些组都使用相同类型的数据库,并在任何数量的测试范围内一次性设置和拆除数据库模式。当针对非SQLite数据库进行测试时,此技术至关重要,因为构建模式的成本很高,并且最理想的情况是使用应用程序的模式迁移,而模式迁移比直接的create_all()更慢。

此mixin提供testresources在使用OptimisingTestSuite时所需的.resources属性。 .resources属性然后提供一个testresources.TestResourceManager对象集合,这些对象在这里oslo_db.sqlalchemy.provision中定义。这些对象知道如何找到可用的数据库后端,构建临时数据库,并调用模式生成和拆除指令。构建模式对象和可选的“删除所有表”步骤的实际部分由实现应用程序本身提供。

FIXTURE

别名 OpportunisticDbFixture

SKIP_ON_UNAVAILABLE_DB = True
generate_fixtures()
property resources

提供TestResourceManager对象集合。

集合在这里被缓存,在测试用例级别以及提供这些资源的fixture对象中。

setUp()
class oslo_db.sqlalchemy.test_fixtures.OpportunisticDbFixture(test, driver=None, ident=None)

Bases: BaseDbFixture

使用testresources完全优化的运行的fixture。

此fixture依赖于使用OpportunisticDBTestMixin提供test.resources属性,并且与testresources.OptimisingTestSuite结合使用效果更好。optimize_package_test_loader()函数应在模块和包级别使用,以优化跨多个测试的数据库配置。

class oslo_db.sqlalchemy.test_fixtures.PostgresqlOpportunisticFixture(test, driver=None, ident=None)

Bases: OpportunisticDbFixture

DRIVER = 'postgresql'
class oslo_db.sqlalchemy.test_fixtures.ReplaceEngineFacadeFixture(enginefacade, replace_with_enginefacade)

基础: Fixture

一个将一个enginefacade的引擎插入到另一个enginefacade的fixture。

此fixture可用于已经具有自己的非oslo_db数据库设置/拆除方案的测试套件,以将任何URL或测试导向的enginefacade按原样插入到enginefacade导向的API中。

对于使用oslo.db的测试fixture的应用程序,ReplaceEngineFacade fixture在内部使用。

例如:

class MyDBTest(TestCase):

    def setUp(self):
        from myapplication.api import main_enginefacade

        my_test_enginefacade = enginefacade.transaction_context()
        my_test_enginefacade.configure(connection=my_test_url)

        self.useFixture(
            ReplaceEngineFacadeFixture(
                main_enginefacade, my_test_enginefacade))

以上,main_enginefacade对象是正常的应用程序级别对象,而my_test_enginefacade是我们创建的一个指向某些测试数据库的本地对象。在fixture的设置过程中,应用程序级别的enginefacade将使用测试enginefacade的引擎工厂和引擎,并在fixture拆除时替换回。

class oslo_db.sqlalchemy.test_fixtures.ResetsData

基类: object

定义一个重置模式数据而不删除的fixture的mixin。

reset_schema_data(engine, enginefacade)

重置模式中的数据。

setup_for_reset(engine, enginefacade)

“执行测试运行前可能需要的设置。”

class oslo_db.sqlalchemy.test_fixtures.SimpleDbFixture(driver=None, ident=None)

Bases: BaseDbFixture

提供来自固定URL的engine的fixture。

SimpleDbFixture通常仅适用于SQLite内存数据库,因为此数据库自然与其他进程隔离,不需要管理模式。对于需要专门针对MySQL或Postgresql运行测试的测试,OpportunisticDbFixture更合适。

数据库连接信息本身来自配置系统,将所需的驱动程序(通常是sqlite)与配置系统为此驱动程序提供的默认URL匹配(对于sqlite,它是SQLite内存URL,例如sqlite://。对于MySQL和Postgresql,它是熟悉的“openstack_citest”URL在localhost上)。

有多种创建/删除方案可以发生

  • 默认情况下,是在设置时获取数据库连接,并在拆除时发出删除模式中所有对象(例如,表、索引)的指令。 SQLAlchemy engine本身在类级别引用,以便后续重用。

  • 当实现GeneratesSchema或GeneratesSchemaFromMigrations mixin时,在fixture设置时也会调用适当的generate_schema方法,默认情况下,这是每个测试。

  • 当实现DeletesFromSchema mixin时,generate_schema方法现在仅调用一次,并且“删除所有对象”系统被delete_from_schema方法替换。 这允许相同的数据库保持设置状态,所有模式对象保持不变,因此不需要在每个测试上运行昂贵的迁移。

  • fixture在测试结束时会释放engine。 假设相同的engine将在许多测试中多次重用。 AdHocDbFixture扩展了此fixture,以在测试结束时提供engine.dispose()。

此fixture旨在无需引用测试本身即可工作,因此无法利用OptimisingTestSuite。

oslo_db.sqlalchemy.test_fixtures.optimize_module_test_loader()

将模块级别的测试组织到testresources.OptimisingTestSuite中。

此函数为给定的模块提供兼容unittest的load_tests钩子;对于包级别,请使用optimize_package_test_loader()函数。

当使用unittest或subunit风格的测试运行器时,将调用此函数以返回包含要运行的测试的TestSuite;此函数确保此TestSuite是OptimisingTestSuite,它将一次性组织跨多个测试的测试资源配置。

该函数被调用为

from oslo_db.sqlalchemy import test_fixtures

load_tests = test_fixtures.optimize_module_test_loader()

loader必须存在于单个模块中,而不是包级别的__init__.py中。

该函数还应用testscenarios扩展到所有测试集合。 这样,即使替换为此函数,现有的需要从load_tests调用构建TestScenarios的测试套件也可以继续进行。

oslo_db.sqlalchemy.test_fixtures.optimize_package_test_loader(file_)

将包级别的测试组织到testresources.OptimisingTestSuite中。

此函数为给定的包提供兼容unittest的load_tests钩子;对于模块级别,请使用optimize_module_test_loader()函数。

当使用unittest或subunit风格的测试运行器时,将调用此函数以返回包含要运行的测试的TestSuite;此函数确保此TestSuite是OptimisingTestSuite,它将一次性组织跨多个测试的测试资源配置。

该函数被调用为

from oslo_db.sqlalchemy import test_fixtures

load_tests = test_fixtures.optimize_package_test_loader(__file__)

loader必须存在于包级别的__init__.py中。

该函数还应用testscenarios扩展到所有测试集合。 这样,即使替换为此函数,现有的需要从load_tests调用构建TestScenarios的测试套件也可以继续进行。

oslo_db.sqlalchemy.test_migrations 模块

class oslo_db.sqlalchemy.test_migrations.ModelsMigrationsSync

基类: object

用于比较数据库迁移脚本和模型的辅助类。

它旨在被目标项目的测试用例继承。它们需要提供测试内部使用的方法的实现(因为我们无法在此处实现它们)。

test_model_sync() 将为提供的引擎运行迁移脚本,然后将给定的元数据与从数据库反射的元数据进行比较。 MODELS 和 MIGRATION 脚本之间的差异将被打印出来,如果差异不为空,则测试将失败。返回值实际上是一个操作列表,应该执行这些操作以使当前数据库模式状态(即迁移脚本)与模型定义保持一致。开发者需要分析输出并决定是修改模型定义还是迁移脚本以使其保持一致。

输出

[(
    'add_table',
    description of the table from models
),
(
    'remove_table',
    description of the table from database
),
(
    'add_column',
    schema,
    table name,
    column description from models
),
(
    'remove_column',
    schema,
    table name,
    column description from database
),
(
    'add_index',
    description of the index from models
),
(
    'remove_index',
    description of the index from database
),
(
    'add_constraint',
    description of constraint from models
),
(
    'remove_constraint,
    description of constraint from database
),
(
    'modify_nullable',
    schema,
    table name,
    column name,
    {
        'existing_type': type of the column from database,
        'existing_server_default': default value from database
    },
    nullable from database,
    nullable from models
),
(
    'modify_type',
    schema,
    table name,
    column name,
    {
        'existing_nullable': database nullable,
        'existing_server_default': default value from database
    },
    database column type,
    type of the column from models
),
(
    'modify_default',
    schema,
    table name,
    column name,
    {
        'existing_nullable': database nullable,
        'existing_type': type of the column from database
    },
    connection column default value,
    default from models
)]

可以覆盖 include_object() 方法以从比较中排除某些表(例如 migrate_repo)。

compare_server_default(ctxt, ins_col, meta_col, insp_def, meta_def, rendered_meta_def)

比较模型和数据库表之间的默认值。

如果默认值不同,则返回 True;如果相同,则返回 False;或者返回 None 以允许默认实现比较这些默认值。

参数:
  • ctxt – alembic MigrationContext 实例

  • insp_col – 反射的列

  • meta_col – 模型中的列

  • insp_def – 反射的列的默认值

  • meta_def – 模型中的列的默认值

  • rendered_meta_def – 渲染后的列的默认值(来自模型)

compare_type(ctxt, insp_col, meta_col, insp_type, meta_type)

如果类型不同,则返回 True;如果相同,则返回 False。

返回 None 以允许默认实现比较这些类型。

参数:
  • ctxt – alembic MigrationContext 实例

  • insp_col – 反射的列

  • meta_col – 模型中的列

  • insp_type – 反射的列的类型

  • meta_type – 模型中的列的类型

abstract db_sync(engine)

使用给定的引擎实例运行迁移脚本。

此方法必须在子类中实现,并为连接到给定引擎的数据库运行迁移脚本。

filter_metadata_diff(diff)

过滤 test_models_sync() 中的断言之前的更改。

允许子类白名单/黑名单更改。默认情况下,不执行任何过滤,更改将按原样返回。

参数:

diff – 差异列表(有关格式的详细信息,请参阅 compare_metadata() 文档)

返回值:

差异列表

abstract get_engine()

返回在运行测试时要使用的引擎实例。

此方法必须在子类中实现,并返回在运行测试时要使用的引擎实例。

abstract get_metadata()

返回要用于模式比较的元数据实例。

此方法必须在子类中实现,并返回附加到 BASE 模型的元数据实例。

include_object(object_, name, type_, reflected, compare_to)

如果应该比较对象,则返回 True。

参数:
  • object – SchemaItem 对象,例如 Table 或 Column 对象

  • name – 对象的名称

  • type – 描述对象类型的字符串(例如“table”)

  • reflected – 如果给定的对象是基于表反射生成的,则为 True;否则为 False

  • compare_to – 要比较的对象,如果可用,则为 None

test_models_sync()

oslo_db.sqlalchemy.types 模块

class oslo_db.sqlalchemy.types.JsonEncodedDict(mysql_as_long=False, mysql_as_medium=False)

继承自:JsonEncodedType

表示在数据库中序列化为 json 编码字符串的字典。

请注意,此类型不跟踪更改。如果想要更新它,必须将现有值分配给临时变量,更新,然后重新分配。有关更强大的解决方法,请参阅此页面:https://docs.sqlalchemy.org.cn/en/rel_1_0/orm/extensions/mutable.html

cache_ok: bool | None = True

此类型可以安全地缓存。

type

别名 dict

class oslo_db.sqlalchemy.types.JsonEncodedList(mysql_as_long=False, mysql_as_medium=False)

继承自:JsonEncodedType

表示在数据库中序列化为 json 编码字符串的列表。

请注意,此类型不跟踪更改。如果想要更新它,必须将现有值分配给临时变量,更新,然后重新分配。有关更强大的解决方法,请参阅此页面:https://docs.sqlalchemy.org.cn/en/rel_1_0/orm/extensions/mutable.html

cache_ok: bool | None = True

此类型可以安全地缓存。

type

别名 list

class oslo_db.sqlalchemy.types.JsonEncodedType(mysql_as_long=False, mysql_as_medium=False)

继承自:TypeDecorator

作为数据库中 JSON 编码字符串序列化的数据的基本列类型。

cache_ok: bool | None = True

此类型可以安全地缓存。

impl

别名 Text

process_bind_param(value, dialect)

绑定参数到处理。

process_result_value(value, dialect)

处理结果值。

type = None
class oslo_db.sqlalchemy.types.SoftDeleteInteger(*args: Any, **kwargs: Any)

继承自:TypeDecorator

强制绑定参数在传递给 DBAPI 之前成为适当的整数。

像 PostgreSQL 这样的某些后端对于类型非常严格,并且不会执行自动类型转换,例如,当尝试将布尔值(如 false)插入到整数列时。通过 DB 层中的自定义 SQLAlchemy 类型装饰器强制绑定参数可以确保我们始终将适当的整数值传递给 DBAPI 实现。

这并不是一个通用的布尔整数类型,因为它专门允许超出布尔整数范围(0、1、False、True)的任意正整数,以便能够对包括 deleted 在内的多个列进行复合唯一约束(例如,为了在 Nova 中软删除具有相同名称的 flavor,而不会触发约束违规):deleted 被设置为删除时的 PK int 值,0 表示未删除的行。

cache_ok: bool | None = True

此类型可以安全地缓存。

impl

别名 Integer

process_bind_param(value, dialect)

返回绑定参数。

oslo_db.sqlalchemy.update_match 模块

exception oslo_db.sqlalchemy.update_match.CantUpdateException

基础: Exception

exception oslo_db.sqlalchemy.update_match.MultiRowsMatched

继承自:CantUpdateException

exception oslo_db.sqlalchemy.update_match.NoRowsMatched

继承自:CantUpdateException

oslo_db.sqlalchemy.update_match.manufacture_criteria(mapped, values)

给定一个映射器/类和一个值命名空间,生成一个 WHERE 子句。

该类应该是已映射的类,字典中的条目对应于该类上映射的属性名称。

一个值也可以是一个元组,在这种情况下,该特定属性将使用 IN 进行比较。标量值或元组也可以包含 None,它转换为 IS NULL,如果合适,则与 IN 表达式一起使用 OR 正确连接。

参数:
  • cls – 一个已映射的类,或实际的 Mapper 对象。

  • values – 值的字典。

oslo_db.sqlalchemy.update_match.manufacture_entity_criteria(entity, include_only=None, exclude=None)

给定一个已映射的实例,生成一个 WHERE 子句。

设置在实例上的属性将组合起来生成 SQL 表达式,以映射的 SQL 表达式作为比较的基础。

实例上的值可以设置为元组,在这种情况下,标准将生成一个 IN 子句。None 也可以作为标量或元组条目接受,这将生成 IS NULL,如果合适,则与 IN 表达式一起使用 OR 正确连接。

参数:
  • entity – 一个已映射的实体。

  • include_only – 可选的键序列,用于限制包含哪些键。

  • exclude – 要排除的键序列

oslo_db.sqlalchemy.update_match.manufacture_persistent_object(session, specimen, values=None, primary_key=None)

在会话中使 ORM 映射对象持久化,无需 SQL。

返回持久化对象。

如果给定会话中已存在匹配对象,则将样本合并到其中并返回持久化对象。否则,使样本本身持久化并返回它。

该对象必须包含完整的 первичный ключ,或通过 values 或 primary_key 参数提供它。该对象以“干净”状态持久化到会话中,没有待处理的更改。

参数:
  • session – 一个会话对象。

  • specimen – 一个通常是瞬态的已映射对象。

  • values – 要应用于样本的字典值,除了已经存在的状态。将设置属性,以便不创建历史记录;对象保持干净。

  • primary_key – 可选的基于元组的 первичный ключ。如果存在,也将将其应用于实例。

oslo_db.sqlalchemy.update_match.update_on_match(query, specimen, surrogate_key, values=None, attempts=3, include_only=None, process_query=None, handle_failure=None)

发出匹配给定样本的 UPDATE 语句。

例如:

with enginefacade.writer() as session:
    specimen = MyInstance(
        uuid='ccea54f',
        interface_id='ad33fea',
        vm_state='SOME_VM_STATE',
    )

    values = {
        'vm_state': 'SOME_NEW_VM_STATE'
    }

    base_query = model_query(
        context, models.Instance,
        project_only=True, session=session)

    hostname_query = model_query(
            context, models.Instance, session=session,
            read_deleted='no').
        filter(func.lower(models.Instance.hostname) == 'SOMEHOSTNAME')

    surrogate_key = ('uuid', )

    def process_query(query):
        return query.where(~exists(hostname_query))

    def handle_failure(query):
        try:
            instance = base_query.one()
        except NoResultFound:
            raise exception.InstanceNotFound(instance_id=instance_uuid)

        if session.query(hostname_query.exists()).scalar():
            raise exception.InstanceExists(
                name=values['hostname'].lower())

        # try again
        return False

    persistent_instance = base_query.update_on_match(
        specimen,
        surrogate_key,
        values=values,
        process_query=process_query,
        handle_failure=handle_failure
    )

UPDATE 语句是根据给定的样本使用存在的构造 WHERE 子句的值构建的。如果样本包含要忽略的其他值,则可以传递 include_only 参数,该参数指示用于构造 WHERE 的属性序列。

UPDATE 在 ORM 查询上执行,该查询是从给定的 Session 创建的,或者通过传递引用现有查询的 `query 参数来执行。

在调用查询之前,它也通过作为 process_query 发送的可调用对象传递,如果存在。此钩子允许在创建后但在调用之前将其他标准添加到查询。

该函数然后调用 UPDATE 语句并检查“成功”一次或多次,最多达到作为 attempts 传递的数量。

从 UPDATE 语句的“成功”的初始检查是返回的行数匹配 1。如果匹配零行,则假定 UPDATE 语句“失败”,并且开始失败处理阶段。

失败处理阶段包括调用给定的 handle_failure 函数(如果有)。此处理程序可以执行其他查询以尝试弄清楚为什么 UPDATE 没有匹配任何行。处理程序在检测到确切的失败条件时,应引发异常以退出;如果它没有,它可以返回 True 或 False,其中 False 表示未处理该错误,而 True 表示实际上没有错误,并且该函数应成功返回。

如果未提供失败处理程序,或者在 attempts 次尝试后返回 False,则该函数整体引发 CantUpdateException。如果处理程序返回 True,则该函数返回而不出错。

该函数的返回值是给定样本的持久化版本;这可能是样本本身,如果没有匹配的对象已经存在于会话中;否则,将返回现有对象,并将样本的状态合并到其中。返回的持久化对象将填充给定值到对象中。

该对象作为“持久化”返回,这意味着它与给定的会话关联,并且具有身份密钥(即,真实的 первичный ключ 值)。

为了生成此身份密钥,必须使用一种策略来尽可能高效和安全地确定它

  1. 如果给定的样本已经包含其 первичный ключ 属性完全填充,则这些属性用作 UPDATE 中的标准,因此我们具有 первичный ключ 值;它直接填充。

  2. 如果目标后端支持 RETURNING,则当使用 RETURNING 子句执行 update() 查询时,将原子地返回匹配的 первичный ключ。这目前包括 Postgresql、Oracle 和其他后端(特别是 MySQL 或 SQLite)。

  3. 如果目标后端是 MySQL,并且给定的模型使用单列、AUTO_INCREMENT 整数 первичный ключ 值(如 Nova 的情况),则 MySQL 推荐的方法是利用 LAST_INSERT_ID(expr) 在 UPDATE 的范围内原子地获取匹配的 первичный ключ 值,然后在立即使用 SELECT LAST_INSERT_ID() 获取它。https://dev.mysqlserver.cn/doc/refman/5.0/en/information- functions.html#function_last-insert-id

  4. 否则,对于 MySQL 上的复合键或其他后端(如 SQLite),必须重新获取已更新的行才能获取 первичный ключ 值。为此使用 surrogate_key 参数,以便重新获取该行;这是一个具有已知唯一值的列名,该对象可以在其中被获取。

oslo_db.sqlalchemy.update_match.update_returning_pk(query, values, surrogate_key)

执行 UPDATE,返回匹配行的主键。

使用多种策略返回 первичный ключ

  • 如果数据库支持 RETURNING,则使用 RETURNING 内联检索 первичный ключ 值。

  • 如果数据库是 MySQL 并且实体映射到单个整数 первичный ключ 列,则 MySQL 的 last_insert_id() 函数在 UPDATE 内部以及第二个 SELECT 中内联使用以获取该值。

  • 否则,使用“重新获取”策略,其中给定的“surrogate”密钥值(通常是实体上的 UUID 列)用于针对该 UUID 运行新的 SELECT。此 UUID 也放置在 UPDATE 查询中,以确保行匹配。

参数:
  • query – 具有现有标准的查询对象,针对单个实体。

  • values – 要更新到行的值的字典。

  • surrogate_key – 一个元组 (attrname, value),引用一个 UNIQUE 属性,该属性也将匹配该行。当不存在优化的策略时,此属性用于通过 SELECT 检索该行。

返回值:

返回 первичный ключ,作为元组。只有当匹配的行数为 1 时才返回。否则,将引发 CantUpdateException。

oslo_db.sqlalchemy.utils 模块

class oslo_db.sqlalchemy.utils.DialectFunctionDispatcher

基类: object

dispatch_for(expr)
classmethod dispatch_for_dialect(expr, multiple=False)

在不同的函数中提供特定于方言的功能。

例如:

@dispatch_for_dialect("*")
def set_special_option(engine):
    pass

@set_special_option.dispatch_for("sqlite")
def set_sqlite_special_option(engine):
    return engine.execute("sqlite thing")

@set_special_option.dispatch_for("mysql+mysqldb")
def set_mysqldb_special_option(engine):
    return engine.execute("mysqldb thing")

在上述注册之后,set_special_option() 函数现在是一个调度器,给定 SQLAlchemy EngineConnection、URL 字符串或 sqlalchemy.engine.URL 对象

eng = create_engine('...')
result = set_special_option(eng)

筛选系统支持两种模式:“multiple”和“single”。默认值为“single”,要求对于给定的后端,只有一个函数匹配。在此模式下,该函数也可以具有返回值,该返回值将由顶级调用返回。

“multiple”模式,另一方面,不支持返回值,但允许任何数量的匹配函数,其中每个函数都将被调用

# the initial call sets this up as a "multiple" dispatcher
@dispatch_for_dialect("*", multiple=True)
def set_options(engine):
    # set options that apply to *all* engines

@set_options.dispatch_for("postgresql")
def set_postgresql_options(engine):
    # set options that apply to all Postgresql engines

@set_options.dispatch_for("postgresql+psycopg2")
def set_postgresql_psycopg2_options(engine):
    # set options that apply only to "postgresql+psycopg2"

@set_options.dispatch_for("*+pyodbc")
def set_pyodbc_options(engine):
    # set options that apply to all pyodbc backends

请注意,在这两种模式下,成员函数都可以接受任何数量的附加参数。例如,为了填充选项字典,可以传递它

@dispatch_for_dialect("*", multiple=True)
def set_engine_options(url, opts):
    pass

@set_engine_options.dispatch_for("mysql+mysqldb")
def _mysql_set_default_charset_to_utf8(url, opts):
    opts.setdefault('charset', 'utf-8')

@set_engine_options.dispatch_for("sqlite")
def _set_sqlite_in_memory_check_same_thread(url, opts):
    if url.database in (None, 'memory'):
        opts['check_same_thread'] = False

opts = {}
set_engine_options(url, opts)

驱动程序说明符的形式为:<database | *>[+<driver | *>]。也就是说,数据库名称或“*”,后跟可选的 + 符号和驱动程序或“*”。省略驱动程序名称表示该数据库的所有驱动程序。

dispatch_on_drivername(drivername)

返回给定驱动程序名称的子调度器。

这提供了一种调用不同函数的方式,例如“*”函数,用于通常引用子函数的给定目标对象。

class oslo_db.sqlalchemy.utils.DialectMultiFunctionDispatcher

继承自 DialectFunctionDispatcher

class oslo_db.sqlalchemy.utils.DialectSingleFunctionDispatcher

继承自 DialectFunctionDispatcher

oslo_db.sqlalchemy.utils.add_index(engine, table_name, index_name, idx_columns)

为给定的列创建索引。

参数:
  • engine – sqlalchemy 引擎

  • table_name – 表名

  • index_name – 索引名

  • idx_columns – 包含要索引的列名的元组

oslo_db.sqlalchemy.utils.change_index_columns(engine, table_name, index_name, new_columns)

更改由给定索引索引的列集。

参数:
  • engine – sqlalchemy 引擎

  • table_name – 表名

  • index_name – 索引名

  • new_columns – 包含要索引的列名的元组

oslo_db.sqlalchemy.utils.column_exists(engine, table_name, column)

检查表是否具有给定的列。

参数:
  • engine – sqlalchemy 引擎

  • table_name – 表名

  • column – 列名

oslo_db.sqlalchemy.utils.dispatch_for_dialect(expr, multiple=False)

在不同的函数中提供特定于方言的功能。

例如:

@dispatch_for_dialect("*")
def set_special_option(engine):
    pass

@set_special_option.dispatch_for("sqlite")
def set_sqlite_special_option(engine):
    return engine.execute("sqlite thing")

@set_special_option.dispatch_for("mysql+mysqldb")
def set_mysqldb_special_option(engine):
    return engine.execute("mysqldb thing")

在上述注册之后,set_special_option() 函数现在是一个调度器,给定 SQLAlchemy EngineConnection、URL 字符串或 sqlalchemy.engine.URL 对象

eng = create_engine('...')
result = set_special_option(eng)

筛选系统支持两种模式:“multiple”和“single”。默认值为“single”,要求对于给定的后端,只有一个函数匹配。在此模式下,该函数也可以具有返回值,该返回值将由顶级调用返回。

“multiple”模式,另一方面,不支持返回值,但允许任何数量的匹配函数,其中每个函数都将被调用

# the initial call sets this up as a "multiple" dispatcher
@dispatch_for_dialect("*", multiple=True)
def set_options(engine):
    # set options that apply to *all* engines

@set_options.dispatch_for("postgresql")
def set_postgresql_options(engine):
    # set options that apply to all Postgresql engines

@set_options.dispatch_for("postgresql+psycopg2")
def set_postgresql_psycopg2_options(engine):
    # set options that apply only to "postgresql+psycopg2"

@set_options.dispatch_for("*+pyodbc")
def set_pyodbc_options(engine):
    # set options that apply to all pyodbc backends

请注意,在这两种模式下,成员函数都可以接受任何数量的附加参数。例如,为了填充选项字典,可以传递它

@dispatch_for_dialect("*", multiple=True)
def set_engine_options(url, opts):
    pass

@set_engine_options.dispatch_for("mysql+mysqldb")
def _mysql_set_default_charset_to_utf8(url, opts):
    opts.setdefault('charset', 'utf-8')

@set_engine_options.dispatch_for("sqlite")
def _set_sqlite_in_memory_check_same_thread(url, opts):
    if url.database in (None, 'memory'):
        opts['check_same_thread'] = False

opts = {}
set_engine_options(url, opts)

驱动程序说明符的形式为:<database | *>[+<driver | *>]。也就是说,数据库名称或“*”,后跟可选的 + 符号和驱动程序或“*”。省略驱动程序名称表示该数据库的所有驱动程序。

oslo_db.sqlalchemy.utils.drop_index(engine, table_name, index_name)

删除具有给定名称的索引。

参数:
  • engine – sqlalchemy 引擎

  • table_name – 表名

  • index_name – 索引名

oslo_db.sqlalchemy.utils.drop_old_duplicate_entries_from_table(engine, table_name, use_soft_delete, *uc_column_names)

删除表中 uc_columns 列中具有相同值的旧行。

此方法删除(或标记为 deleted 如果 use_soft_delete 为 True)名为 table_name 的表中的旧重复行。

参数:
  • engine – SQLAlchemy 引擎

  • table_name – 包含重复项的表

  • use_soft_delete – 如果为 True - 值将被标记为 deleted,如果为 False - 值将被从表中删除

  • uc_column_names – 唯一约束列

oslo_db.sqlalchemy.utils.get_db_connection_info(conn_pieces)
oslo_db.sqlalchemy.utils.get_foreign_key_constraint_name(engine, table_name, column_name)

查找表中给定约束列的外键名称。

参数:
  • engine – SQLAlchemy 引擎(或连接)

  • table_name – 包含约束的表名

  • column_name – 受外键约束的列名。

返回值:

给定表中约束给定列的第一个外键约束的名称。

oslo_db.sqlalchemy.utils.get_indexes(engine, table_name)

获取给定表中的所有索引列表。

参数:
  • engine – sqlalchemy 引擎

  • table_name – 表名

oslo_db.sqlalchemy.utils.get_non_innodb_tables(connectable, skip_tables=('migrate_version', 'alembic_version'))

获取不使用 InnoDB 存储引擎的表列表。

参数:
  • connectable – SQLAlchemy Engine 或 Connection 实例

  • skip_tables – 可能具有不同存储引擎的表列表

oslo_db.sqlalchemy.utils.get_table(engine, name)

从数据库中动态返回 sqlalchemy 表。

由于模型对我们来说在迁移中不起作用,因为模型将与当前数据相差甚远,因此需要此方法。

警告

不要在数据库迁移中创建外键时使用此方法,因为 sqlalchemy 需要相同的 MetaData 对象来保存有关父表和外键中的引用表的的信息。此方法为每个表对象使用唯一的 MetaData 对象,因此它不能与外键创建一起使用。

oslo_db.sqlalchemy.utils.get_unique_keys(model)

获取一组唯一的模型键列表。

参数:

model – ORM 模型类

返回类型:

字符串集合的列表

返回值:

唯一的模型键,如果无法找到它们,则为 None

oslo_db.sqlalchemy.utils.index_exists(engine, table_name, index_name)

检查给定的索引是否存在。

参数:
  • engine – sqlalchemy 引擎

  • table_name – 表名

  • index_name – 索引名

oslo_db.sqlalchemy.utils.index_exists_on_columns(engine, table_name, columns)

检查给定列上的索引是否存在。

参数:
  • engine – sqlalchemy 引擎

  • table_name – 表名

  • columns – 要检查的列的列表类型

oslo_db.sqlalchemy.utils.make_url(target)

返回 url.URL 对象

oslo_db.sqlalchemy.utils.model_query(model, session, args=None, *kwargs)

db.sqlalchemy api 方法的查询助手。

这考虑了 deletedproject_id 字段。

参数:
  • model (models.ModelBase) – 要查询的模型。必须是 ModelBase 的子类。

  • session (sqlalchemy.orm.session.Session) – 要使用的会话。

  • args (tuple) – 查询参数。如果为 None - 使用模型。

关键字参数

参数:
  • project_id (iterable, model.__table__.columns.project_id.type, None type) – 如果存在,允许按 project_id 过滤。可以是 project_id 值、project_id 值的可迭代对象或 None。如果传递了可迭代对象,则仅返回 project_id 列的值位于 project_id 列表上的行。如果传递了 None,则仅返回不绑定到任何项目的行。

  • deleted (bool) – 如果存在,允许按 deleted 字段过滤。如果传递了 True,则仅返回已删除的条目,如果传递了 False - 仅返回存在的条目。

用法

from oslo_db.sqlalchemy import utils


def get_instance_by_uuid(uuid):
    session = get_session()
    with session.begin()
        return (utils.model_query(models.Instance, session=session)
                     .filter(models.Instance.uuid == uuid)
                     .first())

def get_nodes_stat():
    data = (Node.id, Node.cpu, Node.ram, Node.hdd)

    session = get_session()
    with session.begin()
        return utils.model_query(Node, session=session, args=data).all()

您还可以基于 utils.model_query() 创建自己的助手。例如,如果您计划从项目的 context 使用 project_iddeleted 参数,这可能很有用

from oslo_db.sqlalchemy import utils


def _model_query(context, model, session=None, args=None,
                 project_id=None, project_only=False,
                 read_deleted=None):

    # We suppose, that functions ``_get_project_id()`` and
    # ``_get_deleted()`` should handle passed parameters and
    # context object (for example, decide, if we need to restrict a user
    # to query his own entries by project_id or only allow admin to read
    # deleted entries). For return values, we expect to get
    # ``project_id`` and ``deleted``, which are suitable for the
    # ``model_query()`` signature.
    kwargs = {}
    if project_id is not None:
        kwargs['project_id'] = _get_project_id(context, project_id,
                                               project_only)
    if read_deleted is not None:
        kwargs['deleted'] = _get_deleted_dict(context, read_deleted)
    session = session or get_session()

    with session.begin():
        return utils.model_query(model, session=session,
                                 args=args, **kwargs)

def get_instance_by_uuid(context, uuid):
    return (_model_query(context, models.Instance, read_deleted='yes')
                  .filter(models.Instance.uuid == uuid)
                  .first())

def get_nodes_data(context, project_id, project_only='allow_none'):
    data = (Node.id, Node.cpu, Node.ram, Node.hdd)

    return (_model_query(context, Node, args=data, project_id=project_id,
                         project_only=project_only)
                  .all())
oslo_db.sqlalchemy.utils.paginate_query(query, model, limit, sort_keys, marker=None, sort_dir=None, sort_dirs=None)

返回添加了排序/分页标准的查询。

分页的工作原理是需要一个唯一的 sort_key,由 sort_keys 指定。(如果 sort_keys 不是唯一的,我们就有循环遍历值的风险。)我们使用上一页中的最后一行作为分页的“marker”。因此,我们必须返回遵循传递的 marker 的值,按顺序排列。对于单个值的 sort_key,这将很容易:sort_key > X。对于复合值的 sort_key,(k1, k2, k3) 我们必须这样做来重复词法顺序:(k1 > X1) 或 (k1 == X1 && k2 > X2) 或 (k1 == X1 && k2 == X2 && k3 > X3)

我们还必须应对不同的 sort_directions 以及 k2、k3、… 可为空的情况。

通常,最后一行的 id 用作面向客户端的分页 marker,然后必须从数据库中获取实际的 marker 对象并作为 marker 传递给我们。

“offset” 参数被故意避免。由于 offset 每次都需要完全扫描前面的结果,因此首选基于标准的分页。请参阅 http://use-the-index-luke.com/no-offset 以获取更多背景信息。

参数:
  • query – 我们应该添加分页/排序的查询对象

  • model – ORM 模型类

  • limit – 要返回的最大项目数

  • sort_keys – 结果应按其排序的属性数组

  • marker – 上一页的最后一个项目;我们返回此值后的下一个结果。

  • sort_dir – 结果应排序的方向(asc、desc)后缀 -nullsfirst、-nullslast 可用于定义空值的排序

  • sort_dirs – 与 sort_keys 对应的每列的 sort_dirs 数组

返回类型:

sqlalchemy.orm.query.Query

返回值:

添加了排序/分页的查询。

oslo_db.sqlalchemy.utils.sanitize_db_url(url)
oslo_db.sqlalchemy.utils.to_list(x, default=None)

模块内容