Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 64 additions & 13 deletions experiments/evaluator/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from experiments.evaluator.datasets.base_dataset import BaseDataset
from experiments.evaluator.accuracy import Accuracy

from swarm.utils.globals import Time, Cost, CompletionTokens, PromptTokens


class Evaluator():
def __init__(
Expand Down Expand Up @@ -50,10 +52,14 @@ def __init__(
else:
self._logger = None

self._optimization_cost: Optional[Dict[str, float]] = None

async def evaluate_direct_answer(self,
limit_questions: Optional[int] = None,
) -> float:

self._reset_cost()

dataset = self._val_dataset

print(f"Evaluating DirectAnswer on {dataset.get_domain()} split {dataset.split}")
Expand All @@ -62,7 +68,9 @@ async def evaluate_direct_answer(self,

accuracy = Accuracy()

for i_question, record in tqdm(enumerate(dataset)):
data_len = min(len(dataset), limit_questions) if limit_questions is not None else len(dataset)

for i_question, record in tqdm(enumerate(dataset), total=data_len):
print(80*'-')
if limit_questions is not None:
if i_question >= limit_questions:
Expand All @@ -85,7 +93,9 @@ async def evaluate_direct_answer(self,

self._dump_eval_results(dict(
accuracy=accuracy.get(),
limit_questions=limit_questions))
limit_questions=limit_questions,
eval_cost=self._get_cost(),
))

print("Done!")
return accuracy.get()
Expand All @@ -100,10 +110,13 @@ async def evaluate_swarm(
edge_probs: Optional[torch.Tensor] = None,
limit_questions: Optional[int] = None,
eval_batch_size: int = 4,
is_async: bool = True,
) -> float:

assert self._swarm is not None

self._reset_cost()

dataset = self._val_dataset

print(f"Evaluating swarm on {dataset.__class__.__name__} split {dataset.split}")
Expand Down Expand Up @@ -143,7 +156,7 @@ def eval_loader(batch_size: int) -> Iterator[List[Any]]:

start_ts = time.time()

future_answers = []
maybe_future_answers = []
for record in record_batch:
if mode == 'randomly_connected_swarm':
realized_graph, _ = self._swarm.connection_dist.realize(self._swarm.composite_graph)
Expand All @@ -153,9 +166,16 @@ def eval_loader(batch_size: int) -> Iterator[List[Any]]:
print(input_dict)

future_answer = self._swarm.arun(input_dict, realized_graph)
future_answers.append(future_answer)
if is_async:
maybe_future_answer = future_answer
else:
maybe_future_answer = await future_answer
maybe_future_answers.append(maybe_future_answer)

raw_answers = await asyncio.gather(*future_answers)
if is_async:
raw_answers = await asyncio.gather(*maybe_future_answers)
else:
raw_answers = maybe_future_answers

print(f"Batch time {time.time() - start_ts:.3f}")

Expand All @@ -170,10 +190,15 @@ def eval_loader(batch_size: int) -> Iterator[List[Any]]:

accuracy.print()
print("Done!")
self._dump_eval_results(dict(

result_dict = dict(
accuracy=accuracy.get(),
limit_questions=limit_questions))
limit_questions=limit_questions,
eval_cost=self._get_cost(),
)
if self._optimization_cost is not None:
result_dict['train_cost'] = self._optimization_cost
self._dump_eval_results(result_dict)

return accuracy.get()

Expand Down Expand Up @@ -201,15 +226,32 @@ def _print_conns(self, edge_probs: torch.Tensor, save_to_file: bool = False):
with open(txt_name, "w") as f:
f.writelines(msgs)

@staticmethod
def _reset_cost():
Cost.instance().reset()
PromptTokens.instance().reset()
CompletionTokens.instance().reset()

@staticmethod
def _get_cost() -> Dict[str, float]:
return dict(
Cost=Cost.instance().value,
PromptTokens=PromptTokens.instance().value,
CompletionTokens=CompletionTokens.instance().value,
)

async def optimize_swarm(
self,
num_iters: int,
lr: float,
batch_size: int = 4,
batch_size: int = 4, # 32
is_async: bool = True,
) -> torch.Tensor:

assert self._swarm is not None

self._reset_cost()

dataset = self._train_dataset

print(f"Optimizing swarm on {dataset.__class__.__name__} split {dataset.split}")
Expand Down Expand Up @@ -240,7 +282,7 @@ def infinite_data_loader() -> Iterator[pd.DataFrame]:

start_ts = time.time()

future_answers = []
maybe_future_answers = []
log_probs = []
correct_answers = []
for i_record, record in zip(range(batch_size), loader):
Expand All @@ -251,13 +293,20 @@ def infinite_data_loader() -> Iterator[pd.DataFrame]:
)

input_dict = dataset.record_to_swarm_input(record)
answer = self._swarm.arun(input_dict, realized_graph)
future_answers.append(answer)
future_answer = self._swarm.arun(input_dict, realized_graph)
if is_async:
maybe_future_answer = future_answer
else:
maybe_future_answer = await future_answer
maybe_future_answers.append(maybe_future_answer)
log_probs.append(log_prob)
correct_answer = dataset.record_to_target_answer(record)
correct_answers.append(correct_answer)

raw_answers = await asyncio.gather(*future_answers)
if is_async:
raw_answers = await asyncio.gather(*maybe_future_answers)
else:
raw_answers = maybe_future_answers

print(f"Batch time {time.time() - start_ts:.3f}")

Expand Down Expand Up @@ -303,6 +352,8 @@ def infinite_data_loader() -> Iterator[pd.DataFrame]:
if edge_probs is not None:
self._print_conns(edge_probs, save_to_file=True)

self._optimization_cost = self._get_cost()

print("Done!")
edge_probs = torch.sigmoid(self._swarm.connection_dist.edge_logits)
return edge_probs
28 changes: 18 additions & 10 deletions experiments/run_mmlu.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ def parse_args():
choices=['DirectAnswer', 'FullConnectedSwarm', 'RandomSwarm', 'OptimizedSwarm'],
help="Mode of operation. Default is 'OptimizedSwarm'.")

