Engineering Blog

Back
Published on November 27, 2024 by

Staggering Restarts in Ceph

How we restart OSDs in our Ceph clusters to avoid customer impact.

When customers use a disk in our cloud, they talk to one of our Ceph storage clusters. Every byte written is sent, every byte read received from one. The underlying physical hardware is abstracted away, shielding the VMs from unexpected disk failures and planned maintenance procedures.

To manage our clusters, we sometimes have to restart their services. Here's how we do that while minimizing customer impact.

Impact of Restarts

When a single disk dies, it typically takes one or two OSDs with it. This can rarely be detected by our customers - after all, this is what Ceph is all about.

However, restarting a lot of OSDs concurrently causes throughput drops and latency spikes. A drop in throughput can be noticed because a PostgreSQL server might suddenly be much slower in scanning its tables, a drop in latency is especially noticeable in clusters like Etcd, where high latency might cause leader elections.

💡 Tip

If you use Etcd in the cloud, tuning its time parameters may help avoid unnecessary leader elections:

If we restart as many OSDs as possible simultaneously (a third of a cluster), the impact on customer VMs is quite visible in benchmarks:

Concurrent OSD restarts - reads.
Concurrent OSD restarts - reads.

If we manually stagger the OSD restarts five seconds apart, we get more spread out numbers, especially when it comes to latency:

5s staggered OSD restarts - reads
5s staggered OSD restarts - writes

The effect on throughput is not too dramatic, but that's a function of the cluster size. On our smaller, internal clusters, the effect is more pronounced:

Concurrent OSD restarts on small cluster - reads
Concurrent OSD restarts on small cluster - writes

By staggering starts, we spread out the negative effects:

5s starggered OSD restarts on small cluster - reads
5s staggered OSD restarts on small cluster - writes

Note that staggering stops has no positive impact. Ceph uses kill -9 on its OSDs when stopping them and it is designed to deal well with suddenly disappearing OSDs.

Automating Staggered Restarts

While we are able to manually start OSDs in a staggered fashion, we have situations where we cannot do that. We want staggered OSD starts when a host unexpectedly reboots, when it gets reinstalled, when we do a package upgrade and so on. Ideally, we don't want to have to think about it.

To achieve this, we wrote a Python script that uses cluster-wide locking to ensure that OSDs are started slightly apart:

stagger-osds.py
#!/usr/bin/env python3
"""

About
-----

A set of commands to force OSDs to start in a staggered fashion, avoiding
excessive peering load on the cluster.

Implementation
--------------

Uses an exclusive lock on an image in RADOS:

1. The script starts two processes: a main and a detached lock process.
2. The main process waits for the lock process to acquire a lock.
3. The main process calls `exec` on the additional commandline arguments.
4. The lock process runs until the lock expires.

Signals
-------

Signals are handled separately by the lock and the exec process.

It is possible to use a signal to kill the exec process before it runs the
command. In this case the command will not be executed. Once the command is
executing, signal handling is up to that command.

The lock process can also be killed, which causes it to release the lock
early.

Short-Lived Commands
--------------------

This command is meant for OSDs that may be up for weeks or months. If used
on a short-lived command (or an OSD that exits immediately), the lock is
not held for the whole duration.

The lock process detects if the parent pid has exited. In this case, the
lock is released early and the lock process exits.

This behavior relies on the fact that the exec process's pid is not used by
another process. This is generally no problem with short lock durations,
and a normal amount of processes. Should the pid be reused for some reason,
the effect will be a lock that is held for too long.

To avoid that race condition, we could use pidfd_open, but that is only
available in Python 3.9.

Requirements
------------

To run, this command expects the following packages to be present:

- librados for python: https://docs.ceph.com/en/reef/rados/api/librados-intro
- click
- pydantic

The current Python target is Python 3.8.

"""
from __future__ import annotations

import click
import code
import json
import logging
import mmap
import os
import random
import secrets
import shlex
import signal
import socket
import sys
import time
import yaml

