Auth Server 和中间件

创建您自己的 Auth Server 和中间件

包含的 swift/common/middleware/tempauth.py 是一个很好的例子,展示了如何使用代理服务器身份验证中间件创建身份验证子系统。主要要点是,身份验证中间件可以在请求到达 Swift Proxy 应用程序之前就拒绝请求,之后在代理服务器发出回调以验证授权时进行验证。

通常最好将身份验证和授权过程分开。身份验证验证请求是否来自其声称的来源。授权验证“谁”有权访问请求想要访问的资源。

身份验证在请求到达 Swift Proxy 应用程序之前执行。身份信息从请求中提取,以某种方式验证,并将验证信息添加到 WSGI 环境中,供后续的授权过程使用。添加到 WSGI 环境中的具体内容完全取决于已安装的授权过程需要什么;Swift Proxy 应用程序本身不需要任何特定信息,它只是传递信息。约定是将 environ[‘REMOTE_USER’] 设置为经过身份验证的用户字符串,但通常比这需要更多信息。

包含的 TempAuth 会将 REMOTE_USER 设置为用户所属的逗号分隔组列表。第一个组将是“用户的组”,一个只有该用户所属的组。第二个组将是“帐户的组”,一个包含该身份验证帐户所有用户的组(与存储帐户不同)。第三个组是可选的,是存储帐户字符串。如果用户没有帐户的管理员访问权限,则将省略第三个组。

强烈建议身份验证服务器实现者为其创建的令牌和 Swift 存储帐户使用可配置的经销商前缀(默认情况下包含在 TempAuth 中的 AUTH_)。这将避免与其他可能使用相同 Swift 集群的身份验证服务器发生冲突。否则,Swift 集群将不得不尝试所有经销商,直到有一个验证令牌或全部失败。

组名的一个限制是,不应有任何组名以句点 ‘.’ 开头,因为该句点保留供 Swift 内部使用(例如,您稍后将看到的 .r 用于推荐人指定)。

使用 TempAuth 进行身份验证示例

  • 令牌 AUTH_tkabcd 通过请求的 X-Auth-Token 标头提供给 TempAuth 中间件。

  • TempAuth 中间件验证令牌 AUTH_tkabcd 并发现它与存储帐户“AUTH_storage_xyz”的“test”帐户中的“tester”用户匹配。

  • TempAuth 中间件将 REMOTE_USER 设置为“test:tester,test,AUTH_storage_xyz”

  • 现在,此用户将具有对 AUTH_storage_xyz Swift 存储帐户的完全访问权限(通过后续的授权过程),并且可以访问其他存储帐户中的容器,前提是存储帐户以相同的 AUTH_ 经销商前缀开头,并且容器的 ACL 指定了这三个组中的至少一个。

授权是通过 Swift Proxy 服务器对 WSGI 环境的 swift.authorize 值进行回调来执行的,如果已设置。swift.authorize 值应该只是一个函数,它接受一个 Request 作为参数,如果授予访问权限则返回 None,如果拒绝访问权限则返回一个可调用(environ, start_response)对象。这个可调用对象是一个标准的 WSGI 可调用对象。通常,对于经过身份验证的用户的请求,您应该返回 403 Forbidden,对于未经身份验证的请求,您应该返回 401 Unauthorized。例如,这是一个只允许 GET 的授权函数(在这种情况下,您可能应该返回 405 Method Not Allowed,但暂时忽略这一点)。

from swift.common.swob import HTTPForbidden, HTTPUnauthorized


def authorize(req):
    if req.method == 'GET':
        return None
    if req.remote_user:
        return HTTPForbidden(request=req)
    else:
        return HTTPUnauthorized(request=req)

添加 swift.authorize 回调通常由身份验证中间件完成,因为身份验证和授权通常配对在一起。但是,您可以创建单独的授权中间件,只需在传递请求之前设置回调即可。为了继续上面的例子

from swift.common.swob import HTTPForbidden, HTTPUnauthorized


