Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Ownership and Global Scheduling

Problem Statement

For a magpie instance to execute a nanotransaction on a set of objects, it first needs to be the "owner" of the objects the nanotransaction depends on. Ownership directly translates to permission to perform writes on a given object.

This restriction of the execution model requires us to introduce a system to manage object ownership. As ownership management and the logic around compute and data rendezvous are tightly coupled, we combine the two functionalities into a discreet system layer responsible for cluster-wide scheduling. Magpie instances are still entirely responsible for the local scheduling and execution of nanotransactions; the scheduling layer faciliates discovery (so that instances can schedule nanotransactions over objects they know nothing about) and object-level ownership transfer between hosts to facilitate the execution of "cross-partition" nanotransactions.

Prototype Overview

For the first prototype, we will structure the system as a layer of workers (the existing magpie instances) whose work is orchestrated at a coarse granularity by a scheduling layer which for now only contains a single scheduler instance.

Basic Architecture Diagram

Workers publish newly allocated objects to the global scheduler over the w_s::publish() interface (1). This establishes them as the owners of the objects being published, and also makes the published objects visible to discovery, so that cluster-wide nanotransactions may be scheduled over them.

Upon receiving a request to execute a nanotransaction (see the execution engine page), the receiving worker first attempts to schedule a nanotransaction locally. If it can't (because it does not own the nanotransaction's object dependencies, for example), it inspects its locally cached (and potentially stale) ownership information to determine if it knows of another worker that can handle the request (i.e. is the owner of all associated dependencies).

If such a host exists, the original worker forwards the nanotransaction activation request over the w_w::schedule() interface (3), and execution proceeds normally. If such a host cannot be determined, then the original worker forwards the activation request to the global scheduler over the w_s::schedule() interface (1).

The scheduler inspects the current ownership state of the nanotransaction's dependencies. If it can find a host that is the owner of all objects, it replies to the requesting worker with the host's ID. Otherwise, it first attempts to consolidate the nanotransactions' dependencies on a single host (through parallel calls over the s_w::move_ownership() interface (2)), before letting the original worker know of the new owning host's identity so that it may forward its execution request.

Global Scheduler Functionality

The scheduler's core piece of functionality is to consolidate nanotransaction dependencies on a single owning host before scheduling a nanotransaction. Below is some pseudocode for the consolidation process.

#![allow(unused)]
fn main() {
fn consolidate(dependencies) -> HostID {
    // Run the cost model to compute an optimal activation location
    activation_site = compute_location(dependencies);
    parallel for (dependency, old_owner) in dependencies {
        success, whomstone_version = s_w::move_ownership(
            old_owner, dependency, new_owner,
        );
        // Trigger ownership change between workers, wait for ack
        if !success {
            signal failure;
        }
        ownership_state.update(/* */);
    }

    if any migration failed {
        signal partial failure.
    }

    return activation_site
}

fn compute_location(dependencies) -> HostID {
    / * cost model */
}
}

In order for consolidate() to make principled choices, it relies on the cost model to compute an activation location that (at least) minimizes object moves and service disruption.

Ownership State

The scheduler maintains ownership state in two data structures:

  • A registry of current owners, capable of answering questions of the form "which host owns object O?", to facilitate nanotransaction scheduling (as well as ownership changes). Along with object metadata, this constitutes the underlying state the cost model uses for its execution strategy ranking.
  • A per-object record of the object's past ownership changes. This is used to serve read requests for past versions of an object (as well as epic scheduling).

Each worker maintains its own version of ownership state.

  • First, each worker has, at any given point in time, complete knowledge of the set of objects for which it is the current owner (and hence can answer questions of this kind authoritatively).
  • Each worker also has historical knowledge around the ownership lineage of the objects that it was an owner of in the past. Ownership changes are recorded in special locations in the ceding owner's transaction log, tentatively called "whomstones".
  • Finally, it has a potentially stale cache of ownership info for objects that it has scheduled nanotransactions over, permitting each worker to engage in best-effort scheduling of nanotransactions without necessarily having to make a call to the scheduling layer.

An interesting point in this design is that none of the scheduler's state constituteds "hard state" that we need to persist to external storage as it can be entirely reconstructed by aggregating ownership state from the worker layer.

Functionality Description

Interface #1: Worker to Scheduler

publish() Scheduler Handler

Workers periodically publish objects that were allocated locally to the global scheduler, to make them discoverable across all workers in the worker layer. The scheduler checks its local state to determine if this object has already been published by a different host. If so, it signals the conflict to the originating worker. Otherwise, it updates its internal state and appropriately notifies the publishing worker. It is only after this point that the object's ownership may change.

#![allow(unused)]
fn main() {
fn w_s::publish(object, publisher: HostID) -> Result<(), Error> {
    if object is published {
        if publisher is not caller {
            return Error(AlreadyPublished);
        }
        return Ok;
    }
    /* update internal state with current ownership */
    return Ok;
}
}