from configparser import ConfigParser
from contextlib import contextmanager
from datetime import datetime
from datetime import timedelta
from pydantic import BaseModel
from pydantic import Field
from pydantic.types import FilePath
from rados import Ioctx
from rados import ObjectBusy
from rados import ObjectNotFound
from rados import Rados
from threading import Event
from types import FrameType
from typing import cast
from typing import Iterator
from typing import List
from typing import Optional

# The default config path
DEFAULT_CONFIG = '/etc/stagger-osds.yml'

# The ID of the client
CLIENT = 'stagger-osds'

config_option = click.option(
    '--config',
    default=DEFAULT_CONFIG,
    type=click.Path(exists=True, dir_okay=False),
)

log = logging.getLogger(CLIENT)


@click.group()
def cli() -> None:
    pass


class Config(BaseModel):

    # The path to the global ceph config
    ceph_config: FilePath

    # Keyring to authenticate with
    keyring: FilePath

    # Name of the pool to use
    pool: str = Field(
        ...,
        min_length=3,

        # Mypy 1.7+ fails here, the correct solution would be to use
        # Annotated[str, Field(min_length=3, strip_whitespace=True], but that
        # is not supported on the proxmox host.
        # type: ignore[call-arg]
        strip_whitespace=True,
    )

    # Key of the object in the selected pool
    key: str = Field(
        ...,
        min_length=3,

        # Mypy 1.7+ fails here, the correct solution would be to use
        # Annotated[str, Field(min_length=3, strip_whitespace=True], but that
        # is not supported on the proxmox host.
        # type: ignore[call-arg]
        strip_whitespace=True,
    )

    # How long to maximally keep the lock active (in seconds)
    duration: int = Field(..., gt=0, lt=61)

    # How long to try and acquire a lock (in seconds)
    timeout: int = Field(..., gt=0, lt=3601)

    @classmethod
    def from_yaml(cls, path: str) -> Config:
        with open(path, 'r') as f:
            return cls.parse_obj(yaml.safe_load(f))

    def configure_logging(self, process: str, verbose: bool) -> None:
        level = verbose and logging.DEBUG or logging.WARNING
        fmt = f"{process} [%(levelname)s] %(message)s"

        logging.basicConfig(format=fmt, level=level)

    @contextmanager
    def cluster_connection(self) -> Iterator[Rados]:
        """ Yields a connection to the cluster, by using the global ceph
        config for mon discovery, the keyring for client authentication.

        """
        conf = ConfigParser()
        conf.read(self.ceph_config)
        conf = dict(conf["global"].items())

        client = f"client.{CLIENT}"
        conffile = str(self.keyring)

        with Rados(name=client, conf=conf, conffile=conffile) as cluster:
            yield cluster

    @contextmanager
    def ioctx(self) -> Iterator[Ioctx]:
        """ Yields an ioctx. It is imperative that this is done on a new
        cluster connection each time, as there's some internal state in regards
        to application metadata, causing values to be cached agressively.

        """
        with self.cluster_connection() as cluster:
            with cluster.open_ioctx(self.pool) as ioctx:
                yield ioctx

    def metadata_get(self, ioctx: Ioctx, key: str, default: str) -> str:
        """ Gets the given metadata key, or a default value. """

        try:
            return cast(str, ioctx.application_metadata_get(CLIENT, key))
        except KeyError:
            return default

    def metadata_set(self, ioctx: Ioctx, key: str, val: str) -> None:
        """ Sets the given metadata key. Strings longer than 100 characters
        are truncated, as they otherwise cause errors.

        """
        ioctx.application_metadata_set(CLIENT, key, val[:100])

    def metadata_remove(self, ioctx: Ioctx, key: str) -> None:
        """ Deletes the given metadata key (idempotent). """

        ioctx.application_metadata_remove(CLIENT, key)

    def disable(self, ioctx: Ioctx) -> None:
        """ Disables lock acquisition with immediate effect. The current lock
        is released, and all OSDs waiting for a lock are started. As long
        as osd-staggering is disabled, locking is skipped completely.

        """
        self.metadata_set(ioctx, 'disabled', '1')

    def enable(self, ioctx: Ioctx) -> None:
        """ Removes the 'disabled' state. """

        self.metadata_remove(ioctx, 'disabled')

    @property
    def is_disabled(self) -> bool:
        """ True if locking is disabled. Uses its own connection to the cluster
        as the cache might otherwise return outdated values.

        """

        with self.ioctx() as ioctx:
            return self.metadata_get(ioctx, 'disabled', '0') == '1'


