Function tier functionality
Let’s now turn our eyes towards the function tier implementation itself. This is found in the src/derecho_component/function_tier.cpp
file.
The purpose of the function_tier is to receive a request via gRPC,
identify the shard of the categorizer tier which should handle the
request, and then forward the request along, forwarding the reply back
over gRPC. As we saw in the header file, the function tier can handle
three requests: Whatsthis
, InstallModel
, and RemoveModel
. As
the logic is very similar across all three features, we’ll focus just
on the Whatsthis
component. Here’s the relevant code (comments and
debugging elided):
Status FunctionTier::Whatsthis(ServerContext* context,
grpc::ServerReader<PhotoRequest>* reader,
PhotoReply* reply) {
ParsedWhatsThisArguments parsed_args;
try {
parsed_args = parse_grpc_whatsthis_args(context, reader, reply);
} catch(const RequestCancel&) {
return Status::CANCELLED;
} catch(const StatusOK&) {
return Status::OK;
}
derecho::ExternalCaller<CategorizerTier>& categorizer_tier_handler
= group->get_nonmember_subgroup<CategorizerTier>();
auto shards = group->get_subgroup_members<CategorizerTier>();
using response_pair = std::pair<derecho::rpc::QueryResults<Guess>, node_id_t>;
std::vector<response_pair> responses;
if(parsed_args.tags.size() > 1) {
reply->set_desc("Multiple tags support to be implemented.");
return Status::OK;
} else {
std::size_t tag_index = 0;
node_id_t target = shards[parsed_args.tags[tag_index] % shards.size()][0];
responses.emplace_back(
categorizer_tier_handler.p2p_send<RPC_NAME(inference)>(
target, Photo{parsed_args.tags[tag_index],
parsed_args.photo_data,
parsed_args.photo_size}),
target);
}
std::vector<Guess> guesses;
for(auto& result : responses) {
guesses.emplace_back(result.first.get().get(result.second));
}
std::string reply_string = guesses.at(0).guess;
for(uint i = 1; i < guesses.size(); ++i) {
reply_string += " or " + guesses.at(i).guess;
}
reply->set_desc(reply_string);
return Status::OK;
}
As before, we’ll go over this code section by section.
gRPC parsing and errors
This first bit of code handles gRPC parsing:
ParsedWhatsThisArguments parsed_args;
try {
parsed_args = parse_grpc_whatsthis_args(context, reader, reply);
} catch(const RequestCancel&) {
return Status::CANCELLED;
} catch(const StatusOK&) {
return Status::OK;
}
The parse_grpc_whatsthis_args
function hides most of the
grpc-related complexity, and can be found in the file
src/grpc-component/function_tier-grpc.cpp
. Its job is to convert
the gRPC arguments into a simple struct containing all the parsed
arguments. If it encounters an error (or an invalid request) it will
throw an exception, which we catch here, forwarding the intended
behavior back to gRPC.
Accessing Derecho internals
The next code block gets us our handles for accessing Derecho itself, and sets up objects to contain the responses to our Derecho RPC requests:
derecho::ExternalCaller<CategorizerTier>& categorizer_tier_handler
= group->get_nonmember_subgroup<CategorizerTier>();
auto shards = group->get_subgroup_members<CategorizerTier>();
using response_pair = std::pair<derecho::rpc::QueryResults<Guess>, node_id_t>;
std::vector<response_pair> responses;
The first line gets us our categorizer_tier_handler
, which we can
use to invoke p2p_send
requests to the specified subgroup. Here,
we’ve chosen to explicitly request a handler for the
CategorizerTier
, the ultimate destination for this FunctionTier
’s
Whatsthis
message. We next retrieve a list of the shards available
within the CategorizerTier
, which in turn contains a list of
individual node_id
s listing the exact members of each of
CategorizerTier
’s shards. Both of these calls provide objects
usable for the group as it is right now; as Derecho is an elastic
system, it’s entirely possible that the set of nodes in each shard,
the set of shards, or even the set of active subgroups will change
over time. For this reason, we recommend explicitly fetching these
handlers every time you’ll need them, rather than caching them within
your subgroup object. We’ve made the lookup functions very fast in
anticipation of this pattern!
The next two lines define a vector which will contain the responses to
all queries issued as part of this Whatsthis
invocation. Objects of
type QueryResults<T>
are returned by ordered_send
and p2p_send
,
representing the return result of the remotely-invoked method. The
ordered_send
and p2p_send
calls do not block; instead, the
QueryResults<T>
object contains futures which can be waited on.
Calling the categorizer tier
We’ll next jump to the inside of the else
branch of that if
statement and examine the code which actually forwards our request to
the categorizer tier.
std::size_t tag_index = 0;
node_id_t target = shards[parsed_args.tags[tag_index] % shards.size()][0];
responses.emplace_back(
categorizer_tier_handler.p2p_send<RPC_NAME(inference)>(
target, Photo{parsed_args.tags[tag_index],
parsed_args.photo_data,
parsed_args.photo_size}),
target);
The first two lines of this block inspect the shards
map, selecting
the appropriate shard
for our received tag
(corresponding to a
tagged model installed via InstallModel
). We aren’t doing anything
intelligent to load-balance this logic; instead, we’ll just always
take the 0th node in the shard. We’re also not supporting searching
multiple different tagged models; while the code was written with this
feature in mind, “for now” we’ll just assume we’ve only been passed
one model tag.
The remaining lines consist of the actual invocation to the
categorizer tier. we’re using the categorizer_tier_handler
to
invoke a p2p_send
, calling the method inference
on node target
with a single Photo
argument, constructed out of
parsed_args.tags[tag_index]
, parsed_args.photo_data
, and
parsed_args.photo_size
. These constructor arguments are simply the
arguments we ourselves received via gRPC. Finally, we’ll put the
returned result of this p2p_send
, paired with the node_id target
,
into the responses
vector we defined above.
Processing the results
The last two steps we’ll take are inspecting the responses, and using them to construct a single response for the client. First, the code for waiting on and unpacking the responses (again with debugging and comments elided):
std::vector<Guess> guesses;
for(auto& result : responses) {
guesses.emplace_back(result.first.get().get(result.second));
}
This for-loop loops over the response to each query we’ve executed
(…just one query), and explicitly waits until the result is ready,
placing it into the guesses
vector. Unpacking further,
result.first
is the QueryResults
we received, while
results.second
is the node_id of the node which sent us that result.
results.first.get()
waits for Derecho to have finished sending the
query, while ...get(results.second)
asks for any component of the
reply sent to us from our target node. For p2p_send
, this argument
is always trivial, as we only invoked a request on a single node. For
ordered_send
, this argument is more relevant: an ordered_send
is
sent to an entire shard, and so it makes sense to inspect the response
from each member of that shard independently.
Constructing the client reply
Finally we’ll construct our response to the client:
std::string reply_string = guesses.at(0).guess;
for(uint i = 1; i < guesses.size(); ++i) {
reply_string += " or " + guesses.at(i).guess;
}
reply->set_desc(reply_string);
return Status::OK;
We’ve declared that our response to the client must be a string, so this code will loop through all the returned guesses and concatenate them into a single big string.
…And that’s it! Next let’s take a look at the categorizer tier.