Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
INSERT INTO "activityTypes" ("activityType", platform, "isCodeContribution", "isCollaboration", description, "label") VALUES
('added-to-committee', 'committees', false, false, 'Member is added to a committee', 'Added to committee'),
('removed-from-committee', 'committees', false, false, 'Member is removed from a committee', 'Removed from committee');
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import { IS_PROD_ENV } from '@crowd/common'

// Main: FIVETRAN_INGEST.SFDC_CONNECTOR_PROD_PLATFORM.COMMUNITY__C
// Joins:
// - ANALYTICS.SILVER_DIM.COMMITTEE (committee metadata + project slug)
// - ANALYTICS.BRONZE_KAFKA_CROWD_DEV.SEGMENTS (segment resolution)
// - ANALYTICS.SILVER_DIM.USERS (member identity: email, lf_username, name)
// - ANALYTICS.BRONZE_FIVETRAN_SALESFORCE_B2B.USERS (actor name + email)
// - ANALYTICS.BRONZE_FIVETRAN_SALESFORCE_B2B.ACCOUNTS (org data)

const CDP_MATCHED_SEGMENTS = `
cdp_matched_segments AS (
SELECT DISTINCT
s.SOURCE_ID AS sourceId,
s.slug
FROM ANALYTICS.BRONZE_KAFKA_CROWD_DEV.SEGMENTS s
WHERE s.PARENT_SLUG IS NOT NULL
AND s.GRANDPARENTS_SLUG IS NOT NULL
AND s.SOURCE_ID IS NOT NULL
)`

const ORG_ACCOUNTS = `
org_accounts AS (
SELECT account_id, account_name, website, domain_aliases
FROM ANALYTICS.BRONZE_FIVETRAN_SALESFORCE_B2B.ACCOUNTS
WHERE website IS NOT NULL
)`