class LockHolder(BaseModel):
    """ Metadata about the current lock holder. """

    # The hostname of the lock holder
    host: str

    # The command being executed as an argument list as expected by exec*
    command: List[str]

    # The PID of the exec process
    ppid: int

    # The time in UTC when this lock expires
    expires: datetime

    @property
    def is_expired(self) -> bool:
        return datetime.utcnow() > self.expires


class SharedBool:
    """ Anonymous memory mapped bool, that can be used by multiple processes
    at the same time. Currently used for a single-writer model - not sure if
    there are race-conditions in a multi-writer scenario.

    """

    def __init__(self, default: bool = False):
        self.mmap = mmap.mmap(-1, 1)
        self.store(default)

    def __bool__(self) -> bool:
        self.mmap.seek(0)
        return self.mmap.read(1) == b'1'

    def __repr__(self) -> str:
        return f"SharedBool({self and 'True' or 'False'})"

    def store(self, value: bool) -> None:
        self.mmap.seek(0)
        self.mmap.write(value and b'1' or b'0')


def signal_handler() -> Event:
    """ Provides signal handling for each process. Should be called after
    forking, as each process must have its own signal handling.

    """
    interrupt = Event()

    def quit(signum: int, frame: Optional[FrameType]) -> None:
        interrupt.set()

    for sig in (signal.SIGTERM, signal.SIGHUP, signal.SIGINT):
        signal.signal(sig, quit)

    return interrupt


def ensure_session_leader() -> None:
    """ Ensures that the current process is promoted session leader, if it
    is not one already.

    """

    if os.getpid() != os.getpgid(0):
        os.setsid()


def is_pid_running(pid: int) -> bool:
    """ Returns True if the given PID is of a running process. """

    try:
        os.kill(pid, 0)
    except OSError:
        return False
    else:
        return True


