Example Implementation
The example implementations below are based on code available at https://github.com/Trustworthy-AI/test-environments
.
Broker
A broker object distributes simulations amongst multiple workers and manages the evaluation job.
Attributes
- stub (
stub
) - a local object that implements the same methods as the service - port (
string port
) - the unique port assigned to the broker object
Methods
- PushResult (
PushResult(self, request, context)
) - pushes a simulation result from the simulation worker to the search server
Definition
# Implementation of Broker service from trustworthy_search.proto
class Broker(rpc.BrokerServicer):
# Upon initialization of the broker, connect the search stub (the
# connection to the server) as an instance variable
# This broker is identified by its (unique) port
def __init__(self, stub, port):
self.searchstub = stub
self.port = port
np.random.seed(port)
# Push the result from the simulation worker to the search server
def PushResult(self, request, context):
request = self.searchstub.UploadSimResult(request)
return ts.Empty()
Example
The BrokerRunner
class encapsulates the details of starting a job and distributing simulation parameters to workers. The script runBrokerscript.py
(avalable at https://github.com/Trustworthy-AI/test-environments
) demonstrates its use:
from tsclient import BrokerRunner
from tsclient import ts
import argparse
param_dict_mcar = {'dist_types': [ts.Distribution.GAUSSIAN] * 2,
'job_mode': ts.JobStyle.Mode.MINIMIZE,
'dimension': 2,
}
param_dict_rocket = {'dist_types': [ts.Distribution.GAUSSIAN] * 100,
'job_mode': ts.JobStyle.Mode.MAXIMIZE,
'dimension': 100}
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--port', type=int, required=True, help='Pick a unique port not used by any other process (including other brokers). 5000 is usually a good choice.')
parser.add_argument('--workerportstart', type=int, default=6000, help='default = 6000')
parser.add_argument('--num_workers', type=int, default=10, help='default = 10')
parser.add_argument('--SSLcert', default='tsclient/beta_server.crt', help='default = beta_server.crt')
parser.add_argument('--serverURL', default='beta.trustworthy.ai', help='default = beta.trustworthy.ai')
parser.add_argument('--serverport', type=int, default=443, help='default = 443')
parser.add_argument('--threshold', type=float, default=90, help='Threshold level (gamma) for event search. Default = 2')
parser.add_argument('--num_evals', type=int, default=100,
help='Number of simulatons to run. Default = 100. Not used for RISK job_types (see stop_prob and stop_prob_confidence)')
parser.add_argument('--grid_density', type=int, nargs='+', default=[10, 10], help='Grid density for GRID job style. Default 10 10')
parser.add_argument('--job_type', type=str, choices=['MONTECARLO', 'GRID', 'STRESSTEST', 'RISK'], default='MONTECARLO',
help='Options are MONTECARLO (default), GRID, STRESSTEST, RISK')
parser.add_argument('--stop_prob', type=float, default=0.0001,
help='A RISK job will terminate onces it reaches events with \"stop_prob\" rate of occurrence or events past the \"threshold\", whichever comes first.\
This value and \"stop_prob_confidence\"" determine the number of simulations that a RISK job will run. This can be calculated by calling\
EstimateRiskBudget (see lines 120-125). Default = 0.0001')
parser.add_argument('--stop_prob_confidence', type=float, default=0.9,
help='The confidence with which to estimate events with likelihood \"stop_prob\". Confidence of x implies coefficient of variation of 1-x.\
Higher confidence requires more simulations. The exact number of simulations can be calculated by calling EstimateRiskBudget (see lines 120-125 for an example)\
Default = 0.9')
parser.add_argument('--batch', type=str, choices=['True', 'False'], default='True', help='Give each worker a batch of jobs or not. Default True')
parser.add_argument('--batch_size', type=int, default=1000, help='How many jobs to be sent to a worker at a time')
parser.add_argument("--q", action="store_true", help='Supress the printing of parameters and corresponding objectives. This will also supress the additional outputs of RISK jobs.')
parser.add_argument("--env", type=str, choices=['mountaincar', 'rocket'], default='rocket', help='Which env to run. Default rocket')
args = parser.parse_args()
type_dict = {'RISK': ts.JobStyle.Type.RISK,
'GRID': ts.JobStyle.Type.GRID,
'MONTECARLO': ts.JobStyle.Type.MONTECARLO,
'STRESSTEST': ts.JobStyle.Type.STRESSTEST}
# Here we are setting the environment specific parameters
if(args.env == 'mountaincar'):
ts_params = param_dict_mcar
else:
ts_params = param_dict_rocket
ts_params["threshold"] = args.threshold
ts_params["job_type"] = type_dict[args.job_type]
ts_params["num_evals"] = args.num_evals
ts_params["grid_density"] = args.grid_density
ts_params["stop_prob"] = args.stop_prob
ts_params["stop_prob_confidence"] = args.stop_prob_confidence
worker_params = {'WORKERPORTSTART': args.workerportstart, 'num_workers': args.num_workers}
broker_params = {'BROKER_PORT': args.port, 'batch_size': args.batch_size,
'quiet': args.q, 'batch': args.batch}
server_params = {'SSLcertfile': args.SSLcert, 'serverURL': args.serverURL, 'serverport': args.serverport}
runner = BrokerRunner(broker_params, worker_params, server_params, ts_params, args.env)
runner.run()
Worker
When a simulation worker recieves a set of parameters it evaluates the system-under-test in closed loop with a model of the environment.
The details of each simulation remain opaque to the search mechanism, only the evaluation of the safety metric is returned.
Attributes
- brokerchannels (
brokerchannels
) - a dict of connections to the broker on a specified host and port - brokerstubs (
brokerstubs
) - a dict of local object that implements the same methods as the service
Methods
- Simulate (
Simulate(self, request, context)
) - run a simulation, this method may be extended to incorporate custom applications see the Examples section for more details - BatchSimulate (
BatchSimulate(self, request, context)
) - run a batch of simulations with one worker - RegisterBroker (
RegisterBroker(self, request, context)
) - registers a new broker - DeregisterBroker (
DeregisterBroker(self, request, context)
) - deregister the broker when the job has been completed
Definition
class Worker(rpc.SimulatorServicer):
# The worker keeps track of the brokers that are communicating with it.
def __init__(self, seed, simulate, batchSimulate):
self.brokerchannels = {}
self.brokerstubs = {}
self.simulate = simulate
self.batchSimulate = batchSimulate
np.random.seed(seed)
# Run a 'simulation' with one set of parameters.
def Simulate(self, request, context):
# random params from the server
simparams = request.simparams
objective = self.simulate(simparams.params)
result = ts.SimResult(jobid=simparams.jobid,
simid=simparams.simid,
objective=objective[0])
result = self.brokerstubs[request.port].PushResult(result)
return ts.Empty()
# Run a batch 'simulation' which just takes in multiple sets of parameters and
# runs the simulation on them.
def BatchSimulate(self, request, context):
objectives = self.batchSimulate(request.param_list)
for i, param in enumerate(request.param_list):
result = ts.SimResult(jobid=param.simparams.jobid,
simid=param.simparams.simid,
objective=objectives[i])
result = self.brokerstubs[param.port].PushResult(result)
return ts.Empty()
# Register the new broker
# Note: this assumes that workers are on a single machine.
# Change 'localhost' to appropriate broker IP address if
# workers are spread across multiple machines.
def RegisterBroker(self, request, context):
channel = grpc.insecure_channel('localhost:' + str(request.port))
self.brokerchannels[request.port] = channel
self.brokerstubs[request.port] = rpc.BrokerStub(channel)
return ts.Empty()
# The broker has finished its job so deregister it.
def DeregisterBroker(self, request, context):
self.brokerchannels[request.port].close()
self.brokerchannels.pop(request.port)
self.brokerstubs.pop(request.port)
return ts.Empty()
Example
See runWorkerScript.py
(avalable at https://github.com/Trustworthy-AI/test-environments
) and the following section, Examples, for more details.