export const buildSourceQuery = (sinceTimestamp?: string): string => {
let select = `
SELECT
c.SFID,
c._FIVETRAN_DELETED AS FIVETRAN_DELETED,
c.CONTACTEMAIL__C,
c.CREATEDBYID,
c.COLLABORATION_NAME__C,
c.ACCOUNT__C,
c.ROLE__C,
c.CREATEDDATE::TIMESTAMP_NTZ AS CREATEDDATE,
c.LASTMODIFIEDDATE::TIMESTAMP_NTZ AS LASTMODIFIEDDATE,
c._FIVETRAN_SYNCED::TIMESTAMP_NTZ AS FIVETRAN_SYNCED,
cm.COMMITTEE_ID,
cm.COMMITTEE_NAME,
cm.PROJECT_ID,
cm.PROJECT_NAME,
cm.PROJECT_SLUG,
su.EMAIL AS SU_EMAIL,
su.LF_USERNAME,
su.PRIMARY_SOURCE_USER_ID,
su.FIRST_NAME AS SU_FIRST_NAME,
su.LAST_NAME AS SU_LAST_NAME,
su.FULL_NAME AS SU_FULL_NAME,
bu.FIRST_NAME AS BU_FIRST_NAME,
bu.LAST_NAME AS BU_LAST_NAME,
bu.EMAIL AS BU_EMAIL,
org.account_name AS ACCOUNT_NAME,
org.website AS ORG_WEBSITE,
org.domain_aliases AS ORG_DOMAIN_ALIASES
FROM FIVETRAN_INGEST.SFDC_CONNECTOR_PROD_PLATFORM.COMMUNITY__C c
JOIN ANALYTICS.SILVER_DIM.COMMITTEE cm
ON c.COLLABORATION_NAME__C = cm.COMMITTEE_ID
INNER JOIN cdp_matched_segments cms
ON cms.slug = cm.PROJECT_SLUG
AND cms.sourceId = cm.PROJECT_ID
LEFT JOIN ANALYTICS.SILVER_DIM.USERS su
ON LOWER(c.CONTACTEMAIL__C) = LOWER(su.EMAIL)
LEFT JOIN ANALYTICS.BRONZE_FIVETRAN_SALESFORCE_B2B.USERS bu
ON c.CREATEDBYID = bu.USER_ID
LEFT JOIN org_accounts org
ON c.ACCOUNT__C = org.account_id
WHERE c.LASTMODIFIEDDATE IS NOT NULL`

// Limit to a single project in non-prod to avoid exporting all project data
if (!IS_PROD_ENV) {
select += ` AND cm.PROJECT_SLUG = 'cncf'`
}

const dedup = `
QUALIFY ROW_NUMBER() OVER (PARTITION BY c.SFID ORDER BY org.website DESC) = 1`

if (!sinceTimestamp) {
return `
WITH ${ORG_ACCOUNTS},
${CDP_MATCHED_SEGMENTS}
${select}
${dedup}`.trim()
}

return `
WITH ${ORG_ACCOUNTS},
${CDP_MATCHED_SEGMENTS},
new_cdp_segments AS (
SELECT DISTINCT
s.SOURCE_ID AS sourceId,
s.slug
FROM ANALYTICS.BRONZE_KAFKA_CROWD_DEV.SEGMENTS s
WHERE s.CREATED_TS >= '${sinceTimestamp}'
AND s.PARENT_SLUG IS NOT NULL
AND s.GRANDPARENTS_SLUG IS NOT NULL
AND s.SOURCE_ID IS NOT NULL
)

-- Updated committee memberships since last export
${select}
AND c.LASTMODIFIEDDATE > '${sinceTimestamp}'
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incremental export misses soft-deleted records for removal events

High Severity

The incremental filter uses only c.LASTMODIFIEDDATE > sinceTimestamp to find changed records, but the transformer relies on _FIVETRAN_DELETED to generate removed-from-committee activities. When Fivetran soft-deletes a record, it sets _FIVETRAN_DELETED and updates _FIVETRAN_SYNCED, but LASTMODIFIEDDATE (a Salesforce source column) may retain its original value. This means newly deleted records won't match the incremental filter, and removal activities will never be generated in incremental exports. No other connector in the codebase uses _FIVETRAN_DELETED, so this gap is unique to committees. The filter needs to also check _FIVETRAN_SYNCED for deleted records.

Fix in Cursor Fix in Web

${dedup}

UNION

-- All committee memberships in newly created segments
${select}
AND EXISTS (
SELECT 1 FROM new_cdp_segments ncs
WHERE ncs.slug = cms.slug AND ncs.sourceId = cms.sourceId
)
${dedup}`.trim()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import { COMMITTEES_GRID, CommitteesActivityType } from '@crowd/integrations'
import { getServiceChildLogger } from '@crowd/logging'
import {
IActivityData,
IOrganizationIdentity,
OrganizationIdentityType,
OrganizationSource,
PlatformType,
} from '@crowd/types'

import { TransformedActivity, TransformerBase } from '../../../core/transformerBase'

const log = getServiceChildLogger('committeesCommitteesTransformer')

export class CommitteesCommitteesTransformer extends TransformerBase {
readonly platform = PlatformType.COMMITTEES

transformRow(row: Record<string, unknown>): TransformedActivity | null {
const email = (row.CONTACTEMAIL__C as string | null)?.trim() || null
if (!email) {
log.warn(
{ sfid: row.SFID, committeeId: row.COMMITTEE_ID, rawEmail: row.CONTACTEMAIL__C },
'Skipping row: missing email',
)
return null
}

const committeeId = (row.COMMITTEE_ID as string).trim()
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

committeeId is computed via (row.COMMITTEE_ID as string).trim() without null/undefined protection. If COMMITTEE_ID is missing or not a string for any row, this will throw and safeTransformRow will skip the row. Consider using optional chaining + explicit skip/log when COMMITTEE_ID is not present.

Suggested change
const committeeId = (row.COMMITTEE_ID as string).trim()
const rawCommitteeId = row.COMMITTEE_ID
const committeeId =
typeof rawCommitteeId === 'string' && rawCommitteeId.trim().length > 0
? rawCommitteeId.trim()
: null
if (!committeeId) {
log.warn(
{ sfid: row.SFID, rawCommitteeId: row.COMMITTEE_ID, email },
'Skipping row: missing or invalid committeeId',
)
return null
}

Copilot uses AI. Check for mistakes.
const fivetranDeleted = row.FIVETRAN_DELETED as boolean
const lfUsername = (row.LF_USERNAME as string | null)?.trim() || null
const suFullName = (row.SU_FULL_NAME as string | null)?.trim() || null
const suFirstName = (row.SU_FIRST_NAME as string | null)?.trim() || null
const suLastName = (row.SU_LAST_NAME as string | null)?.trim() || null

const displayName =
suFullName ||
(suFirstName && suLastName ? `${suFirstName} ${suLastName}` : suFirstName || suLastName) ||
email.split('@')[0]

const type = fivetranDeleted
? CommitteesActivityType.REMOVED_FROM_COMMITTEE
: CommitteesActivityType.ADDED_TO_COMMITTEE

const sourceId = (row.PRIMARY_SOURCE_USER_ID as string | null)?.trim() || undefined
const identities = this.buildMemberIdentities({
email,
platformUsername: null,
sourceId,
lfUsername,
})

const activityTimestamp =
type === CommitteesActivityType.ADDED_TO_COMMITTEE
? (row.CREATEDDATE as string | null) || null
: (row.FIVETRAN_SYNCED as string | null) || null

const committeeName = (row.COMMITTEE_NAME as string | null) || null

const activity: IActivityData = {
type,
platform: PlatformType.COMMITTEES,
timestamp: activityTimestamp,
score: COMMITTEES_GRID[type].score,
sourceId: `${committeeId}-${row.SFID}`,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same sourceId for add and remove causes overwrites

High Severity

The activity sourceId is ${committeeId}-${row.SFID}, which is identical for both "added-to-committee" and "removed-from-committee" events on the same membership record. When a record transitions from active to soft-deleted (FIVETRAN_DELETED flips to true), the removal activity will share the same sourceId as the original addition activity, causing the downstream pipeline to overwrite the "added" event with the "removed" event. The type (or another discriminator) needs to be part of the sourceId to produce distinct activities.

Additional Locations (1)
Fix in Cursor Fix in Web

sourceParentId: null,
channel: committeeName,
member: {
Comment on lines +59 to +67
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

activity.sourceId is set to committeeId (COMMITTEE_ID). That value is shared by many rows/members, so activities will collide on the unique key (tenant/platform/type/sourceId/segmentId) and later rows can overwrite or fail to insert. Use a per-membership unique identifier like row.SFID for sourceId (and keep COMMITTEE_ID as an attribute and/or sourceParentId).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

displayName,
identities,
organizations: this.buildOrganizations(row),
},
attributes: {
committeeId: (row.COLLABORATION_NAME__C as string | null) || null,
committeeName,
role: (row.ROLE__C as string | null) || null,
projectId: (row.PROJECT_ID as string | null) || null,
projectName: (row.PROJECT_NAME as string | null) || null,
organizationId: (row.ACCOUNT__C as string | null) || null,
organizationName: (row.ACCOUNT_NAME as string | null) || null,
member: {
userId: (row.PRIMARY_SOURCE_USER_ID as string | null) || null,
firstName: (row.SU_FIRST_NAME as string | null) || null,
lastName: (row.SU_LAST_NAME as string | null) || null,
email,
},
actor: {
userId: (row.CREATEDBYID as string | null) || null,
firstName: (row.BU_FIRST_NAME as string | null) || null,
lastName: (row.BU_LAST_NAME as string | null) || null,
email: (row.BU_EMAIL as string | null) || null,
},
activityDate: activityTimestamp,
},
}

const segmentSlug = (row.PROJECT_SLUG as string | null)?.trim() || null
const segmentSourceId = (row.PROJECT_ID as string | null)?.trim() || null

if (!segmentSlug || !segmentSourceId) {
log.warn(
{ sfid: row.SFID, committeeId, segmentSlug, segmentSourceId },
'Skipping row: missing segment slug or sourceId',
)
return null
}

return { activity, segment: { slug: segmentSlug, sourceId: segmentSourceId } }
}

private buildOrganizations(
row: Record<string, unknown>,
): IActivityData['member']['organizations'] {
const website = (row.ORG_WEBSITE as string | null)?.trim() || null
const domainAliases = (row.ORG_DOMAIN_ALIASES as string | null)?.trim() || null

if (!website && !domainAliases) {
return undefined
}

const displayName = (row.ACCOUNT_NAME as string | null)?.trim() || website

if (this.isIndividualNoAccount(displayName)) {
return [
{
displayName,
source: OrganizationSource.COMMITTEES,
identities: website
? [
{
platform: PlatformType.COMMITTEES,
value: website,
type: OrganizationIdentityType.PRIMARY_DOMAIN,
verified: true,
},
]
: [],
},
]
}

const identities: IOrganizationIdentity[] = []

if (website) {
identities.push({
platform: PlatformType.COMMITTEES,
value: website,
type: OrganizationIdentityType.PRIMARY_DOMAIN,
verified: true,
})
}

if (domainAliases) {
for (const alias of domainAliases.split(',')) {
const trimmed = alias.trim()
if (trimmed) {
identities.push({
platform: PlatformType.COMMITTEES,
value: trimmed,
type: OrganizationIdentityType.ALTERNATIVE_DOMAIN,
verified: true,
})
}
}
}

return [
{
displayName,
source: OrganizationSource.COMMITTEES,
identities,
},
]
}
}
11 changes: 11 additions & 0 deletions services/apps/snowflake_connectors/src/integrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
*/
import { PlatformType } from '@crowd/types'

import { buildSourceQuery as committeesCommitteesBuildQuery } from './committees/committees/buildSourceQuery'
import { CommitteesCommitteesTransformer } from './committees/committees/transformer'
import { buildSourceQuery as cventBuildSourceQuery } from './cvent/event-registrations/buildSourceQuery'
import { CventTransformer } from './cvent/event-registrations/transformer'
import { buildSourceQuery as tncCertificatesBuildQuery } from './tnc/certificates/buildSourceQuery'
Expand All @@ -20,6 +22,15 @@ export type { BuildSourceQuery, DataSource, PlatformDefinition } from './types'
export { DataSourceName } from './types'

const supported: Partial<Record<PlatformType, PlatformDefinition>> = {
[PlatformType.COMMITTEES]: {
sources: [
{
name: DataSourceName.COMMITTEES_COMMITTEES,
buildSourceQuery: committeesCommitteesBuildQuery,
transformer: new CommitteesCommitteesTransformer(),
},
],
},
[PlatformType.CVENT]: {
sources: [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export enum DataSourceName {
TNC_ENROLLMENTS = 'enrollments',
TNC_CERTIFICATES = 'certificates',
TNC_COURSES = 'courses',
COMMITTEES_COMMITTEES = 'committees',
}

export interface DataSource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ export const ORG_DB_ATTRIBUTE_SOURCE_PRIORITY = [
OrganizationAttributeSource.ENRICHMENT_PEOPLEDATALABS,
OrganizationAttributeSource.CVENT,
OrganizationAttributeSource.TNC,
OrganizationAttributeSource.COMMITTEES,
// legacy - keeping this for backward compatibility
OrganizationAttributeSource.ENRICHMENT,
OrganizationAttributeSource.GITHUB,
Expand Down
11 changes: 11 additions & 0 deletions services/libs/integrations/src/integrations/committees/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { IActivityScoringGrid } from '@crowd/types'

export enum CommitteesActivityType {
ADDED_TO_COMMITTEE = 'added-to-committee',
REMOVED_FROM_COMMITTEE = 'removed-from-committee',
}

export const COMMITTEES_GRID: Record<CommitteesActivityType, IActivityScoringGrid> = {
[CommitteesActivityType.ADDED_TO_COMMITTEE]: { score: 1 },
[CommitteesActivityType.REMOVED_FROM_COMMITTEE]: { score: 1 },
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removal activity score inconsistent with codebase pattern

Medium Severity

REMOVED_FROM_COMMITTEE has score: 1, which is inconsistent with the established codebase scoring convention. Analogous removal activities use negative scores: UNSTAR is -2 (reversal of STAR at 2), and MEMBER_LEAVE is -2 (reversal of MEMBER_JOIN at 2). A committee removal with a positive score would incorrectly boost member engagement metrics.

Fix in Cursor Fix in Web

}
2 changes: 2 additions & 0 deletions services/libs/integrations/src/integrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,6 @@ export * from './cvent/types'

export * from './tnc/types'

export * from './committees/types'

export * from './activityDisplayService'
2 changes: 2 additions & 0 deletions services/libs/types/src/enums/organizations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export enum OrganizationSource {
UI = 'ui',
CVENT = 'cvent',
TNC = 'tnc',
COMMITTEES = 'committees',
}

export enum OrganizationMergeSuggestionType {
Expand Down Expand Up @@ -42,6 +43,7 @@ export enum OrganizationAttributeSource {
ENRICHMENT_PEOPLEDATALABS = 'enrichment-peopledatalabs',
CVENT = 'cvent',
TNC = 'tnc',
COMMITTEES = 'committees',
// legacy - keeping this for backward compatibility
ENRICHMENT = 'enrichment',
GITHUB = 'github',
Expand Down
1 change: 1 addition & 0 deletions services/libs/types/src/enums/platforms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export enum PlatformType {
GIT = 'git',
CRUNCHBASE = 'crunchbase',
GROUPSIO = 'groupsio',
COMMITTEES = 'committees',
CONFLUENCE = 'confluence',
GERRIT = 'gerrit',
JIRA = 'jira',
Expand Down
Loading