Skip to content

Commit ddb4b4b

Browse files
committed
Index expansion jobs to index only affected concepts/mappings and not all always
1 parent 6a67ea5 commit ddb4b4b

2 files changed

Lines changed: 62 additions & 45 deletions

File tree

core/collections/models.py

Lines changed: 48 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1200,10 +1200,11 @@ def apply_parameters(self, queryset, is_concept_queryset):
12001200
parameters = ExpansionParameters(self.parameters, is_concept_queryset)
12011201
return parameters.apply(queryset)
12021202

1203-
def index_concepts(self):
1204-
if self.concepts.exists():
1203+
def index_concepts(self, concept_versioned_ids=None):
1204+
should_run = len(concept_versioned_ids) > 0 if concept_versioned_ids is not None else self.concepts.exists()
1205+
if should_run:
12051206
if get(settings, 'TEST_MODE', False):
1206-
index_expansion_concepts(self.id, self.concepts.count())
1207+
index_expansion_concepts(self.id, self.concepts.count(), concept_versioned_ids)
12071208
else:
12081209
task = None
12091210
try:
@@ -1212,15 +1213,17 @@ def index_concepts(self):
12121213
name=index_expansion_concepts.__name__
12131214
)
12141215
index_expansion_concepts.apply_async(
1215-
(self.id, self.concepts.count(), ), task_id=task.id, queue=task.queue, persist_args=True)
1216+
(self.id, self.concepts.count(), concept_versioned_ids),
1217+
task_id=task.id, queue=task.queue, persist_args=True)
12161218
except AlreadyQueued:
12171219
if task:
12181220
task.delete()
12191221

1220-
def index_mappings(self):
1221-
if self.mappings.exists():
1222+
def index_mappings(self, mapping_versioned_ids=None):
1223+
should_run = len(mapping_versioned_ids) > 0 if mapping_versioned_ids is not None else self.mappings.exists()
1224+
if should_run:
12221225
if get(settings, 'TEST_MODE', False):
1223-
index_expansion_mappings(self.id, self.mappings.count())
1226+
index_expansion_mappings(self.id, self.mappings.count(), mapping_versioned_ids)
12241227
else:
12251228
task = None
12261229
try:
@@ -1229,7 +1232,8 @@ def index_mappings(self):
12291232
name=index_expansion_mappings.__name__
12301233
)
12311234
index_expansion_mappings.apply_async(
1232-
(self.id, self.mappings.count()), task_id=task.id, queue=task.queue, persist_args=True)
1235+
(self.id, self.mappings.count(), mapping_versioned_ids),
1236+
task_id=task.id, queue=task.queue, persist_args=True)
12331237
except AlreadyQueued:
12341238
if task:
12351239
task.delete()
@@ -1252,29 +1256,28 @@ def to_ref_list_separated(cls, references):
12521256
def delete_references(self, references):
12531257
refs, _ = self.to_ref_list_separated(references)
12541258

1255-
index_concepts = False
1256-
index_mappings = False
1259+
index_concepts = []
1260+
index_mappings = []
12571261
any_ref_with_resources = False
12581262
for reference in refs:
12591263
concepts = reference.concepts
12601264
if concepts.exists():
12611265
any_ref_with_resources = True
1262-
index_concepts = True
1263-
filters = {'versioned_object_id__in': list(concepts.values_list('versioned_object_id', flat=True))}
1266+
resources_updated = list(concepts.values_list('versioned_object_id', flat=True))
1267+
filters = {'versioned_object_id__in': resources_updated}
1268+
index_concepts = [*index_concepts, *resources_updated]
12641269
self.concepts.set(self.concepts.exclude(**filters))
12651270
batch_index_resources.apply_async(('concept', filters), queue='indexing', permanent=False)
12661271
mappings = reference.mappings
12671272
if mappings.exists():
12681273
any_ref_with_resources = True
1269-
index_mappings = True
1270-
filters = {'versioned_object_id__in': list(mappings.values_list('versioned_object_id', flat=True))}
1274+
resources_updated = list(mappings.values_list('versioned_object_id', flat=True))
1275+
filters = {'versioned_object_id__in': resources_updated}
1276+
index_mappings = [*index_mappings, *resources_updated]
12711277
self.mappings.set(self.mappings.exclude(**filters))
12721278
batch_index_resources.apply_async(('mapping', filters), queue='indexing', permanent=False)
12731279

1274-
if index_concepts:
1275-
self.index_concepts()
1276-
if index_mappings:
1277-
self.index_mappings()
1280+
self.index_resources(index_concepts, index_mappings)
12781281

