Object

Object Auditor

class swift.obj.auditor.AuditorWorker(conf, logger, rcache, devices, zero_byte_only_at_fps=0, watcher_defs=None)

基类: object

遍历文件系统以审计对象

audit_all_objects(mode='once', device_dirs=None)
create_recon_nested_dict(top_level_key, device_list, item)
failsafe_object_audit(location)

Entrypoint to object_audit, with a failsafe generic exception handler.

object_audit(location)

审计给定的对象位置。

参数:

location – an audit location (from diskfile.object_audit_location_generator)

record_stats(obj_size)

Based on config’s object_size_stats will keep track of how many objects fall into the specified ranges. For example with the following

object_size_stats = 10, 100, 1024

and your system has 3 objects of sizes: 5, 20, and 10000 bytes the log will look like: {“10”: 1, “100”: 1, “1024”: 0, “OVER”: 1}

class swift.obj.auditor.ObjectAuditor(conf, logger=None, **options)

基类: Daemon

审计对象。

audit_loop(parent, zbo_fps, override_devices=None, **kwargs)

并行审计循环

clear_recon_cache(auditor_type)

清除重构缓存条目

fork_child(zero_byte_fps=False, sleep_between_zbf_scanner=False, **kwargs)

子进程执行

run_audit(**kwargs)

运行对象审计

run_forever(*args, **kwargs)

运行对象审计直到停止。

run_once(*args, **kwargs)

运行对象审计一次

class swift.obj.auditor.WatcherWrapper(watcher_class, watcher_name, conf, logger)

基类: object

运行用户提供的监视器。

简单且能完成工作。请注意,我们没有采取任何措施将自己与插件中的挂起或文件描述符泄漏隔离。

参数:

logger – an instance of SwiftLogAdapter.

end()
see_object(meta, data_file_path)
start(audit_type)
swift.obj.auditor.main()

Object Backend

Disk File Interface for the Swift Object Server

The DiskFile, DiskFileWriter and DiskFileReader classes combined define the on-disk abstraction layer for supporting the object server REST API interfaces (excluding REPLICATE). Other implementations wishing to provide an alternative backend for the object server must implement the three classes. An example alternative implementation can be found in the mem_server.py and mem_diskfile.py modules along size this one.

The DiskFileManager is a reference implemenation specific class and is not part of the backend API.

The remaining methods in this module are considered implementation specific and are also not considered part of the backend API.

class swift.obj.diskfile.AuditLocation(path, device, partition, policy)

基类: object

Represents an object location to be audited.

Other than being a bucket of data, the only useful thing this does is stringify to a filesystem path so the auditor’s logs look okay.

class swift.obj.diskfile.BaseDiskFile(mgr, device_path, partition, account=None, container=None, obj=None, _datadir=None, policy=None, use_splice=False, pipe_size=None, open_expired=False, next_part_power=None, **kwargs)

基类: object

管理对象文件。

This specific implementation manages object files on a disk formatted with a POSIX-compliant file system that supports extended attributes as metadata on a file or directory.

注意

The arguments to the constructor are considered implementation specific. The API does not define the constructor arguments.

The following path format is used for data file locations: <devices_path/<device_dir>/<datadir>/<partdir>/<suffixdir>/<hashdir>/ <datafile>.<ext>

参数:
  • mgr – associated DiskFileManager instance

  • device_path – path to the target device or drive

  • partition – partition on the device in which the object lives

  • account – account name for the object

  • container – container name for the object

  • obj – object name for the object

  • _datadir – override the full datadir otherwise constructed here

  • policy – the StoragePolicy instance

  • use_splice – if true, use zero-copy splice() to send data

  • pipe_size – size of pipe buffer used in zero-copy operations

  • open_expired – if True, open() will not raise a DiskFileExpired if object is expired

  • next_part_power – the next partition power to be used

property account
property container
property content_length
property content_type
property content_type_timestamp
create(size=None, extension='.data')

Context manager to create a file. We create a temporary file first, and then return a DiskFileWriter object to encapsulate the state.

注意

An implementation is not required to perform on-disk preallocations even if the parameter is specified. But if it does and it fails, it must raise a DiskFileNoSpace exception.

参数:
  • size – optional initial size of file to explicitly allocate on disk

  • extension – file extension to use for the newly-created file; defaults to .data for the sake of tests

引发:

DiskFileNoSpace – if a size is specified and allocation fails

property data_timestamp
delete(timestamp)

Delete the object.

This implementation creates a tombstone file using the given timestamp, and removes any older versions of the object file. Any file that has an older timestamp than timestamp will be deleted.

注意

An implementation is free to use or ignore the timestamp parameter.

参数:

timestamp – timestamp to compare with each file

引发:

DiskFileError – this implementation will raise the same errors as the create() method.

property durable_timestamp

Provides the timestamp of the newest data file found in the object directory.

返回值:

A Timestamp instance, or None if no data file was found.

引发:

DiskFileNotOpen – if the open() method has not been previously called on this instance.

property fragments
classmethod from_hash_dir(mgr, hash_dir_path, device_path, partition, policy)
get_datafile_metadata()

Provide the datafile metadata for a previously opened object as a dictionary. This is metadata that was included when the object was first PUT, and does not include metadata set by any subsequent POST.

返回值:

object’s datafile metadata dictionary

引发:

DiskFileNotOpen – if the swift.obj.diskfile.DiskFile.open() method was not previously invoked

get_metadata()

Provide the metadata for a previously opened object as a dictionary.

返回值:

object’s metadata dictionary

引发:

DiskFileNotOpen – if the swift.obj.diskfile.DiskFile.open() method was not previously invoked

get_metafile_metadata()

Provide the metafile metadata for a previously opened object as a dictionary. This is metadata that was written by a POST and does not include any persistent metadata that was set by the original PUT.

返回值:

object’s .meta file metadata dictionary, or None if there is no .meta file

引发:

DiskFileNotOpen – if the swift.obj.diskfile.DiskFile.open() method was not previously invoked

property manager
property obj
open(modernize=False, current_time=None)

Open the object.

This implementation opens the data file representing the object, reads the associated metadata in the extended attributes, additionally combining metadata from fast-POST .meta files.

参数:
  • modernize – if set, update this diskfile to the latest format. Currently, this means adding metadata checksums if none are present.

  • current_time – Unix time used in checking expiration. If not present, the current time will be used.

注意

An implementation is allowed to raise any of the following exceptions, but is only required to raise DiskFileNotExist when the object representation does not exist.

引发:
返回值:

itself for use as a context manager

read_metadata(current_time=None)

Return the metadata for an object without requiring the caller to open the object first.

参数:

current_time – Unix time used in checking expiration. If not present, the current time will be used.

返回值:

metadata dictionary for an object

引发:

DiskFileError – this implementation will raise the same errors as the open() method.

reader(keep_cache=False, cooperative_period=0, etag_validate_frac=1, _quarantine_hook=<function BaseDiskFile.<lambda>>)

Return a swift.common.swob.Response class compatible “app_iter” object as defined by swift.obj.diskfile.DiskFileReader.

For this implementation, the responsibility of closing the open file is passed to the swift.obj.diskfile.DiskFileReader object.

参数:
  • keep_cache – caller’s preference for keeping data read in the OS buffer cache

  • cooperative_period – the period parameter for cooperative yielding during file read

  • etag_validate_frac – the probability that we should perform etag validation during a complete file read

  • _quarantine_hook – 1-arg callable called when obj quarantined; the arg is the reason for quarantine. Default is to ignore it. Not needed by the REST layer.