parser.add_argument('--num-truthful-agents', type=int, default=1,
parser.add_argument('--num-truthful-agents', type=int, default=7,
help="Number of truthful agents. The total will be N truthful and N adversarial.")

parser.add_argument('--num-iterations', type=int, default=200,
parser.add_argument('--num-iterations', type=int, default=50, # 200
help="Number of optimization iterations. Default 200.")

parser.add_argument('--model_name', type=str, default=None,
parser.add_argument('--model_name', type=str, default=None, # None, 'gpt-35-turbo-0301' 'gpt-3.5-turbo-1106'
help="Model name, None runs the default ChatGPT4.")

parser.add_argument('--domain', type=str, default="mmlu",
Expand All @@ -31,6 +31,9 @@ def parse_args():
parser.add_argument('--debug', action='store_true', default=False,
help="Set for a quick debug cycle")

parser.add_argument('--lr', type=float, default=0.1,
help="Learning rate")

args = parser.parse_args()
return args

Expand Down Expand Up @@ -60,9 +63,15 @@ async def main():
else:
N = args.num_truthful_agents
M = N
agent_name_list = N * ["IO"] + M * ["AdversarialAgent"]
# agent_name_list = N * ["IO"] + M * ["AdversarialAgent"]
# agent_name_list = N * ["IO"]
agent_name_list = N * ["SpecialistAgent"]
# agent_name_list = N * ["SpecialistAgent"] + M * ["AdversarialAgent"]

swarm_name = f"{N}true_{M}adv"
# swarm_name = f"{N}true_{M}adv"
# swarm_name = f"{N}io"
swarm_name = f"{N}specialist"
# swarm_name = f"{N}S{M}A"

swarm = Swarm(
agent_name_list,
Expand All @@ -79,6 +88,7 @@ async def main():

dataset_train = MMLUDataset('dev')
dataset_val = MMLUDataset('val')
# dataset_val = MMLUDataset('test')

evaluator = Evaluator(
swarm,
Expand All @@ -89,7 +99,7 @@ async def main():
enable_artifacts=True,
tensorboard_tag=tag)

limit_questions = 5 if debug else 153
limit_questions = 5 if debug else 153 # 14042*20%=2808 # 153 is 10% of val

if mode == 'DirectAnswer':
score = await evaluator.evaluate_direct_answer(
Expand All @@ -104,11 +114,9 @@ async def main():
limit_questions=limit_questions)
elif mode == 'OptimizedSwarm':

num_iters = 5 if debug else args.num_iterations

lr = 0.1
num_iters = 2 if debug else args.num_iterations

edge_probs = await evaluator.optimize_swarm(num_iters=num_iters, lr=lr)
edge_probs = await evaluator.optimize_swarm(num_iters=num_iters, lr=args.lr)

score = await evaluator.evaluate_swarm(
mode='external_edge_probs',
Expand Down
2 changes: 2 additions & 0 deletions swarm/environment/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from swarm.environment.agents.gaia.normal_io import NormalIO
from swarm.environment.agents.humaneval.code_io import CodeIO
# from swarm.environment.agents.humaneval.code_reflection import CodeReflection
from swarm.environment.agents.mmlu.specialist_agent import SpecialistAgent

__all__ = [
"IO",
Expand All @@ -27,4 +28,5 @@
"NormalIO",
"WebIO",
"CodeIO",
"SpecialistAgent",
]
15 changes: 15 additions & 0 deletions swarm/environment/agents/mmlu/specialist_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from swarm.graph import Graph
from swarm.environment.operations import SpecialistAnswer
from swarm.environment.agents.agent_registry import AgentRegistry


@AgentRegistry.register('SpecialistAgent')
class SpecialistAgent(Graph):
def build_graph(self):
sa = SpecialistAnswer(self.domain, self.model_name)
self.add_node(sa)
self.input_nodes = [sa]
self.output_nodes = [sa]
2 changes: 2 additions & 0 deletions swarm/environment/operations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from swarm.environment.operations.combine_answer import CombineAnswer
from swarm.environment.operations.generate_query import GenerateQuery
from swarm.environment.operations.direct_answer import DirectAnswer
from swarm.environment.operations.specialist_answer import SpecialistAnswer
from swarm.environment.operations.file_analyse import FileAnalyse
from swarm.environment.operations.web_search import WebSearch
from swarm.environment.operations.reflect import Reflect
Expand All @@ -13,6 +14,7 @@
"CombineAnswer",
"GenerateQuery",
"DirectAnswer",
"SpecialistAnswer",
"FileAnalyse",
"WebSearch",
"Reflect",
Expand Down
2 changes: 1 addition & 1 deletion swarm/environment/operations/combine_answer.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def _execute(self, inputs: List[Any] = [], **kwargs):

self.memory.add(self.id, executions)

self.log()
# self.log()
return [executions]
#return executions

Expand Down
61 changes: 40 additions & 21 deletions swarm/environment/operations/direct_answer.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,28 +56,47 @@ async def _execute(self, inputs: List[Any] = [], **kwargs):
node_inputs = self.process_input(inputs)
outputs = []

task: Optional[str] = None
additional_knowledge: List[str] = []
for input in node_inputs:
task = input["task"]
role, constraint = await self.node_optimize(input, meta_optmize=False)
prompt = self.prompt_set.get_answer_prompt(question=task)
message = [Message(role="system", content=f"You are a {role}. {constraint}"),
Message(role="user", content=prompt)]
response = await self.llm.agen(message, max_tokens=self.max_token)

execution = {
"operation": self.node_name,
"task": task,
"files": input.get("files", []),
"input": task,
"role": role,
"constraint": constraint,
"prompt": prompt,
"output": response,
"ground_truth": input.get("GT", []),
"format": "natural language"
}
outputs.append(execution)
self.memory.add(self.id, execution)
if len(input) == 1 and 'task' in input: # Swarm input
task = input['task']
else: # All other incoming edges
extra_knowledge = f"Opinion of {input['operation']} is \"{input['output']}\"."
additional_knowledge.append(extra_knowledge)

if task is None:
raise ValueError(f"{self.__class__.__name__} expects swarm input among inputs")

user_message = "\n\n"
if len(additional_knowledge) > 0:
for extra_knowledge in additional_knowledge:
user_message = user_message + extra_knowledge + "\n\n"

prompt = self.prompt_set.get_answer_prompt(question=task)
user_message = user_message + prompt

role, constraint = await self.node_optimize(input, meta_optmize=False)
system_message = f"You are a {role}. {constraint}"

message = [Message(role="system", content=system_message),
Message(role="user", content=user_message)]
response = await self.llm.agen(message, max_tokens=self.max_token)

execution = {
"operation": self.node_name,
"task": task,
"files": input.get("files", []),
"input": task,
"role": role,
"constraint": constraint,
"prompt": prompt,
"output": response,
"ground_truth": input.get("GT", []),
"format": "natural language"
}
outputs.append(execution)
self.memory.add(self.id, execution)

# self.log()
return outputs
2 changes: 1 addition & 1 deletion swarm/environment/operations/file_analyse.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async def _execute(self, inputs: List[Any] = [], **kwargs):
outputs.append(executions)
self.memory.add(self.id, executions)

self.log()
# self.log()
return outputs


Expand Down
Loading