Skip to content

Example Implementation

The example implementations below are based on code available at https://github.com/Trustworthy-AI/test-environments.

Broker

A broker object distributes simulations amongst multiple workers and manages the evaluation job.

Simulation parameters are received by a broker and distributed to worker nodes for evaluation

Attributes

  • stub (stub) - a local object that implements the same methods as the service
  • port (string port) - the unique port assigned to the broker object

Methods

  • PushResult (PushResult(self, request, context)) - pushes a simulation result from the simulation worker to the search server

Definition

# Implementation of Broker service from trustworthy_search.proto
class Broker(rpc.BrokerServicer):
    # Upon initialization of the broker, connect the search stub (the
    # connection to the server) as an instance variable
    # This broker is identified by its (unique) port
    def __init__(self, stub, port):
        self.searchstub = stub
        self.port = port
        np.random.seed(port)

    # Push the result from the simulation worker to the search server
    def PushResult(self, request, context):
        request = self.searchstub.UploadSimResult(request)
        return ts.Empty()

Example

The BrokerRunner class encapsulates the details of starting a job and distributing simulation parameters to workers. The script runBrokerscript.py (avalable at https://github.com/Trustworthy-AI/test-environments) demonstrates its use:

from tsclient import BrokerRunner
from tsclient import ts
import argparse

param_dict_mcar = {'dist_types': [ts.Distribution.GAUSSIAN] * 2,
                   'job_mode': ts.JobStyle.Mode.MINIMIZE,
                   'dimension': 2,
                   }

param_dict_rocket = {'dist_types': [ts.Distribution.GAUSSIAN] * 100,
                     'job_mode': ts.JobStyle.Mode.MAXIMIZE,
                     'dimension': 100}


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--port', type=int, required=True, help='Pick a unique port not used by any other process (including other brokers). 5000 is usually a good choice.')
    parser.add_argument('--workerportstart', type=int, default=6000, help='default = 6000')
    parser.add_argument('--num_workers', type=int, default=10, help='default = 10')
    parser.add_argument('--SSLcert', default='tsclient/beta_server.crt', help='default = beta_server.crt')
    parser.add_argument('--serverURL', default='beta.trustworthy.ai', help='default = beta.trustworthy.ai')
    parser.add_argument('--serverport', type=int, default=443, help='default = 443')
    parser.add_argument('--threshold', type=float, default=90, help='Threshold level (gamma) for event search. Default = 2')
    parser.add_argument('--num_evals', type=int, default=100,
                        help='Number of simulatons to run. Default = 100. Not used for RISK job_types (see stop_prob and stop_prob_confidence)')
    parser.add_argument('--grid_density', type=int, nargs='+', default=[10, 10], help='Grid density for GRID job style. Default 10 10')
    parser.add_argument('--job_type', type=str, choices=['MONTECARLO', 'GRID', 'STRESSTEST', 'RISK'], default='MONTECARLO',
                        help='Options are MONTECARLO (default), GRID, STRESSTEST, RISK')
    parser.add_argument('--stop_prob', type=float, default=0.0001,
                        help='A RISK job will terminate onces it reaches events with \"stop_prob\" rate of occurrence or events past the \"threshold\", whichever comes first.\
                             This value and \"stop_prob_confidence\"" determine the number of simulations that a RISK job will run. This can be calculated by calling\
                             EstimateRiskBudget (see lines 120-125). Default = 0.0001')
    parser.add_argument('--stop_prob_confidence', type=float, default=0.9,
                        help='The confidence with which to estimate events with likelihood \"stop_prob\". Confidence of x implies coefficient of variation of 1-x.\
                             Higher confidence requires more simulations. The exact number of simulations can be calculated by calling EstimateRiskBudget (see lines 120-125 for an example)\
                             Default = 0.9')
    parser.add_argument('--batch', type=str, choices=['True', 'False'], default='True', help='Give each worker a batch of jobs or not. Default True')
    parser.add_argument('--batch_size', type=int, default=1000, help='How many jobs to be sent to a worker at a time')
    parser.add_argument("--q", action="store_true", help='Supress the printing of parameters and corresponding objectives. This will also supress the additional outputs of RISK jobs.')
    parser.add_argument("--env", type=str, choices=['mountaincar', 'rocket'], default='rocket', help='Which env to run. Default rocket')
    args = parser.parse_args()
    type_dict = {'RISK': ts.JobStyle.Type.RISK,
                 'GRID': ts.JobStyle.Type.GRID,
                 'MONTECARLO': ts.JobStyle.Type.MONTECARLO,
                 'STRESSTEST': ts.JobStyle.Type.STRESSTEST}

    # Here we are setting the environment specific parameters
    if(args.env == 'mountaincar'):
        ts_params = param_dict_mcar
    else:
        ts_params = param_dict_rocket
    ts_params["threshold"] = args.threshold
    ts_params["job_type"] = type_dict[args.job_type]
    ts_params["num_evals"] = args.num_evals
    ts_params["grid_density"] = args.grid_density
    ts_params["stop_prob"] = args.stop_prob
    ts_params["stop_prob_confidence"] = args.stop_prob_confidence
    worker_params = {'WORKERPORTSTART': args.workerportstart, 'num_workers': args.num_workers}
    broker_params = {'BROKER_PORT': args.port, 'batch_size': args.batch_size,
                     'quiet': args.q, 'batch': args.batch}
    server_params = {'SSLcertfile': args.SSLcert, 'serverURL': args.serverURL, 'serverport': args.serverport}
    runner = BrokerRunner(broker_params, worker_params, server_params, ts_params, args.env)
    runner.run()
The following section, Examples, details its use.

Worker

When a simulation worker recieves a set of parameters it evaluates the system-under-test in closed loop with a model of the environment.

Workers simulate your system

The details of each simulation remain opaque to the search mechanism, only the evaluation of the safety metric is returned.

The search server only sees the evaluation of the safety metric

Attributes

  • brokerchannels (brokerchannels) - a dict of connections to the broker on a specified host and port
  • brokerstubs (brokerstubs) - a dict of local object that implements the same methods as the service

Methods

  • Simulate (Simulate(self, request, context)) - run a simulation, this method may be extended to incorporate custom applications see the Examples section for more details
  • BatchSimulate (BatchSimulate(self, request, context)) - run a batch of simulations with one worker
  • RegisterBroker (RegisterBroker(self, request, context)) - registers a new broker
  • DeregisterBroker (DeregisterBroker(self, request, context)) - deregister the broker when the job has been completed

Definition

class Worker(rpc.SimulatorServicer):
    # The worker keeps track of the brokers that are communicating with it.
    def __init__(self, seed, simulate, batchSimulate):
        self.brokerchannels = {}
        self.brokerstubs = {}
        self.simulate = simulate
        self.batchSimulate = batchSimulate
        np.random.seed(seed)

    # Run a 'simulation' with one set of parameters.
    def Simulate(self, request, context):
        # random params from the server
        simparams = request.simparams
        objective = self.simulate(simparams.params)
        result = ts.SimResult(jobid=simparams.jobid,
                              simid=simparams.simid,
                              objective=objective[0])
        result = self.brokerstubs[request.port].PushResult(result)
        return ts.Empty()

    # Run a batch 'simulation' which just takes in multiple sets of parameters and
    # runs the simulation on them.
    def BatchSimulate(self, request, context):
        objectives = self.batchSimulate(request.param_list)

        for i, param in enumerate(request.param_list):
            result = ts.SimResult(jobid=param.simparams.jobid,
                                  simid=param.simparams.simid,
                                  objective=objectives[i])
            result = self.brokerstubs[param.port].PushResult(result)
        return ts.Empty()

    # Register the new broker
    # Note: this assumes that workers are on a single machine.
    # Change 'localhost' to appropriate broker IP address if
    # workers are spread across multiple machines.
    def RegisterBroker(self, request, context):
        channel = grpc.insecure_channel('localhost:' + str(request.port))
        self.brokerchannels[request.port] = channel
        self.brokerstubs[request.port] = rpc.BrokerStub(channel)
        return ts.Empty()

    # The broker has finished its job so deregister it.
    def DeregisterBroker(self, request, context):
        self.brokerchannels[request.port].close()
        self.brokerchannels.pop(request.port)
        self.brokerstubs.pop(request.port)
        return ts.Empty()

Example

See runWorkerScript.py (avalable at https://github.com/Trustworthy-AI/test-environments) and the following section, Examples, for more details.