schedule() Scheduler Handler

A worker that determines it cannot execute a nanotransaction locally and cannot determine an appropriate activation site locally (see worker's scheduling logic) will forward the nanotransaction intent to the global scheduler. The global scheduler in turn will compute an activation location, potentially consolidating objects onto a single host if it has to, and will subsequently send that location it back to the worker that invoked schedule(), so that the worker may forward the intent to the activation site.

#![allow(unused)]
fn main() {
fn w_s::schedule(intent) -> HostID {
    // returns a value if deps are co-located
    activation_site := location(intent.args);
    if !activation_site {
        activation_site = consolidate(intent.args);
    }
    return activation_site;
}
}

Interface #2: Scheduler to Worker

The scheduler requests object ownership changes as a result of calls to consolidate(). On successful ownership transfer, the worker that owned the object before this request was made replies with its local "whomstone version" of the object -- the last version it was an owner of. This information is recorded in the scheduler's local state as part of consolidate(), and is also propagated to the worker that triggered this consolidation. This will then permit the worker to forward its request to the new owner and supply a "minimum acceptable version" for its dependency.

move_ownership Scheduler Client

#![allow(unused)]
fn main() {
fn s_w::move_ownership(current_owner, object, new_owner) -> (bool, Version) {
    // Send request to current owner
    resp := current_owner.move_ownership(object, new_owner);
    if !resp.ok() {
        return (false, 0);
    }
    return (true, resp.whomstone_version);
}
}

move_ownership Worker Handler

The worker that is asked to give up ownership of an object is responsible for first "flushing" the workqueue of the object that needs to migrate. This is accomplished by submitting a special system nanotransaction through the mark_object_whomstone() call on the transaction manager. The successful completion of this nanotransaction results in the retrieval of an object's whomstone version, the last version the current host was the owner of. The worker is then responsible for contacting its peer (the new owner), asking it to assume ownership of the object past the whomstone version. Only when that call returns successfully may the current worker acknowledge the ownership transfer to the global scheduler.

#![allow(unused)]
fn main() {
fn s_w::move_ownership(object, new_owner) -> Version {
    // mark the object as under-migration, so that we don't
    // schedule any more local nanotransactions on it
    ownership.mark_under_migration(object, new_owner);
    // "seal" the object locally
    whomstone_version := tm.mark_object_whomstone(object);
    // make ownership change request to new owner
    resp := w_w::assume_ownership(new_owner, object, whomstone_version + 1);
    if !resp.ok() {
        // Assuming no worker can refuse ownership, errors here
        // mean the remote worker was unreachable etc. so
        // handle appropriately
    }
    ownership.mark_migrated(object, whomstone_version);

    // Start asynchronously shipping object bytes to the new owner.
    spawn(w_w::move_data(object, whomstone_version, new_owner));
    return whomstone_version;
}
}

Interface #3: Worker to Worker

schedule Worker Handler

If a worker cannot execute a given nanotransaction locally, it first tries to use its cached ownership information to make a best-effort guess about another host that could potentially execute the given transaction. If no such host can be found, or if the computed host cannot actually execute the nanotransaction, the worker contacts the global scheduler to compute a valid activation location for the given nanotransaction.

#![allow(unused)]
fn main() {
fn schedule(intent) {
    if !can_execute_locally(intent) {
        // compute activation location based on cached ownership data,
        // might get redirected
        maybe_activation_location := compute_activation_location(intent);
        if maybe_activation_location {
            /* forward intent to computed location */
        }

        // got redirected or could not compute activation location based on cache,
        // try the scheduler
        activation_location := w_s::schedule(intent);
        /* forward intent to location we got from the scheduler */
    }

    /* existing execution logic */
}
}

assume_ownership() Worker Handler

The worker that is assuming ownership of a given object has to do relatively little. It needs to update its internal ownership state to reflect the fact that it is the new owner of the given object, starting from the version specified by first_owned_version. It could also potentially submit a system nanotransaction to place the dual of a whomstone in its local transaction log -- the pair of this entry with a corresponding whomstone in a subsequent position in the log clearly delineates the object's ownership period by the worker.

#![allow(unused)]
fn main() {
fn assume_ownership(old_owner, first_owned_version) {
    // mark the object as under migration towards this host, so
    // that we can start enqueueing operations on it
    ownership.mark_owned(object, first_owned_version);
}
}

Open Questions

  • How do we reduce synchronization costs around hot data structures for the scheduler?
  • How does the scheduler maintain semi-fungible ownership state (to tolerate worker failures)?
  • How do we scale the scheduling layer?
  • How do we project ownership caches on workers?
  • How should the scheduler ask a worker for its ownership state?