Derecho: Group communication at the speed of light

Version 0.9.1

Derecho is a new distributed system out of Cornell University

Function tier functionality

Let’s now turn our eyes towards the function tier implementation itself. This is found in the src/derecho_component/function_tier.cpp file.

The purpose of the function_tier is to receive a request via gRPC, identify the shard of the categorizer tier which should handle the request, and then forward the request along, forwarding the reply back over gRPC. As we saw in the header file, the function tier can handle three requests: Whatsthis, InstallModel, and RemoveModel. As the logic is very similar across all three features, we’ll focus just on the Whatsthis component. Here’s the relevant code (comments and debugging elided):

Status FunctionTier::Whatsthis(ServerContext* context,
                               grpc::ServerReader<PhotoRequest>* reader,
                               PhotoReply* reply) {
    ParsedWhatsThisArguments parsed_args;
    try {
        parsed_args = parse_grpc_whatsthis_args(context, reader, reply);
    } catch(const RequestCancel&) {
        return Status::CANCELLED;
    } catch(const StatusOK&) {
        return Status::OK;
    }
    derecho::ExternalCaller<CategorizerTier>& categorizer_tier_handler
            = group->get_nonmember_subgroup<CategorizerTier>();
    auto shards = group->get_subgroup_members<CategorizerTier>();
    
    using response_pair = std::pair<derecho::rpc::QueryResults<Guess>, node_id_t>;
    std::vector<response_pair> responses;
    if(parsed_args.tags.size() > 1) {
        reply->set_desc("Multiple tags support to be implemented.");
        return Status::OK;
    } else {
        std::size_t tag_index = 0;
        node_id_t target = shards[parsed_args.tags[tag_index] % shards.size()][0];
        responses.emplace_back(
                categorizer_tier_handler.p2p_send<RPC_NAME(inference)>(
                        target, Photo{parsed_args.tags[tag_index],
                                      parsed_args.photo_data,
                                      parsed_args.photo_size}),
                target);
    }

    std::vector<Guess> guesses;
    for(auto& result : responses) {
        guesses.emplace_back(result.first.get().get(result.second));
    }

    std::string reply_string = guesses.at(0).guess;
    for(uint i = 1; i < guesses.size(); ++i) {
        reply_string += " or " + guesses.at(i).guess;
    }

    reply->set_desc(reply_string);

    return Status::OK;
}

As before, we’ll go over this code section by section.

gRPC parsing and errors

This first bit of code handles gRPC parsing:

    ParsedWhatsThisArguments parsed_args;
    try {
        parsed_args = parse_grpc_whatsthis_args(context, reader, reply);
    } catch(const RequestCancel&) {
        return Status::CANCELLED;
    } catch(const StatusOK&) {
        return Status::OK;
    }

The parse_grpc_whatsthis_args function hides most of the grpc-related complexity, and can be found in the file src/grpc-component/function_tier-grpc.cpp. Its job is to convert the gRPC arguments into a simple struct containing all the parsed arguments. If it encounters an error (or an invalid request) it will throw an exception, which we catch here, forwarding the intended behavior back to gRPC.

Accessing Derecho internals

The next code block gets us our handles for accessing Derecho itself, and sets up objects to contain the responses to our Derecho RPC requests:

    derecho::ExternalCaller<CategorizerTier>& categorizer_tier_handler
            = group->get_nonmember_subgroup<CategorizerTier>();
    auto shards = group->get_subgroup_members<CategorizerTier>();

    using response_pair = std::pair<derecho::rpc::QueryResults<Guess>, node_id_t>;
    std::vector<response_pair> responses;

The first line gets us our categorizer_tier_handler, which we can use to invoke p2p_send requests to the specified subgroup. Here, we’ve chosen to explicitly request a handler for the CategorizerTier, the ultimate destination for this FunctionTier’s Whatsthis message. We next retrieve a list of the shards available within the CategorizerTier, which in turn contains a list of individual node_ids listing the exact members of each of CategorizerTier’s shards. Both of these calls provide objects usable for the group as it is right now; as Derecho is an elastic system, it’s entirely possible that the set of nodes in each shard, the set of shards, or even the set of active subgroups will change over time. For this reason, we recommend explicitly fetching these handlers every time you’ll need them, rather than caching them within your subgroup object. We’ve made the lookup functions very fast in anticipation of this pattern!

The next two lines define a vector which will contain the responses to all queries issued as part of this Whatsthis invocation. Objects of type QueryResults<T> are returned by ordered_send and p2p_send, representing the return result of the remotely-invoked method. The ordered_send and p2p_send calls do not block; instead, the QueryResults<T> object contains futures which can be waited on.

Calling the categorizer tier

We’ll next jump to the inside of the else branch of that if statement and examine the code which actually forwards our request to the categorizer tier.

        std::size_t tag_index = 0;
        node_id_t target = shards[parsed_args.tags[tag_index] % shards.size()][0];
        responses.emplace_back(
                categorizer_tier_handler.p2p_send<RPC_NAME(inference)>(
                        target, Photo{parsed_args.tags[tag_index],
                                      parsed_args.photo_data,
                                      parsed_args.photo_size}),
                target);

The first two lines of this block inspect the shards map, selecting the appropriate shard for our received tag (corresponding to a tagged model installed via InstallModel). We aren’t doing anything intelligent to load-balance this logic; instead, we’ll just always take the 0th node in the shard. We’re also not supporting searching multiple different tagged models; while the code was written with this feature in mind, “for now” we’ll just assume we’ve only been passed one model tag.

The remaining lines consist of the actual invocation to the categorizer tier. we’re using the categorizer_tier_handler to invoke a p2p_send, calling the method inference on node target with a single Photo argument, constructed out of parsed_args.tags[tag_index], parsed_args.photo_data, and parsed_args.photo_size. These constructor arguments are simply the arguments we ourselves received via gRPC. Finally, we’ll put the returned result of this p2p_send, paired with the node_id target, into the responses vector we defined above.

Processing the results

The last two steps we’ll take are inspecting the responses, and using them to construct a single response for the client. First, the code for waiting on and unpacking the responses (again with debugging and comments elided):

    std::vector<Guess> guesses;
    for(auto& result : responses) {
        guesses.emplace_back(result.first.get().get(result.second));
    }

This for-loop loops over the response to each query we’ve executed (…just one query), and explicitly waits until the result is ready, placing it into the guesses vector. Unpacking further, result.first is the QueryResults we received, while results.second is the node_id of the node which sent us that result. results.first.get() waits for Derecho to have finished sending the query, while ...get(results.second) asks for any component of the reply sent to us from our target node. For p2p_send, this argument is always trivial, as we only invoked a request on a single node. For ordered_send, this argument is more relevant: an ordered_send is sent to an entire shard, and so it makes sense to inspect the response from each member of that shard independently.

Constructing the client reply

Finally we’ll construct our response to the client:

    std::string reply_string = guesses.at(0).guess;
    for(uint i = 1; i < guesses.size(); ++i) {
        reply_string += " or " + guesses.at(i).guess;
    }

    reply->set_desc(reply_string);

    return Status::OK;

We’ve declared that our response to the client must be a string, so this code will loop through all the returned guesses and concatenate them into a single big string.

…And that’s it! Next let’s take a look at the categorizer tier.

Last updated on 22 Oct 2019
Published on 22 Oct 2019
Edit on GitHub