class Authorization(object):

    def __init__(self, app, conf):
        self.app = app
        self.conf = conf

    def __call__(self, environ, start_response):
        environ['swift.authorize'] = self.authorize
        return self.app(environ, start_response)

    def authorize(self, req):
        if req.method == 'GET':
            return None
        if req.remote_user:
            return HTTPForbidden(request=req)
        else:
            return HTTPUnauthorized(request=req)


def filter_factory(global_conf, **local_conf):
    conf = global_conf.copy()
    conf.update(local_conf)
    def auth_filter(app):
        return Authorization(app, conf)
    return auth_filter

Swift Proxy 服务器将在完成一些初始工作后调用 swift.authorize,但在真正尝试处理请求之前。此时的积极授权将导致请求立即被完全处理。此时的拒绝将立即发送拒绝响应,适用于大多数操作。

但是对于某些可能需要更多信息才能批准的操作,将收集更多信息并将其添加到 WSGI 环境中,然后再次调用 swift.authorize。这些被称为 delay_denial 请求,目前包括容器读取请求和对象读取和写入请求。对于这些请求,将获取读取或写入访问控制字符串(X-Container-Read 和 X-Container-Write)并将其设置为传递给 swift.authorize 的 Request 中的“acl”属性。

delay_denial 过程允许跳过可能代价高昂的访问控制字符串检索,用于可以无需该信息批准的请求,例如管理员或帐户所有者请求。

为了进一步说明我们的例子,现在我们将批准所有访问控制字符串设置为与经过身份验证的用户字符串相同值的请求。请注意,您可能不会完全这样做,因为访问控制字符串表示一个列表而不是单个用户,但对于这个例子来说就足够了

from swift.common.swob import HTTPForbidden, HTTPUnauthorized


class Authorization(object):

    def __init__(self, app, conf):
        self.app = app
        self.conf = conf

    def __call__(self, environ, start_response):
        environ['swift.authorize'] = self.authorize
        return self.app(environ, start_response)

    def authorize(self, req):
        # Allow anyone to perform GET requests
        if req.method == 'GET':
            return None
        # Allow any request where the acl equals the authenticated user
        if getattr(req, 'acl', None) == req.remote_user:
            return None
        if req.remote_user:
            return HTTPForbidden(request=req)
        else:
            return HTTPUnauthorized(request=req)


def filter_factory(global_conf, **local_conf):
    conf = global_conf.copy()
    conf.update(local_conf)
    def auth_filter(app):
        return Authorization(app, conf)
    return auth_filter

访问控制字符串具有 Swift 包含的标准格式,尽管如果需要可以覆盖该格式。可以使用 swift.common.middleware.acl.parse_acl 解析标准格式,该格式将字符串转换为两个字符串数组:(referrers, groups)。referrers 允许将请求的 Referer 标头与控制访问进行比较。groups 允许将 request.remote_user(或其他组信息来源)与控制访问进行比较。可以使用 swift.common.middleware.acl.referrer_allowed 函数完成推荐人访问检查。组访问检查通常是一个简单的字符串比较。

让我们继续我们的例子,使用 parse_acl 和 referrer_allowed。现在我们只允许在推荐人检查之后进行 GET,并在组检查之后进行任何请求

from swift.common.middleware.acl import parse_acl, referrer_allowed
from swift.common.swob import HTTPForbidden, HTTPUnauthorized


class Authorization(object):

    def __init__(self, app, conf):
        self.app = app
        self.conf = conf

    def __call__(self, environ, start_response):
        environ['swift.authorize'] = self.authorize
        return self.app(environ, start_response)

    def authorize(self, req):
        if hasattr(req, 'acl'):
            referrers, groups = parse_acl(req.acl)
            if req.method == 'GET' and referrer_allowed(req, referrers):
                return None
            if req.remote_user and groups and req.remote_user in groups:
                return None
        if req.remote_user:
            return HTTPForbidden(request=req)
        else:
            return HTTPUnauthorized(request=req)


def filter_factory(global_conf, **local_conf):
    conf = global_conf.copy()
    conf.update(local_conf)
    def auth_filter(app):
        return Authorization(app, conf)
    return auth_filter