def wait_for_lock_then_hold(start_osd: SharedBool, ppid: int, config: Config,
                            command: List[str]) -> None:
    """ Tries to acquire a lock on the object with the given name, then holds
    it for the given duration.

    The lock is released early under the following circumstances:
        - The process with pid `ppid` no longer exists.
        - An interrupt is received.

    Meant to be run in a separate process.

    """

    # Ensure we get separate signal handling
    ensure_session_leader()
    interrupt = signal_handler()

    # Lock configuration
    lock_cfg = {
        # The key of the object that is locked
        'key': config.key,

        # The name of the lock on the object
        'name': CLIENT,

        # Unique string belonging to the owner of the lock. This string is
        # required to unlock the lock pre-maturely.
        'cookie': secrets.token_hex(8),
    }

    try:
        with config.ioctx() as ioctx:

            # Exit condition for the acquisition and hold loops
            def keep_running(until: float, warn_on_timeout: bool) -> bool:
                if time.monotonic() > until:
                    if warn_on_timeout:
                        log.warning("Timeout expired")
                    return False

                if interrupt.is_set():
                    log.info("Interrupt received")
                    return False

                if not is_pid_running(ppid):
                    log.info("Parent process exited")
                    return False

                if config.is_disabled:
                    log.info("Stagger disabled globally")
                    return False

                # Randomize the wait time slightly, to be extra sure that we
                # are not creating a thundering herd (not that that is
                # likely).
                return not interrupt.wait(random.uniform(0.9, 1.1))

            # Try to acquire the lock
            log.info(f"Acquiring lock (timeout {config.timeout}s)")
            until = time.monotonic() + config.timeout

            while keep_running(until, warn_on_timeout=True):
                try:
                    ioctx.lock_exclusive(**lock_cfg, duration=config.duration)
                    log.info("Acquired lock, cleared for launch")
                    start_osd.store(True)
                    break
                except ObjectBusy:
                    continue
                except Exception:
                    log.exception("Unexpected error while trying to get lock")
                    break

            if not start_osd:

                # If we cannot get a lock due to a timeout or an error, we
                # signal to the OSD to start anyway.
                log.warning("Could not acquire lock, cleared for launch")
                start_osd.store(True)
                return

            # Write some information about the current lock holder
            holder = LockHolder(
                host=socket.gethostname(),
                command=command,
                ppid=ppid,
                expires=datetime.utcnow() + timedelta(seconds=config.duration)
            )

            ioctx.write_full(config.key, holder.json().encode('utf-8'))

            # Keep the lock for the given duration
            try:
                log.info(f"Holding lock for up to {config.duration}s")

                until = time.monotonic() + config.duration
                while keep_running(until, warn_on_timeout=False):
                    pass

            finally:

                # The lock vanishes on its own, if the duration is up. Since
                # we might be too late, we can only try to unlock, but may
                # be too late.
                try:
                    ioctx.unlock(**lock_cfg)
                except ObjectNotFound:
                    pass

            # Release the lock
            log.info("Released lock")

    except Exception:

        # If there are *any* issues, we let the OSDs start.
        start_osd.store(True)
        log.exception(f"Could not connect to cluster using {config.keyring}")
        sys.exit(1)


def wait_for_lock(start_osd: SharedBool, timeout: int) -> bool:
    """ Waits for the `start_osd` variable to change to `True`, returning
    `True` if the OSD should be started, `False` otherwise.
    """

    # Ensure we get separate signal handling
    ensure_session_leader()
    interrupt = signal_handler()

    # Wait to acquire the lock
    log.info("Awaiting lock")

    # The same timeout that eventually abandons the lock, can be used here,
    # so we don't get stuck if the lock process dies.
    until = time.monotonic() + timeout
    while not (start_osd or interrupt.is_set()) and time.monotonic() < until:
        interrupt.wait(0.5)

    # If the interrupt has been received, stop:
    if interrupt.is_set():
        log.info("Interrupt received: Aborting")
        return False

    return True


@cli.command(context_settings={
    "allow_extra_args": True,
    "ignore_unknown_options": True
})
@click.pass_context
@config_option
@click.option('--verbose', is_flag=True, default=False)
def start(ctx: click.Context, config: str, verbose: bool) -> None:
    """ Wraps the given command with a cluster-wide lock, forcing the command
    to start at a staggered interval (spread out), even if run on multiple
    machines.

    """

    # Load config
    config = Config.from_yaml(config)

    # Load command
    command = ctx.args

    if not command:
        print("You must specify a command to run, after '--'", file=sys.stderr)
        sys.exit(1)

    # Shared lock
    start_osd = SharedBool(False)

    # The PID of the process destined to be the command
    ppid = os.getpid()

    # Fork a process that launches the lock
    if os.fork() == 0:

        # Fork the lock process
        if os.fork() == 0:
            config.configure_logging(process="lock-process", verbose=verbose)
            wait_for_lock_then_hold(
                start_osd=start_osd,
                ppid=ppid,
                command=command,
                config=config)

        # Launcher and lock process are both awaited by the init process
        sys.exit(0)

    # Wait for the process that launches the lock to exit (this is quick)
    os.wait()

    config.configure_logging(process="exec-process", verbose=verbose)
    if wait_for_lock(start_osd=start_osd, timeout=config.timeout):
        log.info(shlex.join(command))
        os.execvp(command[0], command)


