Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 50 additions & 36 deletions ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_load.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ defmodule ExCubicIngestion.Schema.CubicLoad do

alias Ecto.Changeset
alias ExCubicIngestion.Repo
alias ExCubicIngestion.Schema.CubicOdsTableSnapshot
alias ExCubicIngestion.Schema.CubicTable

@derive {Jason.Encoder,
Expand Down Expand Up @@ -138,45 +139,14 @@ defmodule ExCubicIngestion.Schema.CubicLoad do

@doc """
Get loads with the 'ready' status by getting all the active tables and
querying for the first {limit} loads by table. For ODS, because there might be a
Qlik restart between the 'ready' and 'ingesting' status, we should check to make
sure we only start since the last snapshot load's s3_modified. Because of this
logic, some 'ready' loads will be left behind. That's because they will be
pointing to non-existing objects (Note: Qlik removes all objects on a restart).
querying for loads by table.
"""
@spec get_status_ready(integer()) :: [t()]
def get_status_ready(limit \\ 100) do
@spec get_status_ready :: [t()]
def get_status_ready do
# we need to get 'ready' loads only for active tables
CubicTable.all_with_ods_table_snapshot()
|> Enum.map(fn {table_rec, ods_table_snapshot_rec} ->
# get the last load with snapshot key
last_snapshot_load_rec =
if not is_nil(ods_table_snapshot_rec) do
Repo.one(
from(load in not_deleted(),
where: load.s3_key == ^ods_table_snapshot_rec.snapshot_s3_key,
order_by: [desc: load.s3_modified],
limit: 1
)
)
end

# query for 'ready' loads, with limit
ready_loads =
from(load in not_deleted(),
where: load.status == "ready" and load.table_id == ^table_rec.id,
order_by: [load.s3_modified, load.s3_key],
limit: ^limit
)

# if we know the last snapshot load, then filter down to 'ready' loads
# after the snapshot
if is_nil(last_snapshot_load_rec) do
ready_loads
else
from(load in ready_loads, where: load.s3_modified >= ^last_snapshot_load_rec.s3_modified)
end
end)
|> Enum.map(&get_status_ready_for_table_query(&1))
|> Enum.filter(&(!is_nil(&1)))
|> Enum.flat_map(&Repo.all(&1))
end

Expand Down Expand Up @@ -249,4 +219,48 @@ defmodule ExCubicIngestion.Schema.CubicLoad do

DateTime.truncate(datetime_with_msec, :second)
end

@doc """
Construct query for loads with the ready status, filtered by the table and ordered by
the S3 modified and key values. For tables that are ODS, because there might be a
Qlik restart between the 'ready' and 'ingesting' status, we should check to make
sure we only start since the last snapshot load's s3_modified. Because of this
logic, some 'ready' loads will be left behind. That's because they will be
pointing to non-existing objects (Note: Qlik removes all objects on a restart).
Lastly, if no snapshot load exists, just return nil as loads may have not come
through yet.
"""
@spec get_status_ready_for_table_query({CubicTable.t(), CubicOdsTableSnapshot.t()}, integer()) ::
Ecto.Queryable.t() | nil
def get_status_ready_for_table_query(table_and_ods_table_snapshot, limit \\ 100)

def get_status_ready_for_table_query({table_rec, nil}, limit) do
from(load in not_deleted(),
where: load.status == "ready" and load.table_id == ^table_rec.id,
order_by: [load.s3_modified, load.s3_key],
limit: ^limit
)
end

def get_status_ready_for_table_query({table_rec, ods_table_snapshot_rec}, limit) do
# get the last load with snapshot key
last_snapshot_load_rec =
Repo.one(
from(load in not_deleted(),
where: load.s3_key == ^ods_table_snapshot_rec.snapshot_s3_key,
order_by: [desc: load.s3_modified],
limit: 1
)
)

if not is_nil(last_snapshot_load_rec) do
from(load in not_deleted(),
where:
load.status == "ready" and load.table_id == ^table_rec.id and
load.s3_modified >= ^last_snapshot_load_rec.s3_modified,
order_by: [load.s3_modified, load.s3_key],
limit: ^limit
)
end
end
end
161 changes: 104 additions & 57 deletions ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_load_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do
end

describe "get_status_ready/0" do
test "getting non-ODS 'ready' loads", %{
test "getting only 'ready' loads after updating one to 'archived'", %{
table: table,
load_objects: load_objects
} do
Expand All @@ -126,62 +126,6 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do
# assert that the last record inserted comes back
assert rest_new_load_recs == CubicLoad.get_status_ready()
end

test "getting ODS 'ready' loads" do
# insert ODS table and snapshot
ods_table =
Repo.insert!(%CubicTable{
name: "cubic_ods_qlik__sample",
s3_prefix: "cubic/ods_qlik/SAMPLE/",
is_raw: true
})

ods_snapshot_s3_key = "cubic/ods_qlik/SAMPLE/LOAD1.csv.gz"
ods_snapshot = ~U[2022-01-01 20:49:50Z]

Repo.insert!(%CubicOdsTableSnapshot{
table_id: ods_table.id,
snapshot: ods_snapshot,
snapshot_s3_key: ods_snapshot_s3_key
})

# insert loads
ods_load =
Repo.insert!(%CubicLoad{
table_id: ods_table.id,
status: "ready",
s3_key: ods_snapshot_s3_key,
s3_modified: ods_snapshot,
s3_size: 197,
is_raw: true
})

Repo.insert!(%CubicLoad{
table_id: ods_table.id,
status: "ready",
s3_key: "cubic/ods_qlik/SAMPLE/LOAD2.csv.gz",
s3_modified: ~U[2022-01-02 20:49:50Z],
s3_size: 197,
is_raw: true
})

# only get the first load because of limit
assert [ods_load] == CubicLoad.get_status_ready(1)

# add new snapshot load
new_ods_load =
Repo.insert!(%CubicLoad{
table_id: ods_table.id,
status: "ready",
s3_key: "cubic/ods_qlik/SAMPLE/LOAD1.csv.gz",
s3_modified: ~U[2022-01-03 20:49:50Z],
s3_size: 197,
is_raw: true
})

# ignoring loads prior to this last snapshot load
assert [new_ods_load] == CubicLoad.get_status_ready()
end
end

describe "all_by_status_in/2" do
Expand Down Expand Up @@ -296,4 +240,107 @@ defmodule ExCubicIngestion.Schema.CubicLoadTest do
Enum.map(new_load_rec_ids, &CubicLoad.get!(&1).status)
end
end

describe "get_status_ready_for_table_query/3" do
test "getting non-ODS 'ready' loads", %{
table: table,
load_objects: load_objects
} do
{:ok, new_load_recs} = CubicLoad.insert_new_from_objects_with_table(load_objects, table)

assert new_load_recs == Repo.all(CubicLoad.get_status_ready_for_table_query({table, nil}))
end

test "getting ODS 'ready' loads" do
# insert ODS table and snapshot
ods_table =
Repo.insert!(%CubicTable{
name: "cubic_ods_qlik__sample",
s3_prefix: "cubic/ods_qlik/SAMPLE/",
is_raw: true
})

ods_snapshot_s3_key = "cubic/ods_qlik/SAMPLE/LOAD1.csv.gz"
ods_snapshot = ~U[2022-01-01 20:49:50Z]

ods_table_snapshot =
Repo.insert!(%CubicOdsTableSnapshot{
table_id: ods_table.id,
snapshot: ods_snapshot,
snapshot_s3_key: ods_snapshot_s3_key
})

# insert loads
ods_load_1 =
Repo.insert!(%CubicLoad{
table_id: ods_table.id,
status: "ready",
s3_key: ods_snapshot_s3_key,
s3_modified: ods_snapshot,
s3_size: 197,
is_raw: true
})

Repo.insert!(%CubicLoad{
table_id: ods_table.id,
status: "ready",
s3_key: "cubic/ods_qlik/SAMPLE/LOAD2.csv.gz",
s3_modified: ~U[2022-01-02 20:49:50Z],
s3_size: 197,
is_raw: true
})

# only get the first load because of limit
assert [ods_load_1] ==
Repo.all(
CubicLoad.get_status_ready_for_table_query({ods_table, ods_table_snapshot}, 1)
)

# add new snapshot load
new_ods_load =
Repo.insert!(%CubicLoad{
table_id: ods_table.id,
status: "ready",
s3_key: ods_snapshot_s3_key,
s3_modified: ~U[2022-01-03 20:49:50Z],
s3_size: 197,
is_raw: true
})

# ignoring loads prior to this last snapshot load
assert [new_ods_load] ==
Repo.all(
CubicLoad.get_status_ready_for_table_query({ods_table, ods_table_snapshot})
)
end

test "no query available because no snapshot load" do
# insert ODS table and snapshot
ods_table =
Repo.insert!(%CubicTable{
name: "cubic_ods_qlik__sample",
s3_prefix: "cubic/ods_qlik/SAMPLE/",
is_raw: true
})

ods_table_snapshot =
Repo.insert!(%CubicOdsTableSnapshot{
table_id: ods_table.id,
snapshot: ~U[2022-01-01 20:49:50Z],
snapshot_s3_key: "cubic/ods_qlik/SAMPLE/LOAD1.csv.gz"
})

# insert a non-snapshot load
Repo.insert!(%CubicLoad{
table_id: ods_table.id,
status: "ready",
s3_key: "cubic/ods_qlik/SAMPLE/LOAD2.csv.gz",
s3_modified: ~U[2022-01-02 20:49:50Z],
s3_size: 197,
is_raw: true
})

assert is_nil(CubicLoad.get_status_ready_for_table_query({ods_table, ods_table_snapshot}))
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ defmodule ExCubicIngestion.StartIngestionTest do
s3_size: 197
})

