Collector(收集器)

数据格式

在内部,CloudKitty 的数据格式比 架构文档 中找到的要更详细。

内部数据格式如下

{
    "bananas": [
        {
            "vol": {
                "unit": "banana",
                "qty": 1
            },
            "rating": {
                "price": 1
            },
            "groupby": {
                "xxx_id": "hello",
                "yyy_id": "bye",
            },
            "metadata": {
                "flavor": "chocolate",
                "eaten_by": "gorilla",
            },
       }
    ],
}

但是,实现 collector 的开发者不需要自行格式化数据,因为有辅助函数来处理这些事务。

实现

每个 collector 必须实现以下类

class cloudkitty.collector.BaseCollector(**kwargs)[source]
static check_configuration(conf)[source]

检查并验证指标配置。

需要额外参数进行指标收集的 collector 应该实现此方法,调用父类的方法,扩展 extra_args 键在 METRIC_BASE_SCHEMA 中,并根据新的 schema 验证指标配置。

abstract fetch_all(metric_name, start, end, project_id=None, q_filter=None)[source]

获取给定时间段内特定指标的信息。

此方法必须尊重在初始化时提供的指标配置中的 groupbymetadata 参数。(在 self.conf['groupby']self.conf['metadata'] 中可用)。

返回 cloudkitty.dataframe.DataPoint 对象列表。

参数:
  • metric_name (str) – 要获取的指标名称

  • start (datetime.datetime) – 时间段的开始

  • end (datetime.datetime) – 时间段的结束

  • project_id (str) – 应收集数据的范围的 ID

  • q_filter (dict) – 可选过滤器

orchestrator 调用 BaseCollector 类的 retrieve 方法。此方法调用子类的 fetch_all 方法。

要创建一个 collector,您至少需要实现 fetch_all 方法。

数据收集

collector 必须实现一个 fetch_all 方法。此方法将为每种指标类型、每个范围、每个收集周期调用。它具有以下原型

class cloudkitty.collector.BaseCollector(**kwargs)[source]
abstract fetch_all(metric_name, start, end, project_id=None, q_filter=None)[source]

获取给定时间段内特定指标的信息。

此方法必须尊重在初始化时提供的指标配置中的 groupbymetadata 参数。(在 self.conf['groupby']self.conf['metadata'] 中可用)。

返回 cloudkitty.dataframe.DataPoint 对象列表。

参数:
  • metric_name (str) – 要获取的指标名称

  • start (datetime.datetime) – 时间段的开始

  • end (datetime.datetime) – 时间段的结束

  • project_id (str) – 应收集数据的范围的 ID

  • q_filter (dict) – 可选过滤器

此方法应该返回一个 cloudkitty.dataframe.DataPoint 对象列表。

一个基本 collector 的示例代码

from cloudkitty.collector import BaseCollector

class MyCollector(BaseCollector):
    def __init__(self, **kwargs):
        super(MyCollector, self).__init__(**kwargs)

    def fetch_all(self, metric_name, start, end,
                  project_id=None, q_filter=None):
        data = []
        for CONDITION:
            # do stuff
            data.append(dataframe.DataPoint(
                unit,
                qty, # int, float, decimal.Decimal or str
                0, # price
                groupby, # dict
                metadata, # dict
            ))

        return data

project_id 可能会产生误导,因为它是一个遗留名称。它包含当前范围的 ID。配置中,在 [collect]/scope_key 下指定了与范围对应的属性。因此,所有查询都应根据此属性进行过滤。示例

from oslo_config import cfg

from cloudkitty.collector import BaseCollector

CONF = cfg.CONF

class MyCollector(BaseCollector):
    def __init__(self, **kwargs):
        super(MyCollector, self).__init__(**kwargs)

    def fetch_all(self, metric_name, start, end,
                  project_id=None, q_filter=None):
        scope_key = CONF.collect.scope_key
        filters = {'start': start, 'stop': stop, scope_key: project_id}

        data = self.client.query(
            filters=filters,
            groupby=self.conf[metric_name]['groupby'])
        # Format data etc
        return output

附加配置

如果您需要扩展指标配置(在 metrics.ymlextra_args 部分添加参数),您可以重载基本 collector 的 check_configuration 方法

class cloudkitty.collector.BaseCollector(**kwargs)[source]
static check_configuration(conf)[source]

检查并验证指标配置。

需要额外参数进行指标收集的 collector 应该实现此方法,调用父类的方法,扩展 extra_args 键在 METRIC_BASE_SCHEMA 中,并根据新的 schema 验证指标配置。

此方法使用 voluptuous 进行数据验证。每个指标的基本 schema 可以在 cloudkitty.collector.METRIC_BASE_SCHEMA 中找到。此 schema 旨在被其他 collector 扩展。来自 gnocchi collector 代码的示例

from cloudkitty import collector

GNOCCHI_EXTRA_SCHEMA = {
    Required('extra_args'): {
        Required('resource_type'): All(str, Length(min=1)),
        # Due to Gnocchi model, metric are grouped by resource.
        # This parameter allows to adapt the key of the resource identifier
        Required('resource_key', default='id'): All(str, Length(min=1)),
        Required('aggregation_method', default='max'):
            In(['max', 'mean', 'min']),
    },
}

class GnocchiCollector(collector.BaseCollector):

    collector_name = 'gnocchi'

    @staticmethod
    def check_configuration(conf):
        conf = collector.BaseCollector.check_configuration(conf)
        metric_schema = Schema(collector.METRIC_BASE_SCHEMA).extend(
            GNOCCHI_EXTRA_SCHEMA)

        output = {}
        for metric_name, metric in conf.items():
            met = output[metric_name] = metric_schema(metric)

            if met['extra_args']['resource_key'] not in met['groupby']:
                met['groupby'].append(met['extra_args']['resource_key'])

        return output

如果您的 collector 不需要任何 extra_args,则不需要重载 check_configuration 方法。