diff --git a/lib/op-attrs/src/op-attrs/ops/element_unary.cc b/lib/op-attrs/src/op-attrs/ops/element_unary.cc index 9d02923689..ca7e417814 100644 --- a/lib/op-attrs/src/op-attrs/ops/element_unary.cc +++ b/lib/op-attrs/src/op-attrs/ops/element_unary.cc @@ -35,7 +35,6 @@ ParallelTensorDimDegrees get_output_parallel_dim_degrees( ElementUnaryAttrs const &attrs, ParallelTensorDimDegrees const &input_degrees) { ASSERT(input_degrees.sum_degree.value == 1); - ASSERT(input_degrees.discard_copy_degree.value == 1); return input_degrees; } diff --git a/lib/op-attrs/test/src/op-attrs/ops/element_unary.cc b/lib/op-attrs/test/src/op-attrs/ops/element_unary.cc index 672b160cbd..43b4be06d8 100644 --- a/lib/op-attrs/test/src/op-attrs/ops/element_unary.cc +++ b/lib/op-attrs/test/src/op-attrs/ops/element_unary.cc @@ -62,13 +62,5 @@ TEST_SUITE(FF_TEST_SUITE) { SumDegree{degree}, DiscardCopyDegree{1_p}, 1_p, 1_p, 1_p))); } - SUBCASE("discard copy degree > 1") { - positive_int degree = 2_p; - - CHECK_THROWS(get_output_shape( - attrs, - make_input( - SumDegree{1_p}, DiscardCopyDegree{degree}, 1_p, 1_p, 1_p))); - } } } diff --git a/lib/realm-execution/include/realm-execution/realm_context.h b/lib/realm-execution/include/realm-execution/realm_context.h index ab89e916c0..eab42d0d79 100644 --- a/lib/realm-execution/include/realm-execution/realm_context.h +++ b/lib/realm-execution/include/realm-execution/realm_context.h @@ -63,15 +63,18 @@ struct RealmContext { int priority = 0); ///\} - /** \name Data movement */ + /** \name Data movement and reduction */ ///\{ - Realm::Event issue_copy(ParallelTensorShape const &src_shape, - Realm::RegionInstance src_inst, - ParallelTensorShape const &dst_shape, - Realm::RegionInstance dst_inst, - Realm::ProfilingRequestSet const &requests, - Realm::Event wait_on = Realm::Event::NO_EVENT, - int priority = 0); + Realm::Event + issue_copy(ParallelTensorShape const &src_shape, + Realm::RegionInstance src_inst, + ParallelTensorShape const &dst_shape, + Realm::RegionInstance dst_inst, + Realm::ProfilingRequestSet const &requests, + Realm::Event wait_on = Realm::Event::NO_EVENT, + int priority = 0, + std::optional redop_id = std::nullopt, + bool exclusive = false); ///\} /** \name Instance management */ diff --git a/lib/realm-execution/include/realm-execution/tasks/realm_reduction.h b/lib/realm-execution/include/realm-execution/tasks/realm_reduction.h new file mode 100644 index 0000000000..d9cf00441b --- /dev/null +++ b/lib/realm-execution/include/realm-execution/tasks/realm_reduction.h @@ -0,0 +1,109 @@ +#pragma once +#include "op-attrs/datatype.dtg.h" +#include + +namespace FlexFlow { + +// Sum reduction for float +struct SumReductionFloat { + using LHS = float; + using RHS = float; + static constexpr RHS identity = 0.0f; // ← inside struct, constexpr + + template + static void apply(LHS &lhs, RHS rhs) { + if (EXCLUSIVE) { + lhs += rhs; + } else { + // atomic add for non-exclusive + __sync_fetch_and_add((int *)&lhs, *(int *)&rhs); + // proper float atomic add — use union trick + union { + float f; + int i; + } old_val, new_val; + do { + old_val.f = lhs; + new_val.f = old_val.f + rhs; + } while ( + !__sync_bool_compare_and_swap((int *)&lhs, old_val.i, new_val.i)); + } + } + + template + static void fold(RHS &rhs1, RHS rhs2) { + if (EXCLUSIVE) { + rhs1 += rhs2; + } else { + union { + float f; + int i; + } old_val, new_val; + do { + old_val.f = rhs1; + new_val.f = old_val.f + rhs2; + } while ( + !__sync_bool_compare_and_swap((int *)&rhs1, old_val.i, new_val.i)); + } + } +}; + +// Sum reduction for double +struct SumReductionDouble { + using LHS = double; + using RHS = double; + static constexpr RHS identity = 0.0; // ← inside struct, constexpr + + template + static void apply(LHS &lhs, RHS rhs) { + if (EXCLUSIVE) { + lhs += rhs; + } else { + union { + double d; + long long i; + } old_val, new_val; + do { + old_val.d = lhs; + new_val.d = old_val.d + rhs; + } while (!__sync_bool_compare_and_swap( + (long long *)&lhs, old_val.i, new_val.i)); + } + } + + template + static void fold(RHS &rhs1, RHS rhs2) { + if (EXCLUSIVE) { + rhs1 += rhs2; + } else { + union { + double d; + long long i; + } old_val, new_val; + do { + old_val.d = rhs1; + new_val.d = old_val.d + rhs2; + } while (!__sync_bool_compare_and_swap( + (long long *)&rhs1, old_val.i, new_val.i)); + } + } +}; + +// Reduction op IDs — must not conflict with other registered redops +enum SumReductionOpIDs { + REDOP_SUM_FLOAT = 1, + REDOP_SUM_DOUBLE = 2, +}; + +inline Realm::ReductionOpID get_sum_reduction_op_id(DataType dtype) { + switch (dtype) { + case DataType::FLOAT: + return REDOP_SUM_FLOAT; + case DataType::DOUBLE: + return REDOP_SUM_DOUBLE; + default: + PANIC("no sum reduction registered for datatype {}", dtype); + } +} + +} // namespace FlexFlow diff --git a/lib/realm-execution/src/realm-execution/distributed_per_device_op_state_initialization.cc b/lib/realm-execution/src/realm-execution/distributed_per_device_op_state_initialization.cc index 1d517a8fe4..e7d8647b12 100644 --- a/lib/realm-execution/src/realm-execution/distributed_per_device_op_state_initialization.cc +++ b/lib/realm-execution/src/realm-execution/distributed_per_device_op_state_initialization.cc @@ -31,6 +31,7 @@ PerDeviceOpStateBacking perform_distributed_per_device_op_state_initialization( std::unordered_map *> device_state_map; + std::vector completion_events; for (DynamicNodeInvocation const &invocation : dg.invocations) { Realm::Processor target_proc = ctx.map_device_coord_to_processor( assert_unwrap(invocation.node_attrs.device_coord)); @@ -56,6 +57,7 @@ PerDeviceOpStateBacking perform_distributed_per_device_op_state_initialization( precondition); if (completion_event.has_value()) { + completion_events.push_back(completion_event.value()); device_state_map.insert(std::pair{invocation, device_state_ptr}); } else { // Task doesn't require initialization, clean up and don't store result @@ -63,7 +65,9 @@ PerDeviceOpStateBacking perform_distributed_per_device_op_state_initialization( } } - ctx.get_outstanding_events().wait(); + // wait for all init tasks — direct write to *result_ptr happens + // before each init task event fires so result is ready after this + Realm::Event::merge_events(completion_events).wait(); auto deref = [](DeviceSpecificPtr *const &p) { return *p; }; std::unordered_map> diff --git a/lib/realm-execution/src/realm-execution/pcg_instance.cc b/lib/realm-execution/src/realm-execution/pcg_instance.cc index 0ecd02143e..a0653c3c37 100644 --- a/lib/realm-execution/src/realm-execution/pcg_instance.cc +++ b/lib/realm-execution/src/realm-execution/pcg_instance.cc @@ -6,6 +6,7 @@ #include "realm-execution/instance_allocation.h" #include "realm-execution/realm_context.h" #include "realm-execution/tasks/impl/op_task.h" +#include "realm-execution/tasks/realm_reduction.h" #include "realm-execution/tensor_instance_backing.h" #include "task-spec/dynamic_graph/copy_insertion.h" #include "task-spec/dynamic_graph/dynamic_node_invocation.dtg.h" @@ -215,6 +216,46 @@ static Realm::Event spawn_dynamic_node_invocation( precondition); }; + // issue_replicate_bwd lambda + auto issue_replicate_bwd = [&]() { + std::optional output_grad_opt; + for (auto const &[slot, value] : invocation.inputs) { + if (slot.slot_tensor_role == DynamicTensorRole{FwbTensorType::GRADIENT}) { + output_grad_opt = value; + } + } + DynamicValueAttrs output_grad = assert_unwrap(output_grad_opt); + DynamicValueAttrs input_grad = get_only(invocation.outputs).second; + Realm::RegionInstance dst_inst = + tensor_instance_backing.backing.at(input_grad).first; + + Realm::ReductionOpID redop_id = get_sum_reduction_op_id( + assert_unwrap(output_grad.parallel_tensor_shape).data_type); + + // chain reductions sequentially to avoid write races on dst + Realm::Event e = precondition; + for (auto const &[p, m] : assert_unwrap(output_grad.mapping)) { + DynamicValueAttrs replica_key = output_grad; + replica_key.mapping = + bidict{{p, m}}; + replica_key.shard_coord = p; + + Realm::RegionInstance src_inst = + tensor_instance_backing.backing.at(replica_key).first; + + e = ctx.issue_copy(assert_unwrap(output_grad.parallel_tensor_shape), + src_inst, + assert_unwrap(input_grad.parallel_tensor_shape), + dst_inst, + Realm::ProfilingRequestSet{}, + e, + 0, + redop_id, + false); + } + return e; + }; + TrainingOperationAttrs op_attrs = assert_unwrap(invocation.node_attrs.op_attrs); return op_attrs.visit(overload{ @@ -222,11 +263,24 @@ static Realm::Event spawn_dynamic_node_invocation( return pcg_op_attrs.visit(overload{ [&](InputAttrs const &) { return Realm::Event::NO_EVENT; }, [&](WeightAttrs const &) { return Realm::Event::NO_EVENT; }, + [&](ReplicateAttrs const &) { + // this should never be reached since replicate + // goes through TrainingOperationAttrs::ReplicateAttrs + PANIC("unexpected replicate in PCGOperatorAttrs path"); + return Realm::Event::NO_EVENT; + }, [&](auto const &) { return spawn_task(); }, }); }, [&](LossAttrs const &) { return spawn_task(); }, [&](CopyAttrs const &) { return issue_copy(); }, + [&](ReplicateAttrs const &) { + if (invocation.node_attrs.task_type.has_value() && + invocation.node_attrs.task_type.value() == DynamicTaskType::BWD) { + return issue_replicate_bwd(); + } + return issue_copy(); + }, }); } diff --git a/lib/realm-execution/src/realm-execution/realm_context.cc b/lib/realm-execution/src/realm-execution/realm_context.cc index 790c1bd613..a4669bf43e 100644 --- a/lib/realm-execution/src/realm-execution/realm_context.cc +++ b/lib/realm-execution/src/realm-execution/realm_context.cc @@ -161,7 +161,9 @@ Realm::Event Realm::RegionInstance dst_inst, Realm::ProfilingRequestSet const &requests, Realm::Event wait_on, - int priority) { + int priority, + std::optional redop_id, + bool exclusive) { TensorShape src_piece_shape = get_piece_shape(src_shape); TensorShape dst_piece_shape = get_piece_shape(dst_shape); ASSERT(src_piece_shape == dst_piece_shape); // For now, assume they match @@ -183,6 +185,11 @@ Realm::Event size_of_datatype(src_piece_shape.data_type).int_from_positive_int()), /*subfield_offset=*/0); + // set reduction op on dst field if provided + if (redop_id.has_value()) { + dst_field.set_redop(redop_id.value(), /*is_fold=*/false, exclusive); + } + Realm::Event result; switch (src_piece_shape.dims.ff_ordered.num_dims()) { #if REALM_MAX_DIM >= 1 diff --git a/lib/realm-execution/src/realm-execution/tasks/impl/per_device_op_state_init_task.cc b/lib/realm-execution/src/realm-execution/tasks/impl/per_device_op_state_init_task.cc index 753fccf74b..0ea51810e4 100644 --- a/lib/realm-execution/src/realm-execution/tasks/impl/per_device_op_state_init_task.cc +++ b/lib/realm-execution/src/realm-execution/tasks/impl/per_device_op_state_init_task.cc @@ -66,11 +66,17 @@ void per_device_op_state_init_task_body(void const *args, result_state, ctx.get_current_device_idx())}; DeviceSpecificPtr result_device_specific{ ctx.get_current_device_idx(), result_state_ptr}; - spawn_per_device_op_state_init_return_task(ctx, - task_args.origin_proc, - result_device_specific, - task_args.origin_result_ptr, - Realm::Event::NO_EVENT); + + // replace spawn_per_device_op_state_init_return_task with: + // NOTE: SM/TODO: direct write assumes single-node shared address space + // For multi-node, replace with UserEvent trigger pattern + *task_args.origin_result_ptr = result_device_specific; + + // spawn_per_device_op_state_init_return_task(ctx, + // task_args.origin_proc, + // result_device_specific, + // task_args.origin_result_ptr, + // Realm::Event::NO_EVENT); } std::optional spawn_per_device_op_state_init_task( diff --git a/lib/realm-execution/src/realm-execution/tasks/realm_task_registry.cc b/lib/realm-execution/src/realm-execution/tasks/realm_task_registry.cc index e7a8948f8d..acafdf59fd 100644 --- a/lib/realm-execution/src/realm-execution/tasks/realm_task_registry.cc +++ b/lib/realm-execution/src/realm-execution/tasks/realm_task_registry.cc @@ -5,6 +5,7 @@ #include "realm-execution/tasks/impl/op_task.h" #include "realm-execution/tasks/impl/per_device_op_state_init_return_task.h" #include "realm-execution/tasks/impl/per_device_op_state_init_task.h" +#include "realm-execution/tasks/realm_reduction.h" #include "realm-execution/tasks/task_id_t.h" #include "utils/exception.h" @@ -30,9 +31,18 @@ Realm::Event register_task(Realm::Processor::Kind target_kind, Realm::ProfilingRequestSet()); } +static void register_reductions() { + // register sum reduction ops + Realm::Runtime rt = Realm::Runtime::get_runtime(); + rt.register_reduction(REDOP_SUM_FLOAT); + rt.register_reduction(REDOP_SUM_DOUBLE); + // register_reduction is synchronous — no event returned +} + Realm::Event register_all_tasks() { std::vector pending_registrations; + register_reductions(); std::vector init_task_ids = { // Init tasks task_id_t::BATCHNORM_INIT_TASK_ID, diff --git a/lib/realm-execution/test/src/realm-execution/test_op_replicate.cc b/lib/realm-execution/test/src/realm-execution/test_op_replicate.cc new file mode 100644 index 0000000000..632f08d239 --- /dev/null +++ b/lib/realm-execution/test/src/realm-execution/test_op_replicate.cc @@ -0,0 +1,472 @@ +#include "internal/realm_test_utils.h" +#include "kernels/allocation.h" +#include "kernels/compare_tensor_accessors.h" +#include "kernels/copy_tensor_accessor.h" +#include "kernels/format_accessor_contents.h" +#include "kernels/tensor_accessor_reductions.h" +#include "op-attrs/operator_task_space_to_operator_task_space_mapping.h" +#include "op-attrs/ops/element_unary.h" +#include "op-attrs/ops/linear.h" +#include "op-attrs/ops/replicate.h" +#include "op-attrs/parallel_tensor_shape.h" +#include "op-attrs/tensor_shape.dtg.h" +#include "op-attrs/tensor_slot_name.dtg.h" +#include "pcg/device_type.dtg.h" +#include "pcg/machine_space_coordinate.dtg.h" +#include "pcg/mapped_parallel_computation_graph/operator_atomic_task_shard_binding.dtg.h" +#include "pcg/parallel_computation_graph/parallel_computation_graph.h" +#include "pcg/parallel_computation_graph/parallel_computation_graph_builder.h" +#include "pcg/parallel_computation_graph/parallel_layer_guid_t.dtg.h" +#include "pcg/parallel_computation_graph/parallel_tensor_guid_t.dtg.h" +#include "realm-execution/distributed_ff_handle.h" +#include "realm-execution/dynamic_tensor_accessor_from_instance.h" +#include "realm-execution/pcg_instance.h" +#include "realm-execution/realm_context.h" +#include "realm-execution/realm_manager.h" +#include "task-spec/permissions.h" +#include "test/utils/doctest/check_kv.h" +#include "utils/containers/require_only_key.h" +#include + +namespace test { + +using namespace ::FlexFlow; +namespace Realm = ::FlexFlow::Realm; + +template +static ParallelLayerAttrs make_layer_attrs(T const &op_attrs) { + return ParallelLayerAttrs{ + /*op_attrs=*/PCGOperatorAttrs{op_attrs}, + /*name=*/std::nullopt, + }; +}; + +static bool did_loss_decrease(GenericTensorAccessorR const &first_epoch, + GenericTensorAccessorR const &last_epoch, + Allocator &allocator) { + return tensor_accessor_all( + compare_tensor_accessors_le(last_epoch, first_epoch, allocator)); +} + +TEST_SUITE(FF_TEST_SUITE) { + TEST_CASE("RealmBackend e2e Training Replicate Op (CPU Model Parallelism)") { + std::vector fake_args = + make_fake_realm_args(/*num_cpus=*/2_p, /*num_gpus=*/0_n); + int fake_argc = fake_args.size(); + char **fake_argv = fake_args.data(); + + RealmManager manager = RealmManager{&fake_argc, &fake_argv}; + ControllerTaskResult result = + manager.start_controller([](RealmContext &ctx) { + Allocator allocator = ctx.get_current_device_allocator(); + + positive_int batch_size = 10_p; + positive_int data_dim = 16_p; + positive_int hidden_dim = 32_p; + positive_int output_dim = 1_p; + + // 10,2 + TensorShape output_tensor_shape = TensorShape{ + TensorDims{FFOrdered{batch_size, output_dim}}, DataType::FLOAT}; + + // 10,2 + TensorShape label_tensor_shape = TensorShape{ + TensorDims{FFOrdered{batch_size, output_dim}}, DataType::FLOAT}; + + GenericTensorAccessorW label_tensor = + allocator.allocate_tensor(label_tensor_shape); + + // construct computation graph + ParallelComputationGraph pcg = empty_parallel_computation_graph(); + + // input tensor + // 10, 16 + TensorShape input_tensor_shape = TensorShape{ + TensorDims{FFOrdered{batch_size, data_dim}}, DataType::FLOAT}; + + // parallel layer -> input tensor + ParallelLayerAddedResult inputs_layer = + pcg_add_input_layer(pcg, input_tensor_shape); + parallel_tensor_guid_t t_input = + require_only_key(inputs_layer.outputs, TensorSlotName::OUTPUT); + + // parallel layer -> input tensor 2 + ParallelLayerAddedResult inputs_layer_2 = + pcg_add_input_layer(pcg, input_tensor_shape); + parallel_tensor_guid_t t_input_2 = + require_only_key(inputs_layer_2.outputs, TensorSlotName::OUTPUT); + + // binary ADD attribute + ElementBinaryAttrs add_attrs = ElementBinaryAttrs{ + OperatorType::EW_ADD, + DataType::FLOAT, + false, + false, + }; + + // parallel layer -> perform add + ParallelLayerAddedResult add_operator_1 = + add_parallel_layer(pcg, + make_layer_attrs(add_attrs), + { + { + TensorSlotName::LHS_INPUT, + t_input, + }, + { + TensorSlotName::RHS_INPUT, + t_input_2, + }, + }, + {/* weight */}); + + parallel_tensor_guid_t t_add_1 = + require_only_key(add_operator_1.outputs, TensorSlotName::OUTPUT); + + // parallel layer -> perform replicate + const positive_int replicate_degree = 2_p; + ReplicateAttrs repl_attrs = ReplicateAttrs(replicate_degree); + ParallelLayerAddedResult repl_operator_1 = + add_parallel_layer(pcg, + make_layer_attrs(repl_attrs), + { + { + TensorSlotName::INPUT, + t_add_1, + }, + }, + /*weight=*/{}); + // output of replicate layer + parallel_tensor_guid_t t_repl_1 = + require_only_key(repl_operator_1.outputs, TensorSlotName::OUTPUT); + + // parallel layer -> perform RelU + ParallelLayerAddedResult relu_operator_1 = + add_parallel_layer(pcg, + make_layer_attrs(make_relu_attrs()), + /*inputs=*/ + { + { + TensorSlotName::INPUT, + t_repl_1, + }, + }, + /*weights=*/{}); + // output of relu layer + parallel_tensor_guid_t t_relu_1 = + require_only_key(relu_operator_1.outputs, TensorSlotName::OUTPUT); + + // machine + MachineSpaceCoordinate cpu0{0_n, 0_n, DeviceType::CPU}; + MachineSpaceCoordinate cpu1{0_n, 1_n, DeviceType::CPU}; + + ParallelTensorSpaceCoordinate tensor_coord0{ + /* sum_component */ 0_n, + /* discard_copy_component */ 0_n, + /*shard_component*/ FFOrdered{0_n}}; + ParallelTensorSpaceCoordinate tensor_coord1{ + /* sum_component */ 0_n, + /* discard_copy_component */ 1_n, + /*shard_component*/ FFOrdered{0_n}}; + MappedParallelComputationGraph mpcg{ + pcg, + {{inputs_layer.parallel_layer, + MappedOperatorTaskGroup{ + {{cpu0, + OperatorAtomicTaskShardBinding{ + {{TensorSlotName::OUTPUT, tensor_coord0}}}}}}}, + {inputs_layer_2.parallel_layer, + MappedOperatorTaskGroup{ + {{cpu0, + OperatorAtomicTaskShardBinding{ + {{TensorSlotName::OUTPUT, tensor_coord0}}}}}}}, + {add_operator_1.parallel_layer, + MappedOperatorTaskGroup{ + {{cpu0, + OperatorAtomicTaskShardBinding{{ + {TensorSlotName::LHS_INPUT, tensor_coord0}, + {TensorSlotName::RHS_INPUT, tensor_coord0}, + {TensorSlotName::OUTPUT, tensor_coord0}, + }}}}}}, + {repl_operator_1.parallel_layer, + MappedOperatorTaskGroup{{ + {cpu0, + OperatorAtomicTaskShardBinding{{ + {TensorSlotName::OUTPUT, tensor_coord0}, + }}}, + {cpu1, + OperatorAtomicTaskShardBinding{{ + {TensorSlotName::OUTPUT, tensor_coord1}, + }}}, + }}}, + {relu_operator_1.parallel_layer, + MappedOperatorTaskGroup{{ + {cpu0, + OperatorAtomicTaskShardBinding{{ + {TensorSlotName::INPUT, tensor_coord0}, + {TensorSlotName::OUTPUT, tensor_coord0}, + }}}, + {cpu1, + OperatorAtomicTaskShardBinding{{ + {TensorSlotName::INPUT, tensor_coord1}, + {TensorSlotName::OUTPUT, tensor_coord1}, + }}}, + }}}}, + }; + + MappedOperatorTaskGroup loss_mapping{ + {{cpu0, + OperatorAtomicTaskShardBinding{{ + {TensorSlotName::INPUT, tensor_coord0}, + {TensorSlotName::LOGIT, tensor_coord0}, + }}}}}; + + // instantiate computation graph + LossAttrs loss_attrs = LossAttrs{ + NonconfigurableLossAttrs{LossFunction::CATEGORICAL_CROSSENTROPY}}; + OptimizerAttrs optimizer_attrs = + OptimizerAttrs{SGDOptimizerAttrs{/*lr=*/0.001, + /*momentum=*/0.9, + /*nesterov=*/false, + /*weight_decay=*/0.001}}; + + std::unordered_map + input_tensors; + + DistributedFfHandle device_handle = create_distributed_ff_handle( + ctx, + /*workSpaceSize=*/1024 * 1024, + /*allowTensorOpMathConversion=*/true); + PCGInstance pcg_instance = create_pcg_instance( + /*ctx=*/ctx, + /*mpcg=*/mpcg, + /*optimizer=*/optimizer_attrs, + /*loss=*/std::nullopt, + /*input_tensors=*/input_tensors, + /*profiling_settings=*/ProfilingSettings{0, 0}, + /*device_handle=*/device_handle, + /*iteration_config=*/FFIterationConfig{1_p}); + + // begin training loop + int num_epochs = 1; + for (int i = 0; i < num_epochs; i++) { + perform_all_passes_for_pcg_instance( + /*instance=*/pcg_instance, + /*profiling_settings=*/ProfilingSettings{0, 0}, + /*device_handle=*/device_handle, + /*iteration_config=*/FFIterationConfig{1_p}); + } + }); + result.wait(); + } +} + +TEST_SUITE(FF_CUDA_TEST_SUITE) { + TEST_CASE("RealmBackend e2e Training Replicate Op (GPU Model Parallelism)") { + std::vector fake_args = + make_fake_realm_args(/*num_cpus=*/1_p, /*num_gpus=*/2_n); + int fake_argc = fake_args.size(); + char **fake_argv = fake_args.data(); + + RealmManager manager = RealmManager{&fake_argc, &fake_argv}; + + ControllerTaskResult result = + manager.start_controller([](RealmContext &ctx) { + Allocator allocator = ctx.get_current_device_allocator(); + + positive_int batch_size = 10_p; + positive_int data_dim = 16_p; + positive_int hidden_dim = 32_p; + positive_int output_dim = 1_p; + + // 10,2 + TensorShape output_tensor_shape = TensorShape{ + TensorDims{FFOrdered{batch_size, output_dim}}, DataType::FLOAT}; + + // 10,2 + TensorShape label_tensor_shape = TensorShape{ + TensorDims{FFOrdered{batch_size, output_dim}}, DataType::FLOAT}; + + GenericTensorAccessorW label_tensor = + allocator.allocate_tensor(label_tensor_shape); + + // construct computation graph + ParallelComputationGraph pcg = empty_parallel_computation_graph(); + + // input tensor + // 10, 16 + TensorShape input_tensor_shape = TensorShape{ + TensorDims{FFOrdered{batch_size, data_dim}}, DataType::FLOAT}; + + // parallel layer -> input tensor + ParallelLayerAddedResult inputs_layer = + pcg_add_input_layer(pcg, input_tensor_shape); + parallel_tensor_guid_t t_input = + require_only_key(inputs_layer.outputs, TensorSlotName::OUTPUT); + + // parallel layer -> input tensor 2 + ParallelLayerAddedResult inputs_layer_2 = + pcg_add_input_layer(pcg, input_tensor_shape); + parallel_tensor_guid_t t_input_2 = + require_only_key(inputs_layer_2.outputs, TensorSlotName::OUTPUT); + + // binary ADD attribute + ElementBinaryAttrs add_attrs = ElementBinaryAttrs{ + OperatorType::EW_ADD, + DataType::FLOAT, + false, + false, + }; + + // parallel layer -> perform add + ParallelLayerAddedResult add_operator_1 = + add_parallel_layer(pcg, + make_layer_attrs(add_attrs), + { + { + TensorSlotName::LHS_INPUT, + t_input, + }, + { + TensorSlotName::RHS_INPUT, + t_input_2, + }, + }, + {/* weight */}); + + parallel_tensor_guid_t t_add_1 = + require_only_key(add_operator_1.outputs, TensorSlotName::OUTPUT); + + // parallel layer -> perform replicate + const positive_int replicate_degree = 2_p; + ReplicateAttrs repl_attrs = ReplicateAttrs(replicate_degree); + ParallelLayerAddedResult repl_operator_1 = + add_parallel_layer(pcg, + make_layer_attrs(repl_attrs), + { + { + TensorSlotName::INPUT, + t_add_1, + }, + }, + /*weight=*/{}); + // output of replicate layer + parallel_tensor_guid_t t_repl_1 = + require_only_key(repl_operator_1.outputs, TensorSlotName::OUTPUT); + + // parallel layer -> perform RelU + ParallelLayerAddedResult relu_operator_1 = + add_parallel_layer(pcg, + make_layer_attrs(make_relu_attrs()), + /*inputs=*/ + { + { + TensorSlotName::INPUT, + t_repl_1, + }, + }, + /*weights=*/{}); + // output of relu layer + parallel_tensor_guid_t t_relu_1 = + require_only_key(relu_operator_1.outputs, TensorSlotName::OUTPUT); + + // machine + MachineSpaceCoordinate gpu0{0_n, 0_n, DeviceType::GPU}; + MachineSpaceCoordinate gpu1{0_n, 1_n, DeviceType::GPU}; + ParallelTensorSpaceCoordinate tensor_coord0{0_n, 0_n, FFOrdered{0_n}}; + ParallelTensorSpaceCoordinate tensor_coord1{0_n, 1_n, FFOrdered{0_n}}; + MappedParallelComputationGraph mpcg{ + pcg, + { + {inputs_layer.parallel_layer, + MappedOperatorTaskGroup{ + {{gpu0, + OperatorAtomicTaskShardBinding{ + {{TensorSlotName::OUTPUT, tensor_coord0}}}}}}}, + {inputs_layer_2.parallel_layer, + MappedOperatorTaskGroup{ + {{gpu0, + OperatorAtomicTaskShardBinding{ + {{TensorSlotName::OUTPUT, tensor_coord0}}}}}}}, + {add_operator_1.parallel_layer, + MappedOperatorTaskGroup{ + {{gpu0, + OperatorAtomicTaskShardBinding{{ + {TensorSlotName::LHS_INPUT, tensor_coord0}, + {TensorSlotName::RHS_INPUT, tensor_coord0}, + {TensorSlotName::OUTPUT, tensor_coord0}, + }}}}}}, + {repl_operator_1.parallel_layer, + MappedOperatorTaskGroup{ + {{gpu0, + OperatorAtomicTaskShardBinding{{ + {TensorSlotName::OUTPUT, tensor_coord0}, + }}}, + {gpu1, + OperatorAtomicTaskShardBinding{{ + {TensorSlotName::OUTPUT, tensor_coord1}, + }}}}}}, + {relu_operator_1.parallel_layer, + MappedOperatorTaskGroup{{ + {gpu0, + OperatorAtomicTaskShardBinding{{ + {TensorSlotName::INPUT, tensor_coord0}, + {TensorSlotName::OUTPUT, tensor_coord0}, + }}}, + {gpu1, + OperatorAtomicTaskShardBinding{{ + {TensorSlotName::INPUT, tensor_coord1}, + {TensorSlotName::OUTPUT, tensor_coord1}, + }}}, + }}}, + }, + }; + + MappedOperatorTaskGroup loss_mapping{ + {{gpu0, + OperatorAtomicTaskShardBinding{{ + {TensorSlotName::INPUT, tensor_coord0}, + {TensorSlotName::LOGIT, tensor_coord0}, + }}}}}; + + // instantiate computation graph + LossAttrs loss_attrs = LossAttrs{ + NonconfigurableLossAttrs{LossFunction::CATEGORICAL_CROSSENTROPY}}; + OptimizerAttrs optimizer_attrs = + OptimizerAttrs{SGDOptimizerAttrs{/*lr=*/0.001, + /*momentum=*/0.9, + /*nesterov=*/false, + /*weight_decay=*/0.001}}; + + std::unordered_map + input_tensors; + + DistributedFfHandle device_handle = create_distributed_ff_handle( + ctx, + /*workSpaceSize=*/1024 * 1024, + /*allowTensorOpMathConversion=*/true); + + PCGInstance pcg_instance = create_pcg_instance( + /*ctx=*/ctx, + /*mpcg=*/mpcg, + /*optimizer=*/optimizer_attrs, + /*loss=*/std::nullopt, + /*input_tensors=*/input_tensors, + /*profiling_settings=*/ProfilingSettings{0, 0}, + /*device_handle=*/device_handle, + /*iteration_config=*/FFIterationConfig{1_p}); + + // begin training loop + int num_epochs = 1; + for (int i = 0; i < num_epochs; i++) { + perform_all_passes_for_pcg_instance( + /*instance=*/pcg_instance, + /*profiling_settings=*/ProfilingSettings{0, 0}, + /*device_handle=*/device_handle, + /*iteration_config=*/FFIterationConfig{1_p}); + } + }); + result.wait(); + } +} +} // namespace test diff --git a/lib/task-spec/include/task-spec/dynamic_graph/training_operation_attrs.dtg.toml b/lib/task-spec/include/task-spec/dynamic_graph/training_operation_attrs.dtg.toml index 8f8f6467c8..2bd0714512 100644 --- a/lib/task-spec/include/task-spec/dynamic_graph/training_operation_attrs.dtg.toml +++ b/lib/task-spec/include/task-spec/dynamic_graph/training_operation_attrs.dtg.toml @@ -25,3 +25,7 @@ key = "loss" [[values]] type = "::FlexFlow::CopyAttrs" key = "copy" + +[[values]] +type = "::FlexFlow::ReplicateAttrs" +key = "replicate" diff --git a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc index 4c1b9d4609..7a28e254aa 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/copy_insertion.cc @@ -25,15 +25,43 @@ bool node_is_copy(DynamicNodeAttrs const &n) { return n.op_attrs.has_value() && n.op_attrs.value().is_copy(); } +static bool is_replicate_invocation(DynamicNodeInvocation const &i) { + if (!i.node_attrs.op_attrs.has_value()) { + return false; + } + TrainingOperationAttrs const &op_attrs = i.node_attrs.op_attrs.value(); + if (op_attrs.is_replicate()) { + return true; + } + return false; +} + bool value_is_mapped(DynamicValueAttrs const &n) { return n.mapping.has_value(); } bool no_part_of_graph_is_copy_inserted(DynamicOpenDataflowGraph const &g) { auto slot_is_mapped = [](DynamicTensorSlot const &) -> bool { return false; }; - - return no_part_of_dynamic_graph_satisfies( - g, node_is_copy, value_is_mapped, slot_is_mapped); + // check all non-replicate invocations + for (DynamicNodeInvocation const &i : g.invocations) { + if (is_replicate_invocation(i)) { + continue; // replicate tensors have mapping set by design + } + if (node_is_copy(i.node_attrs)) { + return false; + } + for (auto const &[slot, value] : i.inputs) { + if (value_is_mapped(value)) { + return false; + } + } + for (auto const &[slot, value] : i.outputs) { + if (value_is_mapped(value)) { + return false; + } + } + } + return true; } bool graph_is_fully_copy_inserted(DynamicOpenDataflowGraph const &g) { @@ -85,6 +113,11 @@ std::unordered_set perform_copy_insertion_for_invocation( std::unordered_map const &unmapped_value_to_mapped_source_value) { + // replicate nodes have no MappedOperatorTaskGroup — + // pass through unchanged, no copies needed + if (is_replicate_invocation(i)) { + return {i}; + } MappedOperatorTaskGroup mapping = assert_unwrap(i.node_attrs.mapping); auto map_tensor = [&](DynamicTensorSlot const &slot, @@ -157,6 +190,14 @@ DynamicOpenDataflowGraph std::unordered_map unmapped_value_to_mapped_source_value; for (DynamicNodeInvocation const &i : g.invocations) { + // replicate nodes have no MappedOperatorTaskGroup — + // output mapping already fully set, maps to itself + if (is_replicate_invocation(i)) { + for (auto const &[slot, value] : i.outputs) { + unmapped_value_to_mapped_source_value.insert(std::pair{value, value}); + } + continue; + } for (auto const &[slot, value] : i.outputs) { unmapped_value_to_mapped_source_value.insert( std::pair{value, diff --git a/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_mapped_pcg.cc b/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_mapped_pcg.cc index 246f9a3242..3d48a0dc2b 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_mapped_pcg.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/make_dynamic_open_dataflow_graph_from_mapped_pcg.cc @@ -7,11 +7,129 @@ #include "task-spec/dynamic_graph/dynamic_open_dataflow_graph.h" #include "task-spec/dynamic_graph/dynamic_tensor_role.h" #include "utils/containers/generate_map.h" +#include "utils/containers/get_only.h" #include #include #include namespace FlexFlow { +static bidict + get_input_mapping_for_replicate( + MappedParallelComputationGraph const &mpcg, + parallel_layer_guid_t const &replicate_layer) { + + auto [input_slot_name, input_tensor_guid] = + get_only(get_incoming_tensors(mpcg.pcg, replicate_layer)); + + // find the layer that produces this tensor + for (auto const &[layer, _] : get_parallel_layer_attrs_mapping(mpcg.pcg)) { + for (auto const &[slot_name, t] : get_outgoing_tensors(mpcg.pcg, layer)) { + if (t == input_tensor_guid) { + MappedOperatorTaskGroup producer_mapping = mpcg.mapped_tasks.at(layer); + return get_tensor_bindings_for_slot_name(producer_mapping, slot_name); + } + } + } + + PANIC("could not find producer of replicate layer input tensor"); +} + +static std::unordered_map + get_consumers_of_tensor(MappedParallelComputationGraph const &mpcg, + parallel_tensor_guid_t const &tensor) { + std::unordered_map result; + for (auto const &[layer, _] : get_parallel_layer_attrs_mapping(mpcg.pcg)) { + for (auto const &[slot_name, t] : get_incoming_tensors(mpcg.pcg, layer)) { + if (t == tensor) { + result.insert({layer, slot_name}); + } + } + } + return result; +} + +static bidict + build_replicated_output_mapping( + MappedParallelComputationGraph const &mpcg, + parallel_layer_guid_t const &replicate_layer) { + + auto [output_slot_name, output_tensor_guid] = + get_only(get_outgoing_tensors(mpcg.pcg, replicate_layer)); + + auto consumers = get_consumers_of_tensor(mpcg, output_tensor_guid); + ASSERT(!consumers.empty()); + + // union all consumer bindings — each consumer shard maps to a distinct + // (discard_copy, machine) pair since replicas are always on different machines + bidict result; + for (auto const &[consumer_layer, slot_name] : consumers) { + MappedOperatorTaskGroup consumer_mapping = + mpcg.mapped_tasks.at(consumer_layer); + bidict binding = + get_tensor_bindings_for_slot_name(consumer_mapping, slot_name); + for (auto const &[p, m] : binding) { + result.equate(p, m); + } + } + return result; +} + +static DynamicNodeInvocation + build_replicate_invocation(parallel_layer_guid_t const &layer, + ParallelLayerAttrs const &attrs, + MappedParallelComputationGraph const &mpcg) { + auto [input_slot_name, input_tensor_guid] = + get_only(get_incoming_tensors(mpcg.pcg, layer)); + auto incoming = get_incoming_tensors(mpcg.pcg, layer); + ASSERT(!incoming.empty(), + "replicate layer has no incoming tensors — " + "check PCG edge construction in test"); + + ParallelTensorAttrs input_attrs = + get_parallel_tensor_attrs(mpcg.pcg, input_tensor_guid); + bidict input_mapping = + get_input_mapping_for_replicate(mpcg, layer); + + DynamicValueAttrs input_value{ + /*tensor_guid=*/dynamic_tensor_guid_t{input_tensor_guid}, + /*parallel_tensor_shape=*/input_attrs.shape, + /*shard_coord=*/std::nullopt, + /*mapping=*/get_input_mapping_for_replicate(mpcg, layer), + /*accessor=*/std::nullopt, + /*role=*/std::nullopt, + }; + + auto [output_slot_name, output_tensor_guid] = + get_only(get_outgoing_tensors(mpcg.pcg, layer)); + ParallelTensorAttrs output_attrs = + get_parallel_tensor_attrs(mpcg.pcg, output_tensor_guid); + + DynamicValueAttrs output_value{ + /*tensor_guid=*/dynamic_tensor_guid_t{output_tensor_guid}, + /*parallel_tensor_shape=*/output_attrs.shape, + /*shard_coord=*/std::nullopt, + /*mapping=*/build_replicated_output_mapping(mpcg, layer), + /*accessor=*/std::nullopt, + /*role=*/std::nullopt, + }; + DynamicNodeAttrs node_attrs{ + /*task_type=*/std::nullopt, + /*device_coord=*/std::nullopt, + /*mapping=*/std::nullopt, + /*op_attrs=*/TrainingOperationAttrs{attrs.op_attrs.get()}, + /*pcg_layer_guid=*/dynamic_layer_guid_t{layer}, + /*per_device_op_state=*/std::nullopt, + }; + + DynamicNodeInvocation invocation_node{ + /*inputs=*/{ + {DynamicTensorSlot{input_slot_name, std::nullopt}, input_value}}, + /*node_attrs=*/node_attrs, + /*outputs=*/ + {{DynamicTensorSlot{output_slot_name, std::nullopt}, output_value}}, + }; + return invocation_node; +} DynamicOpenDataflowGraph make_dynamic_open_dataflow_graph_from_mapped_pcg( MappedParallelComputationGraph const &mpcg) { @@ -19,6 +137,15 @@ DynamicOpenDataflowGraph make_dynamic_open_dataflow_graph_from_mapped_pcg( for (auto const &[layer, attrs] : get_parallel_layer_attrs_mapping(mpcg.pcg)) { + + if (attrs.op_attrs.has()) { + // build replicate invocation + DynamicNodeInvocation repl_inv = + build_replicate_invocation(layer, attrs, mpcg); + result.invocations.emplace(repl_inv); + continue; + } + DynamicNodeAttrs result_attrs{ /*task_type=*/std::nullopt, /*device_coord=*/std::nullopt, diff --git a/lib/task-spec/src/task-spec/dynamic_graph/pass_expansion.cc b/lib/task-spec/src/task-spec/dynamic_graph/pass_expansion.cc index 0cee06368f..aed5f2c4c3 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/pass_expansion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/pass_expansion.cc @@ -4,6 +4,7 @@ #include "utils/containers/are_all_same.h" #include "utils/containers/merge_disjoint_maps.h" #include "utils/containers/transform.h" +#include "utils/containers/get_only.h" namespace FlexFlow { @@ -109,6 +110,44 @@ DynamicNodeInvocation perform_bwd_pass_expansion_for_invocation( transform(invocation.inputs, to_grad), }; } +static std::unordered_set + perform_pass_expansion_for_replicate( + DynamicNodeInvocation const &invocation) { + + auto const &[input_slot, input] = get_only(invocation.inputs); + auto const &[output_slot, output] = get_only(invocation.outputs); + + // forward: INPUT/FWD → OUTPUT/FWD (copy to replicas) + DynamicNodeInvocation fwd{ + /*inputs=*/{{pass_expand_slot(input_slot, FwbTensorType::FORWARD), + pass_expand_value(input, FwbTensorType::FORWARD)}}, + /*node_attrs=*/ + pass_expand_node(invocation.node_attrs, DynamicTaskType::FWD), + /*outputs=*/ + {{pass_expand_slot(output_slot, FwbTensorType::FORWARD), + pass_expand_value(output, FwbTensorType::FORWARD)}}, + }; + + // backward: OUTPUT/FWD + OUTPUT/GRAD → INPUT/GRAD (reduce gradients) + // The backward node needs the mapping from the output (replicated) + // so it knows which replicas to reduce from + DynamicNodeAttrs bwd_node_attrs = invocation.node_attrs; + bwd_node_attrs.task_type = DynamicTaskType::BWD; + + DynamicNodeInvocation bwd{ + /*inputs=*/{ + {pass_expand_slot(output_slot, FwbTensorType::FORWARD), + pass_expand_value(output, FwbTensorType::FORWARD)}, + {pass_expand_slot(output_slot, FwbTensorType::GRADIENT), + pass_expand_value(output, FwbTensorType::GRADIENT)}, + }, + /*node_attrs=*/bwd_node_attrs, + /*outputs=*/ + {{pass_expand_slot(input_slot, FwbTensorType::GRADIENT), + pass_expand_value(input, FwbTensorType::GRADIENT)}}, + }; + return {fwd, bwd}; +} DynamicOpenDataflowGraph perform_pass_expansion(DynamicOpenDataflowGraph const &g) { @@ -117,6 +156,10 @@ DynamicOpenDataflowGraph DynamicOpenDataflowGraph result = flatmap_dynamic_invocation_set( g, [](DynamicNodeInvocation const &invocation) { + if (invocation.node_attrs.op_attrs.has_value() && + invocation.node_attrs.op_attrs.value().is_replicate()) { + return perform_pass_expansion_for_replicate(invocation); + } if (invocation.inputs.empty()) { return std::unordered_set{ perform_fwd_pass_expansion_for_invocation(invocation), diff --git a/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc b/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc index fb6efb96d0..f30a4d8470 100644 --- a/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc +++ b/lib/task-spec/src/task-spec/dynamic_graph/shard_expansion.cc @@ -39,7 +39,6 @@ bool graph_is_fully_shard_expanded(DynamicOpenDataflowGraph const &g) { value_is_shard_expanded, slot_is_shard_expanded); } - static bidict restrict_tensor_mapping_keys_to_coord( bidict const @@ -85,6 +84,114 @@ static DynamicNodeInvocation shard_invocation_for_binding( }; } +static std::unordered_set + perform_shard_expansion_for_replicate(DynamicNodeInvocation const &i) { + auto const &[input_slot, input] = get_only(i.inputs); + auto const &[output_slot, output] = get_only(i.outputs); + + bidict input_mapping = + assert_unwrap(input.mapping); + bidict output_mapping = + assert_unwrap(output.mapping); + + return transform(output_mapping.left_values(), + [&](ParallelTensorSpaceCoordinate const &p) { + ParallelTensorSpaceCoordinate input_p{ + /*sum_component=*/p.sum_component, + /*discard_copy_component=*/nonnegative_int{0}, + /*shard_components=*/p.shard_components, + }; + return shard_invocation_for_binding( + i, + output_mapping.at_l(p), + OperatorAtomicTaskShardBinding{{ + {input_slot.slot_name, input_p}, + {output_slot.slot_name, p}, + }}); + }); +} + +static std::unordered_set + perform_shard_expansion_for_replicate_bwd(DynamicNodeInvocation const &i) { + + std::optional output_grad_opt; + std::optional output_fwd_opt; + std::optional output_grad_slot_opt; + std::optional output_fwd_slot_opt; + + for (auto const &[slot, value] : i.inputs) { + if (slot.slot_tensor_role == DynamicTensorRole{FwbTensorType::GRADIENT}) { + output_grad_slot_opt = slot; + output_grad_opt = value; + } else { + output_fwd_slot_opt = slot; + output_fwd_opt = value; + } + } + + DynamicValueAttrs output_grad = assert_unwrap(output_grad_opt); + DynamicValueAttrs output_fwd = assert_unwrap(output_fwd_opt); + DynamicTensorSlot output_grad_slot = assert_unwrap(output_grad_slot_opt); + DynamicTensorSlot output_fwd_slot = assert_unwrap(output_fwd_slot_opt); + auto const &[input_grad_slot, input_grad] = get_only(i.outputs); + + bidict + output_grad_mapping = assert_unwrap(output_grad.mapping); + bidict + input_grad_mapping = assert_unwrap(input_grad.mapping); + + std::unordered_map, + std::unordered_set> + by_shard; + for (auto const &p : output_grad_mapping.left_values()) { + by_shard[p.shard_components].insert(p); + } + + std::unordered_set result; + for (auto const &[shard_components, replica_coords] : by_shard) { + ParallelTensorSpaceCoordinate src_p{ + nonnegative_int{0}, nonnegative_int{0}, shard_components}; + MachineSpaceCoordinate src_machine = input_grad_mapping.at_l(src_p); + + bidict + replica_mapping; + for (auto const &p : replica_coords) { + replica_mapping.equate(p, output_grad_mapping.at_l(p)); + } + + DynamicValueAttrs sharded_output_grad = output_grad; + sharded_output_grad.mapping = replica_mapping; + sharded_output_grad.shard_coord = src_p; + + DynamicValueAttrs sharded_output_fwd = output_fwd; + sharded_output_fwd.mapping = replica_mapping; + sharded_output_fwd.shard_coord = src_p; + + DynamicValueAttrs sharded_input_grad = input_grad; + sharded_input_grad.mapping = + bidict{ + {src_p, src_machine}}; + sharded_input_grad.shard_coord = src_p; + + DynamicNodeAttrs sharded_node = i.node_attrs; + sharded_node.device_coord = src_machine; + + result.insert(DynamicNodeInvocation{ + /*inputs=*/{ + {output_fwd_slot, sharded_output_fwd}, + {output_grad_slot, sharded_output_grad}, + }, + /*node_attrs=*/sharded_node, + /*outputs=*/ + { + {input_grad_slot, sharded_input_grad}, + }, + }); + } + return result; +} + + static std::unordered_set perform_shard_expansion_for_copy(DynamicNodeInvocation const &i) { auto [input_slot, input] = get_only(i.inputs); @@ -121,6 +228,22 @@ std::unordered_set return perform_shard_expansion_for_copy(i); } + // forward replicate + if (i.node_attrs.op_attrs.has_value() && + i.node_attrs.op_attrs.value().is_replicate() && + i.node_attrs.task_type.has_value() && + i.node_attrs.task_type.value() == DynamicTaskType::FWD) { + return perform_shard_expansion_for_replicate(i); + } + + // backward replicate + if (i.node_attrs.op_attrs.has_value() && + i.node_attrs.op_attrs.value().is_replicate() && + i.node_attrs.task_type.has_value() && + i.node_attrs.task_type.value() == DynamicTaskType::BWD) { + return perform_shard_expansion_for_replicate_bwd(i); + } + MappedOperatorTaskGroup mapping = assert_unwrap(i.node_attrs.mapping); std::unordered_set shard_machine_coords = diff --git a/lib/task-spec/src/task-spec/ops/impl/element_binary.cc b/lib/task-spec/src/task-spec/ops/impl/element_binary.cc index 13465d7a5f..c8460af538 100644 --- a/lib/task-spec/src/task-spec/ops/impl/element_binary.cc +++ b/lib/task-spec/src/task-spec/ops/impl/element_binary.cc @@ -36,8 +36,8 @@ static std::optional forward_task_impl(TaskArgumentAccessor const &acc) { ProfilingSettings profiling = acc.get_profiling_settings(); DeviceType kernel_device_type = acc.get_kernel_device_type(); - ElementBinaryPerDeviceState per_device_state = - acc.get_per_device_op_state().require_element_binary().value(); + std::optional per_device_state = + acc.get_per_device_op_state().require_element_binary(); ElementBinaryAttrs attrs = acc.get_op_attrs().require_element_binary(); device_handle_t handle = acc.get_ff_handle(); @@ -62,8 +62,8 @@ static std::optional backward_task_impl(TaskArgumentAccessor const &acc) { ProfilingSettings profiling = acc.get_profiling_settings(); DeviceType kernel_device_type = acc.get_kernel_device_type(); - ElementBinaryPerDeviceState per_device_state = - acc.get_per_device_op_state().require_element_binary().value(); + std::optional per_device_state = + acc.get_per_device_op_state().require_element_binary(); ElementBinaryAttrs attrs = acc.get_op_attrs().require_element_binary(); device_handle_t handle = acc.get_ff_handle(); diff --git a/lib/task-spec/src/task-spec/ops/impl/element_unary.cc b/lib/task-spec/src/task-spec/ops/impl/element_unary.cc index d66ff9ab8d..9a092b90b8 100644 --- a/lib/task-spec/src/task-spec/ops/impl/element_unary.cc +++ b/lib/task-spec/src/task-spec/ops/impl/element_unary.cc @@ -35,8 +35,8 @@ static std::optional ProfilingSettings profiling = acc.get_profiling_settings(); DeviceType kernel_device_type = acc.get_kernel_device_type(); - ElementUnaryPerDeviceState per_device_state = - acc.get_per_device_op_state().require_element_unary().value(); + std::optional per_device_state = + acc.get_per_device_op_state().require_element_unary(); return profile(forward_kernel, profiling, @@ -62,8 +62,8 @@ static std::optional ProfilingSettings profiling = acc.get_profiling_settings(); DeviceType kernel_device_type = acc.get_kernel_device_type(); - ElementUnaryPerDeviceState per_device_state = - acc.get_per_device_op_state().require_element_unary().value(); + std::optional per_device_state = + acc.get_per_device_op_state().require_element_unary(); return profile(backward_kernel, profiling,