@cli.command()
@config_option
def status(config: str) -> None:
    """ Show the current status. """

    config = Config.from_yaml(config)
    print("Status:", config.is_disabled and "Disabled" or "Enabled")

    with config.ioctx() as ioctx:
        try:
            holder = ioctx.read(config.key)
        except KeyError:
            holder = None
        else:
            holder = holder and LockHolder.parse_obj(json.loads(holder))

    if not holder or holder.is_expired:
        print("Last Lock: Expired")
    else:
        print(f"Last Lock: Acquired on {holder.host}")
        print(f"PPID: {holder.ppid}")
        print(f"Command: {shlex.join(holder.command)}")


@cli.command()
@config_option
def enable(config: str) -> None:
    """ Enable OSD staggering. """

    config = Config.from_yaml(config)

    with config.ioctx() as ioctx:
        config.enable(ioctx)


@cli.command()
@config_option
def disable(config: str) -> None:
    """ Disable OSD staggering, immediately seizing all locking, even
    for locks that are currently in progress.

    """

    config = Config.from_yaml(config)

    with config.ioctx() as ioctx:
        config.disable(ioctx)


@cli.command(hidden=True)
@config_option
def shell(config: str) -> None:
    """ Drop into a Python shell, with an active ioctx. Only use if you know
    what you are doing!

    """

    config = Config.from_yaml(config)

    with config.ioctx() as ioctx:
        code.interact(local={**globals(), **locals()})


if __name__ == '__main__':
    cli()

This gives us the stagger-osds CLI tool, that we can use to wrap commands that are then run with a cluster-wide lock on a chosen object in a designated pool.

In a sense, it works a lot like flock. You can give it a command, that is then run with an exclusive lock on the cluster:

$ stagger-osds start --verbose -- sleep 5
exec-process [INFO] Awaiting lock
lock-process [INFO] Acquiring lock (timeout 50s)
lock-process [INFO] Acquired lock, cleared for launch
lock-process [INFO] Holding lock for up to 4s
exec-process [INFO] sleep 5
lock-process [INFO] Released lock

While the lock is used, it shows up in the status command:

$ stagger-osds status
Status: Enabled
Last Lock: Acquired on lab-nvme1-a-lpg1
PPID: 3927556
Command: sleep 5

With stagger-osds disable we can disable the lock, causing all processes currently waiting for the lock to be started instantly.

Systemd Integration

To start all ceph-osd@.service units wrapped in the stagger-osds wrapper, we use the following drop-in:

[Service]
ExecStart=
ExecStart=/usr/local/bin/stagger-osds start --verbose -- /usr/bin/ceph-osd -f --cluster ${CLUSTER} --id %i --setuser ceph --setgroup ceph
SyslogIdentifier=ceph-osd
TimeoutSec=148

The timeout needs to be configured to accommodate the start of 1/3 of all OSDs on the cluster (after the timeout is over, all OSDs are started right away). Here is the kind of config one might use on a cluster with 600 OSDs:

  • 1/3 of the Cluster: 600 OSDs / 3 = 200 OSDs
  • Timeout: 200 OSDs x 4s = 800s
ceph_config: /etc/ceph/ceph.conf
keyring: /etc/ceph/ceph.client.stagger-osds.keyring
pool: sys
key: stagger-osds
duration: 4
timeout: 800

The keyring was created as follows:

ceph auth get-or-create \
    client.stagger-osds \
    mon "allow rw" \
    osd "allow rwx pool stagger-osds" \
    -o /etc/ceph/ceph.client.stagger-osds.keyring

Conclusion

Stagger OSDs is active on all our Ceph clusters, and since we have started using it, clusters run with more predictable performance, even during major upgrades.

Using builtin Ceph tools, we are also able to use global locks without the likes of Zookeeper, Etcd or Redis.


If you have comments or corrections to share, you can reach our engineers at engineering-blog@cloudscale.ch.

Back to overview