ray使用 发表于 2018-06-Sat | 阅读次数: 使用绝对路径12345678910111213141516tune.register_trainable("train_func", train_func)tune.run_experiments({ "my_experiment": { "run": "train_func", "stop": {"mean_accuracy": 99}, "local_dir": "/home/ray_results/tmp2", "trial_resources": {'cpu': 1, 'gpu': 1}, # "num_gpus": 1, "config": { "batch_size": tune.grid_search([10, 20, 30]), # "resources": {"cpu": 1, "gpu": 1} # "momentum": tune.grid_search([0.1, 0.2]), } }})Example123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909from __future__ import absolute_importfrom __future__ import divisionfrom __future__ import print_functionimport osimport timeimport unittestimport rayfrom ray.rllib import _register_allfrom ray.tune import Trainable, TuneErrorfrom ray.tune import register_env, register_trainable, run_experimentsfrom ray.tune.registry import _default_registry, TRAINABLE_CLASSfrom ray.tune.result import DEFAULT_RESULTS_DIR, TrainingResultfrom ray.tune.util import pin_in_object_store, get_pinned_objectfrom ray.tune.experiment import Experimentfrom ray.tune.trial import Trial, Resourcesfrom ray.tune.trial_runner import TrialRunnerfrom ray.tune.variant_generator import generate_trials, grid_search, \ RecursiveDependencyErrorclass TrainableFunctionApiTest(unittest.TestCase): def setUp(self): ray.init(num_cpus=4, num_gpus=0) def tearDown(self): ray.worker.cleanup() _register_all() # re-register the evicted objects def testPinObject(self): X = pin_in_object_store("hello") @ray.remote def f(): return get_pinned_object(X) self.assertEqual(ray.get(f.remote()), "hello") def testFetchPinned(self): X = pin_in_object_store("hello") def train(config, reporter): get_pinned_object(X) reporter(timesteps_total=100, done=True) register_trainable("f1", train) [trial] = run_experiments({ "foo": { "run": "f1", "config": { "script_min_iter_time_s": 0, }, } }) self.assertEqual(trial.status, Trial.TERMINATED) self.assertEqual(trial.last_result.timesteps_total, 100) def testRegisterEnv(self): register_env("foo", lambda: None) self.assertRaises(TypeError, lambda: register_env("foo", 2)) def testRegisterTrainable(self): def train(config, reporter): pass class A(object): pass class B(Trainable): pass register_trainable("foo", train) register_trainable("foo", B) self.assertRaises(TypeError, lambda: register_trainable("foo", B())) self.assertRaises(TypeError, lambda: register_trainable("foo", A)) def testBuiltInTrainableResources(self): class B(Trainable): @classmethod def default_resource_request(cls, config): return Resources(cpu=config["cpu"], gpu=config["gpu"]) def _train(self): return TrainingResult(timesteps_this_iter=1, done=True) register_trainable("B", B) def f(cpus, gpus, queue_trials): return run_experiments( { "foo": { "run": "B", "config": { "cpu": cpus, "gpu": gpus, }, } }, queue_trials=queue_trials)[0] # Should all succeed self.assertEqual(f(0, 0, False).status, Trial.TERMINATED) self.assertEqual(f(1, 0, True).status, Trial.TERMINATED) self.assertEqual(f(1, 0, True).status, Trial.TERMINATED) # Infeasible even with queueing enabled (no gpus) self.assertRaises(TuneError, lambda: f(1, 1, True)) # Too large resource request self.assertRaises(TuneError, lambda: f(100, 100, False)) self.assertRaises(TuneError, lambda: f(0, 100, False)) self.assertRaises(TuneError, lambda: f(100, 0, False)) # TODO(ekl) how can we test this is queued (hangs)? # f(100, 0, True) def testRewriteEnv(self): def train(config, reporter): reporter(timesteps_total=1) register_trainable("f1", train) [trial] = run_experiments({ "foo": { "run": "f1", "env": "CartPole-v0", } }) self.assertEqual(trial.config["env"], "CartPole-v0") def testConfigPurity(self): def train(config, reporter): assert config == {"a": "b"}, config reporter(timesteps_total=1) register_trainable("f1", train) run_experiments({ "foo": { "run": "f1", "config": { "a": "b" }, } }) def testLogdir(self): def train(config, reporter): assert "/tmp/logdir/foo" in os.getcwd(), os.getcwd() reporter(timesteps_total=1) register_trainable("f1", train) run_experiments({ "foo": { "run": "f1", "local_dir": "/tmp/logdir", "config": { "a": "b" }, } }) def testLogdirStartingWithTilde(self): local_dir = '~/ray_results/local_dir' def train(config, reporter): cwd = os.getcwd() assert cwd.startswith(os.path.expanduser(local_dir)), cwd assert not cwd.startswith('~'), cwd reporter(timesteps_total=1) register_trainable('f1', train) run_experiments({ 'foo': { 'run': 'f1', 'local_dir': local_dir, 'config': { 'a': 'b' }, } }) def testLongFilename(self): def train(config, reporter): assert "/tmp/logdir/foo" in os.getcwd(), os.getcwd() reporter(timesteps_total=1) register_trainable("f1", train) run_experiments({ "foo": { "run": "f1", "local_dir": "/tmp/logdir", "config": { "a" * 50: lambda spec: 5.0 / 7, "b" * 50: lambda spec: "long" * 40 }, } }) def testBadParams(self): def f(): run_experiments({"foo": {}}) self.assertRaises(TuneError, f) def testBadParams2(self): def f(): run_experiments({ "foo": { "run": "asdf", "bah": "this param is not allowed", } }) self.assertRaises(TuneError, f) def testBadParams3(self): def f(): run_experiments({ "foo": { "run": grid_search("invalid grid search"), } }) self.assertRaises(TuneError, f) def testBadParams4(self): def f(): run_experiments({ "foo": { "run": "asdf", } }) self.assertRaises(TuneError, f) def testBadParams5(self): def f(): run_experiments({"foo": {"run": "PPO", "stop": {"asdf": 1}}}) self.assertRaises(TuneError, f) def testBadParams6(self): def f(): run_experiments({ "foo": { "run": "PPO", "trial_resources": { "asdf": 1 } } }) self.assertRaises(TuneError, f) def testBadReturn(self): def train(config, reporter): reporter() register_trainable("f1", train) def f(): run_experiments({ "foo": { "run": "f1", "config": { "script_min_iter_time_s": 0, }, } }) self.assertRaises(TuneError, f) def testEarlyReturn(self): def train(config, reporter): reporter(timesteps_total=100, done=True) time.sleep(99999) register_trainable("f1", train) [trial] = run_experiments({ "foo": { "run": "f1", "config": { "script_min_iter_time_s": 0, }, } }) self.assertEqual(trial.status, Trial.TERMINATED) self.assertEqual(trial.last_result.timesteps_total, 100) def testAbruptReturn(self): def train(config, reporter): reporter(timesteps_total=100) register_trainable("f1", train) [trial] = run_experiments({ "foo": { "run": "f1", "config": { "script_min_iter_time_s": 0, }, } }) self.assertEqual(trial.status, Trial.TERMINATED) self.assertEqual(trial.last_result.timesteps_total, 100) def testErrorReturn(self): def train(config, reporter): raise Exception("uh oh") register_trainable("f1", train) def f(): run_experiments({ "foo": { "run": "f1", "config": { "script_min_iter_time_s": 0, }, } }) self.assertRaises(TuneError, f) def testSuccess(self): def train(config, reporter): for i in range(100): reporter(timesteps_total=i) register_trainable("f1", train) [trial] = run_experiments({ "foo": { "run": "f1", "config": { "script_min_iter_time_s": 0, }, } }) self.assertEqual(trial.status, Trial.TERMINATED) self.assertEqual(trial.last_result.timesteps_total, 99)class RunExperimentTest(unittest.TestCase): def setUp(self): ray.init() def tearDown(self): ray.worker.cleanup() _register_all() # re-register the evicted objects def testDict(self): def train(config, reporter): for i in range(100): reporter(timesteps_total=i) register_trainable("f1", train) trials = run_experiments({ "foo": { "run": "f1", "config": { "script_min_iter_time_s": 0 } }, "bar": { "run": "f1", "config": { "script_min_iter_time_s": 0 } } }) for trial in trials: self.assertEqual(trial.status, Trial.TERMINATED) self.assertEqual(trial.last_result.timesteps_total, 99) def testExperiment(self): def train(config, reporter): for i in range(100): reporter(timesteps_total=i) register_trainable("f1", train) exp1 = Experiment(**{ "name": "foo", "run": "f1", "config": { "script_min_iter_time_s": 0 } }) [trial] = run_experiments(exp1) self.assertEqual(trial.status, Trial.TERMINATED) self.assertEqual(trial.last_result.timesteps_total, 99) def testExperimentList(self): def train(config, reporter): for i in range(100): reporter(timesteps_total=i) register_trainable("f1", train) exp1 = Experiment(**{ "name": "foo", "run": "f1", "config": { "script_min_iter_time_s": 0 } }) exp2 = Experiment(**{ "name": "bar", "run": "f1", "config": { "script_min_iter_time_s": 0 } }) trials = run_experiments([exp1, exp2]) for trial in trials: self.assertEqual(trial.status, Trial.TERMINATED) self.assertEqual(trial.last_result.timesteps_total, 99)class VariantGeneratorTest(unittest.TestCase): def setUp(self): ray.init() def tearDown(self): ray.worker.cleanup() _register_all() # re-register the evicted objects def testParseToTrials(self): trials = generate_trials({ "run": "PPO", "repeat": 2, "max_failures": 5, "config": { "env": "Pong-v0", "foo": "bar" }, }, "tune-pong") trials = list(trials) self.assertEqual(len(trials), 2) self.assertEqual(str(trials[0]), "PPO_Pong-v0_0") self.assertEqual(trials[0].config, {"foo": "bar", "env": "Pong-v0"}) self.assertEqual(trials[0].trainable_name, "PPO") self.assertEqual(trials[0].experiment_tag, "0") self.assertEqual(trials[0].max_failures, 5) self.assertEqual(trials[0].local_dir, os.path.join(DEFAULT_RESULTS_DIR, "tune-pong")) self.assertEqual(trials[1].experiment_tag, "1") def testEval(self): trials = generate_trials({ "run": "PPO", "config": { "foo": { "eval": "2 + 2" }, }, }) trials = list(trials) self.assertEqual(len(trials), 1) self.assertEqual(trials[0].config, {"foo": 4}) self.assertEqual(trials[0].experiment_tag, "0_foo=4") def testGridSearch(self): trials = generate_trials({ "run": "PPO", "config": { "bar": { "grid_search": [True, False] }, "foo": { "grid_search": [1, 2, 3] }, }, }) trials = list(trials) self.assertEqual(len(trials), 6) self.assertEqual(trials[0].config, {"bar": True, "foo": 1}) self.assertEqual(trials[0].experiment_tag, "0_bar=True,foo=1") self.assertEqual(trials[1].config, {"bar": False, "foo": 1}) self.assertEqual(trials[1].experiment_tag, "1_bar=False,foo=1") self.assertEqual(trials[2].config, {"bar": True, "foo": 2}) self.assertEqual(trials[3].config, {"bar": False, "foo": 2}) self.assertEqual(trials[4].config, {"bar": True, "foo": 3}) self.assertEqual(trials[5].config, {"bar": False, "foo": 3}) def testGridSearchAndEval(self): trials = generate_trials({ "run": "PPO", "config": { "qux": lambda spec: 2 + 2, "bar": grid_search([True, False]), "foo": grid_search([1, 2, 3]), }, }) trials = list(trials) self.assertEqual(len(trials), 6) self.assertEqual(trials[0].config, {"bar": True, "foo": 1, "qux": 4}) self.assertEqual(trials[0].experiment_tag, "0_bar=True,foo=1,qux=4") def testConditionResolution(self): trials = generate_trials({ "run": "PPO", "config": { "x": 1, "y": lambda spec: spec.config.x + 1, "z": lambda spec: spec.config.y + 1, }, }) trials = list(trials) self.assertEqual(len(trials), 1) self.assertEqual(trials[0].config, {"x": 1, "y": 2, "z": 3}) def testDependentLambda(self): trials = generate_trials({ "run": "PPO", "config": { "x": grid_search([1, 2]), "y": lambda spec: spec.config.x * 100, }, }) trials = list(trials) self.assertEqual(len(trials), 2) self.assertEqual(trials[0].config, {"x": 1, "y": 100}) self.assertEqual(trials[1].config, {"x": 2, "y": 200}) def testDependentGridSearch(self): trials = generate_trials({ "run": "PPO", "config": { "x": grid_search([ lambda spec: spec.config.y * 100, lambda spec: spec.config.y * 200 ]), "y": lambda spec: 1, }, }) trials = list(trials) self.assertEqual(len(trials), 2) self.assertEqual(trials[0].config, {"x": 100, "y": 1}) self.assertEqual(trials[1].config, {"x": 200, "y": 1}) def testRecursiveDep(self): try: list( generate_trials({ "run": "PPO", "config": { "foo": lambda spec: spec.config.foo, }, })) except RecursiveDependencyError as e: assert "`foo` recursively depends on" in str(e), e else: assert Falseclass TrialRunnerTest(unittest.TestCase): def tearDown(self): ray.worker.cleanup() _register_all() # re-register the evicted objects def testTrialStatus(self): ray.init() trial = Trial("__fake") self.assertEqual(trial.status, Trial.PENDING) trial.start() self.assertEqual(trial.status, Trial.RUNNING) trial.stop() self.assertEqual(trial.status, Trial.TERMINATED) trial.stop(error=True) self.assertEqual(trial.status, Trial.ERROR) def testExperimentTagTruncation(self): ray.init() def train(config, reporter): reporter(timesteps_total=1) register_trainable("f1", train) experiments = { "foo": { "run": "f1", "config": { "a" * 50: lambda spec: 5.0 / 7, "b" * 50: lambda spec: "long" * 40 }, } } for name, spec in experiments.items(): for trial in generate_trials(spec, name): trial.start() self.assertLessEqual(len(trial.logdir), 200) trial.stop() def testTrialErrorOnStart(self): ray.init() _default_registry.register(TRAINABLE_CLASS, "asdf", None) trial = Trial("asdf", resources=Resources(1, 0)) try: trial.start() except Exception as e: self.assertIn("a class", str(e)) def testExtraResources(self): ray.init(num_cpus=4, num_gpus=2) runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 1 }, "resources": Resources(cpu=1, gpu=0, extra_cpu=3, extra_gpu=1), } trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)] for t in trials: runner.add_trial(t) runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) self.assertEqual(trials[1].status, Trial.PENDING) runner.step() self.assertEqual(trials[0].status, Trial.TERMINATED) self.assertEqual(trials[1].status, Trial.PENDING) def testResourceScheduler(self): ray.init(num_cpus=4, num_gpus=1) runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 1 }, "resources": Resources(cpu=1, gpu=1), } trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)] for t in trials: runner.add_trial(t) runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) self.assertEqual(trials[1].status, Trial.PENDING) runner.step() self.assertEqual(trials[0].status, Trial.TERMINATED) self.assertEqual(trials[1].status, Trial.PENDING) runner.step() self.assertEqual(trials[0].status, Trial.TERMINATED) self.assertEqual(trials[1].status, Trial.RUNNING) runner.step() self.assertEqual(trials[0].status, Trial.TERMINATED) self.assertEqual(trials[1].status, Trial.TERMINATED) def testMultiStepRun(self): ray.init(num_cpus=4, num_gpus=2) runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 5 }, "resources": Resources(cpu=1, gpu=1), } trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)] for t in trials: runner.add_trial(t) runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) self.assertEqual(trials[1].status, Trial.PENDING) runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) self.assertEqual(trials[1].status, Trial.RUNNING) runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) self.assertEqual(trials[1].status, Trial.RUNNING) runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) self.assertEqual(trials[1].status, Trial.RUNNING) def testErrorHandling(self): ray.init(num_cpus=4, num_gpus=2) runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 1 }, "resources": Resources(cpu=1, gpu=1), } _default_registry.register(TRAINABLE_CLASS, "asdf", None) trials = [Trial("asdf", **kwargs), Trial("__fake", **kwargs)] for t in trials: runner.add_trial(t) runner.step() self.assertEqual(trials[0].status, Trial.ERROR) self.assertEqual(trials[1].status, Trial.PENDING) runner.step() self.assertEqual(trials[0].status, Trial.ERROR) self.assertEqual(trials[1].status, Trial.RUNNING) def testFailureRecoveryDisabled(self): ray.init(num_cpus=1, num_gpus=1) runner = TrialRunner() kwargs = { "resources": Resources(cpu=1, gpu=1), "checkpoint_freq": 1, "max_failures": 0, "config": { "mock_error": True, }, } runner.add_trial(Trial("__fake", **kwargs)) trials = runner.get_trials() runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) runner.step() self.assertEqual(trials[0].status, Trial.ERROR) self.assertEqual(trials[0].num_failures, 1) def testFailureRecoveryEnabled(self): ray.init(num_cpus=1, num_gpus=1) runner = TrialRunner() kwargs = { "resources": Resources(cpu=1, gpu=1), "checkpoint_freq": 1, "max_failures": 1, "config": { "mock_error": True, }, } runner.add_trial(Trial("__fake", **kwargs)) trials = runner.get_trials() runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) self.assertEqual(trials[0].num_failures, 1) runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) def testFailureRecoveryMaxFailures(self): ray.init(num_cpus=1, num_gpus=1) runner = TrialRunner() kwargs = { "resources": Resources(cpu=1, gpu=1), "checkpoint_freq": 1, "max_failures": 2, "config": { "mock_error": True, "persistent_error": True, }, } runner.add_trial(Trial("__fake", **kwargs)) trials = runner.get_trials() runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) self.assertEqual(trials[0].num_failures, 1) runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) self.assertEqual(trials[0].num_failures, 2) runner.step() self.assertEqual(trials[0].status, Trial.ERROR) self.assertEqual(trials[0].num_failures, 3) def testCheckpointing(self): ray.init(num_cpus=1, num_gpus=1) runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 1 }, "resources": Resources(cpu=1, gpu=1), } runner.add_trial(Trial("__fake", **kwargs)) trials = runner.get_trials() runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) self.assertEqual(ray.get(trials[0].runner.set_info.remote(1)), 1) path = trials[0].checkpoint() kwargs["restore_path"] = path runner.add_trial(Trial("__fake", **kwargs)) trials = runner.get_trials() runner.step() self.assertEqual(trials[0].status, Trial.TERMINATED) self.assertEqual(trials[1].status, Trial.PENDING) runner.step() self.assertEqual(trials[0].status, Trial.TERMINATED) self.assertEqual(trials[1].status, Trial.RUNNING) self.assertEqual(ray.get(trials[1].runner.get_info.remote()), 1) self.addCleanup(os.remove, path) def testResultDone(self): """Tests that last_result is marked `done` after trial is complete.""" ray.init(num_cpus=1, num_gpus=1) runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 2 }, "resources": Resources(cpu=1, gpu=1), } runner.add_trial(Trial("__fake", **kwargs)) trials = runner.get_trials() runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) runner.step() self.assertNotEqual(trials[0].last_result.done, True) runner.step() self.assertEqual(trials[0].last_result.done, True) def testPauseThenResume(self): ray.init(num_cpus=1, num_gpus=1) runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 2 }, "resources": Resources(cpu=1, gpu=1), } runner.add_trial(Trial("__fake", **kwargs)) trials = runner.get_trials() runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) self.assertEqual(ray.get(trials[0].runner.get_info.remote()), None) self.assertEqual(ray.get(trials[0].runner.set_info.remote(1)), 1) trials[0].pause() self.assertEqual(trials[0].status, Trial.PAUSED) trials[0].resume() self.assertEqual(trials[0].status, Trial.RUNNING) runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) self.assertEqual(ray.get(trials[0].runner.get_info.remote()), 1) runner.step() self.assertEqual(trials[0].status, Trial.TERMINATED) def testStopTrial(self): ray.init(num_cpus=4, num_gpus=2) runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 5 }, "resources": Resources(cpu=1, gpu=1), } trials = [ Trial("__fake", **kwargs), Trial("__fake", **kwargs), Trial("__fake", **kwargs), Trial("__fake", **kwargs) ] for t in trials: runner.add_trial(t) runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) self.assertEqual(trials[1].status, Trial.PENDING) # Stop trial while running runner.stop_trial(trials[0]) self.assertEqual(trials[0].status, Trial.TERMINATED) self.assertEqual(trials[1].status, Trial.PENDING) runner.step() self.assertEqual(trials[0].status, Trial.TERMINATED) self.assertEqual(trials[1].status, Trial.RUNNING) self.assertEqual(trials[-1].status, Trial.PENDING) # Stop trial while pending runner.stop_trial(trials[-1]) self.assertEqual(trials[0].status, Trial.TERMINATED) self.assertEqual(trials[1].status, Trial.RUNNING) self.assertEqual(trials[-1].status, Trial.TERMINATED) runner.step() self.assertEqual(trials[0].status, Trial.TERMINATED) self.assertEqual(trials[1].status, Trial.RUNNING) self.assertEqual(trials[2].status, Trial.RUNNING) self.assertEqual(trials[-1].status, Trial.TERMINATED)if __name__ == "__main__": unittest.main(verbosity=2)请作者喝一杯咖啡☕️打赏微信支付