From 7cedd11ba738a6c10fc768e93d9780c6d5a183e5 Mon Sep 17 00:00:00 2001 From: Dylan Jeffers Date: Thu, 26 Mar 2026 11:44:46 -0700 Subject: [PATCH 1/2] Drop scheduled release Celery task from discovery provider. Remove publish_scheduled_releases from worker scheduling and replace test imports with a DB-only helper so integration coverage remains after the task migration. Made-with: Cursor --- .../publish_scheduled_releases_helpers.py | 50 +++++++++++ ..._fan_remix_contest_started_notification.py | 10 ++- packages/discovery-provider/src/app.py | 6 -- .../src/tasks/publish_scheduled_releases.py | 85 ------------------- 4 files changed, 56 insertions(+), 95 deletions(-) create mode 100644 packages/discovery-provider/integration_tests/tasks/publish_scheduled_releases_helpers.py delete mode 100644 packages/discovery-provider/src/tasks/publish_scheduled_releases.py diff --git a/packages/discovery-provider/integration_tests/tasks/publish_scheduled_releases_helpers.py b/packages/discovery-provider/integration_tests/tasks/publish_scheduled_releases_helpers.py new file mode 100644 index 00000000000..7d6b1e7e8a5 --- /dev/null +++ b/packages/discovery-provider/integration_tests/tasks/publish_scheduled_releases_helpers.py @@ -0,0 +1,50 @@ +"""DB-only scheduled release publish (parity with @pedalboard/publish-scheduled-releases). + +Used by integration tests after Celery task removal from discovery-provider. +""" + +from sqlalchemy import func + +from src.models.playlists.playlist import Playlist +from src.models.tracks.track import Track +from src.tasks.entity_manager.utils import create_remix_contest_notification + +batch_size = 100 + + +def publish_scheduled_releases_session(session): + tracks_to_release = ( + session.query(Track) + .filter( + Track.is_unlisted == True, + Track.is_scheduled_release == True, + Track.release_date != None, + Track.release_date < func.current_timestamp(), + ) + .order_by(Track.created_at.asc()) + .limit(batch_size) + .all() + ) + if len(tracks_to_release) == 0: + pass + else: + for track in tracks_to_release: + track.is_unlisted = False + create_remix_contest_notification(session, track) + + playlists_to_release = ( + session.query(Playlist) + .filter( + Playlist.is_private == True, + Playlist.is_album == True, + Playlist.is_scheduled_release == True, + Playlist.release_date != None, + Playlist.release_date < func.current_timestamp(), + ) + .order_by(Playlist.created_at.asc()) + .limit(batch_size) + .all() + ) + + for playlist in playlists_to_release: + playlist.is_private = False diff --git a/packages/discovery-provider/integration_tests/tasks/test_fan_remix_contest_started_notification.py b/packages/discovery-provider/integration_tests/tasks/test_fan_remix_contest_started_notification.py index 9110e385df0..6ebf6c19d21 100644 --- a/packages/discovery-provider/integration_tests/tasks/test_fan_remix_contest_started_notification.py +++ b/packages/discovery-provider/integration_tests/tasks/test_fan_remix_contest_started_notification.py @@ -12,7 +12,9 @@ from src.models.tracks.track import Track from src.queries.get_notifications import NotificationType from src.tasks.entity_manager.entity_manager import entity_manager_update -from src.tasks.publish_scheduled_releases import _publish_scheduled_releases +from integration_tests.tasks.publish_scheduled_releases_helpers import ( + publish_scheduled_releases_session, +) from src.utils.db_session import get_db logger = logging.getLogger(__name__) @@ -532,7 +534,7 @@ def test_fan_remix_contest_started_notification_on_scheduled_release(app): assert len(notifications) == 0 # Run the scheduled release publish - _publish_scheduled_releases(session) + publish_scheduled_releases_session(session) # Verify track was made public track = session.query(Track).filter_by(track_id=TEST_TRACK_ID).first() @@ -777,7 +779,7 @@ def test_fan_remix_contest_started_notification_no_duplicate_on_scheduled_releas with db.scoped_session() as session: # Run the scheduled release publish - _publish_scheduled_releases(session) + publish_scheduled_releases_session(session) # Verify track was made public track = session.query(Track).filter_by(track_id=TEST_TRACK_ID).first() @@ -1049,7 +1051,7 @@ def test_fan_remix_contest_started_notification_no_duplicate_with_existing_on_sc assert notifications_before[0].user_ids == [FOLLOWER_ID] # Run the scheduled release publish - _publish_scheduled_releases(session) + publish_scheduled_releases_session(session) # Verify track was made public track = session.query(Track).filter_by(track_id=TEST_TRACK_ID).first() diff --git a/packages/discovery-provider/src/app.py b/packages/discovery-provider/src/app.py index 13f8e4b002f..696035c2889 100644 --- a/packages/discovery-provider/src/app.py +++ b/packages/discovery-provider/src/app.py @@ -301,7 +301,6 @@ def configure_celery(celery, test_config=None): "src.tasks.cache_current_nodes", "src.tasks.update_aggregates", "src.tasks.cache_entity_counts", - "src.tasks.publish_scheduled_releases", "src.tasks.create_engagement_notifications", "src.tasks.create_listen_streak_reminder_notifications", "src.tasks.create_remix_contest_notifications", @@ -380,10 +379,6 @@ def configure_celery(celery, test_config=None): "task": "update_aggregates", "schedule": timedelta(minutes=10), }, - "publish_scheduled_releases": { - "task": "publish_scheduled_releases", - "schedule": timedelta(minutes=1), - }, "create_engagement_notifications": { "task": "create_engagement_notifications", "schedule": timedelta(minutes=10), @@ -450,7 +445,6 @@ def configure_celery(celery, test_config=None): redis_inst.delete(UPDATE_DELIST_STATUSES_LOCK) redis_inst.delete(REPAIR_AUDIO_ANALYSES_LOCK) redis_inst.delete("update_aggregates_lock") - redis_inst.delete("publish_scheduled_releases_lock") redis_inst.delete("create_engagement_notifications") redis_inst.delete(index_core_lock_key) # delete cached final_poa_block in case it has changed diff --git a/packages/discovery-provider/src/tasks/publish_scheduled_releases.py b/packages/discovery-provider/src/tasks/publish_scheduled_releases.py deleted file mode 100644 index 670036d5b94..00000000000 --- a/packages/discovery-provider/src/tasks/publish_scheduled_releases.py +++ /dev/null @@ -1,85 +0,0 @@ -from sqlalchemy import func - -from src.models.playlists.playlist import Playlist -from src.models.tracks.track import Track -from src.tasks.celery_app import celery -from src.tasks.entity_manager.utils import create_remix_contest_notification -from src.utils.structured_logger import StructuredLogger, log_duration -from src.utils.web3_provider import get_eth_web3 - -logger = StructuredLogger(__name__) -web3 = get_eth_web3() -publish_scheduled_releases_cursor_key = "publish_scheduled_releases_cursor" -batch_size = 100 - - -@log_duration(logger) -def _publish_scheduled_releases(session): - tracks_to_release = ( - session.query(Track) - .filter( - Track.is_unlisted == True, - Track.is_scheduled_release == True, - Track.release_date != None, # Filter for non-null release_date - Track.release_date < func.current_timestamp(), - ) - .order_by(Track.created_at.asc()) - .limit(batch_size) - .all() - ) - if len(tracks_to_release) == 0: - return - - logger.info(f"Found {len(tracks_to_release)} tracks ready for release") - - for track in tracks_to_release: - logger.debug(f"Releasing track {track.track_id}") - track.is_unlisted = False - create_remix_contest_notification(session, track) - - playlists_to_release = ( - session.query(Playlist) - .filter( - Playlist.is_private == True, - Playlist.is_album == True, # Only support albums for now - Playlist.is_scheduled_release == True, - Playlist.release_date != None, # Filter for non-null release_date - Playlist.release_date < func.current_timestamp(), - ) - .order_by(Playlist.created_at.asc()) - .limit(batch_size) - .all() - ) - logger.debug(f"Found {len(playlists_to_release)} albums ready for release") - - for playlist in playlists_to_release: - logger.debug(f"Releasing album {playlist.playlist_id}") - playlist.is_private = False - - -# ####### CELERY TASKS ####### # -@celery.task(name="publish_scheduled_releases", bind=True) -def publish_scheduled_releases(self): - redis = publish_scheduled_releases.redis - db = publish_scheduled_releases.db - - # Define lock acquired boolean - have_lock = False - # Define redis lock object - update_lock = redis.lock( - "publish_scheduled_releases_lock", blocking_timeout=25, timeout=600 - ) - try: - have_lock = update_lock.acquire(blocking=False) - if have_lock: - with db.scoped_session() as session: - _publish_scheduled_releases(session) - - else: - logger.debug("Failed to acquire lock") - except Exception as e: - logger.error(f"ERROR caching node info {e}") - raise e - finally: - if have_lock: - update_lock.release() From 1cf4067dfca57015bc436698255d0ab6373a59c6 Mon Sep 17 00:00:00 2001 From: Dylan Jeffers Date: Thu, 26 Mar 2026 11:46:32 -0700 Subject: [PATCH 2/2] Fix lint --- .../tasks/test_fan_remix_contest_started_notification.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/discovery-provider/integration_tests/tasks/test_fan_remix_contest_started_notification.py b/packages/discovery-provider/integration_tests/tasks/test_fan_remix_contest_started_notification.py index 6ebf6c19d21..ebf16cce508 100644 --- a/packages/discovery-provider/integration_tests/tasks/test_fan_remix_contest_started_notification.py +++ b/packages/discovery-provider/integration_tests/tasks/test_fan_remix_contest_started_notification.py @@ -6,15 +6,15 @@ from web3.datastructures import AttributeDict from integration_tests.challenges.index_helpers import UpdateTask +from integration_tests.tasks.publish_scheduled_releases_helpers import ( + publish_scheduled_releases_session, +) from integration_tests.utils import populate_mock_db from src.challenges.challenge_event_bus import ChallengeEventBus, setup_challenge_bus from src.models.notifications.notification import Notification from src.models.tracks.track import Track from src.queries.get_notifications import NotificationType from src.tasks.entity_manager.entity_manager import entity_manager_update -from integration_tests.tasks.publish_scheduled_releases_helpers import ( - publish_scheduled_releases_session, -) from src.utils.db_session import get_db logger = logging.getLogger(__name__)