From 8e060812e5084ef9b4a56cf529e1ef9fe4a32fc6 Mon Sep 17 00:00:00 2001 From: Grejdi Gjura Date: Tue, 27 Sep 2022 21:59:03 -0400 Subject: [PATCH] fix: processing of ODS loads without snapshot load --- .../ex_cubic_ingestion/schema/cubic_load.ex | 86 ++++++---- .../schema/cubic_load_test.exs | 161 +++++++++++------- .../start_ingestion_test.exs | 54 +++++- 3 files changed, 201 insertions(+), 100 deletions(-) diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_load.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_load.ex index 5e2c4c3..6716f4b 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_load.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_load.ex @@ -21,6 +21,7 @@ defmodule ExCubicIngestion.Schema.CubicLoad do alias Ecto.Changeset alias ExCubicIngestion.Repo + alias ExCubicIngestion.Schema.CubicOdsTableSnapshot alias ExCubicIngestion.Schema.CubicTable @derive {Jason.Encoder, @@ -138,45 +139,14 @@ defmodule ExCubicIngestion.Schema.CubicLoad do @doc """ Get loads with the 'ready' status by getting all the active tables and - querying for the first {limit} loads by table. For ODS, because there might be a - Qlik restart between the 'ready' and 'ingesting' status, we should check to make - sure we only start since the last snapshot load's s3_modified. Because of this - logic, some 'ready' loads will be left behind. That's because they will be - pointing to non-existing objects (Note: Qlik removes all objects on a restart). + querying for loads by table. """ - @spec get_status_ready(integer()) :: [t()] - def get_status_ready(limit \\ 100) do + @spec get_status_ready :: [t()] + def get_status_ready do # we need to get 'ready' loads only for active tables CubicTable.all_with_ods_table_snapshot() - |> Enum.map(fn {table_rec, ods_table_snapshot_rec} -> - # get the last load with snapshot key - last_snapshot_load_rec = - if not is_nil(ods_table_snapshot_rec) do - Repo.one( - from(load in not_deleted(), - where: load.s3_key == ^ods_table_snapshot_rec.snapshot_s3_key, - order_by: [desc: load.s3_modified], - limit: 1 - ) - ) - end - - # query for 'ready' loads, with limit - ready_loads = - from(load in not_deleted(), - where: load.status == "ready" and load.table_id == ^table_rec.id, - order_by: [load.s3_modified, load.s3_key], - limit: ^limit - ) - - # if we know the last snapshot load, then filter down to 'ready' loads - # after the snapshot - if is_nil(last_snapshot_load_rec) do - ready_loads - else - from(load in ready_loads, where: load.s3_modified >= ^last_snapshot_load_rec.s3_modified) - end - end) + |> Enum.map(&get_status_ready_for_table_query(&1)) + |> Enum.filter(&(!is_nil(&1))) |> Enum.flat_map(&Repo.all(&1)) end @@ -249,4 +219,48 @@ defmodule ExCubicIngestion.Schema.CubicLoad do DateTime.truncate(datetime_with_msec, :second) end + + @doc """ + Construct query for loads with the ready status, filtered by the table and ordered by + the S3 modified and key values. For tables that are ODS, because there might be a + Qlik restart between the 'ready' and 'ingesting' status, we should check to make + sure we only start since the last snapshot load's s3_modified. Because of this + logic, some 'ready' loads will be left behind. That's because they will be + pointing to non-existing objects (Note: Qlik removes all objects on a restart). + Lastly, if no snapshot load exists, just return nil as loads may have not come + through yet. + """ + @spec get_status_ready_for_table_query({CubicTable.t(), CubicOdsTableSnapshot.t()}, integer()) :: + Ecto.Queryable.t() | nil + def get_status_ready_for_table_query(table_and_ods_table_snapshot, limit \\ 100) + + def get_status_ready_for_table_query({table_rec, nil}, limit) do + from(load in not_deleted(), + where: load.status == "ready" and load.table_id == ^table_rec.id, + order_by: [load.s3_modified, load.s3_key], + limit: ^limit + ) + end + + def get_status_ready_for_table_query({table_rec, ods_table_snapshot_rec}, limit) do + # get the last load with snapshot key + last_snapshot_load_rec = + Repo.one( + from(load in not_deleted(), + where: load.s3_key == ^ods_table_snapshot_rec.snapshot_s3_key, + order_by: [desc: load.s3_modified], + limit: 1 + ) + ) + + if not is_nil(last_snapshot_load_rec) do + from(load in not_deleted(), + where: + load.status == "ready" and load.table_id == ^table_rec.id and + load.s3_modified >= ^last_snapshot_load_rec.s3_modified, + order_by: [load.s3_modified, load.s3_key], + limit: ^limit + ) + end + end end diff --git a/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_load_test.exs b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_load_test.exs index e6a101d..55c5984 100644 --- a/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_load_test.exs +++ b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_load_test.exs @@ -112,7 +112,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do end describe "get_status_ready/0" do - test "getting non-ODS 'ready' loads", %{ + test "getting only 'ready' loads after updating one to 'archived'", %{ table: table, load_objects: load_objects } do @@ -126,62 +126,6 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do # assert that the last record inserted comes back assert rest_new_load_recs == CubicLoad.get_status_ready() end - - test "getting ODS 'ready' loads" do - # insert ODS table and snapshot - ods_table = - Repo.insert!(%CubicTable{ - name: "cubic_ods_qlik__sample", - s3_prefix: "cubic/ods_qlik/SAMPLE/", - is_raw: true - }) - - ods_snapshot_s3_key = "cubic/ods_qlik/SAMPLE/LOAD1.csv.gz" - ods_snapshot = ~U[2022-01-01 20:49:50Z] - - Repo.insert!(%CubicOdsTableSnapshot{ - table_id: ods_table.id, - snapshot: ods_snapshot, - snapshot_s3_key: ods_snapshot_s3_key - }) - - # insert loads - ods_load = - Repo.insert!(%CubicLoad{ - table_id: ods_table.id, - status: "ready", - s3_key: ods_snapshot_s3_key, - s3_modified: ods_snapshot, - s3_size: 197, - is_raw: true - }) - - Repo.insert!(%CubicLoad{ - table_id: ods_table.id, - status: "ready", - s3_key: "cubic/ods_qlik/SAMPLE/LOAD2.csv.gz", - s3_modified: ~U[2022-01-02 20:49:50Z], - s3_size: 197, - is_raw: true - }) - - # only get the first load because of limit - assert [ods_load] == CubicLoad.get_status_ready(1) - - # add new snapshot load - new_ods_load = - Repo.insert!(%CubicLoad{ - table_id: ods_table.id, - status: "ready", - s3_key: "cubic/ods_qlik/SAMPLE/LOAD1.csv.gz", - s3_modified: ~U[2022-01-03 20:49:50Z], - s3_size: 197, - is_raw: true - }) - - # ignoring loads prior to this last snapshot load - assert [new_ods_load] == CubicLoad.get_status_ready() - end end describe "all_by_status_in/2" do @@ -296,4 +240,107 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do Enum.map(new_load_rec_ids, &CubicLoad.get!(&1).status) end end + + describe "get_status_ready_for_table_query/3" do + test "getting non-ODS 'ready' loads", %{ + table: table, + load_objects: load_objects + } do + {:ok, new_load_recs} = CubicLoad.insert_new_from_objects_with_table(load_objects, table) + + assert new_load_recs == Repo.all(CubicLoad.get_status_ready_for_table_query({table, nil})) + end + + test "getting ODS 'ready' loads" do + # insert ODS table and snapshot + ods_table = + Repo.insert!(%CubicTable{ + name: "cubic_ods_qlik__sample", + s3_prefix: "cubic/ods_qlik/SAMPLE/", + is_raw: true + }) + + ods_snapshot_s3_key = "cubic/ods_qlik/SAMPLE/LOAD1.csv.gz" + ods_snapshot = ~U[2022-01-01 20:49:50Z] + + ods_table_snapshot = + Repo.insert!(%CubicOdsTableSnapshot{ + table_id: ods_table.id, + snapshot: ods_snapshot, + snapshot_s3_key: ods_snapshot_s3_key + }) + + # insert loads + ods_load_1 = + Repo.insert!(%CubicLoad{ + table_id: ods_table.id, + status: "ready", + s3_key: ods_snapshot_s3_key, + s3_modified: ods_snapshot, + s3_size: 197, + is_raw: true + }) + + Repo.insert!(%CubicLoad{ + table_id: ods_table.id, + status: "ready", + s3_key: "cubic/ods_qlik/SAMPLE/LOAD2.csv.gz", + s3_modified: ~U[2022-01-02 20:49:50Z], + s3_size: 197, + is_raw: true + }) + + # only get the first load because of limit + assert [ods_load_1] == + Repo.all( + CubicLoad.get_status_ready_for_table_query({ods_table, ods_table_snapshot}, 1) + ) + + # add new snapshot load + new_ods_load = + Repo.insert!(%CubicLoad{ + table_id: ods_table.id, + status: "ready", + s3_key: ods_snapshot_s3_key, + s3_modified: ~U[2022-01-03 20:49:50Z], + s3_size: 197, + is_raw: true + }) + + # ignoring loads prior to this last snapshot load + assert [new_ods_load] == + Repo.all( + CubicLoad.get_status_ready_for_table_query({ods_table, ods_table_snapshot}) + ) + end + + test "no query available because no snapshot load" do + # insert ODS table and snapshot + ods_table = + Repo.insert!(%CubicTable{ + name: "cubic_ods_qlik__sample", + s3_prefix: "cubic/ods_qlik/SAMPLE/", + is_raw: true + }) + + ods_table_snapshot = + Repo.insert!(%CubicOdsTableSnapshot{ + table_id: ods_table.id, + snapshot: ~U[2022-01-01 20:49:50Z], + snapshot_s3_key: "cubic/ods_qlik/SAMPLE/LOAD1.csv.gz" + }) + + # insert a non-snapshot load + Repo.insert!(%CubicLoad{ + table_id: ods_table.id, + status: "ready", + s3_key: "cubic/ods_qlik/SAMPLE/LOAD2.csv.gz", + s3_modified: ~U[2022-01-02 20:49:50Z], + s3_size: 197, + is_raw: true + }) + + assert is_nil(CubicLoad.get_status_ready_for_table_query({ods_table, ods_table_snapshot})) + end + end end diff --git a/ex_cubic_ingestion/test/ex_cubic_ingestion/start_ingestion_test.exs b/ex_cubic_ingestion/test/ex_cubic_ingestion/start_ingestion_test.exs index 20125dd..e83b6d1 100644 --- a/ex_cubic_ingestion/test/ex_cubic_ingestion/start_ingestion_test.exs +++ b/ex_cubic_ingestion/test/ex_cubic_ingestion/start_ingestion_test.exs @@ -53,8 +53,6 @@ defmodule ExCubicIngestion.StartIngestionTest do s3_size: 197 }) - dmap_load_id = dmap_load.id - ods_load = Repo.insert!(%CubicLoad{ table_id: ods_table.id, @@ -64,20 +62,62 @@ defmodule ExCubicIngestion.StartIngestionTest do s3_size: 197 }) - ods_load_id = ods_load.id - :ok = StartIngestion.run() # snapshot was updated assert CubicOdsTableSnapshot.get_by!(%{table_id: ods_table.id}).snapshot == ods_snapshot # status was updated - assert CubicLoad.get!(dmap_load_id).status == "ingesting" + assert CubicLoad.get!(dmap_load.id).status == "ingesting" - assert CubicLoad.get!(ods_load_id).status == "ingesting" + assert CubicLoad.get!(ods_load.id).status == "ingesting" # job have been queued - assert_enqueued(worker: Ingest, args: %{load_rec_ids: [dmap_load_id, ods_load_id]}) + assert_enqueued(worker: Ingest, args: %{load_rec_ids: [dmap_load.id, ods_load.id]}) + end + + test "ignoring ODS loads without snapshots" do + dmap_table = + Repo.insert!(%CubicTable{ + name: "cubic_dmap__sample", + s3_prefix: "cubic/dmap/sample/" + }) + + ods_table = + Repo.insert!(%CubicTable{ + name: "cubic_ods_qlik__sample", + s3_prefix: "cubic/ods_qlik/SAMPLE/" + }) + + # insert ODS table + Repo.insert!(%CubicOdsTableSnapshot{ + table_id: ods_table.id, + snapshot: nil, + snapshot_s3_key: "cubic/ods_qlik/SAMPLE/LOAD1.csv.gz" + }) + + # insert loads + dmap_load = + Repo.insert!(%CubicLoad{ + table_id: dmap_table.id, + status: "ready", + s3_key: "cubic/dmap/sample/20220101.csv.gz", + s3_modified: ~U[2022-01-01 20:49:50Z], + s3_size: 197 + }) + + # add a 'ready' CT ODS load + Repo.insert!(%CubicLoad{ + table_id: ods_table.id, + status: "ready", + s3_key: "cubic/ods_qlik/SAMPLE__ct/20220102-204950123.csv.gz", + s3_modified: ~U[2022-01-02 20:49:50Z], + s3_size: 197 + }) + + :ok = StartIngestion.run() + + assert_enqueued(worker: Ingest, args: %{load_rec_ids: [dmap_load.id]}) end end