返回值:

a swift.obj.diskfile.DiskFileReader object

reader_cls = None
property timestamp
validate_metadata()
write_metadata(metadata)

Write a block of metadata to an object without requiring the caller to create the object first. Supports fast-POST behavior semantics.

参数:

metadata – dictionary of metadata to be associated with the object

引发:

DiskFileError – this implementation will raise the same errors as the create() method.

writer(size=None)
writer_cls = None
class swift.obj.diskfile.BaseDiskFileManager(conf, logger)

基类: object

Management class for devices, providing common place for shared parameters and methods not provided by the DiskFile class (which primarily services the object server REST API layer).

The get_diskfile() method is how this implementation creates a DiskFile object.

注意

This class is reference implementation specific and not part of the pluggable on-disk backend API.

注意

TODO(portante): Not sure what the right name to recommend here, as “manager” seemed generic enough, though suggestions are welcome.

参数:
  • conf – caller provided configuration object

  • logger – caller provided logger

classmethod check_policy(policy)
cleanup_ondisk_files(hsh_path, **kwargs)

Clean up on-disk files that are obsolete and gather the set of valid on-disk files for an object.

参数:
  • hsh_path – object hash path

  • frag_index – if set, search for a specific fragment index .data file, otherwise accept the first valid .data file

返回值:

a dict that may contain: valid on disk files keyed by their filename extension; a list of obsolete files stored under the key ‘obsolete’; a list of files remaining in the directory, reverse sorted, stored under the key ‘files’.

clear_auditor_status(policy, auditor_type='ALL')
static consolidate_hashes(partition_dir)

Take what’s in hashes.pkl and hashes.invalid, combine them, write the result back to hashes.pkl, and clear out hashes.invalid.

参数:

partition_dir – absolute path to partition dir containing hashes.pkl and hashes.invalid

返回值:

a dict, the suffix hashes (if any), the key ‘valid’ will be False if hashes.pkl is corrupt, cannot be read or does not exist

construct_dev_path(device)

Construct the path to a device without checking if it is mounted.

参数:

device – name of target device

返回值:

设备的完整路径

diskfile_cls = None
get_dev_path(device, mount_check=None)

Return the path to a device, first checking to see if either it is a proper mount point, or at least a directory depending on the mount_check configuration option.

参数:
  • device – name of target device

  • mount_check – whether or not to check mountedness of device. Defaults to bool(self.mount_check).

返回值:

full path to the device, None if the path to the device is not a proper mount point or directory.

get_diskfile(device, partition, account, container, obj, policy, **kwargs)

Returns a BaseDiskFile instance for an object based on the object’s partition, path parts and policy.

参数:
  • device – name of target device

  • partition – partition on device in which the object lives

  • account – account name for the object

  • container – container name for the object

  • obj – object name for the object

  • policy – the StoragePolicy instance

get_diskfile_and_filenames_from_hash(device, partition, object_hash, policy, **kwargs)

Returns a tuple of (a DiskFile instance for an object at the given object_hash, the basenames of the files in the object’s hash dir). Just in case someone thinks of refactoring, be sure DiskFileDeleted is not raised, but the DiskFile instance representing the tombstoned object is returned instead.

参数:
  • device – name of target device

  • partition – partition on the device in which the object lives

  • object_hash – the hash of an object path

  • policy – the StoragePolicy instance

引发:

DiskFileNotExist – if the object does not exist

返回值:

a tuple comprising (an instance of BaseDiskFile, a list of file basenames)

get_diskfile_from_audit_location(audit_location)

Returns a BaseDiskFile instance for an object at the given AuditLocation.

参数:

audit_location – object location to be audited

get_diskfile_from_hash(device, partition, object_hash, policy, **kwargs)

Returns a DiskFile instance for an object at the given object_hash. Just in case someone thinks of refactoring, be sure DiskFileDeleted is not raised, but the DiskFile instance representing the tombstoned object is returned instead.

参数:
  • device – name of target device

  • partition – partition on the device in which the object lives

  • object_hash – the hash of an object path

  • policy – the StoragePolicy instance

引发:

DiskFileNotExist – if the object does not exist

返回值:

an instance of BaseDiskFile

get_hashes(device, partition, suffixes, policy, skip_rehash=False)
参数:
  • device – name of target device

  • partition – partition name

  • suffixes – a list of suffix directories to be recalculated

  • policy – the StoragePolicy instance

  • skip_rehash – just mark the suffixes dirty; return None

返回值:

a dictionary that maps suffix directories

get_ondisk_files(files, datadir, verify=True, policy=None, **kwargs)

Given a simple list of files names, determine the files that constitute a valid fileset i.e. a set of files that defines the state of an object, and determine the files that are obsolete and could be deleted. Note that some files may fall into neither category.

If a file is considered part of a valid fileset then its info dict will be added to the results dict, keyed by <extension>_info. Any files that are no longer required will have their info dicts added to a list stored under the key ‘obsolete’.

The results dict will always contain entries with keys ‘ts_file’, ‘data_file’ and ‘meta_file’. Their values will be the fully qualified path to a file of the corresponding type if there is such a file in the valid fileset, or None.

参数:
  • files – a list of file names.

  • datadir – directory name files are from; this is used to construct file paths in the results, but the datadir is not modified by this method.

  • verify – if True verify that the ondisk file contract has not been violated, otherwise do not verify.

  • policy – storage policy used to store the files. Used to validate fragment indexes for EC policies.

返回值:

a dict that will contain keys

ts_file -> path to a .ts file or None data_file -> path to a .data file or None meta_file -> path to a .meta file or None ctype_file -> path to a .meta file or None

and may contain keys

ts_info -> a file info dict for a .ts file data_info -> a file info dict for a .data file meta_info -> a file info dict for a .meta file ctype_info -> a file info dict for a .meta file which contains the content-type value unexpected -> a list of file paths for unexpected files possible_reclaim -> a list of file info dicts for possible reclaimable files obsolete -> a list of file info dicts for obsolete files

static invalidate_hash(suffix_dir)

Invalidates the hash for a suffix_dir in the partition’s hashes file.

参数:

suffix_dir – absolute path to suffix dir whose hash needs invalidating

make_on_disk_filename(timestamp, ext=None, ctype_timestamp=None, *a, **kw)

Returns filename for given timestamp.

参数:
  • timestamp – the object timestamp, an instance of Timestamp

  • ext – an optional string representing a file extension to be appended to the returned file name

  • ctype_timestamp – an optional content-type timestamp, an instance of Timestamp

返回值:

a file name

object_audit_location_generator(policy, device_dirs=None, auditor_type='ALL')

Yield an AuditLocation for all objects stored under device_dirs.

参数:
  • policy – the StoragePolicy instance

  • device_dirs – directory of target device

  • auditor_type – either ALL or ZBF

parse_on_disk_filename(filename, policy)

Parse an on disk file name.

参数:
  • filename – the file name including extension

  • policy – storage policy used to store the file

返回值:

a dict, with keys for timestamp, ext and ctype_timestamp

  • timestamp is a Timestamp

  • ctype_timestamp is a Timestamp or None for .meta files, otherwise None

  • ext is a string, the file extension including the leading dot or the empty string if the filename has no extension.

Subclasses may override this method to add further keys to the returned dict.

引发:

