The Sibley Guide to magpie
Achilles Benetopoulos (University of California, Santa Cruz)
This is a work-in-progress document containing:
- Design documents for the various components of our prototype distributed runtime,
magpie - Descriptions of our target experiments
- Other relevant details for the prototyping phase
Components
Caller
Description
This component is responsible for orchestrating the rendezvous of nandos and data, and for initiating nando execution by appropriately notifying the target node’s Scheduler. This is the entry point into the runtime after a nando invocation.
External API
TODO
Implementation Notes
TODO (use this for internal implementation details, as well as the cutting room floor for what is excluded from the PoC)
Oracle
Description
This component is responsible for keeping track of (at least) object locations across the cluster and answering placement questions from the scheduler.
External API
input: list of object/code references. output: list of pairs (object/code references, location)
Implementation Notes
TODO (use this for internal implementation details, as well as the cutting room floor for what is excluded from the PoC)
Scheduler
Description
The scheduler is responsible for figuring out the appropriate activation site (i.e. target host) for a nando, based on the current locations and activity of the nando’s object dependencies and the present state of the cluster (resource availability, etc.).
External API
input: list of object/location pairs * ownership module * cluster state, output: location of executor NOTE no cluster state for initial cut
Implementation Notes
TODO (use this for internal implementation details, as well as the cutting room floor for what is excluded from the PoC)
Ownership Tracker
Description
This module is meant to track logical ownership. (NOTE(achilles) It remains to be seen if this needs to be an independent module or if it should somehow be merged into the scheduler and/or oracle).[f]
External API
Two APIs:
- check ownership =
- input: list of data references.
- output: list of (data references * owner) pairs
- change ownership =
- input: list of (data references * new owner pairs).
- output: bool
Implementation Notes
TODO (use this for internal implementation details, as well as the cutting room floor for what is excluded from the PoC)
To atomically transfer ownership of an object from one host to another will (probably; at least in v0) require synchronization with the Execution Engine. In particular, consider the following scenario:
Host A invokes "change ownership" to move ownership of object O to host B. Object C is the current owner, and is currently running a nando on O. There are also enqueued nandos for O on C. There are a number of options here (abort, block, long block (all enqueued items), work stealing, etc, etc). But it seems important that C communicates with the OT!
Another option would be to not bother, let all that fate-shared work continue, and "fence off" the result, but this requires additional mechanism and could waste arbitrary work under contention.
Execution Engine
Description
The nando execution engine. Block until we have the data and code, run the code, return result to continuation
External API
TODO
Implementation Notes
TODO (use this for internal implementation details, as well as the cutting room floor for what is excluded from the PoC)
Component Interaction
TODO: activity diagram
Evaluation
TODO
Rough Architecture Diagram
Overview
This chapter outlines the design of the different kinds of allocators implemented on top of persistent objects as part of Magpie.
The allocator is the interface between magpie's logical description of an object, and the underlying operating system's allocation mechanisms. In the case of the linux variant, this is the mmap-based library for file-backed memory.
Implemented Allocators
Bump Allocator
This is an implementation of a simple bump allocator. It maintains a reference to a handle to the underlying storage (at present, an mmap-ed file) and manages its size as a response to allocation and deallocation requests. Namely, when an allocation request is made, it extends the underlying file and returns a pointer to the region of memory that was just appended to the file.
In the future, this allocator's logic will be platform dependent, and in the Twizzler variant will be one of the components interacting with the pager to satisfy memory requests on behalf of applications accessing object data.
High-level description
The "owner" of a given data object is defined as the (logical) host that is responsible for applying operations on that object. All requests for nanotransaction executions for a given object must be directed to its owner. Furthermore, data movement requests should be sent to its owner.
The current version of the ownership tracking module is v0.
Properties
A host can be in either of two states of ownership for a given data object:
- The host maintains strong ownership of the object, meaning that no ownership changes are allowed at the present moment. This is our way of preventing object thrashing through constant movement. In the absence of requests to renew a host's strong ownership "lease", this mode of ownership expires after a configurable amount of time.
- The host maintains weak ownership of the object. This happens after the strong ownership TTL mentioned above expires. At this point, the owning host may be asked to perform an operation on the given object if there is no scheduling pressure to relocate the given object (given that we want to minimize data movement), but ownership can change hands if need be.
Iterations
Ownership Management - v0
System Breakdown
There are two distinct components to the ownership tracking subsystem:
- A "server" component that maintains the mapping of object id to current owner, together with some piece of information that encodes the "lease" of ownership (during which ownership is not permitted to change)
- Client-side libraries (where clients are magpie instances) for instances to contact the server.
No ownership state is maintained within magpie itself, at least in the current iteration. It might be beneficial to cache ownership results (given that we can retrieve the lease time from the server) to reduce the number of round trips, but the necessity for this remains to be decided.
The way the APIs exposed by the ownership subsytem are leveraged by the rest of the system for things like scheduling are not covered by this document. They should be included under the respective components' design documents.
Prototype Implementation
For the purposes of the v0 prototype, we will host ownership information in Redis, which will be the server component described above, mapping object IDs to host IDs.
We will use two logical databases (indexes 0 and 1) to maintain ownership information. Database 0
is the one that ownership-tracker reads to and writes from. The key/value pairs present in it at
any given point in time represent all the current strong ownership relations. Database 1 is the
database that location-manager communicates with. It will contain a mapping from objects to
hosts, but these pairs represent object locations. When an entry in database 0 expires, the system
will fall back to querying the location database to determine weak ownership relations.
The above scheme is less a matter of "proper design" and more of a side-effect of choosing to use redis and wanting to keep interactions with it at a minimum, as well as not wanting to store complex serialized data structures as values. Under this scheme, magpie has to do the bare minimum to maintain ownership relations and to enforce multi-key atomic ownership updates, as described in the protocol section below.
Protocols
Location Manager
There are two cases when the location manager of a magpie instance will need to contact the location database:
- Whenever a new object is created. This in essence publishes the object to the entire cluster.
- Whenever an object is moved. At this point, the location manager of the receiving node will update the value associated with an existing object as soon as the data transfer is complete. Ownership of the object should be atomically updated before the data movement is triggered by the node initiating the data movement, as described in the ownership tracker's protocol section below.
The API exposed by the location manager for this piece of functionality can be boiled down to the following two functions.
#![allow(unused)] fn main() { fn get_location(object_id: ObjectId) -> Result<HostId> fn set_location(object_id: ObjectId, host_id: HostId) -> Result<()> }
Both operations are expected to succeed in the absence of connectivity issues to the redis instance. Updating the location of a given object is unconditional.
NOTE It is probably the case that we will not need to update a set of object locations atomically, but if we end up in that position, the above signatures will be updated to operate over collections of objects.
Ownership Tracker
The ownership tracker exposes the following operations, all of which should be considered atomic over the set of keys passed in as arguments.
#![allow(unused)] fn main() { fn get_owners(object_ids: Vec<ObjectId>) -> Result<Vec<(ObjectId, HostId)>> fn set_owner(object_ids: Vec<ObjectId>, host_id: HostId) -> Result<()> fn renew_ownership(object_ids: Vec<ObjectId>) -> Result<()> }
A call to set_owner() will fail totally if at least one of the provided objects are already
strongly owned by a host, even if it is the currently owning host. To renew an owner's lease,
renew_ownership() should be used.
High-level description
This chapter contains the design docs for the persistable collection types implemented withing magpie, to be used when defining data that is meant to be stored in objects operated on from within nanotransactions.
The problem
To model most interesting problems, we will require non-scalar types like sets, dynamic arrays etc. As things currently stand, we cannot leverage Rust's built-in collection types, because of how they lay out their memory.
Take the built-in Vec type as an example. A
Rust vector is composed of a header detailing the underlying array's length and current capacity.
This header also contains a pointer to the buffer that holds the array's contents; this pointer is
in the virtual address space of the process that instantiated the vector. What this means is that,
if we were using such a vector as part of a persistable data structure in our Magpie program, the
"transparent" abstraction of manipulating data in an mmap-ed file would break, as the contents
of the vector would reside outside of the file (given that the Rust runtime uses the global
allocator to allocate the buffer on the process' heap).
The solution
In an ideal world, all standard library collection types would allow us to set the allocator for
only the instances we are interested in (i.e. those that are meant to end up transparently
persisted) to a custom allocator that "understands" how to lay memory out for the task at hand. Even
though the allocator_api RFC is a step in this
direction, it is unclear when (and if) it is actually going to become more than just a facility that
is in place, and when the standard library collection types are going to leverage it to support
parameterization through custom allocators.
Because of this, we are opting to implement (some) collection types ourselves. We implement custom allocators, and leverage those to create collection types that expose a subset of the standard library APIs to users (so that they can be dropped-in to programs that would use standard library collection types and work as users expect them to without many surprises), but play nicely with the rest of the infrastructure we have set up around transparent operations over persistent data and data movement.
Collection Types
Vectors (Dynamic Arrays)
This page covers the design of the
PVec type
provided by magpie.
From the outside, a PVec<T> should be thought of as identical in functionality to a Vec<T>. The
difference from the user's perspective is that they explicitly need to supply a BumpAllocator
instance so that the vector implementation can request file-backed memory within the same object.
In the rest of this page, the term "persistable vector" will be used to refer to a PVec<T>.
Design
The approximate definition of a persistable vector is found in the following code listing.
#![allow(unused)] fn main() { pub struct PVec<T> { len: usize, capacity: usize, buf: usize, allocator: Option<BumpAllocator>, } }
The "header" of a vector contains information about its allocated capacity and its current length,
along with a buf entry that is the offset, in bytes, to the first allocated element of the vector.
In this way, we do not need to store pointer information as part of our "serialized" vector
representation in the underlying file. Any instance of Magpie that loads the underlying file (and,
of course, casts it to the correct datatype) will be able to load and safely access the same vector
in memory, as all references are encoded as relative offsets within the file.
A persistable vector does not model its underlying buffer as a contiguous region of memory. This is
a conscious decision, in an effort to avoid internal fragmentation of the backing storage whenever
the vector needs to be resized. Instead, a persistable vector is composed of one or more sections. A
section is created as a the result of a call to the bump allocator's allocate() method, whenever
space needs dictate the need for the underlying buffer to be resized. The newly allocated region of
memory is, itself, contiguous within the process' virtual address space.
Entries within the persistable vector are of the following type:
#![allow(unused)] fn main() { enum Entry<T> { SectionBegin { section_len: usize, }, SectionEnd { next_section_offset: usize, }, Elem(T), } }
Each section contains two "bookends": the initial element, SectionBegin describes the section
immediately following it (namely, how many elements it can hold), and the last element,
SectionEnd, contains the offset to the next allocated section (if any). The memory in between
those elements is the memory that user data occupies, wrapped within an Entry::Elem enum.
Memory Layout
The vector uses a bump allocator to request contiguous regions of memory on resize events. The
vector grows geometrically, doubling its capacity on each resize request. The image below
illustrates the sample layout in memory (and consequently, within a backing file) of a vector with a
single section (in green, delimited by SectionBegin and SectionEnd entries), which has been
allocated after some other data stored by the object.
The below image shows the effect of calling push() on the above vector. Because the allocated
buffer was at capacity, a new allocation was requested. The new allocation occured immediately after
the previous buffer, since no other allocation had intervened. The old SectionEnd pointer was
updated with the new section's offset, and the original element was inserted into the newly
allocated section.
To index into the section, the implementation reads the offset contained within buf and constructs
a pointer into the loaded file by calculating, in essence, &buf + *buf.
An advantage of this layout is that we do not need to pay the overhead of data copy on resize to maintain our invariant of the underlying buffer residing within a contiguous region of memory.
This layout, however, has implications on access times. Most importantly, accesses to the vector's
elements through an index (and, consequently, insertions and removals) are no longer truly constant
time O(1), but they still complete within a bounded number of steps.
Indexing proceeds by accessing each segment sequentially, and checking if the requested index falls
within it. If so, the implementation fetches the entry from the contiguous block of the section's
memory. If not, the next section's address is computed by reading the offset information encoded in
SectionEnd, and the above check is repeated. This means that access times are O(log(n / C)),
where C is the vector's initial capacity, and n is the number of actual elements. This means
that the initial capacity users choose for the vector matters. A value that is too far off the
number of elements the vector ends up holding will lead to a larger number of relatively small
sections being allocated, and therefore more steps before the appropriate section is identified. For
this reason, the initial capacity should be (roughly) in the same order of magnitude as the
anticipated number of elements in the vector.
API
The full list of supported operations can be found in the corresponding module-level docs. For the prototype version, the supported operations are the fundamental ones, namely:
- Inserting elements --
fn push(&mut self: PVec, elem: T) -> () - Removing elements --
fn pop(&mut self: PVec) -> Option<T> - Querying the vector's length --
fn len(&self) -> usize - Indexing into the vector through the
[]operator.
Transaction Execution
This chapter contains resources describing the design of the various components that support transaction execution in magpie, as well as the execution semantics and user-provided guarantees for the different kinds of supported workloads.
Transaction Execution
- Nanotransactions
- Transactional functions that declare all the objects they will touch as arguments, and never spawn further downstream computations. This is the building block for compound operations, or "epics".
- Epics
- The runtime composition of nanotransactions that form a graph of dependent subcomputations (like a task graph in a traditional distributed dataflow system). These should not really be thought of as transactions, but rather as workflows composed of (nano)transactions, with user-configurable consistency semantics with regard to this composition.
Nanotransactions
A nanotransaction is any function operating on invariant, mobile data that does not include another transactional function invocation as part of its body.
One of the simplest nanotransactions one can write is a function that transfers a certain amount of money between users' bank accounts:
#![allow(unused)] fn main() { let transfer = nando(| src: &Account, dst: &Account, amount: f64, | { if src.balance < amount { abort(/* error code */); } src.balance -= amount; dst.balance += amount; }) }
A nanotransaction is executed in an all-or-nothing fashion, with its updates made visible at commit
time. The application of an authored nanotransaction f on a set of input and output objects o_1, ..., o_n is referred to as an activation of nanotransaction f.
Importantly, a nanotransaction never invokes another nanotransaction -- nanotransactions containing nanotransaction invocations in their body will be decomposed into epics.
Iterations
Lifetime of a nanotransaction activation
A request for an activation originates on some machine, either because of a user explicitly making a call through an interactive shell, or programmaticaly by a program condition in the ephemeral context of a user program (running on top of Magpie) invoking a nanotransaction with concrete object IDs.
The first step of execution involves the source machine writing down the activation in a local intent log. QUESTION should this be done before or after scheduling? If we do it before scheduling, it meanst that the most information we can store at this point is the "transaction id" (a content-based hash of the function to run and its input arguments). If we do it after scheduling, we can breadcrumb our way through the execution if it spills over to a different machine if we ever have to inquire about the status of the nanotransaction execution.
Some scheduling needs to take place. We inspect the set of arguments, we consult the ownership module, and come up with an activation location. The second step is to orchestrate all the required data movement to the activation location, submit the nanotransaction to the activation location, and (potentially) await for the result.
Logging
What we keep track of, and why
Intent logging: Each transaction should have an intent entry in the intent log specifying the system's intent to execute it. For each successful commit, there should also be a corresponding committed entry (QUESTION how does this work with compound transactions? multi-host transactions?).
Old-value logging: keep track of the values of the fields that the nanotransaction we are about to
run is going to overwrite (by assigning to them), as tracked through this
change in the nandoize macro. The reason
for tracking old values is so that, on abort, we can undo a transaction's (partial) changes to
provide failure atomicity.
New-value logging: keep track of the values that this transaction ended up writing (and to which object fields) so that, on recovery, we can re-execute them.
Nanotransactions - v0
Declaring a nanotransaction
The prototype implementation of "nanotransactions" in magpie currently leverages an operational
macro (nandoize) that users use to annotate functions that are meant to be run as
nanotransactions.
nandoize
The purpose of the nandoize macro is to wrap (almost) any arbitrary user function with a
"transactional context". The implementation documentation is located here.
As an example, say we have the following function set_key() operating
over arguments of type KeyValuePair:
#![allow(unused)] fn main() { #[derive(PersistableDerive)] struct KeyValuePair { key: u64, value: u128, } #[nandoize] fn set_key(x: &mut KeyValuePair, k: u64) { x.key = k; } }
The macro will result in the following meta-function being generated:
#![allow(unused)] fn main() { 1 | struct NandoManager { 2 | fn set_key_nando(object_tracker: &object_lib::ObjectTracker, iptr0: &object_lib::IPtr, k: u64) { 3 | let mut obj0 = match object_tracker.get(iptr0.object_id) { 4 | Some(o) => o.lock(), 5 | None => panic!("Object {} not found", iptr0.object_id), 6 | }; 7 | obj0.advise(); 8 | let mut v = unsafe { obj0.read_into::<KeyValuePair>(&iptr0).unwrap().as_mut() }.unwrap(); 9 | /* pre-image logging */ 10 | let res = set_key(v, k); 11 | /* post-image logging and flushing/fsync'ing here */ 12 | res 13 | } 13 | } }
There are three parts to these generated functions:
- The first part is the "preamble" where
- invariant pointers are resolved to concrete in-memory pointers (in the sample above, lines 3-8), potentially also resolving for the appropriate version. Arguments that the caller expects to receive by value (scalars or more complex structures) are simply passed through to the wrapped user function.
- Pre-transaction state is captured for all (potentially) mutable fields.
- The second part is the call of the original user function (line 10).
- The last part is the logging and syncing phase, during which:
- post-transaction state of the touched data is captured
- flushes to disc are made
With this setup, we still get to write "normal" rust functions over "normal" rust data, but we can then invoke them through the runtime which is also supposed to manage objects.
Not all functions may be annotated with nandoize. The current restrictions are:
- The annotated function may not be async (this is subject to change, see epics)
- Any argument passed by reference must implement the
Persistabletrait.
If the macro is invoked on an async function, a panic with an appropriate error message is thrown.
If a non-Persistable argument is included in the function's arguments, the macro expansion will
succeed, but the final user program will not typecheck.
Persistable Trait
The Persistable trait should be implemented for any type that the user wants to store in an
object. Its basic implementation includes two methods to transparently convert between stored,
mmap-ed data and usable data structure instances:
#![allow(unused)] fn main() { pub trait Persistable { fn as_bytes(&self) -> &[u8] where Self: Sized, { unsafe { slice::from_raw_parts((self as *const Self) as *const u8, mem::size_of::<Self>()) } } fn from_bytes(src: *mut [u8]) -> *mut Self where Self: Sized, { let src_p = src as *mut Self; src_p } } }
This is trivially derivable for "simple" types (hence the inclusion of a simple derive macro,
PersistableDerive), but really falls over for owned types (I won't even mention collections, they
have been covered elsewhere) -- this is in the
short-term list of things to address.
Execution
Currently, the only way to invoke a nanotransaction "from the outside" (as in, when not feeding a
workload file to the runtime or invoking rust functions that trigger nanotransactions, like tests)
is to make an RPC to a magpie instance, where the endpoint is (currently, unfortunately) handwritten
by the user. For the set_key() method, the user would also have to implement the following
function (some boilerplate has been removed for illustration):
#![allow(unused)] fn main() { async fn set_key(&self, request: Request<RequestBody>) -> Result<Response<()>, Status> { let object_iptr = IPtr::from(request.get_ref()); let new_key: u64 = request.get_ref().key.into(); self.wait_for_object(&object_iptr).await; { // clone async Arc from the server instance let object_tracker = Arc::clone(&self.object_tracker); tokio::task::spawn_blocking(move || { let rt_handle = tokio::runtime::Handle::current(); let object_tracker = rt_handle.block_on(object_tracker.read()); NandoManagerBase::set_key_nando( &object_tracker, &object_iptr, key, ); }) .await .unwrap(); } Ok(Response::new(())) } }
What works and what doesn't
Pretty much all of magpie is reliant on tokio, including nanotransaction scheduling and execution, and I want to get away from that, at the very least for some performance predictability and hopefully some better debugging facilities.
I think the approach of generating transactionally-boxed code from macro-annotated user functions is a good one, but the current implementation is very hacky. Additionally, as it stands now, I have no clear plan in my mind for extracting user application code outside the magpie binary, and providing a clean library, and some kind of integration with a magpie daemon or something along those lines.
The following page goes into more detail for the next target design for the prototype.
Nanotransactions - v1
Execution Engine Overview
Requests for nanotransaction activations arrive at the local instance of the Activation Router component -- for now, think of this as the global component of a two-tier scheduler. If the activation router determines that the given nanotransaction can execute locally (potentially having to wait for incoming data dependencies to arrive to the present host), it forwards the nanotransaction request to the Transaction Manager (1). The transaction manager logs the intent to run the given nanotransaction, and then forwards the request to the local Nando Scheduler (2), receiving back a future that it can await for the nanotransaction's termination. The rest of the nanotransaction's lifecycle is synchronous. The local scheduler is responsible for constructing a nanotransaction activation that can be enqueued for execution to the Nando Executor (3), which outputs the nanotransaction execution status along with any new log entries that need to be appended to the local log to the Log Manager (4), which is also responsible for informing the local scheduler of the outcome of the nanotransaction (5). At this point, the future returned initially over (2) is completed.
Component Breakdown
Activation Router
For incoming nanotransaction activation requests (say, from a local magpie REPL or from another magpie instance on the same cluster), it is responsible for ensuring that the nanotransaction can be scheduled locally (either by awaiting in-progress data dependency movements or initiating them itself), and either forwarding the request to a different host (eventually) or submitting it for local execution across interface (1) to the local Transaction Manager.
It is also possible for the Transaction Manager to notify the activation router of the need to schedule a nanotransaction on a remote host (although this will probably not need to happen until we want to support epics).
Finally, this component is responsible for notifying the request's source for its eventual status (success / failure / fwd / etc.).
Transaction Manager
Interface between the application execution environment and the nanotransaction execution infrastructure. Responsible for recording events to the local intent log (not pictured) on all incoming event edges, for transaction recovery and lineage tracking, as well as forwarding local execution requests to the local scheduler, and forwarding requests for remote transactions to the Activation Router.
Nando Scheduler
Local scheduler responsible for scheduling activations that are ready to be executed (in terms of dependencies etc.) to the appropriate task queue feeding into the local executor. The scheduler is somewhat critical to the correct execution of nanotransactions, as it also enforces mutual exclusion (by ensuring that any "active" object is exclusively owned by exactly one thread in the local executor at any given time). Some more detail around this is provided in the "Local Scheduling" section.
Nando Executor
A pool of N threads (configurable), each of which is pinned to a hardware core, consuming nanotransaction activations from its incoming queue (produced by the local scheduler), and outputting log records (and in the case of epics, continuations) to the local log manager. Inspired by various thread-per-core models, the goal here is to minimize latency on the nanotransaction execution path by executing many small units of work (the nanotransaction) in a strategy that maximizes locality and minimizes context switching and the need for synchronization between execution threads.
Log Manager / Commit Engine
A component dedicated to flushing log entries to stable storage. This component also encapsulates the commit/abort logic of the transaction execution system. In the initial design, all nanotransactions should commit barring exceptional circumstances, so the commit logic is essentially a no-op (or rather a list of assertions to make sure we didn't mess anything up in the scheduling and execution layers) -- however, it becomes important for supporting epics, and hence fleshing it out is reserved for that design.
Design Discussion
Motivating this design
The above kind of comes out of frustration on my part around the lack of control in pure async runtimes like tokio. Scheduling is opaque, metrics are confusing, and at the end of the day it is a lot of technical baggage for a relatively narrow purpose (we are not trying to power the same kinds of use cases as things like tokio, so we can lose some of the generality).
The TPC-style setup of the execution engine was influenced by the design of "modern", low latency systems (e.g. LMAX, Tigerbeetle, ScyllaDB), and the rationale behind them, namely -- you will go fast if you avoid cross-thread synchronization as much as possible (hence the ownership of each active object by at most one thread), try to maximize the mileage you get out of your cache (with the thread-per-object model and the scheduling policy working together to leverage both spatial and temporal locality), and minimize context switching (in which we are aided by the constraints we place on individual nandos and what they can actually do at runtime; separating the logging functionality to another thread/component is also in service of this target).
Local Scheduling
The job of the local scheduler is to enforce single-threaded accesses to objects being operated on by nanotransactions at any given time. Its scheduling policy should strive to minimize (or eliminate) thrashing of local objects between cores, leveraging locality in a working set.
The scheduler can receive a request for execution from two different components: the local Transaction Manager, because of an "external" request for a nanotransaction activation on the given machine, or (in the case of epics) from the local "Commit Engine".
The commit engine is responsible for inspecting the output of "yielding" executions that have requested spawns of downstream subcomputations (in the form of nando activations), and for forwarding them to the local scheduler.
Object Versioning
Each object is associated with a version (starting at 0), which increases in strict monotonic order as updates are applied to it. Versions reside in an object's header, and are incremented by the owning executor thread after any nanotransaction invocation that modifies the object's contents. A transaction's log entry includes version information for both its read set and write set (which might include one or more output objects that this transaction created, again at version 0).
Versions are available to all components of the system -- they can be specified explicitly in a nanotransaction's activation request as part of the nanotransaction's input, but only for read dependencies. These versions can be materialized if they are not already present locally (because of an object move, garbage collection, etc.).
Object versions are not user visible or accessible, so the only way for the system to request an object at a specific version is for read-only transactions that do not need access to the latest version of an object. At this point, a recent version is selected by the runtime during the transaction submission, and the nanotransaction is permitted to proceed (potentially on a different executor thread than the one that owns the target object presently).
Absent a version specification, the runtime will use the latest version of an object. Depending on the access patterns for a given object (and the associated complexity and benefits), it might be appropriate to attempt to keep two live versions of an object around, a "back buffer" for writes and a "front buffer" that is one version behind the "head" of an object and is used to service reads (in order to avoid blocking write nanotransactions).
Technical Reference
Data Structures
Intent Log Entry
#![allow(unused)] fn main() { enum IntentKind { OWNERSHIP_CHANGE, USER_TRANSACTION_EXECUTION, } enum IntentLogEntryStatus { START, // set before sending top-level transaction to nando scheduler SUCCESS, // successful commit response from scheduler ABORTED(String), // abort notification from scheduler, with reason for abort } struct IntentLogEntry { kind: IntentKind, txn_id: TxnId, status: IntentLogEntryStatus, timestamp: PersistableTimestamp, } }
All intended actions on objects are first entered in the owning instance's local intent log. This includes both application transaction execution, as well as internal operations like ownership changes.
For each submitted transaction, there will always be an entry with status START. For each
completed transaction, there will also be an entry at a strictly greater timestamp with status
either SUCCESS or ABORTED, and a reason for the abort if any (which might be
application-specific error codes or system abort codes).
Integrating "destructive" system operations like ownership changes into the transaction path seems like a good design choice, as it restricts the number of concurrent (i.e. unsafe) interactions we need to consider when building the system. Under this model, an ownership change is a "linear" process from the instance's activation router receiving the ownership change request, through the transaction system (which places a whomstone for the moved data in its local log for any lagging requests targeted at the object under ownership transfer), to the component that is responsible for the shuffling of data between hosts.
Transaction Log Entry
#![allow(unused)] fn main() { struct Image<T> { field: IPtr, // IPtr is a triple of (object_id, offset, size) pre_value: T, post_value: Option<T>, } struct Version { object_id: ObjectId, version: u64, } struct TransactionLogEntry { txn_id: TxnId, images: Vec<Image>, read_set: Vec<Version>, // Object IDs in write_set that also appear in read_set // will be accompanied by a version that // is the old version incremented by one write_set: Vec<Version>, timestamp: PersistableTimestamp, } }
Instances of TransactionLogEntry are constructed by worker threads in Nando Executor during
nanotransaction execution, and are then passed off to the Log Manager over (4) (most likely over a
queue) for persistence.
In case of an application-level abort() in the middle of executing a nanotransaction, it is the
executor thread's responsibility to re-apply all values from images to recover the state of the
target objects. This could be more elegant through shadow paging, but I am not sure what the
relative overheads are and how the two approaches compare to each other.
An open question with regard to both kinds of log entries is whether or not we need a concept
like an LSN, both for easier (semantic) association of entries (which could also be accomplished
through the included txn_id) as well as managing the externalization of effects.
Interfaces
1. GC <-> TM
TODO
2. TM <-> Nando Scheduler
#![allow(unused)] fn main() { struct NandoHandle<T> {} impl<T> Future for NandoHandle<T> { type Output = Result<T, TxnExecutionError>; /* ... */ } struct ActivationIntent { txn_id: TxnId, /* ... */ } struct Activation { txn_id: TxnId, /* ... */ } fn schedule_activation<T>(activation_intent: ActivationIntent) -> NandoHandle<T> { /* ... */ } }
ActivationIntent should contain the name of the nanotransaction we want to schedule, and a list of its
arguments, either as IPtr entries (for objects to be resolved), or values (for any pass-by-value
arguments). This should be translated by the Nando Scheduler into a concrete Activation that is
then executable by Nando Executor threads.
Although ActivationIntent and Activation might appear redundant at this point, their distinction
becomes clearer in the epics document. For now it should suffice to say that an
ActivationIntent can lead to the spawning of multiple connected Activations, but from the
caller's perspective (i.e. Transaction Manager), that is opaque.
Epics
An "epic" is a compound operation comprised of a set of nanotransactions. While each nanotransaction in an epic operates under the same atomicity guarantees as in the simple case, the epic abstraction permits users to specify the kinds of isolation etc. they desire from the composition of nanotransactions (in a way similar to a Saga).
To help illustrate how epics differ from nanotransactions, we will use the following example of a simple graph traversal algorithm:
#![allow(unused)] fn main() { #[nandoize] fn aggregate(node: &Node, output: &mut Aggregator) { if output.visited.contains(&node) { return; } output.visited.insert(&node); output.sum += node.value; for neighbor in node.neighbors { aggregate(neighbor, output); } }) }
and its associated data model:
#![allow(unused)] fn main() { struct Node { value: uint, neighbors: PVec<Node>, } struct Aggregator { sum: uint, visited: Set<Node>, } }
"Compilation"
The above nanotransaction code would first be re-written to something like the following:
#![allow(unused)] fn main() { fn aggregate(node: &Node, output: &mut Aggregator) { if output.visited.contains(&node) { return; } output.visited.insert(&node); output.sum += node.value; for neighbor in node.neighbors { nando_spawn!("aggregate", neighbor, output); } yield(); }) }
The job of the nando_spawn!() macro is to update the transaction log entry corresponding to a
given execution of aggregate (under a transactional context, called through the generated
aggregate_nando() variant, as covered in the nanotransaction section) with the information that
the present transaction intends to spawn a (data-dependent) number of activations of aggregate.
Note the use of the word "intends": a nando_spawn!() does not immediately schedule the specified
function, but rather ensures that the intent for it to be scheduled is durably logged; it is the
responsibility of downstream components to schedule the target activation.
The yield() function call has two effects:
- First, a continuation is generated, whose execution is dependent on the successful execution of all the spawned nanotransactions. The point of this continuation is to specify a natural "merge" point for the "forked" computations. In the case of what are in essence tail calls (like above), we can omit this step. This is functionally similar to the way CIEL manages its dynamic task graphs.
- Second, it drives the current nanotransaction activation to commit (the meaning of "commit" in this context is dependent on the user's choice of consistency mode. Outside of the information that is recorded to facilitate recovery (as in nanotransaction logging, the log entry also contains the list of spawns generated by the executed nanotransaction, which are then materialized into activations, either locally or remotely.
Execution
Let's assume that a user is interacting with a REPL, and already has an invariant pointer to a node,
start_node, in a graph of Nodes. A sample graph is provided on the left-hand side of the below
image. On the right-hand side we can see what the full "task graph" of invoking aggregate() on
start_node with a fresh Aggregator object as output looks like. This task graph is the epic.
Rectangles represent the points at which the runtime interposes on the execution of aggregate()
(henceforth referred to as control nodes), while squared boxes represent the execution of the
rewritten aggregate() nanotransaction (henceforth referred to as task nodes).
At each control node, the runtime schedules the execution of further downstream computations, and does all the necessary bookkeeping for the continuation of control, into either another task node / nanotransaction activation or a "commit" decision point for the epic. The advantage of the above approach is that control nodes lend themselves naturally to the process of discovery, and can facilitate both distributed prefetching and also continuation across host boundaries (think back to the graph traversal example from the HPTS talk).
For the sample graph, the execution of aggregate(&start_node, /* some output object */) (marked as
T1.1 in the figure above) will spawn two further transactions, one for each of its neighbors. More
specifically, T2.1 for the node with the value 14 and T2.2 for the node with the value 18.
Because both of them need mutable access to the given output object, they will be scheduled on the
same executor queue, one after the other (with the ordering shown in the numbered circles in the
figure). Once T2.2. "commits", a decision is made to commit or discard the entire epic. On commit,
the future that represented the result of the nanotransaction is resolved and the user can consume
the results of aggregate() through the output object they passed as an argument to the top-level
transaction.
Non-tail calls
The actual usefulness of the above dynamic continuation scheme is more evident in non-tail calls. Imagine the below two functions:
#![allow(unused)] fn main() { #[nandoize] fn bar(some_other_object: &StructuredObject) -> ... { /* */ } #[nandoize] fn foo(some_object: &StructuredObject) { let x = /* some value based on some_object's contents */; /* invoke bar nando on a reference */ let intermediate = bar(some_object.left); /* compute some value combining x and intermediate */ } }
The above would be rewritten as
#![allow(unused)] fn main() { #[nandoize] fn bar(some_other_object: &StructuredObject) -> ... { /* */ } #[nandoize] fn foo(some_object: &StructuredObject) { 1 | let x = /* some value based on some_object's contents */; 2 | /* invoke bar nando on a reference */ 3 | let intermediate = nando_spawn!("bar", some_object.left); 4 | yield(); 5 | /* compute some value combining x and intermediate */ } }
After the corresponding activation of bar() commits, execution of foo() will resume immediately
after line 4, regardless of whether the activation of bar() was local or remote. All state up to
the yield() statement has been durably persisted, so that we don't have to replay the entire
epic if we need to recover from some fault.
The yield() construct allows us to introduce some hints as to the "maximum" permitted parallelism
of computation subtrees. Any set of spawns between two successive yield() statements in a function
(or the function's start and the first yield() statement) could be scheduled to run
concurrently at runtime, but might be prevented from doing so because of data dependencies, guiding
the runtime towards a sequential schedule. yields() represent barriers (or "merge points") that
need to be enforced at runtime.
Motivation
The above mechanism for rewriting large transaction into compositions of contiuations is motivated by our goals of ensuring non-interference properties for workflows that derive from compositions of nanotransactions. It allows us to leverage the existing execution model and consistency guarantees of local nanotransactions and build a system of workflows on top of them, where the workflow is established by controlling how the effects of nanotransactions in the same "transaction tree" are composed and made visible to the "outside" world, while also allowing us to solve the problem of (potentially) distributed transactions outside the core implementation of nanotransactions themselves.
After a nanotransaction / task is done executing and invokes yield(), it can be considered "done",
in the sense that it cannot block other nanotransactions that operate on the same data items it
does from making progress. TODO expand
Handling async code
The notion of a composition of tasks into an epic permits us to re-introduce async constructs into
our transactional context. An async block is equivalent to a nanotransaction that will be executed
as its own task, with the await in the calling code being replaced by an appropriate yield()
construct. These async tasks can be handed off to a special async executor so as not to interfere
with the execution of synchronous nanotransactions.
Execution Semantics
Tunable Consistency
By default, epics work in a mode that is functionally equivalent to Snapshot Isolation (SI). Before the invocation of the top-level transaction that kicks the epic off, the runtime establishes a causally consistent snapshot that all of an epic's reads are resolved through and all of its writes are directed to -- commits of inner nanotransactions are appended to a log that is part of the snapshot and will succeed by default. At the end of the epic, a decision being made at the end of the epic on whether or not to commits its effects (the writes of all of the nanotransactions that were spawned as part of the epic), with the potential of an abort-and-retry in the case of a dependency conflict with transactions that committed since the epic's snapshot was established. If an epic commits, the entirety of its internal log is appended to the "real" instance log.
In reality, the runtime does not establish a complete snapshot at the beginning of the execution, but rather stores enough metadata to be able to establish a causally consistent snapshot lazily as the epic's data dependencies are discovered. The mechanism for this is as of yet not established.
For users that don't want, and applications that don't need to pay the cost of snapshot isolation,
we can weaken the epic's consistency guarantees to those of a more traditional saga. Namely, the
runtime still provides isolation and sequential consistency semantics at the level of individual
nanotransactions and the objects they touch, but each inner nanotransaction immediately commits its
results. If the epic is forced to roll back for whatever reason (which under this consistency mode
would most likely be an application level rollback() statement), a series of compensating actions
will (potentially) need to be applied as part of the epic's rollback.
Compensating (trans)actions
A compensating action might be a simple no-op, or it might be a reset of an object's values as they were before the invocation of the function being compensated for, or it might be some other application-specific logic (although it should probably be restricted to "locally" computable effects, and not include, say, other nanotransaction invocations) -- the point is that we cannot predict this and it should be up to our users to know their domain enough to be able to supply these.
Ideally, we will be able to build some infrastructure that can warn users of potential anomalies that could be introduced by this lower isolation level, to help them in making the choice between the two.
Distributed Transactions
TODO
Epic Object Versioning
TODO
Ownership and Global Scheduling
- Problem Statement
- Prototype Overview
- Global Scheduler Functionality
- Functionality Description
- Open Questions
Problem Statement
For a magpie instance to execute a nanotransaction on a set of objects, it first needs to be the "owner" of the objects the nanotransaction depends on. Ownership directly translates to permission to perform writes on a given object.
This restriction of the execution model requires us to introduce a system to manage object ownership. As ownership management and the logic around compute and data rendezvous are tightly coupled, we combine the two functionalities into a discreet system layer responsible for cluster-wide scheduling. Magpie instances are still entirely responsible for the local scheduling and execution of nanotransactions; the scheduling layer faciliates discovery (so that instances can schedule nanotransactions over objects they know nothing about) and object-level ownership transfer between hosts to facilitate the execution of "cross-partition" nanotransactions.
Prototype Overview
For the first prototype, we will structure the system as a layer of workers (the existing magpie instances) whose work is orchestrated at a coarse granularity by a scheduling layer which for now only contains a single scheduler instance.
Workers publish newly allocated objects to the global scheduler over the w_s::publish()
interface (1). This establishes them as the owners of the objects being published, and also makes
the published objects visible to discovery, so that cluster-wide nanotransactions may be scheduled
over them.
Upon receiving a request to execute a nanotransaction (see the execution engine page), the receiving worker first attempts to schedule a nanotransaction locally. If it can't (because it does not own the nanotransaction's object dependencies, for example), it inspects its locally cached (and potentially stale) ownership information to determine if it knows of another worker that can handle the request (i.e. is the owner of all associated dependencies).
If such a host exists, the original worker forwards the nanotransaction activation request over the
w_w::schedule() interface (3), and execution proceeds normally. If such a host cannot be
determined, then the original worker forwards the activation request to the global scheduler over
the w_s::schedule() interface (1).
The scheduler inspects the current ownership state of the nanotransaction's dependencies. If it can
find a host that is the owner of all objects, it replies to the requesting worker with the host's
ID. Otherwise, it first attempts to consolidate the nanotransactions' dependencies on a single host
(through parallel calls over the s_w::move_ownership() interface (2)), before letting the original
worker know of the new owning host's identity so that it may forward its execution request.
Global Scheduler Functionality
The scheduler's core piece of functionality is to consolidate nanotransaction dependencies on a single owning host before scheduling a nanotransaction. Below is some pseudocode for the consolidation process.
#![allow(unused)] fn main() { fn consolidate(dependencies) -> HostID { // Run the cost model to compute an optimal activation location activation_site = compute_location(dependencies); parallel for (dependency, old_owner) in dependencies { success, whomstone_version = s_w::move_ownership( old_owner, dependency, new_owner, ); // Trigger ownership change between workers, wait for ack if !success { signal failure; } ownership_state.update(/* */); } if any migration failed { signal partial failure. } return activation_site } fn compute_location(dependencies) -> HostID { / * cost model */ } }
In order for consolidate() to make principled choices, it relies on the cost model to compute an
activation location that (at least) minimizes object moves and service disruption.
Ownership State
The scheduler maintains ownership state in two data structures:
- A registry of current owners, capable of answering questions of the form "which host owns object O?", to facilitate nanotransaction scheduling (as well as ownership changes). Along with object metadata, this constitutes the underlying state the cost model uses for its execution strategy ranking.
- A per-object record of the object's past ownership changes. This is used to serve read requests for past versions of an object (as well as epic scheduling).
Each worker maintains its own version of ownership state.
- First, each worker has, at any given point in time, complete knowledge of the set of objects for which it is the current owner (and hence can answer questions of this kind authoritatively).
- Each worker also has historical knowledge around the ownership lineage of the objects that it was an owner of in the past. Ownership changes are recorded in special locations in the ceding owner's transaction log, tentatively called "whomstones".
- Finally, it has a potentially stale cache of ownership info for objects that it has scheduled nanotransactions over, permitting each worker to engage in best-effort scheduling of nanotransactions without necessarily having to make a call to the scheduling layer.
An interesting point in this design is that none of the scheduler's state constituteds "hard state" that we need to persist to external storage as it can be entirely reconstructed by aggregating ownership state from the worker layer.
Functionality Description
Interface #1: Worker to Scheduler
publish() Scheduler Handler
Workers periodically publish objects that were allocated locally to the global scheduler, to make them discoverable across all workers in the worker layer. The scheduler checks its local state to determine if this object has already been published by a different host. If so, it signals the conflict to the originating worker. Otherwise, it updates its internal state and appropriately notifies the publishing worker. It is only after this point that the object's ownership may change.
#![allow(unused)] fn main() { fn w_s::publish(object, publisher: HostID) -> Result<(), Error> { if object is published { if publisher is not caller { return Error(AlreadyPublished); } return Ok; } /* update internal state with current ownership */ return Ok; } }
schedule() Scheduler Handler
A worker that determines it cannot execute a nanotransaction locally and cannot determine an
appropriate activation site locally (see worker's scheduling logic) will
forward the nanotransaction
intent
to the global scheduler.
The global scheduler in turn will compute an activation location, potentially consolidating objects
onto a single host if it has to, and will subsequently send that location it back to the worker that
invoked schedule(), so that the worker may forward the intent to the activation site.
#![allow(unused)] fn main() { fn w_s::schedule(intent) -> HostID { // returns a value if deps are co-located activation_site := location(intent.args); if !activation_site { activation_site = consolidate(intent.args); } return activation_site; } }
Interface #2: Scheduler to Worker
The scheduler requests object ownership changes as a result of calls to consolidate(). On
successful ownership transfer, the worker that owned the object before this request was made replies
with its local "whomstone version" of the object -- the last version it was an owner of. This
information is recorded in the scheduler's local state as part of consolidate(), and is also
propagated to the worker that triggered this consolidation. This will then permit the worker to
forward its request to the new owner and supply a "minimum acceptable version" for its dependency.
move_ownership Scheduler Client
#![allow(unused)] fn main() { fn s_w::move_ownership(current_owner, object, new_owner) -> (bool, Version) { // Send request to current owner resp := current_owner.move_ownership(object, new_owner); if !resp.ok() { return (false, 0); } return (true, resp.whomstone_version); } }
move_ownership Worker Handler
The worker that is asked to give up ownership of an object is responsible for first "flushing" the
workqueue of the object that needs to migrate. This is accomplished by submitting a special system
nanotransaction through the mark_object_whomstone() call on the transaction manager. The successful
completion of this nanotransaction results in the retrieval of an object's whomstone version, the
last version the current host was the owner of. The worker is then responsible for contacting its
peer (the new owner), asking it to assume ownership of the object past the whomstone version. Only
when that call returns successfully may the current worker acknowledge the ownership transfer to the
global scheduler.
#![allow(unused)] fn main() { fn s_w::move_ownership(object, new_owner) -> Version { // mark the object as under-migration, so that we don't // schedule any more local nanotransactions on it ownership.mark_under_migration(object, new_owner); // "seal" the object locally whomstone_version := tm.mark_object_whomstone(object); // make ownership change request to new owner resp := w_w::assume_ownership(new_owner, object, whomstone_version + 1); if !resp.ok() { // Assuming no worker can refuse ownership, errors here // mean the remote worker was unreachable etc. so // handle appropriately } ownership.mark_migrated(object, whomstone_version); // Start asynchronously shipping object bytes to the new owner. spawn(w_w::move_data(object, whomstone_version, new_owner)); return whomstone_version; } }
Interface #3: Worker to Worker
schedule Worker Handler
If a worker cannot execute a given nanotransaction locally, it first tries to use its cached ownership information to make a best-effort guess about another host that could potentially execute the given transaction. If no such host can be found, or if the computed host cannot actually execute the nanotransaction, the worker contacts the global scheduler to compute a valid activation location for the given nanotransaction.
#![allow(unused)] fn main() { fn schedule(intent) { if !can_execute_locally(intent) { // compute activation location based on cached ownership data, // might get redirected maybe_activation_location := compute_activation_location(intent); if maybe_activation_location { /* forward intent to computed location */ } // got redirected or could not compute activation location based on cache, // try the scheduler activation_location := w_s::schedule(intent); /* forward intent to location we got from the scheduler */ } /* existing execution logic */ } }
assume_ownership() Worker Handler
The worker that is assuming ownership of a given object has to do relatively little. It needs to
update its internal ownership state to reflect the fact that it is the new owner of the given
object, starting from the version specified by first_owned_version. It could also potentially
submit a system nanotransaction to place the dual of a whomstone in its local transaction log --
the pair of this entry with a corresponding whomstone in a subsequent position in the log clearly
delineates the object's ownership period by the worker.
#![allow(unused)] fn main() { fn assume_ownership(old_owner, first_owned_version) { // mark the object as under migration towards this host, so // that we can start enqueueing operations on it ownership.mark_owned(object, first_owned_version); } }
Open Questions
- How do we reduce synchronization costs around hot data structures for the scheduler?
- How does the scheduler maintain semi-fungible ownership state (to tolerate worker failures)?
- How do we scale the scheduling layer?
- How do we project ownership caches on workers?
- How should the scheduler ask a worker for its ownership state?
Multiversioning
Summary
This page covers the design of the multiversioning subsystem of magpie, as well as how it integrates with nanotransaction execution. Details of how epics make use of versioning can be found in the epic design doc.
- Introduction
- Version Materialization and Storage
- Nanotransaction Scheduling
- Versioned Object Resolution
- Epics and Multiversioning
Introduction
Currently, worker-resident objects are maintained in a per-worker object
tracker. The
execution subsystem uses the object tracker to resolve IPtr instances to concrete, in-memory
object references.
The object tracker only maintains "current" object versions, and because of our execution subsystem's structure it means that even read-only transactions end up being serialized behind transactions that have been scheduled before them that modify any subset of the read-only transaction's dependencies. Additionally, even though we maintain version history for each object (in the form of pre- and post-images in transaction log entries), we currently have no way of materializing past versions, which means that we cannot support any stronger isolation modes than "read committed" (note that in this context, "read committed" means that the effects of nanotransactions that are executed as part of an epic are immediately externalized to the system (as opposed to making them visible once the epic successfully commits).
The goal of the present page is to formalize the design of the versioning subsystem, concretely describe how it interacts with the nanotransaction execution subsystem to provide higher concurrency for read-mostly workloads, and sketch how epics will make use of this subsystem.
Version Materialization and Storage
A new per-worker component, the version manager, is introduced to supplement the functionality of the object tracker. The version manager maintains a mapping from object IDs to a list of committed versions of the given object in a lock-free hashmap variant, each entry storing a circular array of pointers to heap-allocated materialized versions of the target object.
On object initialization, we store a copy of the object in the table. Any time the object is modified as part of a nanotransaction, we materialize a new read-only version of the object by applying the set of relevant post-images to the latest in-memory copy of the object, and writing the new version pointer to the tail of the array.
This process can either happen downstream from the log manager by a "versioning thread", or it can be done by the worker thread that made the changes. I am unsure which is going to perform better, so I want to experiment with both.
Storage
Our overall approach is close to the delta storage approach of existing MVCC databases; we maintain
the most recent version of all objects on disk and in-memory, and we maintain deltas generated from
effectful nanotransactions in the form of post-images in associated transaction log entries. If we
need to materialize a version that is not already cached, we need to "walk" the chain of deltas,
starting from the oldest version available to us through the version manager and repeatedly applying
the equivalent of logical undo() operations until we arrive at the desired version.
The reason why our approach is "close" to delta storage is that we unfortunately have to store as many copies of the entire materialized object in memory as there are slots in each version cache. Ideally, we would be maintaining a page cache and doing direct I/O from disk; given this infrastructure, we could avoid the above storage amplification by enforcing page-level copy-on-write. With that said, switching over to direct I/O and implementing a performant cache seems like more trouble than it's worth at the present moment, so for now we will just accept the fact that we incur high storage amplification and rely on Rust's clone-on-write smart pointer types.
Nanotransaction Scheduling
In the presence of object versioning, the worker-local scheduler will have to be modified to accomodate the following scenarios:
- Read-only nanotransactions: This class of nanotransactions is unconstrained in terms of which executor thread they can run on, so the scheduler can apply any constant-time heuristic (e.g. least-in-flight) to schedule.
- Read-write nanotransactions: These are nanotransactions that have both mutable and non-mutable object parameters, or nanotransactions that allocate an object and initialize it with data extracted from a set of input, read-only objects. For this class of nanotransactions, the scheduler will apply its existing scheduling logic, i.e. they will be executed on a core that locally owns all the objects that are passed in as arguments, regardless of whether the nanotransaction mutates the target object or not. This is to maintain the serializability of individual nanotransaction histories.
Nanotransactions that just issue blind writes (i.e. write-only nanotransactions) to a set of input objects are treated the same as read-write nanotransactions.
As an example, assume objects 123 and 234, and three nanotransaction definitions,
ro_nando(&obj) (read-only nanotransaction), rw_nando(&obj_1, &mut obj_2) (read-write), and
wo_nando(&mut obj) (write-only). The below figure shows a snapshot of the job queues of a worker
with two executor threads, after a mix of the above nanotransactions have been scheduled.
All nanotransaction activations that intend to modify at least one of the two objects would get
scheduled on whatever executor thread owns the current object locally, while the read-only
nanotransactions on the same objects are free to be scheduled on any thread according to capacity.
Until the instances of ro_nando() start running in thread #2, we do not know which specific
version of the target objects they will read -- all the system guarantees is that they will read the
most recently committed version of the target object (see the object
resolution section for more). The two executor threads will proceed
independently with the execution of their respective queues without needing to synchronize at any point.
Nanotransaction Classification at scheduling time
Since the only information about the nanotransaction activation available to the nanotransaction
scheduler is the target nanotransaction's name and its arguments (in the form of IPtr instances in
the case of object parameters), we have no way of determining the class a particular nanotransaction
belongs to. To address this, the nandoize macro will have to be modified to generate that metadata
as part of nandoization.
This metadata will be made available to the scheduler either programmatically (by generating
something like the resolve_function() resolver per-processed library, e.g. get_operation_class(name: str)-> NandoClassEnum) or by encoding that metadata in an auxiliary, well-known, per-library binary file
that is loaded by the scheduler during library loading.
Versioned Object Resolution
The nandoize macro will have to be further modified to generate the correct resolution code for
versioned objects.
Currently our procedural macro emits
calls
to the resolve_object!()
macro
for all of an activation's object arguments. The resolver code is then executed by the executor
thread that the activation is scheduled on as part of the nanotransaction's "preamble".
A new macro, resolve_versioned_object!(), will resolve the target object from the worker's version
manager -- if a version is not specified explicitly, the result will be the most recently committed
version of the object; if a version is specified, the version manager will return a cached version
(if one still exists) or attempt to re-materialize the specified object version. Explicit versioning
will probably only be used in epics for now, so we can take this materialization outside of the
executor threads' hot path.
Epics and Multiversioning
This section provides a sketch for how epics will interact with the versioning subsystem; a detailed integration design is beyond the scope of this page.
Epics will run under at least two different isolation modes -- a weak one that immediately externalizes the effects of the individual nanotransactions that are running as part of the epic, and a stronger, SI-like mode that establishes a snapshot at the start of the epic entry point and externalizes the epic's effects at the end of the epic. As mentioned in the introduction, the first mode is relatively straightforward, so we will focus on snapshot-based epics.
There are two issues that need to be addressed:
- Maintaining per-object version constraints, so that as the epic unfolds and new objects are added to the epic's read set we refine a constraint set over the object versions that the epic may access, based on the object versions it has already accessed.
- Merging the effects of an epic (buffered while the epic is in progress in e.g. shadow objects) into the primary object copy (mmap-backed version that independent nanotransactions access).
The solution to the first issue will probably be independent of the per-instance version manager. We want versioning constraints to be persisted and shareable, so ideally we will store constraints on a per-object basis, in the object's metadata. These constraints will be modified any time a transaction executes over multiple objects, as part of the nanotransaction itself. The execution engine will then be responsible for reading the version constraints in the metadata of the target objects and supply versions that satisfy the established constraints when requesting an object version from the version manager.
To merge the effects of an epic at the time when the epic "commits", we will create a new system
nanotransaction, apply(). This transaction will be passed the epic's complete write set, together
with a set of transaction log entries (generated by the nanotransactions in the epic and buffered by
the system) that contain the epic's writes to the objects in the write set. apply() needs to
perform a version check over all the objects in the write set -- if the versions of the current
in-memory objects is different to the versions of the objects as included in the epic's read set,
then apply() exits and the epic is aborted/rolled back. If on the other hand all version checks
succeed, the epic will commit, and so apply() is free to fast-forward the target objects by
effectively running the redo logic from recovery, and then to pass the set of transaction log
entries to the worker's log manager to append to the log;
Set Difference
Inspired by the HotNets paper.
Setup
Parameterization
There are two parameters to be configured when generating a set difference workload: set size and operation chain length, which we describe below.
Set Size
The number of elements (unsigned integers) each generated set will contain. The reason why we want to parameterize over the size of the sets is that we want to see the "break-even" point between the legacy and nanotransaction-based variants, with the assumption being that traditional RPC-based approaches are not too bad when the size of the data being exchanged by value is small enough, but as the size of the operands in flight increases, so does the per-request latency.
Operation Chain Length
This represents the number of sequential operations that accept a previously computed result as an operand. For instance, in the workload
ACTION ; HOST; OPERAND 1; OPERAND 2; OUTPUT
get_difference; 1; 4; 106; 4106
get_difference; 1; 4; 146; 4146
get_difference; 1; 4; 103; 4103
the chain length is 1, since none of the calls to get_difference rely on the result of a previous
call to get_difference, while in the workload
ACTION ; HOST; OPERAND 1; OPERAND 2; OUTPUT
get_difference; 1; 4; 106; 4106
get_difference; 1; 4106; 146; 4146
get_difference; 1; 4146; 103; 4103
the chain length is 3, since we are passing the outputs of operations as inputs to subsequent calls
to get_difference.
The motivation behind introducing this parameter is to investigate if some access patterns are inherently better suited to the magpie approach.
TODO
TODO
Overview
One of the use cases used to evaluate the [Argus] programming language is that of a banking system.
A bank consists of a number of branches, each with its own branch id. Servers correspond to branches, and bank accounts are logically partitioned based on their branch id. The system supports a number of operations, but the most critical for the purposes of this discussion are the following two:
audit(branches: [BranchId]) -> int, which, given a set of branch IDs as input, should return the branchs' balance, andtransfer(from: Account, to: Account, amount: int) -> (), which transfers the specified amount between thefromandtoaccounts.
There is also a number of "frontends" that connect to the servers, which may either be ATMs submitting deposits and withdrawals, or terminals used by clerks to submit bank operations.
At the very least, the information stored per account needs to include:
- The account's number
- The branch ID this account belongs to
- The account's balance
The problem
Let's assume we have an instance of a bank with three branches, 1-3. Let's also imagine a setup where
hosts B and C contain the following sets of accounts:
Host B:
object id | branch_id | balance
o1 | 1 | 100
o2 | 1 | 30
o3 | 2 | 100
o4 | 3 | 100
Host C:
o5 | 2 | 200
o6 | 1 | 42
o7 | 2 | 100
o8 | 1 | 17
Now, due to some audit call, [ audit(o1, output) ], [ audit(o2, output) ] both need to be scheduled, and also
[ debit(o1, o2, 50) ] is under scheduling consideration. Both the audit calls need to be scheduled either before
or after the debit() call, otherwise we're computing an invalid result as we're capturing partial states at different
points in time.
Possible approaches
The above should illustrate the importance of introducing some kind of versioning/logical timestamps so that we can enforce a uniform serial ordering, especially in cases where we need some kind of consistency guarantee for nanotransactions that span multiple hosts or objects.
Open questions to be answered include:
- How should (multi)versioning actually be implemented in this context? Should there be a "global" transaction log (one transaction log per host for all objects resident on that host), or should we do finer-grained version tracking?
- Assuming we answer the above, how does a nanotransaction "declare" the logical version(s) it needs to operate on?
One possible way of solving the second problem is to introduce a special kind of object, tentatively called a "directory". This object is essentially a collection of references to the objects that the nanotransaction we are interested in executing is meant to operate over, but we can also attach additional information to it, like the target version(s) it (and the nanotransactions that operate on it, as well as the nanotransactions that this "parent" nanotransaction spawns) targets.