diff --git a/experiments/evaluator/evaluator.py b/experiments/evaluator/evaluator.py index 53d2733..ab961d1 100644 --- a/experiments/evaluator/evaluator.py +++ b/experiments/evaluator/evaluator.py @@ -17,6 +17,8 @@ from experiments.evaluator.datasets.base_dataset import BaseDataset from experiments.evaluator.accuracy import Accuracy +from swarm.utils.globals import Time, Cost, CompletionTokens, PromptTokens + class Evaluator(): def __init__( @@ -50,10 +52,14 @@ def __init__( else: self._logger = None + self._optimization_cost: Optional[Dict[str, float]] = None + async def evaluate_direct_answer(self, limit_questions: Optional[int] = None, ) -> float: + self._reset_cost() + dataset = self._val_dataset print(f"Evaluating DirectAnswer on {dataset.get_domain()} split {dataset.split}") @@ -62,7 +68,9 @@ async def evaluate_direct_answer(self, accuracy = Accuracy() - for i_question, record in tqdm(enumerate(dataset)): + data_len = min(len(dataset), limit_questions) if limit_questions is not None else len(dataset) + + for i_question, record in tqdm(enumerate(dataset), total=data_len): print(80*'-') if limit_questions is not None: if i_question >= limit_questions: @@ -85,7 +93,9 @@ async def evaluate_direct_answer(self, self._dump_eval_results(dict( accuracy=accuracy.get(), - limit_questions=limit_questions)) + limit_questions=limit_questions, + eval_cost=self._get_cost(), + )) print("Done!") return accuracy.get() @@ -100,10 +110,13 @@ async def evaluate_swarm( edge_probs: Optional[torch.Tensor] = None, limit_questions: Optional[int] = None, eval_batch_size: int = 4, + is_async: bool = True, ) -> float: assert self._swarm is not None + self._reset_cost() + dataset = self._val_dataset print(f"Evaluating swarm on {dataset.__class__.__name__} split {dataset.split}") @@ -143,7 +156,7 @@ def eval_loader(batch_size: int) -> Iterator[List[Any]]: start_ts = time.time() - future_answers = [] + maybe_future_answers = [] for record in record_batch: if mode == 'randomly_connected_swarm': realized_graph, _ = self._swarm.connection_dist.realize(self._swarm.composite_graph) @@ -153,9 +166,16 @@ def eval_loader(batch_size: int) -> Iterator[List[Any]]: print(input_dict) future_answer = self._swarm.arun(input_dict, realized_graph) - future_answers.append(future_answer) + if is_async: + maybe_future_answer = future_answer + else: + maybe_future_answer = await future_answer + maybe_future_answers.append(maybe_future_answer) - raw_answers = await asyncio.gather(*future_answers) + if is_async: + raw_answers = await asyncio.gather(*maybe_future_answers) + else: + raw_answers = maybe_future_answers print(f"Batch time {time.time() - start_ts:.3f}") @@ -170,10 +190,15 @@ def eval_loader(batch_size: int) -> Iterator[List[Any]]: accuracy.print() print("Done!") - - self._dump_eval_results(dict( + + result_dict = dict( accuracy=accuracy.get(), - limit_questions=limit_questions)) + limit_questions=limit_questions, + eval_cost=self._get_cost(), + ) + if self._optimization_cost is not None: + result_dict['train_cost'] = self._optimization_cost + self._dump_eval_results(result_dict) return accuracy.get() @@ -201,15 +226,32 @@ def _print_conns(self, edge_probs: torch.Tensor, save_to_file: bool = False): with open(txt_name, "w") as f: f.writelines(msgs) + @staticmethod + def _reset_cost(): + Cost.instance().reset() + PromptTokens.instance().reset() + CompletionTokens.instance().reset() + + @staticmethod + def _get_cost() -> Dict[str, float]: + return dict( + Cost=Cost.instance().value, + PromptTokens=PromptTokens.instance().value, + CompletionTokens=CompletionTokens.instance().value, + ) + async def optimize_swarm( self, num_iters: int, lr: float, - batch_size: int = 4, + batch_size: int = 4, # 32 + is_async: bool = True, ) -> torch.Tensor: assert self._swarm is not None + self._reset_cost() + dataset = self._train_dataset print(f"Optimizing swarm on {dataset.__class__.__name__} split {dataset.split}") @@ -240,7 +282,7 @@ def infinite_data_loader() -> Iterator[pd.DataFrame]: start_ts = time.time() - future_answers = [] + maybe_future_answers = [] log_probs = [] correct_answers = [] for i_record, record in zip(range(batch_size), loader): @@ -251,13 +293,20 @@ def infinite_data_loader() -> Iterator[pd.DataFrame]: ) input_dict = dataset.record_to_swarm_input(record) - answer = self._swarm.arun(input_dict, realized_graph) - future_answers.append(answer) + future_answer = self._swarm.arun(input_dict, realized_graph) + if is_async: + maybe_future_answer = future_answer + else: + maybe_future_answer = await future_answer + maybe_future_answers.append(maybe_future_answer) log_probs.append(log_prob) correct_answer = dataset.record_to_target_answer(record) correct_answers.append(correct_answer) - raw_answers = await asyncio.gather(*future_answers) + if is_async: + raw_answers = await asyncio.gather(*maybe_future_answers) + else: + raw_answers = maybe_future_answers print(f"Batch time {time.time() - start_ts:.3f}") @@ -303,6 +352,8 @@ def infinite_data_loader() -> Iterator[pd.DataFrame]: if edge_probs is not None: self._print_conns(edge_probs, save_to_file=True) + self._optimization_cost = self._get_cost() + print("Done!") edge_probs = torch.sigmoid(self._swarm.connection_dist.edge_logits) return edge_probs diff --git a/experiments/run_mmlu.py b/experiments/run_mmlu.py index e105c75..3e3174a 100644 --- a/experiments/run_mmlu.py +++ b/experiments/run_mmlu.py @@ -16,13 +16,13 @@ def parse_args(): choices=['DirectAnswer', 'FullConnectedSwarm', 'RandomSwarm', 'OptimizedSwarm'], help="Mode of operation. Default is 'OptimizedSwarm'.") - parser.add_argument('--num-truthful-agents', type=int, default=1, + parser.add_argument('--num-truthful-agents', type=int, default=7, help="Number of truthful agents. The total will be N truthful and N adversarial.") - parser.add_argument('--num-iterations', type=int, default=200, + parser.add_argument('--num-iterations', type=int, default=50, # 200 help="Number of optimization iterations. Default 200.") - parser.add_argument('--model_name', type=str, default=None, + parser.add_argument('--model_name', type=str, default=None, # None, 'gpt-35-turbo-0301' 'gpt-3.5-turbo-1106' help="Model name, None runs the default ChatGPT4.") parser.add_argument('--domain', type=str, default="mmlu", @@ -31,6 +31,9 @@ def parse_args(): parser.add_argument('--debug', action='store_true', default=False, help="Set for a quick debug cycle") + parser.add_argument('--lr', type=float, default=0.1, + help="Learning rate") + args = parser.parse_args() return args @@ -60,9 +63,15 @@ async def main(): else: N = args.num_truthful_agents M = N - agent_name_list = N * ["IO"] + M * ["AdversarialAgent"] + # agent_name_list = N * ["IO"] + M * ["AdversarialAgent"] + # agent_name_list = N * ["IO"] + agent_name_list = N * ["SpecialistAgent"] + # agent_name_list = N * ["SpecialistAgent"] + M * ["AdversarialAgent"] - swarm_name = f"{N}true_{M}adv" + # swarm_name = f"{N}true_{M}adv" + # swarm_name = f"{N}io" + swarm_name = f"{N}specialist" + # swarm_name = f"{N}S{M}A" swarm = Swarm( agent_name_list, @@ -79,6 +88,7 @@ async def main(): dataset_train = MMLUDataset('dev') dataset_val = MMLUDataset('val') + # dataset_val = MMLUDataset('test') evaluator = Evaluator( swarm, @@ -89,7 +99,7 @@ async def main(): enable_artifacts=True, tensorboard_tag=tag) - limit_questions = 5 if debug else 153 + limit_questions = 5 if debug else 153 # 14042*20%=2808 # 153 is 10% of val if mode == 'DirectAnswer': score = await evaluator.evaluate_direct_answer( @@ -104,11 +114,9 @@ async def main(): limit_questions=limit_questions) elif mode == 'OptimizedSwarm': - num_iters = 5 if debug else args.num_iterations - - lr = 0.1 + num_iters = 2 if debug else args.num_iterations - edge_probs = await evaluator.optimize_swarm(num_iters=num_iters, lr=lr) + edge_probs = await evaluator.optimize_swarm(num_iters=num_iters, lr=args.lr) score = await evaluator.evaluate_swarm( mode='external_edge_probs', diff --git a/swarm/environment/agents/__init__.py b/swarm/environment/agents/__init__.py index cdd00c4..e577525 100644 --- a/swarm/environment/agents/__init__.py +++ b/swarm/environment/agents/__init__.py @@ -12,6 +12,7 @@ from swarm.environment.agents.gaia.normal_io import NormalIO from swarm.environment.agents.humaneval.code_io import CodeIO # from swarm.environment.agents.humaneval.code_reflection import CodeReflection +from swarm.environment.agents.mmlu.specialist_agent import SpecialistAgent __all__ = [ "IO", @@ -27,4 +28,5 @@ "NormalIO", "WebIO", "CodeIO", + "SpecialistAgent", ] \ No newline at end of file diff --git a/swarm/environment/agents/mmlu/specialist_agent.py b/swarm/environment/agents/mmlu/specialist_agent.py new file mode 100644 index 0000000..287e8e7 --- /dev/null +++ b/swarm/environment/agents/mmlu/specialist_agent.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from swarm.graph import Graph +from swarm.environment.operations import SpecialistAnswer +from swarm.environment.agents.agent_registry import AgentRegistry + + +@AgentRegistry.register('SpecialistAgent') +class SpecialistAgent(Graph): + def build_graph(self): + sa = SpecialistAnswer(self.domain, self.model_name) + self.add_node(sa) + self.input_nodes = [sa] + self.output_nodes = [sa] diff --git a/swarm/environment/operations/__init__.py b/swarm/environment/operations/__init__.py index 38ef351..796015b 100644 --- a/swarm/environment/operations/__init__.py +++ b/swarm/environment/operations/__init__.py @@ -1,6 +1,7 @@ from swarm.environment.operations.combine_answer import CombineAnswer from swarm.environment.operations.generate_query import GenerateQuery from swarm.environment.operations.direct_answer import DirectAnswer +from swarm.environment.operations.specialist_answer import SpecialistAnswer from swarm.environment.operations.file_analyse import FileAnalyse from swarm.environment.operations.web_search import WebSearch from swarm.environment.operations.reflect import Reflect @@ -13,6 +14,7 @@ "CombineAnswer", "GenerateQuery", "DirectAnswer", + "SpecialistAnswer", "FileAnalyse", "WebSearch", "Reflect", diff --git a/swarm/environment/operations/combine_answer.py b/swarm/environment/operations/combine_answer.py index b7c2e1a..794429b 100644 --- a/swarm/environment/operations/combine_answer.py +++ b/swarm/environment/operations/combine_answer.py @@ -77,7 +77,7 @@ async def _execute(self, inputs: List[Any] = [], **kwargs): self.memory.add(self.id, executions) - self.log() + # self.log() return [executions] #return executions diff --git a/swarm/environment/operations/direct_answer.py b/swarm/environment/operations/direct_answer.py index 9c51adb..e08dff8 100644 --- a/swarm/environment/operations/direct_answer.py +++ b/swarm/environment/operations/direct_answer.py @@ -56,28 +56,47 @@ async def _execute(self, inputs: List[Any] = [], **kwargs): node_inputs = self.process_input(inputs) outputs = [] + task: Optional[str] = None + additional_knowledge: List[str] = [] for input in node_inputs: - task = input["task"] - role, constraint = await self.node_optimize(input, meta_optmize=False) - prompt = self.prompt_set.get_answer_prompt(question=task) - message = [Message(role="system", content=f"You are a {role}. {constraint}"), - Message(role="user", content=prompt)] - response = await self.llm.agen(message, max_tokens=self.max_token) - - execution = { - "operation": self.node_name, - "task": task, - "files": input.get("files", []), - "input": task, - "role": role, - "constraint": constraint, - "prompt": prompt, - "output": response, - "ground_truth": input.get("GT", []), - "format": "natural language" - } - outputs.append(execution) - self.memory.add(self.id, execution) + if len(input) == 1 and 'task' in input: # Swarm input + task = input['task'] + else: # All other incoming edges + extra_knowledge = f"Opinion of {input['operation']} is \"{input['output']}\"." + additional_knowledge.append(extra_knowledge) + + if task is None: + raise ValueError(f"{self.__class__.__name__} expects swarm input among inputs") + + user_message = "\n\n" + if len(additional_knowledge) > 0: + for extra_knowledge in additional_knowledge: + user_message = user_message + extra_knowledge + "\n\n" + + prompt = self.prompt_set.get_answer_prompt(question=task) + user_message = user_message + prompt + + role, constraint = await self.node_optimize(input, meta_optmize=False) + system_message = f"You are a {role}. {constraint}" + + message = [Message(role="system", content=system_message), + Message(role="user", content=user_message)] + response = await self.llm.agen(message, max_tokens=self.max_token) + + execution = { + "operation": self.node_name, + "task": task, + "files": input.get("files", []), + "input": task, + "role": role, + "constraint": constraint, + "prompt": prompt, + "output": response, + "ground_truth": input.get("GT", []), + "format": "natural language" + } + outputs.append(execution) + self.memory.add(self.id, execution) # self.log() return outputs \ No newline at end of file diff --git a/swarm/environment/operations/file_analyse.py b/swarm/environment/operations/file_analyse.py index 030eaf4..cf242e1 100644 --- a/swarm/environment/operations/file_analyse.py +++ b/swarm/environment/operations/file_analyse.py @@ -57,7 +57,7 @@ async def _execute(self, inputs: List[Any] = [], **kwargs): outputs.append(executions) self.memory.add(self.id, executions) - self.log() + # self.log() return outputs diff --git a/swarm/environment/operations/final_decision.py b/swarm/environment/operations/final_decision.py index 6ee63a4..ffde8e9 100644 --- a/swarm/environment/operations/final_decision.py +++ b/swarm/environment/operations/final_decision.py @@ -87,6 +87,10 @@ async def _execute(self, inputs: List[Any] = [], if len(inputs) == 0: raise Exception("No inputs is not supported for MajorityVote") answers = [input.get("output") for input in inputs] + + # Quick hack + answers = [s[0].upper() for s in answers] + counter = Counter(answers) sorted_counter = counter.most_common() max_freq = sorted_counter[0][1] @@ -142,7 +146,7 @@ async def _execute(self, inputs: List[Any] = [], "format": "natural language"} self.memory.add(self.id, executions) - self.log() + # self.log() return executions diff --git a/swarm/environment/operations/specialist_answer.py b/swarm/environment/operations/specialist_answer.py new file mode 100644 index 0000000..6a0df66 --- /dev/null +++ b/swarm/environment/operations/specialist_answer.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from copy import deepcopy +from collections import defaultdict +from swarm.llm.format import Message +from swarm.graph import Node +from swarm.memory.memory import GlobalMemory +from typing import List, Any, Optional +from swarm.utils.log import logger, swarmlog +from swarm.utils.globals import Cost +from swarm.environment.prompt.prompt_set_registry import PromptSetRegistry +from swarm.llm.format import Message +from swarm.llm import LLMRegistry +from swarm.optimizer.node_optimizer import MetaPromptOptimizer + + +""" +Imagine someone who has to answer questions. +They can be any person. +Make a list of their possible specializations or social roles. +Make the list as diverse as possible so that you expect them to answer the same question differently. +Make a list of 20, list items only, no need for a description. +""" + +class SpecialistAnswer(Node): + role_list = [ + "Botanist", + "Data Scientist", + "Social Worker", + "Journalist", + "Pilot", + "Anthropologist", + "Fitness Coach", + "Politician", + "Artist", + "Marine Biologist", + "Ethicist", + "Entrepreneur", + "Linguist", + "Archaeologist", + "Nurse", + "Graphic Designer", + "Philanthropist", + "Meteorologist", + "Sommelier", + "Cybersecurity Expert" + ] + + def __init__(self, + domain: str, + model_name: Optional[str], + operation_description: str = "Answer as if you were a specialist in .", + max_token: int = 50, + id=None): + super().__init__(operation_description, id, True) + self.domain = domain + self.model_name = model_name + self.llm = LLMRegistry.get(model_name) + self.max_token = max_token + self.prompt_set = PromptSetRegistry.get(domain) + + # Override role with a specialist role. + idx_role = hash(self.id) % len(self.role_list) + self.role = self.role_list[idx_role] + print(f"Creating a node with specialization {self.role}") + + @property + def node_name(self): + return f"{self.__class__.__name__} {self.role}" + + async def node_optimize(self, input, meta_optmize=False): + task = input["task"] + self.prompt_set = PromptSetRegistry.get(self.domain) + role = self.prompt_set.get_role() + constraint = self.prompt_set.get_constraint() + + if meta_optmize: + update_role = role + node_optmizer = MetaPromptOptimizer(self.model_name, self.node_name) + update_constraint = await node_optmizer.generate(constraint, task) + return update_role, update_constraint + + return role, constraint + + async def _execute(self, inputs: List[Any] = [], **kwargs): + + node_inputs = self.process_input(inputs) + outputs = [] + + task: Optional[str] = None + additional_knowledge: List[str] = [] + for input in node_inputs: + if len(input) == 1 and 'task' in input: # Swarm input + task = input['task'] + else: # All other incoming edges + extra_knowledge = f"Opinion of {input['operation']} is {input['output']}." + additional_knowledge.append(extra_knowledge) + + if task is None: + raise ValueError(f"{self.__class__.__name__} expects swarm input among inputs") + + opinions = "" + if len(additional_knowledge) > 0: + for extra_knowledge in additional_knowledge: + opinions = opinions + extra_knowledge + "\n\n" + + question = self.prompt_set.get_answer_prompt(question=task) + user_message = question + if len(opinions) > 0: + user_message = f"""{user_message} + +Take into accound the following opinions which may or may not be true: + +{opinions}""" + + _, constraint = await self.node_optimize(input, meta_optmize=False) + system_message = f"You are a {self.role}. {constraint}" + + message = [Message(role="system", content=system_message), + Message(role="user", content=user_message)] + response = await self.llm.agen(message, max_tokens=self.max_token) + + execution = { + "operation": self.node_name, + "task": task, + "files": input.get("files", []), + "input": task, + "role": self.role, + "constraint": constraint, + "prompt": user_message, + "output": response, + "ground_truth": input.get("GT", []), + "format": "natural language" + } + outputs.append(execution) + self.memory.add(self.id, execution) + + # self.log() + return outputs diff --git a/swarm/environment/operations/web_search.py b/swarm/environment/operations/web_search.py index 126cf70..4a81b58 100644 --- a/swarm/environment/operations/web_search.py +++ b/swarm/environment/operations/web_search.py @@ -74,7 +74,7 @@ async def _execute(self, inputs: List[Any] = [], max_keywords: int = 5, **kwargs self.memory.add(self.id, executions) outputs.append(executions) - self.log() + # self.log() return outputs def web_search(self, query): diff --git a/swarm/environment/prompt/mmlu_prompt_set.py b/swarm/environment/prompt/mmlu_prompt_set.py index a9a735f..15b5180 100644 --- a/swarm/environment/prompt/mmlu_prompt_set.py +++ b/swarm/environment/prompt/mmlu_prompt_set.py @@ -20,14 +20,14 @@ def get_role(): @staticmethod def get_constraint(): return """ - I will ask you a question. - I will also give you 4 answers enumerated as A, B, C and D. - Only one answer out of the offered 4 is correct. - You must choose the correct answer to the question. - Your response must be one of the 4 letters: A, B, C or D, - corresponding to the correct answer. - Only one letter (A, B, C or D) is allowed in your answer. - """ +I will ask you a question. +I will also give you 4 answers enumerated as A, B, C and D. +Only one answer out of the offered 4 is correct. +You must choose the correct answer to the question. +Your response must start with one of the 4 letters: A, B, C or D, +corresponding to the correct answer. +After the single-letter answer, add an short explaination of why you gave this answer. +""" @staticmethod def get_format(): @@ -51,7 +51,7 @@ def get_websearch_prompt(query): @staticmethod def get_adversarial_answer_prompt(question): - return f"""Answer a lie to the following question: {question}. """ + return f"Answer a lie to the following question: {question}. " @staticmethod def get_distill_websearch_prompt(query, results): diff --git a/swarm/graph/graph.py b/swarm/graph/graph.py index e5ea61d..c666b02 100644 --- a/swarm/graph/graph.py +++ b/swarm/graph/graph.py @@ -52,7 +52,7 @@ def __init__(self, self.domain = domain self.model_name = model_name self.meta_prompt = meta_prompt - self.nodes = {} + self.nodes: Dict[str, Node] = {} self.memory = GlobalMemory.instance() self.is_aggregate = False self.input_nodes: List[Node] = [] @@ -167,7 +167,7 @@ def is_node_useful(node): final_answers.append("No answer since there are no inputs provided") return final_answers - def find_node(self, id: str): + def find_node(self, id: str) -> Node: for node in self.nodes.values(): if node.id == id: return node diff --git a/swarm/graph/node.py b/swarm/graph/node.py index e2a14cd..0f5d257 100644 --- a/swarm/graph/node.py +++ b/swarm/graph/node.py @@ -110,25 +110,27 @@ async def execute(self, **kwargs): self.outputs = [] tasks = [] - if not self.inputs and self.predecessors: + # if not self.inputs and self.predecessors: + # if len(self.inputs) == 0 and len(self.predecessors) > 0: + if True: if self.combine_inputs_as_one: - combined_inputs = [] + combined_inputs = self.inputs for predecessor in self.predecessors: predecessor_outputs = predecessor.outputs if predecessor_outputs is not None and isinstance(predecessor_outputs, list): combined_inputs.extend(predecessor_outputs) tasks.append(asyncio.create_task(self._execute(combined_inputs, **kwargs))) else: - for predecessor in self.predecessors: + for predecessor in self.predecessors: # TODO fix this branch as well, IT IS BROKEN predecessor_outputs = predecessor.outputs if isinstance(predecessor_outputs, list) and predecessor_outputs: for predecessor_output in predecessor_outputs: tasks.append(asyncio.create_task(self._execute(predecessor_output, **kwargs))) - elif self.inputs: - tasks = [asyncio.create_task(self._execute(input, **kwargs)) for input in self.inputs] - else: - warnings.warn("No input received.") - return + # elif self.inputs: + # tasks = [asyncio.create_task(self._execute(input, **kwargs)) for input in self.inputs] + # else: + # warnings.warn("No input received.") + # return if tasks: results = await asyncio.gather(*tasks, return_exceptions=True) diff --git a/test/swarm/environment/agents/test_specialist_agent.py b/test/swarm/environment/agents/test_specialist_agent.py new file mode 100644 index 0000000..94dc625 --- /dev/null +++ b/test/swarm/environment/agents/test_specialist_agent.py @@ -0,0 +1,34 @@ +import pytest + +from swarm.environment.agents.mmlu.specialist_agent import SpecialistAgent + + +@pytest.mark.parametrize("model_name", [ + pytest.param('mock', marks=pytest.mark.mock_llm), + pytest.param(None), +]) +@pytest.mark.asyncio +async def test_io(model_name): + task = """ +What is love? +A: a feeling +B: all you need +C: a chemical process +D: baby don't hurt me no more +""" + responses = [] + for _ in range(10): + io = SpecialistAgent("mmlu", model_name) + response = await io.run({"task": task}) + print(response[0]) + responses.append(response[0]) + + print(responses) + print() + + print([r[0] for r in responses]) + print() + + +if __name__ == "__main__": + pytest.main([__file__, "-s", "-m", "not mock_llm"])