DiskFileError – if any part of the filename is not able to be validated.

partition_lock(device, policy, partition, name=None, timeout=None)

A context manager that will lock on the partition given.

参数:
  • device – device targeted by the lock request

  • policy – policy targeted by the lock request

  • partition – partition targeted by the lock request

引发:

PartitionLockTimeout – If the lock on the partition cannot be granted within the configured timeout.

pickle_async_update(device, account, container, obj, data, timestamp, policy)

Write data describing a container update notification to a pickle file in the async_pending directory.

参数:
  • device – name of target device

  • account – account name for the object

  • container – container name for the object

  • obj – object name for the object

  • data – update data to be written to pickle file

  • timestamp – a Timestamp

  • policy – the StoragePolicy instance

policy = None
static quarantine_renamer(device_path, corrupted_file_path)

In the case that a file is corrupted, move it to a quarantined area to allow replication to fix it.

Params device_path:

The path to the device the corrupted file is on.

Params corrupted_file_path:

The path to the file you want quarantined.

返回值:

path (str) of directory the file was moved to

引发:

OSError – re-raises non errno.EEXIST / errno.ENOTEMPTY exceptions from rename

replication_lock(device, policy, partition)

A context manager that will lock on the partition and, if configured to do so, on the device given.

参数:
  • device – name of target device

  • policy – policy targeted by the replication request

  • partition – partition targeted by the replication request

引发:

ReplicationLockTimeout – If the lock on the device cannot be granted within the configured timeout.

yield_hashes(device, partition, policy, suffixes=None, **kwargs)

Yields tuples of (hash_only, timestamps) for object information stored for the given device, partition, and (optionally) suffixes. If suffixes is None, all stored suffixes will be searched for object hashes. Note that if suffixes is not None but empty, such as [], then nothing will be yielded.

timestamps is a dict which may contain items mapping

  • ts_data -> timestamp of data or tombstone file,

  • ts_meta -> timestamp of meta file, if one exists

  • ts_ctype -> timestamp of meta file containing most recent

    content-type value, if one exists

  • durable -> True if data file at ts_data is durable, False otherwise

where timestamps are instances of Timestamp

参数:
  • device – name of target device

  • partition – partition name

  • policy – the StoragePolicy instance

  • suffixes – optional list of suffix directories to be searched

yield_suffixes(device, partition, policy)

Yields tuples of (full_path, suffix_only) for suffixes stored on the given device and partition.

参数:
  • device – name of target device

  • partition – partition name

  • policy – the StoragePolicy instance

class swift.obj.diskfile.BaseDiskFileReader(fp, data_file, obj_size, etag, disk_chunk_size, keep_cache_size, device_path, logger, quarantine_hook, use_splice, pipe_size, diskfile, keep_cache=False, cooperative_period=0, etag_validate_frac=1)

基类: object

Encapsulation of the WSGI read context for servicing GET REST API requests. Serves as the context manager object for the swift.obj.diskfile.DiskFile class’s swift.obj.diskfile.DiskFile.reader() method.

注意

The quarantining behavior of this method is considered implementation specific, and is not required of the API.

注意

The arguments to the constructor are considered implementation specific. The API does not define the constructor arguments.

参数:
  • fp – open file object pointer reference

  • data_file – on-disk data file name for the object

  • obj_size – verified on-disk size of the object

  • etag – expected metadata etag value for entire file

  • disk_chunk_size – size of reads from disk in bytes

  • keep_cache_size – maximum object size that will be kept in cache

  • device_path – on-disk device path, used when quarantining an obj

  • logger – logger caller wants this object to use

  • quarantine_hook – 1-arg callable called w/reason when quarantined

  • use_splice – if true, use zero-copy splice() to send data

  • pipe_size – size of pipe buffer used in zero-copy operations

  • diskfile – the diskfile creating this DiskFileReader instance

  • keep_cache – should resulting reads be kept in the buffer cache

  • cooperative_period – the period parameter when does cooperative yielding during file read

  • etag_validate_frac – the probability that we should perform etag validation during a complete file read

app_iter_range(start, stop)

Returns an iterator over the data file for range (start, stop)

app_iter_ranges(ranges, content_type, boundary, size)

Returns an iterator over the data file for a set of ranges

can_zero_copy_send()
close()

Close the open file handle if present.

For this specific implementation, this method will handle quarantining the file if necessary.

property manager
zero_copy_send(wsockfd)

Does some magic with splice() and tee() to move stuff from disk to network without ever touching userspace.

参数:

wsockfd – file descriptor (integer) of the socket out which to send data

class swift.obj.diskfile.BaseDiskFileWriter(name, datadir, size, bytes_per_sync, diskfile, next_part_power, extension='.data')

基类: object

Encapsulation of the write context for servicing PUT REST API requests. Serves as the context manager object for the swift.obj.diskfile.DiskFile class’s swift.obj.diskfile.DiskFile.create() method.

注意

It is the responsibility of the swift.obj.diskfile.DiskFile.create() method context manager to close the open file descriptor.

注意

The arguments to the constructor are considered implementation specific. The API does not define the constructor arguments.

参数:
  • name – name of object from REST API

  • datadir – on-disk directory object will end up in on swift.obj.diskfile.DiskFileWriter.put()

  • fd – open file descriptor of temporary file to receive data

  • tmppath – full path name of the opened file descriptor

  • bytes_per_sync – number bytes written between sync calls

  • diskfile – the diskfile creating this DiskFileWriter instance

  • next_part_power – the next partition power to be used

  • extension – the file extension to be used; may be used internally to distinguish between PUT/POST/DELETE operations

chunks_finished()

Expose internal stats about written chunks.

返回值:

a tuple, (upload_size, etag)

close()
commit(timestamp)

Perform any operations necessary to mark the object as durable. For replication policy type this is a no-op.

参数:

timestamp – object put timestamp, an instance of Timestamp

property logger
property manager
open()
put(metadata)

Finalize writing the file on disk.

参数:

metadata – dictionary of metadata to be associated with the object

write(chunk)

Write a chunk of data to disk. All invocations of this method must come before invoking the :func

For this implementation, the data is written into a temporary file.

参数:

chunk – the chunk of data to write as a string object

class swift.obj.diskfile.DiskFile(mgr, device_path, partition, account=None, container=None, obj=None, _datadir=None, policy=None, use_splice=False, pipe_size=None, open_expired=False, next_part_power=None, **kwargs)

Bases: BaseDiskFile

reader_cls

alias of DiskFileReader

writer_cls

alias of DiskFileWriter

class swift.obj.diskfile.DiskFileManager(conf, logger)

Bases: BaseDiskFileManager

diskfile_cls

alias of DiskFile

policy = 'replication'
class swift.obj.diskfile.DiskFileReader(fp, data_file, obj_size, etag, disk_chunk_size, keep_cache_size, device_path, logger, quarantine_hook, use_splice, pipe_size, diskfile, keep_cache=False, cooperative_period=0, etag_validate_frac=1)

Bases: BaseDiskFileReader

class swift.obj.diskfile.DiskFileRouter(*args, **kwargs)

基类: object

class swift.obj.diskfile.DiskFileWriter(name, datadir, size, bytes_per_sync, diskfile, next_part_power, extension='.data')

Bases: BaseDiskFileWriter

put(metadata)

Finalize writing the file on disk.

参数:

metadata – dictionary of metadata to be associated with the object

class swift.obj.diskfile.ECDiskFile(*args, **kwargs)

