-
Notifications
You must be signed in to change notification settings - Fork 734
feat(snowflake-connectors): meetings implementation [CM-1034] #3998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ea73a87
f2bf2c8
6039d06
b3c3880
fc137a6
d7f4e8c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| INSERT INTO "activityTypes" ("activityType", platform, "isCodeContribution", "isCollaboration", description, "label") VALUES | ||
| ('invited-meeting', 'meetings', false, false, 'User is invited to a meeting', 'Invited to a meeting'), | ||
| ('attended-meeting', 'meetings', false, false, 'User attends a meeting', 'Attended a meeting'); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -89,24 +89,26 @@ export class TransformerConsumer { | |
| let resolveSkippedCount = 0 | ||
|
|
||
| for await (const row of this.s3Service.streamParquetRows(job.s3Path)) { | ||
| const result = source.transformer.safeTransformRow(row) | ||
| if (!result) { | ||
| const results = source.transformer.safeTransformRow(row) | ||
| if (!results) { | ||
| transformSkippedCount++ | ||
| continue | ||
| } | ||
|
|
||
| const resolved = await this.integrationResolver.resolve(platform, result.segment) | ||
| if (!resolved) { | ||
| resolveSkippedCount++ | ||
| continue | ||
| for (const result of results) { | ||
| const resolved = await this.integrationResolver.resolve(platform, result.segment) | ||
| if (!resolved) { | ||
| resolveSkippedCount++ | ||
| continue | ||
| } | ||
|
|
||
| await this.emitter.createAndProcessActivityResult( | ||
| resolved.segmentId, | ||
| resolved.integrationId, | ||
| result.activity, | ||
| ) | ||
| transformedCount++ | ||
| } | ||
|
Comment on lines
+98
to
111
|
||
|
|
||
| await this.emitter.createAndProcessActivityResult( | ||
| resolved.segmentId, | ||
| resolved.integrationId, | ||
| result.activity, | ||
| ) | ||
| transformedCount++ | ||
| } | ||
|
|
||
| const skippedCount = transformSkippedCount + resolveSkippedCount | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,101 @@ | ||
| import { IS_PROD_ENV } from '@crowd/common' | ||
|
|
||
| const CDP_MATCHED_SEGMENTS = ` | ||
| cdp_matched_segments AS ( | ||
| SELECT DISTINCT | ||
| s.SOURCE_ID AS sourceId, | ||
| s.slug | ||
| FROM ANALYTICS.BRONZE_KAFKA_CROWD_DEV.SEGMENTS s | ||
| WHERE s.PARENT_SLUG IS NOT NULL | ||
| AND s.GRANDPARENTS_SLUG IS NOT NULL | ||
| AND s.SOURCE_ID IS NOT NULL | ||
| )` | ||
|
|
||
| const ORG_ACCOUNTS = ` | ||
| org_accounts AS ( | ||
| SELECT account_id, account_name, website, domain_aliases, LOGO_URL, INDUSTRY, N_EMPLOYEES | ||
| FROM analytics.bronze_fivetran_salesforce.accounts | ||
| WHERE website IS NOT NULL | ||
| UNION ALL | ||
| SELECT account_id, account_name, website, domain_aliases, NULL AS LOGO_URL, NULL AS INDUSTRY, NULL AS N_EMPLOYEES | ||
| FROM analytics.bronze_fivetran_salesforce_b2b.accounts | ||
| WHERE website IS NOT NULL | ||
| )` | ||
|
|
||
| export const buildSourceQuery = (sinceTimestamp?: string): string => { | ||
| let select = ` | ||
| SELECT | ||
| t.PRIMARY_KEY, | ||
| t.MEETING_ID, | ||
| t.MEETING_NAME, | ||
| t.PROJECT_ID, | ||
| t.PROJECT_NAME, | ||
| t.PROJECT_SLUG, | ||
| t.ACCOUNT_ID, | ||
| t.ACCOUNT_NAME, | ||
| t.MEETING_DATE, | ||
| t.MEETING_TIME, | ||
| t.INVITEE_FULL_NAME, | ||
| t.INVITEE_LF_SSO, | ||
| t.INVITEE_LF_USER_ID, | ||
| t.INVITEE_EMAIL, | ||
| t.INVITEE_ATTENDED, | ||
| t.WAS_INVITED, | ||
| t.RAW_COMMITTEE_TYPE, | ||
| org.website AS ORG_WEBSITE, | ||
| org.domain_aliases AS ORG_DOMAIN_ALIASES, | ||
| org.logo_url AS LOGO_URL, | ||
| org.industry AS ORGANIZATION_INDUSTRY, | ||
| CAST(org.n_employees AS VARCHAR) AS ORGANIZATION_SIZE | ||
| FROM ANALYTICS.SILVER_FACT.MEETING_ATTENDANCE t | ||
| INNER JOIN cdp_matched_segments cms | ||
| ON cms.slug = t.PROJECT_SLUG | ||
| AND cms.sourceId = t.PROJECT_ID | ||
| LEFT JOIN org_accounts org | ||
| ON t.ACCOUNT_ID = org.account_id | ||
| WHERE (t.WAS_INVITED = TRUE OR t.INVITEE_ATTENDED = TRUE)` | ||
|
|
||
| if (!IS_PROD_ENV) { | ||
| select += ` AND t.PROJECT_SLUG = 'cncf'` | ||
| } | ||
|
|
||
| const dedup = ` | ||
| QUALIFY ROW_NUMBER() OVER (PARTITION BY t.PRIMARY_KEY ORDER BY org.website DESC) = 1` | ||
|
|
||
| if (!sinceTimestamp) { | ||
| return ` | ||
| WITH ${ORG_ACCOUNTS}, | ||
| ${CDP_MATCHED_SEGMENTS} | ||
| ${select} | ||
| ${dedup}`.trim() | ||
| } | ||
|
|
||
| return ` | ||
| WITH ${ORG_ACCOUNTS}, | ||
| ${CDP_MATCHED_SEGMENTS}, | ||
| new_cdp_segments AS ( | ||
| SELECT DISTINCT | ||
| s.SOURCE_ID AS sourceId, | ||
| s.slug | ||
| FROM ANALYTICS.BRONZE_KAFKA_CROWD_DEV.SEGMENTS s | ||
| WHERE s.CREATED_TS >= '${sinceTimestamp}' | ||
| AND s.PARENT_SLUG IS NOT NULL | ||
| AND s.GRANDPARENTS_SLUG IS NOT NULL | ||
| AND s.SOURCE_ID IS NOT NULL | ||
| ) | ||
|
|
||
| -- Updated records in existing segments | ||
| ${select} | ||
| AND t.MEETING_DATE >= '${sinceTimestamp}'::DATE | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incremental query uses business date, may miss recordsMedium Severity The incremental filter uses |
||
| ${dedup} | ||
|
|
||
| UNION | ||
|
|
||
| -- All records in newly created segments | ||
| ${select} | ||
| AND EXISTS ( | ||
| SELECT 1 FROM new_cdp_segments ncs | ||
| WHERE ncs.slug = cms.slug AND ncs.sourceId = cms.sourceId | ||
| ) | ||
| ${dedup}`.trim() | ||
| } | ||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This migration introduces new
activityTypesrows but there is no rollback SQL (the correspondingU1775219382__addMeetingsActivityTypes.sqlis empty). If rollbacks are supported in this repo, add a down migration that deletes these inserted activity types (scoped to platform/type) to keep migrations reversible.