Engineering Blog
BackStaggering Restarts in Ceph
How we restart OSDs in our Ceph clusters to avoid customer impact.
When customers use a disk in our cloud, they talk to one of our Ceph storage clusters. Every byte written is sent, every byte read received from one. The underlying physical hardware is abstracted away, shielding the VMs from unexpected disk failures and planned maintenance procedures.
To manage our clusters, we sometimes have to restart their services. Here's how we do that while minimizing customer impact.
Impact of Restarts
When a single disk dies, it typically takes one or two OSDs with it. This can rarely be detected by our customers - after all, this is what Ceph is all about.
However, restarting a lot of OSDs concurrently causes throughput drops and latency spikes. A drop in throughput can be noticed because a PostgreSQL server might suddenly be much slower in scanning its tables, a drop in latency is especially noticeable in clusters like Etcd, where high latency might cause leader elections.
💡 Tip
If you use Etcd in the cloud, tuning its time parameters may help avoid unnecessary leader elections:
If we restart as many OSDs as possible simultaneously (a third of a cluster), the impact on customer VMs is quite visible in benchmarks:
If we manually stagger the OSD restarts five seconds apart, we get more spread out numbers, especially when it comes to latency:
The effect on throughput is not too dramatic, but that's a function of the cluster size. On our smaller, internal clusters, the effect is more pronounced:
By staggering starts, we spread out the negative effects:
Note that staggering stops has no positive impact. Ceph uses kill -9
on its OSDs when stopping them and it is designed to deal well with suddenly disappearing OSDs.
Automating Staggered Restarts
While we are able to manually start OSDs in a staggered fashion, we have situations where we cannot do that. We want staggered OSD starts when a host unexpectedly reboots, when it gets reinstalled, when we do a package upgrade and so on. Ideally, we don't want to have to think about it.
To achieve this, we wrote a Python script that uses cluster-wide locking to ensure that OSDs are started slightly apart:
stagger-osds.py
#!/usr/bin/env python3
"""
About
-----
A set of commands to force OSDs to start in a staggered fashion, avoiding
excessive peering load on the cluster.
Implementation
--------------
Uses an exclusive lock on an image in RADOS:
1. The script starts two processes: a main and a detached lock process.
2. The main process waits for the lock process to acquire a lock.
3. The main process calls `exec` on the additional commandline arguments.
4. The lock process runs until the lock expires.
Signals
-------
Signals are handled separately by the lock and the exec process.
It is possible to use a signal to kill the exec process before it runs the
command. In this case the command will not be executed. Once the command is
executing, signal handling is up to that command.
The lock process can also be killed, which causes it to release the lock
early.
Short-Lived Commands
--------------------
This command is meant for OSDs that may be up for weeks or months. If used
on a short-lived command (or an OSD that exits immediately), the lock is
not held for the whole duration.
The lock process detects if the parent pid has exited. In this case, the
lock is released early and the lock process exits.
This behavior relies on the fact that the exec process's pid is not used by
another process. This is generally no problem with short lock durations,
and a normal amount of processes. Should the pid be reused for some reason,
the effect will be a lock that is held for too long.
To avoid that race condition, we could use pidfd_open, but that is only
available in Python 3.9.
Requirements
------------
To run, this command expects the following packages to be present:
- librados for python: https://docs.ceph.com/en/reef/rados/api/librados-intro
- click
- pydantic
The current Python target is Python 3.8.
"""
from __future__ import annotations
import click
import code
import json
import logging
import mmap
import os
import random
import secrets
import shlex
import signal
import socket
import sys
import time
import yaml
from configparser import ConfigParser
from contextlib import contextmanager
from datetime import datetime
from datetime import timedelta
from pydantic import BaseModel
from pydantic import Field
from pydantic.types import FilePath
from rados import Ioctx
from rados import ObjectBusy
from rados import ObjectNotFound
from rados import Rados
from threading import Event
from types import FrameType
from typing import cast
from typing import Iterator
from typing import List
from typing import Optional
# The default config path
DEFAULT_CONFIG = '/etc/stagger-osds.yml'
# The ID of the client
CLIENT = 'stagger-osds'
config_option = click.option(
'--config',
default=DEFAULT_CONFIG,
type=click.Path(exists=True, dir_okay=False),
)
log = logging.getLogger(CLIENT)
@click.group()
def cli() -> None:
pass
class Config(BaseModel):
# The path to the global ceph config
ceph_config: FilePath
# Keyring to authenticate with
keyring: FilePath
# Name of the pool to use
pool: str = Field(
...,
min_length=3,
# Mypy 1.7+ fails here, the correct solution would be to use
# Annotated[str, Field(min_length=3, strip_whitespace=True], but that
# is not supported on the proxmox host.
# type: ignore[call-arg]
strip_whitespace=True,
)
# Key of the object in the selected pool
key: str = Field(
...,
min_length=3,
# Mypy 1.7+ fails here, the correct solution would be to use
# Annotated[str, Field(min_length=3, strip_whitespace=True], but that
# is not supported on the proxmox host.
# type: ignore[call-arg]
strip_whitespace=True,
)
# How long to maximally keep the lock active (in seconds)
duration: int = Field(..., gt=0, lt=61)
# How long to try and acquire a lock (in seconds)
timeout: int = Field(..., gt=0, lt=3601)
@classmethod
def from_yaml(cls, path: str) -> Config:
with open(path, 'r') as f:
return cls.parse_obj(yaml.safe_load(f))
def configure_logging(self, process: str, verbose: bool) -> None:
level = verbose and logging.DEBUG or logging.WARNING
fmt = f"{process} [%(levelname)s] %(message)s"
logging.basicConfig(format=fmt, level=level)
@contextmanager
def cluster_connection(self) -> Iterator[Rados]:
""" Yields a connection to the cluster, by using the global ceph
config for mon discovery, the keyring for client authentication.
"""
conf = ConfigParser()
conf.read(self.ceph_config)
conf = dict(conf["global"].items())
client = f"client.{CLIENT}"
conffile = str(self.keyring)
with Rados(name=client, conf=conf, conffile=conffile) as cluster:
yield cluster
@contextmanager
def ioctx(self) -> Iterator[Ioctx]:
""" Yields an ioctx. It is imperative that this is done on a new
cluster connection each time, as there's some internal state in regards
to application metadata, causing values to be cached agressively.
"""
with self.cluster_connection() as cluster:
with cluster.open_ioctx(self.pool) as ioctx:
yield ioctx
def metadata_get(self, ioctx: Ioctx, key: str, default: str) -> str:
""" Gets the given metadata key, or a default value. """
try:
return cast(str, ioctx.application_metadata_get(CLIENT, key))
except KeyError:
return default
def metadata_set(self, ioctx: Ioctx, key: str, val: str) -> None:
""" Sets the given metadata key. Strings longer than 100 characters
are truncated, as they otherwise cause errors.
"""
ioctx.application_metadata_set(CLIENT, key, val[:100])
def metadata_remove(self, ioctx: Ioctx, key: str) -> None:
""" Deletes the given metadata key (idempotent). """
ioctx.application_metadata_remove(CLIENT, key)
def disable(self, ioctx: Ioctx) -> None:
""" Disables lock acquisition with immediate effect. The current lock
is released, and all OSDs waiting for a lock are started. As long
as osd-staggering is disabled, locking is skipped completely.
"""
self.metadata_set(ioctx, 'disabled', '1')
def enable(self, ioctx: Ioctx) -> None:
""" Removes the 'disabled' state. """
self.metadata_remove(ioctx, 'disabled')
@property
def is_disabled(self) -> bool:
""" True if locking is disabled. Uses its own connection to the cluster
as the cache might otherwise return outdated values.
"""
with self.ioctx() as ioctx:
return self.metadata_get(ioctx, 'disabled', '0') == '1'
class LockHolder(BaseModel):
""" Metadata about the current lock holder. """
# The hostname of the lock holder
host: str
# The command being executed as an argument list as expected by exec*
command: List[str]
# The PID of the exec process
ppid: int
# The time in UTC when this lock expires
expires: datetime
@property
def is_expired(self) -> bool:
return datetime.utcnow() > self.expires
class SharedBool:
""" Anonymous memory mapped bool, that can be used by multiple processes
at the same time. Currently used for a single-writer model - not sure if
there are race-conditions in a multi-writer scenario.
"""
def __init__(self, default: bool = False):
self.mmap = mmap.mmap(-1, 1)
self.store(default)
def __bool__(self) -> bool:
self.mmap.seek(0)
return self.mmap.read(1) == b'1'
def __repr__(self) -> str:
return f"SharedBool({self and 'True' or 'False'})"
def store(self, value: bool) -> None:
self.mmap.seek(0)
self.mmap.write(value and b'1' or b'0')
def signal_handler() -> Event:
""" Provides signal handling for each process. Should be called after
forking, as each process must have its own signal handling.
"""
interrupt = Event()
def quit(signum: int, frame: Optional[FrameType]) -> None:
interrupt.set()
for sig in (signal.SIGTERM, signal.SIGHUP, signal.SIGINT):
signal.signal(sig, quit)
return interrupt
def ensure_session_leader() -> None:
""" Ensures that the current process is promoted session leader, if it
is not one already.
"""
if os.getpid() != os.getpgid(0):
os.setsid()
def is_pid_running(pid: int) -> bool:
""" Returns True if the given PID is of a running process. """
try:
os.kill(pid, 0)
except OSError:
return False
else:
return True
def wait_for_lock_then_hold(start_osd: SharedBool, ppid: int, config: Config,
command: List[str]) -> None:
""" Tries to acquire a lock on the object with the given name, then holds
it for the given duration.
The lock is released early under the following circumstances:
- The process with pid `ppid` no longer exists.
- An interrupt is received.
Meant to be run in a separate process.
"""
# Ensure we get separate signal handling
ensure_session_leader()
interrupt = signal_handler()
# Lock configuration
lock_cfg = {
# The key of the object that is locked
'key': config.key,
# The name of the lock on the object
'name': CLIENT,
# Unique string belonging to the owner of the lock. This string is
# required to unlock the lock pre-maturely.
'cookie': secrets.token_hex(8),
}
try:
with config.ioctx() as ioctx:
# Exit condition for the acquisition and hold loops
def keep_running(until: float, warn_on_timeout: bool) -> bool:
if time.monotonic() > until:
if warn_on_timeout:
log.warning("Timeout expired")
return False
if interrupt.is_set():
log.info("Interrupt received")
return False
if not is_pid_running(ppid):
log.info("Parent process exited")
return False
if config.is_disabled:
log.info("Stagger disabled globally")
return False
# Randomize the wait time slightly, to be extra sure that we
# are not creating a thundering herd (not that that is
# likely).
return not interrupt.wait(random.uniform(0.9, 1.1))
# Try to acquire the lock
log.info(f"Acquiring lock (timeout {config.timeout}s)")
until = time.monotonic() + config.timeout
while keep_running(until, warn_on_timeout=True):
try:
ioctx.lock_exclusive(**lock_cfg, duration=config.duration)
log.info("Acquired lock, cleared for launch")
start_osd.store(True)
break
except ObjectBusy:
continue
except Exception:
log.exception("Unexpected error while trying to get lock")
break
if not start_osd:
# If we cannot get a lock due to a timeout or an error, we
# signal to the OSD to start anyway.
log.warning("Could not acquire lock, cleared for launch")
start_osd.store(True)
return
# Write some information about the current lock holder
holder = LockHolder(
host=socket.gethostname(),
command=command,
ppid=ppid,
expires=datetime.utcnow() + timedelta(seconds=config.duration)
)
ioctx.write_full(config.key, holder.json().encode('utf-8'))
# Keep the lock for the given duration
try:
log.info(f"Holding lock for up to {config.duration}s")
until = time.monotonic() + config.duration
while keep_running(until, warn_on_timeout=False):
pass
finally:
# The lock vanishes on its own, if the duration is up. Since
# we might be too late, we can only try to unlock, but may
# be too late.
try:
ioctx.unlock(**lock_cfg)
except ObjectNotFound:
pass
# Release the lock
log.info("Released lock")
except Exception:
# If there are *any* issues, we let the OSDs start.
start_osd.store(True)
log.exception(f"Could not connect to cluster using {config.keyring}")
sys.exit(1)
def wait_for_lock(start_osd: SharedBool, timeout: int) -> bool:
""" Waits for the `start_osd` variable to change to `True`, returning
`True` if the OSD should be started, `False` otherwise.
"""
# Ensure we get separate signal handling
ensure_session_leader()
interrupt = signal_handler()
# Wait to acquire the lock
log.info("Awaiting lock")
# The same timeout that eventually abandons the lock, can be used here,
# so we don't get stuck if the lock process dies.
until = time.monotonic() + timeout
while not (start_osd or interrupt.is_set()) and time.monotonic() < until:
interrupt.wait(0.5)
# If the interrupt has been received, stop:
if interrupt.is_set():
log.info("Interrupt received: Aborting")
return False
return True
@cli.command(context_settings={
"allow_extra_args": True,
"ignore_unknown_options": True
})
@click.pass_context
@config_option
@click.option('--verbose', is_flag=True, default=False)
def start(ctx: click.Context, config: str, verbose: bool) -> None:
""" Wraps the given command with a cluster-wide lock, forcing the command
to start at a staggered interval (spread out), even if run on multiple
machines.
"""
# Load config
config = Config.from_yaml(config)
# Load command
command = ctx.args
if not command:
print("You must specify a command to run, after '--'", file=sys.stderr)
sys.exit(1)
# Shared lock
start_osd = SharedBool(False)
# The PID of the process destined to be the command
ppid = os.getpid()
# Fork a process that launches the lock
if os.fork() == 0:
# Fork the lock process
if os.fork() == 0:
config.configure_logging(process="lock-process", verbose=verbose)
wait_for_lock_then_hold(
start_osd=start_osd,
ppid=ppid,
command=command,
config=config)
# Launcher and lock process are both awaited by the init process
sys.exit(0)
# Wait for the process that launches the lock to exit (this is quick)
os.wait()
config.configure_logging(process="exec-process", verbose=verbose)
if wait_for_lock(start_osd=start_osd, timeout=config.timeout):
log.info(shlex.join(command))
os.execvp(command[0], command)
@cli.command()
@config_option
def status(config: str) -> None:
""" Show the current status. """
config = Config.from_yaml(config)
print("Status:", config.is_disabled and "Disabled" or "Enabled")
with config.ioctx() as ioctx:
try:
holder = ioctx.read(config.key)
except KeyError:
holder = None
else:
holder = holder and LockHolder.parse_obj(json.loads(holder))
if not holder or holder.is_expired:
print("Last Lock: Expired")
else:
print(f"Last Lock: Acquired on {holder.host}")
print(f"PPID: {holder.ppid}")
print(f"Command: {shlex.join(holder.command)}")
@cli.command()
@config_option
def enable(config: str) -> None:
""" Enable OSD staggering. """
config = Config.from_yaml(config)
with config.ioctx() as ioctx:
config.enable(ioctx)
@cli.command()
@config_option
def disable(config: str) -> None:
""" Disable OSD staggering, immediately seizing all locking, even
for locks that are currently in progress.
"""
config = Config.from_yaml(config)
with config.ioctx() as ioctx:
config.disable(ioctx)
@cli.command(hidden=True)
@config_option
def shell(config: str) -> None:
""" Drop into a Python shell, with an active ioctx. Only use if you know
what you are doing!
"""
config = Config.from_yaml(config)
with config.ioctx() as ioctx:
code.interact(local={**globals(), **locals()})
if __name__ == '__main__':
cli()
This gives us the stagger-osds
CLI tool, that we can use to wrap commands that are then run with a cluster-wide lock on a chosen object in a designated pool.
In a sense, it works a lot like flock
. You can give it a command, that is then run with an exclusive lock on the cluster:
$ stagger-osds start --verbose -- sleep 5
exec-process [INFO] Awaiting lock
lock-process [INFO] Acquiring lock (timeout 50s)
lock-process [INFO] Acquired lock, cleared for launch
lock-process [INFO] Holding lock for up to 4s
exec-process [INFO] sleep 5
lock-process [INFO] Released lock
While the lock is used, it shows up in the status command:
$ stagger-osds status
Status: Enabled
Last Lock: Acquired on lab-nvme1-a-lpg1
PPID: 3927556
Command: sleep 5
With stagger-osds disable
we can disable the lock, causing all processes currently waiting for the lock to be started instantly.
Systemd Integration
To start all ceph-osd@.service
units wrapped in the stagger-osds
wrapper, we use the following drop-in:
[Service]
ExecStart=
ExecStart=/usr/local/bin/stagger-osds start --verbose -- /usr/bin/ceph-osd -f --cluster ${CLUSTER} --id %i --setuser ceph --setgroup ceph
SyslogIdentifier=ceph-osd
TimeoutSec=148
The timeout needs to be configured to accommodate the start of 1/3 of all OSDs on the cluster (after the timeout is over, all OSDs are started right away). Here is the kind of config one might use on a cluster with 600 OSDs:
- 1/3 of the Cluster: 600 OSDs / 3 = 200 OSDs
- Timeout: 200 OSDs x 4s = 800s
ceph_config: /etc/ceph/ceph.conf
keyring: /etc/ceph/ceph.client.stagger-osds.keyring
pool: sys
key: stagger-osds
duration: 4
timeout: 800
The keyring was created as follows:
ceph auth get-or-create \
client.stagger-osds \
mon "allow rw" \
osd "allow rwx pool stagger-osds" \
-o /etc/ceph/ceph.client.stagger-osds.keyring
Conclusion
Stagger OSDs is active on all our Ceph clusters, and since we have started using it, clusters run with more predictable performance, even during major upgrades.
Using builtin Ceph tools, we are also able to use global locks without the likes of Zookeeper, Etcd or Redis.
If you have comments or corrections to share, you can reach our engineers at engineering-blog@cloudscale.ch.