Bases: BaseDiskFile

property durable_timestamp

Provides the timestamp of the newest durable file found in the object directory.

返回值:

A Timestamp instance, or None if no durable file was found.

引发:

DiskFileNotOpen – if the open() method has not been previously called on this instance.

property fragments

Provides information about all fragments that were found in the object directory, including fragments without a matching durable file, and including any fragment chosen to construct the opened diskfile.

返回值:

A dict mapping <Timestamp instance> -> <list of frag indexes>, or None if the diskfile has not been opened or no fragments were found.

purge(timestamp, frag_index, nondurable_purge_delay=0, meta_timestamp=None)

Remove a tombstone file matching the specified timestamp or datafile matching the specified timestamp and fragment index from the object directory.

This provides the EC reconstructor/ssync process with a way to remove a tombstone or fragment from a handoff node after reverting it to its primary node.

The hash will be invalidated, and if empty the hsh_path will be removed immediately.

参数:
  • timestamp – the object timestamp, an instance of Timestamp

  • frag_index – fragment archive index, must be a whole number or None.

  • nondurable_purge_delay – only remove a non-durable data file if it’s been on disk longer than this many seconds.

  • meta_timestamp – if not None then remove any meta file with this timestamp

reader_cls

alias of ECDiskFileReader

validate_metadata()
writer_cls

alias of ECDiskFileWriter

class swift.obj.diskfile.ECDiskFileManager(conf, logger)

Bases: BaseDiskFileManager

diskfile_cls

alias of ECDiskFile

make_on_disk_filename(timestamp, ext=None, frag_index=None, ctype_timestamp=None, durable=False, *a, **kw)

Returns the EC specific filename for given timestamp.

参数:
  • timestamp – the object timestamp, an instance of Timestamp

  • ext – an optional string representing a file extension to be appended to the returned file name

  • frag_index – a fragment archive index, used with .data extension only, must be a whole number.

  • ctype_timestamp – an optional content-type timestamp, an instance of Timestamp

  • durable – if True then include a durable marker in data filename.

返回值:

a file name

引发:

DiskFileError – if ext==’.data’ and the kwarg frag_index is not a whole number

parse_on_disk_filename(filename, policy)

Returns timestamp(s) and other info extracted from a policy specific file name. For EC policy the data file name includes a fragment index and possibly a durable marker, both of which must be stripped off to retrieve the timestamp.

参数:

filename – the file name including extension

返回值:

a dict, with keys for timestamp, frag_index, durable, ext and

ctype_timestamp

  • timestamp is a Timestamp

  • frag_index is an int or None

  • ctype_timestamp is a Timestamp or None for .meta files, otherwise None

  • ext is a string, the file extension including the leading dot or the empty string if the filename has no extension

  • durable is a boolean that is True if the filename is a data file that includes a durable marker

引发:

DiskFileError – if any part of the filename is not able to be validated.

policy = 'erasure_coding'
validate_fragment_index(frag_index, policy=None)

Return int representation of frag_index, or raise a DiskFileError if frag_index is not a whole number.

参数:
  • frag_index – a fragment archive index

  • policy – storage policy used to validate the index against

class swift.obj.diskfile.ECDiskFileReader(fp, data_file, obj_size, etag, disk_chunk_size, keep_cache_size, device_path, logger, quarantine_hook, use_splice, pipe_size, diskfile, keep_cache=False, cooperative_period=0, etag_validate_frac=1)

Bases: BaseDiskFileReader

class swift.obj.diskfile.ECDiskFileWriter(name, datadir, size, bytes_per_sync, diskfile, next_part_power, extension='.data')

Bases: BaseDiskFileWriter

commit(timestamp)

Finalize put by renaming the object data file to include a durable marker. We do this for EC policy because it requires a 2-phase put commit confirmation.

参数:

timestamp – object put timestamp, an instance of Timestamp

引发:

DiskFileError – if the diskfile frag_index has not been set (either during initialisation or a call to put())

put(metadata)

The only difference between this method and the replication policy DiskFileWriter method is adding the frag index to the metadata.

参数:

metadata – dictionary of metadata to be associated with object

swift.obj.diskfile.clear_auditor_status(devices, datadir, auditor_type='ALL')
swift.obj.diskfile.consolidate_hashes(partition_dir)

Take what’s in hashes.pkl and hashes.invalid, combine them, write the result back to hashes.pkl, and clear out hashes.invalid.

参数:

partition_dir – absolute path to partition dir containing hashes.pkl and hashes.invalid

返回值:

a dict, the suffix hashes (if any), the key ‘valid’ will be False if hashes.pkl is corrupt, cannot be read or does not exist

swift.obj.diskfile.extract_policy(obj_path)

Extracts the policy for an object (based on the name of the objects directory) given the device-relative path to the object. Returns None in the event that the path is malformed in some way.

The device-relative path is everything after the mount point; for example

/srv/node/d42/objects-5/30/179/

485dc017205a81df3af616d917c90179/1401811134.873649.data

would have device-relative path

objects-5/30/179/485dc017205a81df3af616d917c90179/1401811134.873649.data

参数:

obj_path – device-relative path of an object, or the full path

返回值:

a BaseStoragePolicy or None

swift.obj.diskfile.get_async_dir(policy_or_index)

Get the async dir for the given policy.

参数:

policy_or_indexStoragePolicy instance, or an index (string or int); if None, the legacy Policy-0 is assumed.

返回值:

async_pending or async_pending-<N> as appropriate

swift.obj.diskfile.get_auditor_status(datadir_path, logger, auditor_type)
swift.obj.diskfile.get_data_dir(policy_or_index)

Get the data dir for the given policy.

参数:

policy_or_indexStoragePolicy instance, or an index (string or int); if None, the legacy Policy-0 is assumed.

返回值:

objects or objects-<N> as appropriate

swift.obj.diskfile.get_part_path(dev_path, policy, partition)

Given the device path, policy, and partition, returns the full path to the partition

swift.obj.diskfile.get_tmp_dir(policy_or_index)

Get the temp dir for the given policy.

参数:

policy_or_indexStoragePolicy instance, or an index (string or int); if None, the legacy Policy-0 is assumed.

返回值:

tmp or tmp-<N> as appropriate

swift.obj.diskfile.invalidate_hash(suffix_dir)

Invalidates the hash for a suffix_dir in the partition’s hashes file.

参数:

suffix_dir – absolute path to suffix dir whose hash needs invalidating

swift.obj.diskfile.object_audit_location_generator(devices, datadir, mount_check=True, logger=None, device_dirs=None, auditor_type='ALL')

Given a devices path (e.g. “/srv/node”), yield an AuditLocation for all objects stored under that directory for the given datadir (policy), if device_dirs isn’t set. If device_dirs is set, only yield AuditLocation for the objects under the entries in device_dirs. The AuditLocation only knows the path to the hash directory, not to the .data file therein (if any). This is to avoid a double listdir(hash_dir); the DiskFile object will always do one, so we don’t.

参数:
  • devices – 要审计的设备的父目录

  • datadir – objects directory

  • mount_check – flag to check if a mount check should be performed on devices

  • logger – 一个日志记录器对象

  • device_dirs – a list of directories under devices to traverse

  • auditor_type – either ALL or ZBF

swift.obj.diskfile.quarantine_renamer(device_path, corrupted_file_path)

In the case that a file is corrupted, move it to a quarantined area to allow replication to fix it.

Params device_path:

The path to the device the corrupted file is on.

Params corrupted_file_path:

The path to the file you want quarantined.

返回值:

path (str) of directory the file was moved to

引发:

OSError – re-raises non errno.EEXIST / errno.ENOTEMPTY exceptions from rename

swift.obj.diskfile.read_hashes(partition_dir)

Read the existing hashes.pkl

返回值:

a dict, the suffix hashes (if any), the key ‘valid’ will be False if hashes.pkl is corrupt, cannot be read or does not exist

swift.obj.diskfile.read_metadata(fd, add_missing_checksum=False)

Helper function to read the pickled metadata from an object data file.

The only difference from _read_file_metadata is that this function raises DiskFileNotExist when the file cannot be read.

参数:
  • fd – file descriptor or filename to load the metadata from

  • add_missing_checksum – if set and checksum is missing, add it

返回值:

dictionary of metadata

引发:

Hard-links a file located in target_path using the second path new_target_path. Creates intermediate directories if required.

参数:
  • target_path – current absolute filename

  • new_target_path – new absolute filename for the hardlink

  • ignore_missing – if True then no exception is raised if the link could not be made because target_path did not exist, otherwise an OSError will be raised.

引发:

OSError if the hard link could not be created, unless the intended hard link already exists or the target_path does not exist and must_exist if False.

返回值:

True if the link was created by the call to this method, False otherwise.

swift.obj.diskfile.update_auditor_status(datadir_path, logger, partitions, auditor_type)
swift.obj.diskfile.valid_suffix(value)
swift.obj.diskfile.write_hashes(partition_dir, hashes)

Write hashes to hashes.pkl

The updated key is added to hashes before it is written.

swift.obj.diskfile.write_metadata(fd, metadata, xattr_size=65536)

Helper function to write pickled metadata for an object file.

参数:
  • fd – file descriptor or filename to write the metadata

  • metadata – metadata to write

Object Replicator

class swift.obj.replicator.ObjectReplicator(conf, logger=None)

基类: Daemon

Replicate objects.

Encapsulates most logic and data needed by the object replication process. Each call to .replicate() performs one replication pass. It’s up to the caller to do this in a loop.

aggregate_recon_update()
build_replication_jobs(policy, ips, override_devices=None, override_partitions=None)

Helper function for collect_jobs to build jobs for replication using replication style storage policy

check_ring(object_ring)

Check to see if the ring has been updated :param object_ring: the ring to check

返回值:

boolean indicating whether or not the ring has changed

collect_jobs(override_devices=None, override_partitions=None, override_policies=None)

Returns a sorted list of jobs (dictionaries) that specify the partitions, nodes, etc to be rsynced.

参数:
  • override_devices – if set, only jobs on these devices will be returned

  • override_partitions – if set, only jobs on these partitions will be returned

  • override_policies – if set, only jobs in these storage policies will be returned

delete_handoff_objs(job, delete_objs)
delete_partition(path)
get_local_devices()

Returns a set of all local devices in all replication-type storage policies.

This is the device names, e.g. “sdq” or “d1234” or something, not the full ring entries.

get_worker_args(once=False, **kwargs)

For each worker yield a (possibly empty) dict of kwargs to pass along to the daemon’s run() method after fork. The length of elements returned from this method will determine the number of processes created.

If the returned iterable is empty, the Strategy will fallback to run-inline strategy.

参数:
  • once – False if the worker(s) will be daemonized, True if the worker(s) will be run once

  • kwargs – plumbed through via command line argparser

返回值:

an iterable of dicts, each element represents the kwargs to be passed to a single worker’s run() method after fork.

heartbeat()

Loop that runs in the background during replication. It periodically logs progress.

is_healthy()

Check whether our set of local devices remains the same.

If devices have been added or removed, then we return False here so that we can kill off any worker processes and then distribute the new set of local devices across a new set of workers so that all devices are, once again, being worked on.

This function may also cause recon stats to be updated.

返回值:

False if any local devices have been added or removed, True otherwise

load_object_ring(policy)

Make sure the policy’s rings are loaded.

参数:

policy – the StoragePolicy instance

返回值:

相应的环对象

post_multiprocess_run()

Override this to do something after running using multiple worker processes. This method is called in the parent process.

This is probably only useful for run-once mode since there is no “after running” in run-forever mode.

replicate(override_devices=None, override_partitions=None, override_policies=None, start_time=None)

Run a replication pass

revert(job)

High-level method that replicates a single partition that doesn’t belong on this node.

参数:

job – a dict containing info about the partition to be replicated

rsync(node, job, suffixes)

Uses rsync to implement the sync method. This was the first sync method in Swift.

run_forever(multiprocess_worker_index=None, override_devices=None, *args, **kwargs)

Override this to run forever

run_once(multiprocess_worker_index=None, have_overrides=False, *args, **kwargs)

Override this to run the script once

ssync(node, job, suffixes, remote_check_objs=None)
stats_line()

Logs various stats for the currently running replication pass.

sync(node, job, suffixes, *args, **kwargs)

Synchronize local suffix directories from a partition with a remote node.

参数:
  • node – the “dev” entry for the remote node to sync with

  • job – information about the partition being synced

  • suffixes – a list of suffixes which need to be pushed

返回值:

boolean and dictionary, boolean indicating success or failure

property total_stats
update(job)

High-level method that replicates a single partition.

参数:

job – a dict containing info about the partition to be replicated

update_recon(total, end_time, override_devices)
class swift.obj.replicator.Stats(attempted=0, failure=0, hashmatch=0, remove=0, rsync=0, success=0, suffix_count=0, suffix_hash=0, suffix_sync=0, failure_nodes=None)

基类: object

add_failure_stats(failures)

Note the failure of one or more devices.

参数:

failures – a list of (ip, device-name) pairs that failed

fields = ['attempted', 'failure', 'hashmatch', 'remove', 'rsync', 'success', 'suffix_count', 'suffix_hash', 'suffix_sync', 'failure_nodes']
classmethod from_recon(dct)
to_recon()
swift.obj.replicator.main()
class swift.obj.ssync_sender.Sender(daemon, node, job, suffixes, remote_check_objs=None, include_non_durable=False, max_objects=0)

基类: object

Sends SSYNC requests to the object server.

These requests are eventually handled by ssync_receiver and full documentation about the process is there.

connect()

Establishes a connection and starts an SSYNC request with the object server.

disconnect(connection)

Closes down the connection to the object server once done with the SSYNC request.

missing_check(connection, response)

Handles the sender-side of the MISSING_CHECK step of a SSYNC request.

Full documentation of this can be found at Receiver.missing_check().

send_delete(connection, url_path, timestamp)

Sends a DELETE subrequest with the given information.

send_post(connection, url_path, df)
send_put(connection, url_path, df, durable=True)

Sends a PUT subrequest for the url_path using the source df (DiskFile) and content_length.

send_subrequest(connection, method, url_path, headers, df)
updates(connection, response, send_map)

Handles the sender-side of the UPDATES step of an SSYNC request.

Full documentation of this can be found at Receiver.updates().

class swift.obj.ssync_sender.SsyncBufferedHTTPConnection(host, port=None, timeout=<object object>, source_address=None)

Bases: BufferedHTTPConnection

response_class

alias of SsyncBufferedHTTPResponse

class swift.obj.ssync_sender.SsyncBufferedHTTPResponse(*args, **kwargs)

Bases: BufferedHTTPResponse, object