dmap_load_id = dmap_load.id

ods_load =
Repo.insert!(%CubicLoad{
table_id: ods_table.id,
Expand All @@ -64,20 +62,62 @@ defmodule ExCubicIngestion.StartIngestionTest do
s3_size: 197
})

ods_load_id = ods_load.id

:ok = StartIngestion.run()

# snapshot was updated
assert CubicOdsTableSnapshot.get_by!(%{table_id: ods_table.id}).snapshot == ods_snapshot

# status was updated
assert CubicLoad.get!(dmap_load_id).status == "ingesting"
assert CubicLoad.get!(dmap_load.id).status == "ingesting"

assert CubicLoad.get!(ods_load_id).status == "ingesting"
assert CubicLoad.get!(ods_load.id).status == "ingesting"

# job have been queued
assert_enqueued(worker: Ingest, args: %{load_rec_ids: [dmap_load_id, ods_load_id]})
assert_enqueued(worker: Ingest, args: %{load_rec_ids: [dmap_load.id, ods_load.id]})
end

test "ignoring ODS loads without snapshots" do
dmap_table =
Repo.insert!(%CubicTable{
name: "cubic_dmap__sample",
s3_prefix: "cubic/dmap/sample/"
})

ods_table =
Repo.insert!(%CubicTable{
name: "cubic_ods_qlik__sample",
s3_prefix: "cubic/ods_qlik/SAMPLE/"
})

# insert ODS table
Repo.insert!(%CubicOdsTableSnapshot{
table_id: ods_table.id,
snapshot: nil,
snapshot_s3_key: "cubic/ods_qlik/SAMPLE/LOAD1.csv.gz"
})

# insert loads
dmap_load =
Repo.insert!(%CubicLoad{
table_id: dmap_table.id,
status: "ready",
s3_key: "cubic/dmap/sample/20220101.csv.gz",
s3_modified: ~U[2022-01-01 20:49:50Z],
s3_size: 197
})

# add a 'ready' CT ODS load
Repo.insert!(%CubicLoad{
table_id: ods_table.id,
status: "ready",
s3_key: "cubic/ods_qlik/SAMPLE__ct/20220102-204950123.csv.gz",
s3_modified: ~U[2022-01-02 20:49:50Z],
s3_size: 197
})

:ok = StartIngestion.run()

assert_enqueued(worker: Ingest, args: %{load_rec_ids: [dmap_load.id]})
end
end

Expand Down