访问控制字符串是通过对容器进行 PUT 和 POST 操作,并使用 X-Container-Read 和 X-Container-Write 标头设置的。Swift 允许将这些字符串设置为任何值,但验证字符串是否满足所需的格式并向用户返回有用的错误非常有用。

为了支持此验证,Swift Proxy 应用程序将在写入其中一个标头时调用 WSGI 环境的 swift.clean_acl 回调。回调应该接受标头名称和值作为其参数。如果有效,它应该返回要保存的清理后的值,如果无效,则应该引发带有合理错误消息的 ValueError。

有一个包含的 swift.common.middleware.acl.clean_acl 验证标准的 Swift 格式。让我们通过使用它来改进我们的例子

from swift.common.middleware.acl import \
    clean_acl, parse_acl, referrer_allowed
from swift.common.swob import HTTPForbidden, HTTPUnauthorized


class Authorization(object):

    def __init__(self, app, conf):
        self.app = app
        self.conf = conf

    def __call__(self, environ, start_response):
        environ['swift.authorize'] = self.authorize
        environ['swift.clean_acl'] = clean_acl
        return self.app(environ, start_response)

    def authorize(self, req):
        if hasattr(req, 'acl'):
            referrers, groups = parse_acl(req.acl)
            if req.method == 'GET' and referrer_allowed(req, referrers):
                return None
            if req.remote_user and groups and req.remote_user in groups:
                return None
        if req.remote_user:
            return HTTPForbidden(request=req)
        else:
            return HTTPUnauthorized(request=req)


def filter_factory(global_conf, **local_conf):
    conf = global_conf.copy()
    conf.update(local_conf)
    def auth_filter(app):
        return Authorization(app, conf)
    return auth_filter

现在,如果您想覆盖访问控制字符串的格式,您必须提供自己的 clean_acl 函数,并且必须对该格式进行自己的解析和授权检查。强烈建议您使用标准格式,以简单地支持最广泛的外部工具,但有时满足某些 ACL 要求比这更重要。

与 repoze.what 集成

这是一个与 repoze.what 集成的例子,但说实话,我根本不是 repoze.what 的专家;这只是为了希望给那些想要使用 repoze.what 的人提供他们自己的代码的起点

from time import time

from eventlet.timeout import Timeout
from repoze.what.adapters import BaseSourceAdapter
from repoze.what.middleware import setup_auth
from repoze.what.predicates import in_any_group, NotAuthorizedError
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.middleware.acl import clean_acl, parse_acl, referrer_allowed
from swift.common.utils import cache_from_env, split_path
from swift.common.swob import HTTPForbidden, HTTPUnauthorized


class DevAuthorization(object):

    def __init__(self, app, conf):
        self.app = app
        self.conf = conf

    def __call__(self, environ, start_response):
        environ['swift.authorize'] = self.authorize
        environ['swift.clean_acl'] = clean_acl
        return self.app(environ, start_response)

    def authorize(self, req):
        version, account, container, obj = split_path(req.path, 1, 4, True)
        if not account:
            return self.denied_response(req)
        referrers, groups = parse_acl(getattr(req, 'acl', None))
        if referrer_allowed(req, referrers):
            return None
        try:
            in_any_group(account, *groups).check_authorization(req.environ)
        except NotAuthorizedError:
            return self.denied_response(req)
        return None

    def denied_response(self, req):
        if req.remote_user:
            return HTTPForbidden(request=req)
        else:
            return HTTPUnauthorized(request=req)


class DevIdentifier(object):

    def __init__(self, conf):
        self.conf = conf

    def identify(self, env):
        return {'token':
                env.get('HTTP_X_AUTH_TOKEN', env.get('HTTP_X_STORAGE_TOKEN'))}

    def remember(self, env, identity):
        return []

    def forget(self, env, identity):
        return []