readline(size=1024)

Reads a line from the SSYNC response body.

httplib has no readline and will block on read(x) until x is read, so we have to do the work ourselves. A bit of this is taken from Python’s httplib itself.

swift.obj.ssync_sender.decode_wanted(parts)

Parse missing_check line parts to determine which parts of local diskfile were wanted by the receiver.

The encoder for parts is encode_wanted()

swift.obj.ssync_sender.encode_missing(object_hash, ts_data, ts_meta=None, ts_ctype=None, **kwargs)

Returns a string representing the object hash, its data file timestamp, the delta forwards to its metafile and content-type timestamps, if non-zero, and its durability, in the form: <hash> <ts_data> [m:<hex delta to ts_meta>[,t:<hex delta to ts_ctype>] [,durable:False]

The decoder for this line is decode_missing()

class swift.obj.ssync_receiver.Receiver(app, request)

基类: object

Handles incoming SSYNC requests to the object server.

These requests come from the object-replicator daemon that uses ssync_sender.

The number of concurrent SSYNC requests is restricted by use of a replication_semaphore and can be configured with the object-server.conf [object-server] replication_concurrency setting.

An SSYNC request is really just an HTTP conduit for sender/receiver replication communication. The overall SSYNC request should always succeed, but it will contain multiple requests within its request and response bodies. This “hack” is done so that replication concurrency can be managed.

The general process inside an SSYNC request is

  1. Initialize the request: Basic request validation, mount check, acquire semaphore lock, etc..

  2. Missing check: Sender sends the hashes and timestamps of the object information it can send, receiver sends back the hashes it wants (doesn’t have or has an older timestamp).

  3. Updates: Sender sends the object information requested.

  4. Close down: Release semaphore lock, etc.

initialize_request()

Basic validation of request and mount check.

This function will be called before attempting to acquire a replication semaphore lock, so contains only quick checks.

missing_check()

Handles the receiver-side of the MISSING_CHECK step of a SSYNC request.

Receives a list of hashes and timestamps of object information the sender can provide and responds with a list of hashes desired, either because they’re missing or have an older timestamp locally.

The process is generally

  1. Sender sends :MISSING_CHECK: START and begins sending hash timestamp lines.

  2. Receiver gets :MISSING_CHECK: START and begins reading the hash timestamp lines, collecting the hashes of those it desires.

  3. Sender sends :MISSING_CHECK: END.

  4. Receiver gets :MISSING_CHECK: END, responds with :MISSING_CHECK: START, followed by the list of <wanted_hash> specifiers it collected as being wanted (one per line), :MISSING_CHECK: END, and flushes any buffers.

    Each <wanted_hash> specifier has the form <hash>[ <parts>] where <parts> is a string containing characters ‘d’ and/or ‘m’ indicating that only data or meta part of object respectively is required to be sync’d.

  5. Sender gets :MISSING_CHECK: START and reads the list of hashes desired by the receiver until reading :MISSING_CHECK: END.

The collection and then response is so the sender doesn’t have to read while it writes to ensure network buffers don’t fill up and block everything.

updates()

Handles the UPDATES step of an SSYNC request.

Receives a set of PUT and DELETE subrequests that will be routed to the object server itself for processing. These contain the information requested by the MISSING_CHECK step.

The PUT and DELETE subrequests are formatted pretty much exactly like regular HTTP requests, excepting the HTTP version on the first request line.

The process is generally

  1. Sender sends :UPDATES: START and begins sending the PUT and DELETE subrequests.

  2. Receiver gets :UPDATES: START and begins routing the subrequests to the object server.

  3. Sender sends :UPDATES: END.

  4. Receiver gets :UPDATES: END and sends :UPDATES: START and :UPDATES: END (assuming no errors).

  5. Sender gets :UPDATES: START and :UPDATES: END.

If too many subrequests fail, as configured by replication_failure_threshold and replication_failure_ratio, the receiver will hang up the request early so as to not waste any more time.

At step 4, the receiver will send back an error if there were any failures (that didn’t cause a hangup due to the above thresholds) so the sender knows the whole was not entirely a success. This is so the sender knows if it can remove an out of place partition, for example.

exception swift.obj.ssync_receiver.SsyncClientDisconnected

基础: Exception

class swift.obj.ssync_receiver.SsyncInputProxy(wsgi_input, chunk_size, timeout)

基类: object

包装 WSGI 输入,以提供 ssync 特定的读取方法。

如果在读取输入时引发任何异常或超时,则后续调用将引发相同的异常。因此,可以防止调用者在输入引发异常后读取输入,此时其状态可能不确定。这使得输入能够被多个调用者(通常是 ssync Receiver 和 ObjectController)安全地共享,否则这些调用者可能不知道对方遇到了异常。

参数:
  • wsgi_input – 一个 wsgi 输入

  • chunk_size – 一次读取的字节数

  • timeout – 应用于每次读取的超时(秒)

make_subreq_input(context, content_length)

返回一个 wsgi 输入,该输入将从包装的 wsgi 输入中读取最多指定的 content-length

参数:
  • context – 用于注释任何引发的异常的字符串

  • content_length – 要读取的最大字节数

read_line(context)

尝试从 wsgi 输入读取一行;用调用上下文的描述注释任何超时或读取错误。

参数:

context – 用于注释任何引发的异常的字符串

swift.obj.ssync_receiver.decode_missing(line)

解析由 encode_missing() 生成的字符串形式,并返回一个字典,其中包含键 object_hashts_datats_metats_ctypedurable

此行的编码器是 encode_missing()

swift.obj.ssync_receiver.encode_wanted(remote, local)

比较远程和本地结果,并生成一个 wanted 行。

参数:
  • remote – 一个字典,具有 ts_datats_meta 键,形式为 decode_missing() 返回的形式

  • local – 一个字典,可能是空的,具有 ts_datats_meta 键,形式为 Receiver._check_local() 返回的形式

此行的解码器是 decode_wanted()

对象重构器

class swift.obj.reconstructor.ObjectReconstructor(conf, logger=None)

基类: Daemon

使用纠删码重构对象。同时还会手动重新平衡 EC 片段归档对象 off 挂载节点。

封装了对象重构过程所需的大部分逻辑和数据。每次调用 .reconstruct() 执行一次传递。调用者需要自行循环调用。

aggregate_recon_update()

聚合子工作进程的每个磁盘 rcache 更新。

build_reconstruction_jobs(part_info)

用于 collect_jobs 的辅助函数,用于构建使用 EC 存储策略的重构作业

注意。如果此函数返回空作业列表,则整个分区将被删除。

check_ring(object_ring)

检查环是否已更新

参数:

object_ring – 要检查的环

返回值:

boolean indicating whether or not the ring has changed

collect_parts(override_devices=None, override_partitions=None)

用于在顶层重构器中获取分区的辅助函数

在 handoffs_only 模式下,主分区不会包含在返回的(可能为空的)列表中。

delete_partition(path)
delete_reverted_objs(job, objects)

对于 EC,我们可能会回滚分区的一部分,所以我们在这里删除回滚的对象。请注意,我们删除了发送到远程节点的文件的片段索引。

参数:
  • job – 正在处理的作业

  • objects – 要删除的对象字典,每个条目映射 hash=>timestamp

detect_lockups()

在测试中,pool.waitall() 调用非常偶尔地未能返回。这是为了确保在任何情况下重构器都能完成其重构传递。

final_recon_dump(total, override_devices=None, **kwargs)

将此工作进程的运行统计信息添加到 recon 缓存。

在工作进程模式下(per_disk_stats == True)时,此工作进程的统计信息将按设备而不是顶级键进行添加(聚合在父进程中序列化)。

参数:
  • total – 周期运行时间(分钟)

  • override_devices – (可选)正在重构的设备列表

get_local_devices()

返回所有 EC 策略中的所有本地设备集。

get_policy2devices()
get_suffix_delta(local_suff, local_index, remote_suff, remote_index)

比较给定本地和远程片段索引的本地后缀哈希与远程后缀哈希。返回应同步的后缀。

参数:
  • local_suff – 本地后缀哈希(来自 _get_hashes)

  • local_index – 作业的本地片段索引

  • remote_suff – 远程后缀哈希(来自远程 REPLICATE 请求)

  • remote_index – 作业的远程片段索引

返回值:

一个字符串列表,表示要同步的后缀目录。

get_worker_args(once=False, **kwargs)

获取此节点所有 EC 策略环中的所有本地设备集,并将它们平均分配到要根据配置的工作进程数生成的 worker 中。如果 kwargs 中提供了 devices,则仅分配这些设备。

参数:
  • once – False if the worker(s) will be daemonized, True if the worker(s) will be run once

  • kwargs – 命令行提供的可选覆盖

heartbeat()

在重构期间在后台运行的循环。它会定期记录进度。

is_healthy()

检查环是否已更改,并可能进行 recon 更新。

返回值:

如果任何 ec 环已更改,则为 False

kill_coros()

实用函数,用于终止所有当前正在运行的协程。

load_object_ring(policy)

Make sure the policy’s rings are loaded.

参数:

policy – the StoragePolicy instance

返回值:

相应的环对象

make_rebuilt_fragment_iter(responses, path, policy, frag_index)

将来自后端对象服务器的连接集转换为一个生成器,该生成器产生 frag_index 的已重构片段归档。

post_multiprocess_run()

Override this to do something after running using multiple worker processes. This method is called in the parent process.

This is probably only useful for run-once mode since there is no “after running” in run-forever mode.

process_job(job)

根据作业的参数,将本地分区与远程节点同步。对于主节点,SYNC 作业类型将定义左右同步节点,以与主节点在基于分区中找到的片段索引的节点列表中进行同步。对于非主节点(无论是挂载回滚还是重新平衡),REVERT 作业将定义一个同步节点,该节点是片段索引的正确/新家。

注意。环重新平衡可能非常耗时,并且挂载节点的片段索引没有稳定的顺序,可能有一个分区的 REVERT 作业,在某些罕见的故障条件下,甚至可能还有一个同步作业用于同一个分区 - 但每个作业都会单独处理,因为每个作业将定义一个单独的“同步到”节点列表。

参数:

job – 作业字典,键由 _get_job_info 定义

reconstruct(**kwargs)

运行重构传递

reconstruct_fa(job, node, df)

重构片段归档 - 此方法从 ssync 调用,在远程节点响应其缺少此对象后;本地磁盘文件被打开以提供元数据 - 但要重构缺失的片段归档,我们必须连接到多个对象服务器。

参数:
  • job – 来自 ssync_sender 的作业。

  • node – 我们正在重建的节点。

  • dfBaseDiskFile 的实例。

返回值:

供 ssync 使用的类似 DiskFile 的类。

引发:
run_forever(multiprocess_worker_index=None, *args, **kwargs)

Override this to run forever

run_once(multiprocess_worker_index=None, *args, **kwargs)

Override this to run the script once

stats_line()

记录当前运行重构传递的各种统计信息。

class swift.obj.reconstructor.RebuildingECDiskFileStream(datafile_metadata, frag_index, rebuilt_fragment_iter)

基类: object

此类将重构的片段归档数据和元数据包装在 DiskFile 接口中,供 ssync 使用。

property content_length
get_datafile_metadata()
get_metadata()
reader()
class swift.obj.reconstructor.ResponseBucket

基类: object

封装与单个时间戳相关的片段 GET 响应数据。

swift.obj.reconstructor.main()

对象服务器

Swift 的对象服务器

class swift.obj.server.EventletPlungerString

Bases: bytes

Eventlet 不会发送标头,直到它累积了至少 eventlet.wsgi.MINIMUM_CHUNK_SIZE 字节或应用程序迭代器耗尽。如果我们想在 Eventlet 的不知情下发送响应正文,也许使用一些零拷贝的技巧,那么我们必须疏通 eventlet.wsgi 的管道以强制发送标头,因此我们使用 EventletPlungerString 来清空 Eventlet 的所有缓冲区。

class swift.obj.server.ObjectController(conf, logger=None)

基类:BaseStorageServer

实现了 Swift 对象服务器的 WSGI 应用程序。

DELETE(request, timing_stats_labels)

处理 Swift 对象服务器的 HTTP DELETE 请求。

GET(request, timing_stats_labels)

处理 Swift 对象服务器的 HTTP GET 请求。

HEAD(request, timing_stats_labels)

处理 Swift 对象服务器的 HTTP HEAD 请求。

POST(request, timing_stats_labels)

处理 Swift 对象服务器的 HTTP POST 请求。

PUT(request, timing_stats_labels)

处理 Swift 对象服务器的 HTTP PUT 请求。

REPLICATE(request, timing_stats_labels)

处理 Swift 对象服务器的 REPLICATE 请求。对象复制器使用此请求来获取目录的哈希。

请注意,保留 REPLICATE 这个名称是出于历史原因,因为这个动词实际上只是返回指定参数的哈希信息,并被复制和 EC 等使用。

SSYNC(request, timing_stats_labels)
async_update(op, account, container, obj, host, partition, contdevice, headers_out, objdevice, policy, logger_thread_locals=None, container_path=None, db_state=None)

发送或保存异步更新。

参数:
  • op – 执行的操作(例如,“PUT” 或 “DELETE”)

  • account – account name for the object

  • container – container name for the object

  • obj – 对象名称

  • host – 容器所在的主机

  • partition – 容器所在的区

  • contdevice – 容器所在的设备名称

  • headers_out – 在容器请求中发送的标头字典

  • objdevice – 对象所在的设备名称

  • policy – 相关的 BaseStoragePolicy 实例

  • logger_thread_locals – 将设置在 self.logger 上的线程局部值,以保留事务日志信息。

  • container_path – (可选)形式为 <account/container> 的路径,更新应发送到该路径。如果提供了此路径,将使用它而不是从 accountcontainer 参数构造路径。

  • db_state – 由代理提供给我们的容器的当前数据库状态。

container_update(op, account, container, obj, request, headers_out, objdevice, policy)

在更新对象时更新容器。

参数:
  • op – 执行的操作(例如,“PUT” 或 “DELETE”)

  • account – account name for the object

  • container – container name for the object

  • obj – 对象名称

  • request – 驱动更新的原始请求对象

  • headers_out – 在容器请求中发送的标头字典

  • objdevice – 对象所在的设备名称

  • policy – BaseStoragePolicy 实例

delete_at_update(op, delete_at, account, container, obj, request, objdevice, policy, extra_headers=None)

在更新对象时更新过期对象容器。

参数:
  • op – 执行的操作(例如,“PUT” 或 “DELETE”)

  • delete_at – 计划删除的 UNIX 时间戳(秒),整数

  • account – account name for the object

  • container – container name for the object

  • obj – 对象名称

  • request – 驱动更新的原始请求

  • objdevice – 对象所在的设备名称

  • policy – BaseStoragePolicy 实例(用于临时目录)

  • extra_headers – 更新的其他标头字典

get_diskfile(device, partition, account, container, obj, policy, **kwargs)

实例化支持给定 REST API 的 DiskFile 对象的实用方法。

想要使用不同 DiskFile 类的对象服务器实现只需重写此方法即可提供该行为。

server_type = 'object-server'
setup(conf)

特定实现的设置。此方法在构造函数中最后调用,以允许特定实现修改现有属性或添加自己的属性。

参数:

conf – WSGI 配置参数

swift.obj.server.app_factory(global_conf, **local_conf)

paste.deploy 应用程序工厂,用于创建 WSGI 对象服务器应用程序

swift.obj.server.drain(file_like, read_size, timeout)

读取并丢弃 file_like 中的任何字节。

参数:
  • file_like – 要从中读取的文件类对象

  • read_size – 一次读取的块大小

  • timeout – 等待读取的时间(使用 None 表示无超时)

引发:

ChunkReadTimeout – 如果未及时读取到块

swift.obj.server.get_obj_name_and_placement(request)

拆分并验证对象的路径。

参数:

request – swob 请求

返回值:

一个元组,包含路径部分和存储策略

swift.obj.server.global_conf_callback(preloaded_app_conf, global_conf)

swift.common.wsgi.run_wsgi 的回调函数,用于在创建 global_conf 时,以便我们添加 replication_semaphore,用于限制所有工作进程的并发 SSYNC_REQUESTS 数量。

参数:
  • preloaded_app_conf – WSGI 应用程序的预加载 conf。此 conf 实例将消失,因此仅读取它,不要写入。

  • global_conf – 将在稍后传递给 app_factory 函数的全局 conf。此 conf 在工作进程子进程分叉之前创建,因此可用于设置信号量、共享内存等。

swift.obj.server.iter_mime_headers_and_bodies(wsgi_input, mime_boundary, read_chunk_size)
swift.obj.server.main()

对象更新器

class swift.obj.updater.BucketizedUpdateSkippingLimiter(update_iterable, logger, stats, num_buckets=1000, max_elements_per_group_per_second=50, max_deferred_elements=0, drain_until=0)

基类: object

包装一个迭代器,以按存储桶(bucket)为基础来限制更新速率,其中更新通过哈希其目标路径映射到存储桶。如果更新受到速率限制,则将其放入延迟队列,并在包装的迭代器耗尽之前(在 drain_until 时间之前)发送。

延迟队列的大小有限,一旦队列已满,更新将被淘汰,采用先进先出策略。此策略的使用是因为队列中的更新可能已被写入磁盘的较新更新所取代,而这对于已在队列中最久的更新来说更常见。

迭代器按如下方式增加统计信息

  • 每次速率限制一个更新时,将增加 deferrals 统计信息。请注意,单个更新最多被速率限制一次。

  • 每次速率限制但最终未产生(yield)的更新时,将增加 skips 统计信息。这包括从延迟队列中淘汰的更新以及在 drain_until 时间到达并且迭代器终止时,留在延迟队列中的所有更新。

  • 每次最终产生(yield)的速率限制更新时,将增加 drains 统计信息。

因此,当此迭代器终止时,skipsdrains 的总和等于 deferrals 的数量。

参数:
  • update_iterable – 一个异步待处理(async_pending)更新迭代器

  • logger – 一个 logger 实例

  • stats – 一个 SweepStats 实例

  • num_buckets – 将容器哈希划分成的存储桶数量,存储桶越多,每个存储桶的容器越少(一旦繁忙的容器减慢了存储桶的速度,整个存储桶就会开始延迟)。

  • max_elements_per_group_per_second – 可调参数,延迟何时开始。

  • max_deferred_elements – 延迟元素的数量上限,超过该数量后将开始跳过。每个存储桶都可以延迟更新,但是一旦所有存储桶的总延迟更新数达到此值,所有存储桶都将跳过后续更新。

  • drain_until – 任何剩余的延迟元素必须在该时间被跳过并且迭代器停止。一旦包装的迭代器耗尽,此迭代器将从其存储桶中排出延迟元素,直到所有存储桶都排空或达到此时间。

class swift.obj.updater.ObjectUpdater(conf, logger=None)

基类: Daemon

更新容器列表中的对象信息。

aggregate_and_dump_recon(devices, elapsed, now)

聚合跨设备的 recon 统计信息并将结果转储到 recon 缓存。

dump_device_recon(device)

转储单个设备的 recon 统计信息。

get_container_ring()

获取容器环。如果尚未加载,则加载它。

object_sweep(device)

如果设备上有异步待处理项,则遍历每个并进行更新。

参数:

device – 设备路径

object_update(node, part, op, path, headers_out)

对容器执行对象更新

参数:
  • node – 来自容器环的节点字典

  • part – 容纳容器的区

  • op – 执行的操作(例如,“PUT” 或 “DELETE”)

  • path – 正在更新的 /<acct>/<cont>/<obj> 路径

  • headers_out – 发送更新的标头

返回值:

一个元组,包含 (success, node_id, redirect),其中 success 为 True 表示更新成功,node_id 是更新节点的 id,redirect 为 None 或一个元组(路径,时间戳字符串)。

process_object_update(update_path, device, policy, update, **kwargs)

处理要更新的对象信息并进行更新。

参数:
  • update_path – 序列化对象更新文件的路径

  • device – 设备路径

  • policy – 对象更新的存储策略

  • update – 反序列化的更新数据

  • kwargs – update_ctx 中未使用的键

run_forever(*args, **kwargs)

连续运行更新器。

run_once(*args, **kwargs)

运行一次更新器。

class swift.obj.updater.OldestAsyncPendingTracker(max_entries)

基类: object

使用排序列表管理每个账户-容器对最旧异步待处理更新的跟踪。达到 max_entries 时淘汰最新的对。支持检索 N 个最旧的异步待处理更新或计算最旧待处理更新的年龄。

add_update(account, container, timestamp)

为给定的账户和容器添加或更新时间戳。

参数:
  • account – (字符串)账户名。

  • container – (字符串)容器名。

  • timestamp – (浮点数)要添加或更新的时间戳。

get_memory_usage()
get_n_oldest_timestamp_acs(n)
get_oldest_timestamp()
get_oldest_timestamp_age()
reset()
class swift.obj.updater.RateLimiterBucket(max_updates_per_second)

Bases: EventletRateLimiter

扩展 EventletRateLimiter,还维护一个因速率限制而被延迟的项目双端队列(deque),并提供一个比较器来对就绪实例进行排序。

class swift.obj.updater.SweepStats(errors=0, failures=0, quarantines=0, successes=0, unlinks=0, outdated_unlinks=0, redirects=0, skips=0, deferrals=0, drains=0)

基类: object

更新扫描的统计信息桶

速率限制更新的速率衡量指标为

deferrals / (deferrals + successes + failures - drains)

扫描期间未发送更新的速率衡量指标为

skips / (skips + successes + failures)
copy()
reset()
since(other)
swift.obj.updater.main()
swift.obj.updater.random() x 区间 [0, 1) 内。
swift.obj.updater.split_update_path(update)

从异步更新数据中拆分账户和容器部分。

注意。对分片(shards)的更新设置 container_path 键,而 account 和 container 键始终是根。