-
Notifications
You must be signed in to change notification settings - Fork 734
feat: committees implementation [CM-1066] #3995
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
5f5e903
bdc9da9
2f5b708
5f1703e
6debc97
510704b
b3da22c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| INSERT INTO "activityTypes" ("activityType", platform, "isCodeContribution", "isCollaboration", description, "label") VALUES | ||
| ('added-to-committee', 'committees', false, false, 'Member is added to a committee', 'Added to committee'), | ||
| ('removed-from-committee', 'committees', false, false, 'Member is removed from a committee', 'Removed from committee'); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,117 @@ | ||
| import { IS_PROD_ENV } from '@crowd/common' | ||
|
|
||
| // Main: FIVETRAN_INGEST.SFDC_CONNECTOR_PROD_PLATFORM.COMMUNITY__C | ||
| // Joins: | ||
| // - ANALYTICS.SILVER_DIM.COMMITTEE (committee metadata + project slug) | ||
| // - ANALYTICS.BRONZE_KAFKA_CROWD_DEV.SEGMENTS (segment resolution) | ||
| // - ANALYTICS.SILVER_DIM.USERS (member identity: email, lf_username, name) | ||
| // - ANALYTICS.BRONZE_FIVETRAN_SALESFORCE_B2B.USERS (actor name + email) | ||
| // - ANALYTICS.BRONZE_FIVETRAN_SALESFORCE_B2B.ACCOUNTS (org data) | ||
|
|
||
| const CDP_MATCHED_SEGMENTS = ` | ||
| cdp_matched_segments AS ( | ||
| SELECT DISTINCT | ||
| s.SOURCE_ID AS sourceId, | ||
| s.slug | ||
| FROM ANALYTICS.BRONZE_KAFKA_CROWD_DEV.SEGMENTS s | ||
| WHERE s.PARENT_SLUG IS NOT NULL | ||
| AND s.GRANDPARENTS_SLUG IS NOT NULL | ||
| AND s.SOURCE_ID IS NOT NULL | ||
| )` | ||
|
|
||
| const ORG_ACCOUNTS = ` | ||
| org_accounts AS ( | ||
| SELECT account_id, account_name, website, domain_aliases | ||
| FROM ANALYTICS.BRONZE_FIVETRAN_SALESFORCE_B2B.ACCOUNTS | ||
| WHERE website IS NOT NULL | ||
| )` | ||
|
|
||
| export const buildSourceQuery = (sinceTimestamp?: string): string => { | ||
| let select = ` | ||
| SELECT | ||
| c.SFID, | ||
| c._FIVETRAN_DELETED AS FIVETRAN_DELETED, | ||
| c.CONTACTEMAIL__C, | ||
| c.CREATEDBYID, | ||
| c.COLLABORATION_NAME__C, | ||
| c.ACCOUNT__C, | ||
| c.ROLE__C, | ||
| c.CREATEDDATE::TIMESTAMP_NTZ AS CREATEDDATE, | ||
| c.LASTMODIFIEDDATE::TIMESTAMP_NTZ AS LASTMODIFIEDDATE, | ||
| c._FIVETRAN_SYNCED::TIMESTAMP_NTZ AS FIVETRAN_SYNCED, | ||
| cm.COMMITTEE_ID, | ||
| cm.COMMITTEE_NAME, | ||
| cm.PROJECT_ID, | ||
| cm.PROJECT_NAME, | ||
| cm.PROJECT_SLUG, | ||
| su.EMAIL AS SU_EMAIL, | ||
| su.LF_USERNAME, | ||
| su.PRIMARY_SOURCE_USER_ID, | ||
| su.FIRST_NAME AS SU_FIRST_NAME, | ||
| su.LAST_NAME AS SU_LAST_NAME, | ||
| su.FULL_NAME AS SU_FULL_NAME, | ||
| bu.FIRST_NAME AS BU_FIRST_NAME, | ||
| bu.LAST_NAME AS BU_LAST_NAME, | ||
| bu.EMAIL AS BU_EMAIL, | ||
| org.account_name AS ACCOUNT_NAME, | ||
| org.website AS ORG_WEBSITE, | ||
| org.domain_aliases AS ORG_DOMAIN_ALIASES | ||
| FROM FIVETRAN_INGEST.SFDC_CONNECTOR_PROD_PLATFORM.COMMUNITY__C c | ||
| JOIN ANALYTICS.SILVER_DIM.COMMITTEE cm | ||
| ON c.COLLABORATION_NAME__C = cm.COMMITTEE_ID | ||
| INNER JOIN cdp_matched_segments cms | ||
| ON cms.slug = cm.PROJECT_SLUG | ||
| AND cms.sourceId = cm.PROJECT_ID | ||
| LEFT JOIN ANALYTICS.SILVER_DIM.USERS su | ||
| ON LOWER(c.CONTACTEMAIL__C) = LOWER(su.EMAIL) | ||
| LEFT JOIN ANALYTICS.BRONZE_FIVETRAN_SALESFORCE_B2B.USERS bu | ||
| ON c.CREATEDBYID = bu.USER_ID | ||
| LEFT JOIN org_accounts org | ||
| ON c.ACCOUNT__C = org.account_id | ||
| WHERE c.LASTMODIFIEDDATE IS NOT NULL` | ||
|
|
||
| // Limit to a single project in non-prod to avoid exporting all project data | ||
| if (!IS_PROD_ENV) { | ||
| select += ` AND cm.PROJECT_SLUG = 'cncf'` | ||
| } | ||
|
|
||
| const dedup = ` | ||
| QUALIFY ROW_NUMBER() OVER (PARTITION BY c.SFID ORDER BY org.website DESC) = 1` | ||
|
|
||
| if (!sinceTimestamp) { | ||
| return ` | ||
| WITH ${ORG_ACCOUNTS}, | ||
| ${CDP_MATCHED_SEGMENTS} | ||
| ${select} | ||
| ${dedup}`.trim() | ||
| } | ||
|
|
||
| return ` | ||
| WITH ${ORG_ACCOUNTS}, | ||
| ${CDP_MATCHED_SEGMENTS}, | ||
| new_cdp_segments AS ( | ||
| SELECT DISTINCT | ||
| s.SOURCE_ID AS sourceId, | ||
| s.slug | ||
| FROM ANALYTICS.BRONZE_KAFKA_CROWD_DEV.SEGMENTS s | ||
| WHERE s.CREATED_TS >= '${sinceTimestamp}' | ||
| AND s.PARENT_SLUG IS NOT NULL | ||
| AND s.GRANDPARENTS_SLUG IS NOT NULL | ||
| AND s.SOURCE_ID IS NOT NULL | ||
| ) | ||
|
|
||
| -- Updated committee memberships since last export | ||
| ${select} | ||
| AND c.LASTMODIFIEDDATE > '${sinceTimestamp}' | ||
| ${dedup} | ||
|
|
||
| UNION | ||
|
|
||
| -- All committee memberships in newly created segments | ||
| ${select} | ||
| AND EXISTS ( | ||
| SELECT 1 FROM new_cdp_segments ncs | ||
| WHERE ncs.slug = cms.slug AND ncs.sourceId = cms.sourceId | ||
| ) | ||
| ${dedup}`.trim() | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,174 @@ | ||||||||||||||||||||||||||||
| import { COMMITTEES_GRID, CommitteesActivityType } from '@crowd/integrations' | ||||||||||||||||||||||||||||
| import { getServiceChildLogger } from '@crowd/logging' | ||||||||||||||||||||||||||||
| import { | ||||||||||||||||||||||||||||
| IActivityData, | ||||||||||||||||||||||||||||
| IOrganizationIdentity, | ||||||||||||||||||||||||||||
| OrganizationIdentityType, | ||||||||||||||||||||||||||||
| OrganizationSource, | ||||||||||||||||||||||||||||
| PlatformType, | ||||||||||||||||||||||||||||
| } from '@crowd/types' | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| import { TransformedActivity, TransformerBase } from '../../../core/transformerBase' | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| const log = getServiceChildLogger('committeesCommitteesTransformer') | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| export class CommitteesCommitteesTransformer extends TransformerBase { | ||||||||||||||||||||||||||||
| readonly platform = PlatformType.COMMITTEES | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| transformRow(row: Record<string, unknown>): TransformedActivity | null { | ||||||||||||||||||||||||||||
| const email = (row.CONTACTEMAIL__C as string | null)?.trim() || null | ||||||||||||||||||||||||||||
| if (!email) { | ||||||||||||||||||||||||||||
| log.warn( | ||||||||||||||||||||||||||||
| { sfid: row.SFID, committeeId: row.COMMITTEE_ID, rawEmail: row.CONTACTEMAIL__C }, | ||||||||||||||||||||||||||||
| 'Skipping row: missing email', | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
| return null | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| const committeeId = (row.COMMITTEE_ID as string).trim() | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
| const committeeId = (row.COMMITTEE_ID as string).trim() | |
| const rawCommitteeId = row.COMMITTEE_ID | |
| const committeeId = | |
| typeof rawCommitteeId === 'string' && rawCommitteeId.trim().length > 0 | |
| ? rawCommitteeId.trim() | |
| : null | |
| if (!committeeId) { | |
| log.warn( | |
| { sfid: row.SFID, rawCommitteeId: row.COMMITTEE_ID, email }, | |
| 'Skipping row: missing or invalid committeeId', | |
| ) | |
| return null | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same sourceId for add and remove causes overwrites
High Severity
The activity sourceId is ${committeeId}-${row.SFID}, which is identical for both "added-to-committee" and "removed-from-committee" events on the same membership record. When a record transitions from active to soft-deleted (FIVETRAN_DELETED flips to true), the removal activity will share the same sourceId as the original addition activity, causing the downstream pipeline to overwrite the "added" event with the "removed" event. The type (or another discriminator) needs to be part of the sourceId to produce distinct activities.
Additional Locations (1)
Copilot
AI
Apr 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
activity.sourceId is set to committeeId (COMMITTEE_ID). That value is shared by many rows/members, so activities will collide on the unique key (tenant/platform/type/sourceId/segmentId) and later rows can overwrite or fail to insert. Use a per-membership unique identifier like row.SFID for sourceId (and keep COMMITTEE_ID as an attribute and/or sourceParentId).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| import { IActivityScoringGrid } from '@crowd/types' | ||
|
|
||
| export enum CommitteesActivityType { | ||
| ADDED_TO_COMMITTEE = 'added-to-committee', | ||
| REMOVED_FROM_COMMITTEE = 'removed-from-committee', | ||
| } | ||
|
|
||
| export const COMMITTEES_GRID: Record<CommitteesActivityType, IActivityScoringGrid> = { | ||
| [CommitteesActivityType.ADDED_TO_COMMITTEE]: { score: 1 }, | ||
| [CommitteesActivityType.REMOVED_FROM_COMMITTEE]: { score: 1 }, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removal activity score inconsistent with codebase patternMedium Severity
|
||
| } | ||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incremental export misses soft-deleted records for removal events
High Severity
The incremental filter uses only
c.LASTMODIFIEDDATE > sinceTimestampto find changed records, but the transformer relies on_FIVETRAN_DELETEDto generateremoved-from-committeeactivities. When Fivetran soft-deletes a record, it sets_FIVETRAN_DELETEDand updates_FIVETRAN_SYNCED, butLASTMODIFIEDDATE(a Salesforce source column) may retain its original value. This means newly deleted records won't match the incremental filter, and removal activities will never be generated in incremental exports. No other connector in the codebase uses_FIVETRAN_DELETED, so this gap is unique to committees. The filter needs to also check_FIVETRAN_SYNCEDfor deleted records.