class DevAuthenticator(object):

    def __init__(self, conf):
        self.conf = conf
        self.auth_host = conf.get('ip', '127.0.0.1')
        self.auth_port = int(conf.get('port', 11000))
        self.ssl = \
            conf.get('ssl', 'false').lower() in ('true', 'on', '1', 'yes')
        self.auth_prefix = conf.get('prefix', '/')
        self.timeout = float(conf.get('node_timeout', 10))

    def authenticate(self, env, identity):
        token = identity.get('token')
        if not token:
            return None
        memcache_client = cache_from_env(env)
        key = 'devauth/%s' % token
        cached_auth_data = memcache_client.get(key)
        if cached_auth_data:
            start, expiration, user = cached_auth_data
            if time() - start <= expiration:
                return user
        with Timeout(self.timeout):
            conn = http_connect(self.auth_host, self.auth_port, 'GET',
                    '%stoken/%s' % (self.auth_prefix, token), ssl=self.ssl)
            resp = conn.getresponse()
            resp.read()
            conn.close()
        if resp.status == 204:
            expiration = float(resp.getheader('x-auth-ttl'))
            user = resp.getheader('x-auth-user')
            memcache_client.set(key, (time(), expiration, user),
                                time=expiration)
            return user
        return None


class DevChallenger(object):

    def __init__(self, conf):
        self.conf = conf

    def challenge(self, env, status, app_headers, forget_headers):
        def no_challenge(env, start_response):
            start_response(str(status), [])
            return []
        return no_challenge


class DevGroupSourceAdapter(BaseSourceAdapter):

    def __init__(self, *args, **kwargs):
        super(DevGroupSourceAdapter, self).__init__(*args, **kwargs)
        self.sections = {}

    def _get_all_sections(self):
        return self.sections

    def _get_section_items(self, section):
        return self.sections[section]

    def _find_sections(self, credentials):
        return credentials['repoze.what.userid'].split(',')

    def _include_items(self, section, items):
        self.sections[section] |= items

    def _exclude_items(self, section, items):
        for item in items:
            self.sections[section].remove(item)

    def _item_is_included(self, section, item):
        return item in self.sections[section]

    def _create_section(self, section):
        self.sections[section] = set()

    def _edit_section(self, section, new_section):
        self.sections[new_section] = self.sections[section]
        del self.sections[section]

    def _delete_section(self, section):
        del self.sections[section]

    def _section_exists(self, section):
        return self.sections.has_key(section)


class DevPermissionSourceAdapter(BaseSourceAdapter):

    def __init__(self, *args, **kwargs):
        super(DevPermissionSourceAdapter, self).__init__(*args, **kwargs)
        self.sections = {}

    def _get_all_sections(self):
        return self.sections

    def _get_section_items(self, section):
        return self.sections[section]

    def _find_sections(self, group_name):
        return set([n for (n, p) in self.sections.items()
                    if group_name in p])

    def _include_items(self, section, items):
        self.sections[section] |= items

    def _exclude_items(self, section, items):
        for item in items:
            self.sections[section].remove(item)

    def _item_is_included(self, section, item):
        return item in self.sections[section]

    def _create_section(self, section):
        self.sections[section] = set()

    def _edit_section(self, section, new_section):
        self.sections[new_section] = self.sections[section]
        del self.sections[section]

    def _delete_section(self, section):
        del self.sections[section]

    def _section_exists(self, section):
        return self.sections.has_key(section)


def filter_factory(global_conf, **local_conf):
    conf = global_conf.copy()
    conf.update(local_conf)
    def auth_filter(app):
        return setup_auth(DevAuthorization(app, conf),
            group_adapters={'all_groups': DevGroupSourceAdapter()},
            permission_adapters={'all_perms': DevPermissionSourceAdapter()},
            identifiers=[('devauth', DevIdentifier(conf))],
            authenticators=[('devauth', DevAuthenticator(conf))],
            challengers=[('devauth', DevChallenger(conf))])
    return auth_filter

使用 Auth 允许 CORS

跨域资源共享 (CORS) 要求身份验证系统允许 OPTIONS 方法在没有令牌的情况下通过。预检请求将对对象或容器进行 OPTIONS 调用,如果身份验证系统停止它,将无法工作。请参阅 TempAuth,了解如何处理 OPTIONS 请求的示例。