12791282
if any_ref_with_resources:
12801283
removed_reference_ids = [ref.id for ref in self.to_ref_list(references)]
@@ -1321,7 +1324,7 @@ def add_references( # pylint: disable=too-many-locals,too-many-statements,too-m
13211324
else:
13221325
exclude_refs += [*existing_exclude_refs.all()]
13231326

1324-
index_concepts = index_mappings = False
1327+
index_concepts = index_mappings = []
13251328

13261329
# attempt_reevaluate is False for delete reference(s)
13271330
should_reevaluate = force_reevaluate or (attempt_reevaluate and not self.is_auto_generated)
@@ -1421,17 +1424,21 @@ def get_ref_results(ref): # pylint: disable=too-many-branches,too-many-locals
14211424

14221425
for reference in include_refs:
14231426
concepts, mappings = get_ref_results(reference)
1424-
concepts_results = self.__include_resources(self.concepts, concepts, True)
1425-
if not index_concepts:
1426-
index_concepts = concepts_results
1427-
mappings_results = self.__include_resources(self.mappings, mappings, False)
1428-
if not index_mappings:
1429-
index_mappings = mappings_results
1427+
concepts_updated = self.__include_resources(self.concepts, concepts, True)
1428+
if index and concepts_updated is not False:
1429+
index_concepts = [*index_concepts, *concepts_updated.values_list('versioned_object_id', flat=True)]
1430+
mappings_updated = self.__include_resources(self.mappings, mappings, False)
1431+
if index and mappings_updated is not False:
1432+
index_mappings = [*index_mappings, *mappings_updated.values_list('versioned_object_id', flat=True)]
14301433

14311434
for reference in exclude_refs:
14321435
concepts, mappings = get_ref_results(reference)
1433-
index_concepts = self.__exclude_resources(reference, self.concepts, concepts, Concept)
1434-
index_mappings = self.__exclude_resources(reference, self.mappings, mappings, Mapping)
1436+
concepts_updated = self.__exclude_resources(reference, self.concepts, concepts, Concept)
1437+
mappings_updated = self.__exclude_resources(reference, self.mappings, mappings, Mapping)
1438+
if index and concepts_updated is not False:
1439+
index_concepts = [*index_concepts, *concepts_updated.values_list('versioned_object_id', flat=True)]
1440+
if index and mappings_updated is not False:
1441+
index_mappings = [*index_mappings, *mappings_updated.values_list('versioned_object_id', flat=True)]
14351442

14361443
self.explicit_collection_versions.add(*compact(explicit_valueset_versions))
14371444
self.explicit_source_versions.add(*compact(explicit_system_versions))
@@ -1455,30 +1462,30 @@ def __dedupe_mappings(self):
14551462
self.mappings.set(self.mappings.order_by('versioned_object_id', '-id').distinct('versioned_object_id'))
14561463

14571464
def __include_resources(self, rel, resources, is_concept_queryset):
1458-
should_index = resources.exists()
1459-
if should_index:
1460-
rel.add(*self.apply_parameters(resources, is_concept_queryset))
1461-
return should_index
1465+
resources_updated = False
1466+
if resources.exists():
1467+
resources_updated = self.apply_parameters(resources, is_concept_queryset)
1468+
rel.add(*resources_updated)
1469+
return resources_updated
14621470

14631471
@staticmethod
14641472
def __exclude_resources(ref, rel, resources, klass):
1465-
should_index = resources.exists()
1466-
if should_index:
1473+
resources_updated = False
1474+
if resources.exists():
14671475
if ref.resource_version:
1476+
resources_updated = resources
14681477
rel.remove(*resources)
14691478
else:
1470-
rel.remove(
1471-
*klass.objects.filter(
1472-
versioned_object_id__in=resources.values_list('versioned_object_id', flat=True)
1473-
)
1474-
)
1475-
return should_index
1479+
resources_updated = klass.objects.filter(
1480+
versioned_object_id__in=resources.values_list('versioned_object_id', flat=True))
1481+
rel.remove(*resources_updated)
1482+
return resources_updated
14761483

14771484
def index_resources(self, concepts, mappings):
14781485
if concepts:
1479-
self.index_concepts()
1486+
self.index_concepts(concepts)
14801487
if mappings:
1481-
self.index_mappings()
1488+
self.index_mappings(mappings)
14821489

14831490
def seed_children(self, index=True, force_reevaluate=False):
14841491
return self.add_references(

core/common/tasks.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -559,24 +559,34 @@ def batch_index_resources(resource, filters, update_indexed=False):
559559
ignore_result=True, autoretry_for=(Exception, WorkerLostError, ), retry_kwargs={'max_retries': 2, 'countdown': 2},
560560
acks_late=True, reject_on_worker_lost=True, base=QueueOnceCustomTask
561561
)
562-
def index_expansion_concepts(expansion_id, count=None): # pylint: disable=unused-argument
562+
def index_expansion_concepts(expansion_id, count=None, concept_versioned_ids=None): # pylint: disable=unused-argument
563563
from core.collections.models import Expansion
564564
expansion = Expansion.objects.filter(id=expansion_id).first()
565565
if expansion:
566566
from core.concepts.documents import ConceptDocument
567-
expansion.batch_index(expansion.concepts, ConceptDocument)
567+
from core.concepts.models import Concept
568+
if concept_versioned_ids:
569+
queryset = Concept.objects.filter(versioned_object_id__in=concept_versioned_ids)
570+
else:
571+
queryset = expansion.concepts
572+
expansion.batch_index(queryset, ConceptDocument)
568573

569574

570575
@app.task(
571576
ignore_result=True, autoretry_for=(Exception, WorkerLostError, ), retry_kwargs={'max_retries': 2, 'countdown': 2},
572577
acks_late=True, reject_on_worker_lost=True, base=QueueOnceCustomTask
573578
)
574-
def index_expansion_mappings(expansion_id, count=None): # pylint: disable=unused-argument
579+
def index_expansion_mappings(expansion_id, count=None, mapping_versioned_ids=None): # pylint: disable=unused-argument
575580
from core.collections.models import Expansion
576581
expansion = Expansion.objects.filter(id=expansion_id).first()
577582
if expansion:
578583
from core.mappings.documents import MappingDocument
579-
expansion.batch_index(expansion.mappings, MappingDocument)
584+
from core.mappings.models import Mapping
585+
if mapping_versioned_ids:
586+
queryset = Mapping.objects.filter(versioned_object_id__in=mapping_versioned_ids)
587+
else:
588+
queryset = expansion.mappings
589+
expansion.batch_index(queryset, MappingDocument)
580590

581591

582592
@app.task

0 commit comments

Comments
 (0)