From e4c3092e77a39198963d7d753339129f928582da Mon Sep 17 00:00:00 2001 From: Joe Ayoub Date: Fri, 19 Jun 2026 15:29:30 +0100 Subject: [PATCH] handle 409 as retries for pendo-audience destination --- .../pendo-audiences/metadata.json | 378 ------------------ ...orm-batch.test.ts => engage-batch.test.ts} | 241 +++++++++-- .../__tests__/engage-single.test.ts | 151 +++++++ .../syncAudience/__tests__/perform.test.ts | 104 ----- .../pendo-audiences/syncAudience/fields.ts | 24 -- .../pendo-audiences/syncAudience/functions.ts | 127 ++++-- .../syncAudience/generated-types.ts | 10 - .../pendo-audiences/syncAudience/index.ts | 8 +- 8 files changed, 455 insertions(+), 588 deletions(-) delete mode 100644 packages/destination-actions/src/destinations/pendo-audiences/metadata.json rename packages/destination-actions/src/destinations/pendo-audiences/syncAudience/__tests__/{perform-batch.test.ts => engage-batch.test.ts} (50%) create mode 100644 packages/destination-actions/src/destinations/pendo-audiences/syncAudience/__tests__/engage-single.test.ts delete mode 100644 packages/destination-actions/src/destinations/pendo-audiences/syncAudience/__tests__/perform.test.ts diff --git a/packages/destination-actions/src/destinations/pendo-audiences/metadata.json b/packages/destination-actions/src/destinations/pendo-audiences/metadata.json deleted file mode 100644 index e89000cb2c9..00000000000 --- a/packages/destination-actions/src/destinations/pendo-audiences/metadata.json +++ /dev/null @@ -1,378 +0,0 @@ -{ - "slug": "actions-pendo-audiences", - "name": "Pendo Audiences", - "mode": "cloud", - "description": "Sync Segment Engage Audiences to Pendo Segments.", - "authentication": { - "scheme": "custom", - "fields": { - "integrationKey": { - "label": "Integration Key", - "description": "Your Pendo Integration Key. Found in Pendo under Settings > Integrations > Integration Keys.", - "type": "password", - "required": true, - "multiple": false, - "choices": null, - "default": null, - "depends_on": null - }, - "region": { - "label": "Region", - "description": "The region your Pendo account is hosted in.", - "type": "string", - "required": true, - "multiple": false, - "choices": [ - { - "label": "US", - "value": "US" - }, - { - "label": "US1", - "value": "US1" - }, - { - "label": "EU", - "value": "EU" - }, - { - "label": "JP", - "value": "JP" - }, - { - "label": "AU", - "value": "AU" - } - ], - "default": "DEFAULT", - "depends_on": null - } - } - }, - "audienceConfig": { - "mode": { - "type": "synced", - "full_audience_sync": false - }, - "audienceFields": { - "audienceName": { - "label": "Pendo Segment Name", - "description": "A name for the Pendo Segment. Leave blank to use the Segment audience name.", - "type": "string", - "required": false, - "multiple": false, - "choices": null, - "default": null, - "depends_on": null - } - }, - "supportsAudienceFunctions": true - }, - "actions": { - "syncAudience": { - "title": "Sync to Pendo Segment", - "description": "Sync Segment Engage Audience membership to a Pendo Segment by adding or removing visitors.", - "platform": "cloud", - "defaultSubscription": "type = \"identify\" or type = \"track\"", - "hidden": false, - "hasPerformBatch": true, - "syncMode": null, - "hooks": null, - "dynamicFields": null, - "fields": { - "visitorId": { - "label": "Visitor ID", - "description": "The Pendo Visitor ID for the user. Maps to the userId in Segment by default.", - "type": "string", - "required": true, - "multiple": false, - "allowNull": false, - "dynamic": false, - "default": { - "@path": "$.userId" - }, - "choices": null, - "placeholder": null, - "properties": null, - "category": null, - "depends_on": null, - "readOnly": null, - "hidden": null, - "minimum": null, - "maximum": null, - "defaultObjectUI": null, - "disabledInputMethods": null, - "displayMode": null, - "format": null, - "additionalProperties": false - }, - "traitsOrProperties": { - "label": "Traits or Properties", - "description": "Traits or Properties object from the identify() or track() call emitted by Engage.", - "type": "object", - "required": true, - "multiple": false, - "allowNull": false, - "dynamic": false, - "default": { - "@if": { - "exists": { - "@path": "$.traits" - }, - "then": { - "@path": "$.traits" - }, - "else": { - "@path": "$.properties" - } - } - }, - "choices": null, - "placeholder": null, - "properties": null, - "category": null, - "depends_on": null, - "readOnly": null, - "hidden": true, - "minimum": null, - "maximum": null, - "defaultObjectUI": null, - "disabledInputMethods": null, - "displayMode": null, - "format": null, - "additionalProperties": false - }, - "segmentAudienceKey": { - "label": "Segment Audience Key", - "description": "Segment Audience Key. Used to determine whether the user is being added to or removed from the Pendo Segment.", - "type": "string", - "required": true, - "multiple": false, - "allowNull": false, - "dynamic": false, - "default": { - "@path": "$.context.personas.computation_key" - }, - "choices": null, - "placeholder": null, - "properties": null, - "category": null, - "depends_on": null, - "readOnly": null, - "hidden": true, - "minimum": null, - "maximum": null, - "defaultObjectUI": null, - "disabledInputMethods": null, - "displayMode": null, - "format": null, - "additionalProperties": false - }, - "segmentAudienceId": { - "label": "Segment External Audience ID", - "description": "The External Audience ID from Segment, which maps to the Pendo Segment ID.", - "type": "string", - "required": true, - "multiple": false, - "allowNull": false, - "dynamic": false, - "default": { - "@path": "$.context.personas.external_audience_id" - }, - "choices": null, - "placeholder": null, - "properties": null, - "category": null, - "depends_on": null, - "readOnly": null, - "hidden": true, - "minimum": null, - "maximum": null, - "defaultObjectUI": null, - "disabledInputMethods": null, - "displayMode": null, - "format": null, - "additionalProperties": false - }, - "enable_batching": { - "label": "Batch events", - "description": "When enabled, events are batched and sent to Pendo using the batch patch endpoint (up to 1000 visitors per request).", - "type": "boolean", - "required": true, - "multiple": false, - "allowNull": false, - "dynamic": false, - "default": true, - "choices": null, - "placeholder": null, - "properties": null, - "category": null, - "depends_on": null, - "readOnly": null, - "hidden": null, - "minimum": null, - "maximum": null, - "defaultObjectUI": null, - "disabledInputMethods": null, - "displayMode": null, - "format": null, - "additionalProperties": false - }, - "batch_size": { - "label": "Max Batch Size", - "description": "Maximum number of visitors to include in a single batch request. Must be between 1 and 1000.", - "type": "integer", - "required": false, - "multiple": false, - "allowNull": false, - "dynamic": false, - "default": 1000, - "choices": null, - "placeholder": null, - "properties": null, - "category": null, - "depends_on": null, - "readOnly": null, - "hidden": null, - "minimum": 1, - "maximum": 1000, - "defaultObjectUI": null, - "disabledInputMethods": null, - "displayMode": null, - "format": null, - "additionalProperties": false - } - } - } - }, - "presets": [ - { - "name": "Entities Audience Membership Changed", - "type": "specificEvent", - "partnerAction": "syncAudience", - "mapping": { - "visitorId": { - "@path": "$.userId" - }, - "traitsOrProperties": { - "@if": { - "exists": { - "@path": "$.traits" - }, - "then": { - "@path": "$.traits" - }, - "else": { - "@path": "$.properties" - } - } - }, - "segmentAudienceKey": { - "@path": "$.context.personas.computation_key" - }, - "segmentAudienceId": { - "@path": "$.context.personas.external_audience_id" - }, - "enable_batching": true, - "batch_size": 1000 - }, - "eventSlug": "warehouse_audience_membership_changed_identify" - }, - { - "name": "Associated Entity Added", - "type": "specificEvent", - "partnerAction": "syncAudience", - "mapping": { - "visitorId": { - "@path": "$.userId" - }, - "traitsOrProperties": { - "@if": { - "exists": { - "@path": "$.traits" - }, - "then": { - "@path": "$.traits" - }, - "else": { - "@path": "$.properties" - } - } - }, - "segmentAudienceKey": { - "@path": "$.context.personas.computation_key" - }, - "segmentAudienceId": { - "@path": "$.context.personas.external_audience_id" - }, - "enable_batching": true, - "batch_size": 1000 - }, - "eventSlug": "warehouse_entity_added_track" - }, - { - "name": "Associated Entity Removed", - "type": "specificEvent", - "partnerAction": "syncAudience", - "mapping": { - "visitorId": { - "@path": "$.userId" - }, - "traitsOrProperties": { - "@if": { - "exists": { - "@path": "$.traits" - }, - "then": { - "@path": "$.traits" - }, - "else": { - "@path": "$.properties" - } - } - }, - "segmentAudienceKey": { - "@path": "$.context.personas.computation_key" - }, - "segmentAudienceId": { - "@path": "$.context.personas.external_audience_id" - }, - "enable_batching": true, - "batch_size": 1000 - }, - "eventSlug": "warehouse_entity_removed_track" - }, - { - "name": "Journeys Step Entered", - "type": "specificEvent", - "partnerAction": "syncAudience", - "mapping": { - "visitorId": { - "@path": "$.userId" - }, - "traitsOrProperties": { - "@if": { - "exists": { - "@path": "$.traits" - }, - "then": { - "@path": "$.traits" - }, - "else": { - "@path": "$.properties" - } - } - }, - "segmentAudienceKey": { - "@path": "$.context.personas.computation_key" - }, - "segmentAudienceId": { - "@path": "$.context.personas.external_audience_id" - }, - "enable_batching": true, - "batch_size": 1000 - }, - "eventSlug": "journeys_step_entered_track" - } - ] -} diff --git a/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/__tests__/perform-batch.test.ts b/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/__tests__/engage-batch.test.ts similarity index 50% rename from packages/destination-actions/src/destinations/pendo-audiences/syncAudience/__tests__/perform-batch.test.ts rename to packages/destination-actions/src/destinations/pendo-audiences/syncAudience/__tests__/engage-batch.test.ts index 297a3680a0c..53bb74a7f73 100644 --- a/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/__tests__/perform-batch.test.ts +++ b/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/__tests__/engage-batch.test.ts @@ -1,5 +1,5 @@ import nock from 'nock' -import { createTestEvent, createTestIntegration } from '@segment/actions-core' +import { createTestEvent, createTestIntegration, RetryableError } from '@segment/actions-core' import type { SegmentEvent } from '@segment/actions-core' import Destination from '../../index' import { REGIONS, SEGMENT_ENDPOINT } from '../../constants' @@ -32,8 +32,6 @@ function makeEvent(userId: string, audienceValue: boolean, type: 'identify' | 't const baseMapping = { visitorId: { '@path': '$.userId' }, - traitsOrProperties: { '@path': '$.traits' }, - segmentAudienceKey: 'test_audience', segmentAudienceId: SEGMENT_ID, enable_batching: false } @@ -64,18 +62,18 @@ describe('Pendo Audiences - syncAudience', () => { expect(responses.length).toBe(3) expect(responses[0]).toMatchObject({ status: 200, - sent: { patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] }, - body: { visitorId: 'user1', traitsOrProperties: { test_audience: true }, segmentAudienceKey: 'test_audience', segmentAudienceId: SEGMENT_ID } + sent: { visitorId: 'user1', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] } }) expect(responses[1]).toMatchObject({ status: 200, - sent: { patch: [{ op: 'add', path: '/visitors', value: ['user2'] }] }, - body: { visitorId: 'user2', traitsOrProperties: { test_audience: true }, segmentAudienceKey: 'test_audience', segmentAudienceId: SEGMENT_ID } + sent: { visitorId: 'user2', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'add', path: '/visitors', value: ['user2'] }] } }) expect(responses[2]).toMatchObject({ status: 200, - sent: { patch: [{ op: 'add', path: '/visitors', value: ['user3'] }] }, - body: { visitorId: 'user3', traitsOrProperties: { test_audience: true }, segmentAudienceKey: 'test_audience', segmentAudienceId: SEGMENT_ID } + sent: { visitorId: 'user3', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'add', path: '/visitors', value: ['user3'] }] } }) }) }) @@ -99,13 +97,13 @@ describe('Pendo Audiences - syncAudience', () => { expect(responses.length).toBe(2) expect(responses[0]).toMatchObject({ status: 200, - sent: { patch: [{ op: 'remove', path: '/visitors', value: ['user1'] }] }, - body: { visitorId: 'user1', traitsOrProperties: { test_audience: false }, segmentAudienceKey: 'test_audience', segmentAudienceId: SEGMENT_ID } + sent: { visitorId: 'user1', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'remove', path: '/visitors', value: ['user1'] }] } }) expect(responses[1]).toMatchObject({ status: 200, - sent: { patch: [{ op: 'remove', path: '/visitors', value: ['user2'] }] }, - body: { visitorId: 'user2', traitsOrProperties: { test_audience: false }, segmentAudienceKey: 'test_audience', segmentAudienceId: SEGMENT_ID } + sent: { visitorId: 'user2', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'remove', path: '/visitors', value: ['user2'] }] } }) }) }) @@ -137,27 +135,107 @@ describe('Pendo Audiences - syncAudience', () => { expect(responses.length).toBe(4) expect(responses[0]).toMatchObject({ status: 200, - sent: { patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] }, - body: { visitorId: 'user1', traitsOrProperties: { test_audience: true }, segmentAudienceKey: 'test_audience', segmentAudienceId: SEGMENT_ID } + sent: { visitorId: 'user1', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] } }) expect(responses[1]).toMatchObject({ status: 200, - sent: { patch: [{ op: 'remove', path: '/visitors', value: ['user2'] }] }, - body: { visitorId: 'user2', traitsOrProperties: { test_audience: false }, segmentAudienceKey: 'test_audience', segmentAudienceId: SEGMENT_ID } + sent: { visitorId: 'user2', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'remove', path: '/visitors', value: ['user2'] }] } }) expect(responses[2]).toMatchObject({ status: 200, - sent: { patch: [{ op: 'add', path: '/visitors', value: ['user3'] }] }, - body: { visitorId: 'user3', traitsOrProperties: { test_audience: true }, segmentAudienceKey: 'test_audience', segmentAudienceId: SEGMENT_ID } + sent: { visitorId: 'user3', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'add', path: '/visitors', value: ['user3'] }] } }) expect(responses[3]).toMatchObject({ status: 200, - sent: { patch: [{ op: 'remove', path: '/visitors', value: ['user4'] }] }, - body: { visitorId: 'user4', traitsOrProperties: { test_audience: false }, segmentAudienceKey: 'test_audience', segmentAudienceId: SEGMENT_ID } + sent: { visitorId: 'user4', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'remove', path: '/visitors', value: ['user4'] }] } }) }) }) + describe('executeBatch - track events', () => { + it('should resolve membership from track event properties and PATCH add/remove accordingly', async () => { + const expectedPatchJSON = { + patch: [ + { op: 'add', path: '/visitors', value: ['user1'] }, + { op: 'remove', path: '/visitors', value: ['user2'] } + ] + } + + nock(REGIONS.DEFAULT.domain) + .patch(`${segmentBase}/visitor`, expectedPatchJSON) + .reply(200, { + multistatus: [ + { status: 200, message: 'success', operation: 'add' }, + { status: 200, message: 'success', operation: 'remove' } + ] + }) + + const responses = await testDestination.executeBatch('syncAudience', { + events: [makeEvent('user1', true, 'track'), makeEvent('user2', false, 'track')], + settings, + mapping: batchMapping + }) + + expect(responses.length).toBe(2) + expect(responses[0]).toMatchObject({ + status: 200, + sent: { visitorId: 'user1', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] } + }) + expect(responses[1]).toMatchObject({ + status: 200, + sent: { visitorId: 'user2', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'remove', path: '/visitors', value: ['user2'] }] } + }) + }) + + it('should return a 400 error for a track event with no membership in properties', async () => { + const expectedPatchJSON = { + patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] + } + + nock(REGIONS.DEFAULT.domain) + .patch(`${segmentBase}/visitor`, expectedPatchJSON) + .reply(200, { multistatus: [{ status: 200, message: 'success', operation: 'add' }] }) + + const events: SegmentEvent[] = [ + makeEvent('user1', true, 'track'), + createTestEvent({ + type: 'track', + event: 'Audience Entered', + userId: 'user2', + properties: {}, // computation_key missing from properties → membership undeterminable + context: { + personas: { computation_class: 'audience', computation_key: 'test_audience', external_audience_id: SEGMENT_ID } + } + }) + ] + + const responses = await testDestination.executeBatch('syncAudience', { + events, + settings, + mapping: batchMapping + }) + + expect(responses.length).toBe(2) + expect(responses[0]).toMatchObject({ + status: 200, + sent: { visitorId: 'user1', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] } + }) + expect(responses[1]).toMatchObject({ + status: 400, + errormessage: 'Unable to determine audience membership for this event', + sent: { visitorId: 'user2', segmentAudienceId: SEGMENT_ID, enable_batching: true } + }) + expect(responses[1]).not.toHaveProperty('body') + }) + }) + describe('executeBatch - validation errors', () => { it('should return a schema error for a payload with undefined visitorId and succeed for the valid one', async () => { const expectedPatchJSON = { @@ -170,14 +248,20 @@ describe('Pendo Audiences - syncAudience', () => { const events: SegmentEvent[] = [ createTestEvent({ + type: 'identify', userId: 'user1', traits: { test_audience: true, customId: 'user1' }, - context: { personas: { computation_key: 'test_audience', external_audience_id: SEGMENT_ID } } + context: { + personas: { computation_class: 'audience', computation_key: 'test_audience', external_audience_id: SEGMENT_ID } + } }), createTestEvent({ + type: 'identify', userId: 'user2', traits: { test_audience: true }, // no customId → visitorId resolves to undefined - context: { personas: { computation_key: 'test_audience', external_audience_id: SEGMENT_ID } } + context: { + personas: { computation_class: 'audience', computation_key: 'test_audience', external_audience_id: SEGMENT_ID } + } }) ] @@ -190,8 +274,8 @@ describe('Pendo Audiences - syncAudience', () => { expect(responses.length).toBe(2) expect(responses[0]).toMatchObject({ status: 200, - sent: { patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] }, - body: { visitorId: 'user1', traitsOrProperties: { test_audience: true, customId: 'user1' }, segmentAudienceKey: 'test_audience', segmentAudienceId: SEGMENT_ID } + sent: { visitorId: 'user1', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] } }) expect(responses[1]).toMatchObject({ status: 400, @@ -202,7 +286,7 @@ describe('Pendo Audiences - syncAudience', () => { it('should return a 400 error for all payloads when visitorId is empty string', async () => { // Empty strings survive schema validation (required only rejects undefined/null) and - // reach our custom if(!visitorId) guard in sendBatch + // reach our custom if(!visitorId) guard in send const responses = await testDestination.executeBatch('syncAudience', { events: [makeEvent('user1', true)], settings, @@ -213,8 +297,10 @@ describe('Pendo Audiences - syncAudience', () => { expect(responses[0]).toMatchObject({ status: 400, errormessage: 'Visitor ID is required', - body: { visitorId: '', traitsOrProperties: { test_audience: true }, segmentAudienceKey: 'test_audience', segmentAudienceId: SEGMENT_ID } + sent: { visitorId: '', segmentAudienceId: SEGMENT_ID, enable_batching: true } }) + // Validation error never reached Pendo, so no request body was sent to the destination + expect(responses[0]).not.toHaveProperty('body') }) it('should return a 400 error for all payloads when segmentAudienceId is empty string', async () => { @@ -228,26 +314,77 @@ describe('Pendo Audiences - syncAudience', () => { expect(responses[0]).toMatchObject({ status: 400, errormessage: 'Missing Pendo Segment ID', - body: { visitorId: 'user1', traitsOrProperties: { test_audience: true }, segmentAudienceKey: 'test_audience', segmentAudienceId: '' } + sent: { visitorId: 'user1', segmentAudienceId: '', enable_batching: true } }) + expect(responses[0]).not.toHaveProperty('body') expect(responses[1]).toMatchObject({ status: 400, errormessage: 'Missing Pendo Segment ID', - body: { visitorId: 'user2', traitsOrProperties: { test_audience: false }, segmentAudienceKey: 'test_audience', segmentAudienceId: '' } + sent: { visitorId: 'user2', segmentAudienceId: '', enable_batching: true } }) + expect(responses[1]).not.toHaveProperty('body') }) it('should return a schema error for all payloads when segmentAudienceId is missing', async () => { // No nock mock needed — performBatch is never called when all payloads fail schema validation + // Omit segmentAudienceId entirely so the field is absent and fails schema validation + const { segmentAudienceId, ...mappingWithoutSegmentId } = batchMapping const responses = await testDestination.executeBatch('syncAudience', { events: [makeEvent('user1', true), makeEvent('user2', false)], settings, - mapping: { ...batchMapping, segmentAudienceId: undefined } + mapping: mappingWithoutSegmentId }) expect(responses.length).toBe(2) - expect(responses[0]).toMatchObject({ status: 400, errortype: 'PAYLOAD_VALIDATION_FAILED', errormessage: "The root value is missing the required field 'segmentAudienceId'." }) - expect(responses[1]).toMatchObject({ status: 400, errortype: 'PAYLOAD_VALIDATION_FAILED', errormessage: "The root value is missing the required field 'segmentAudienceId'." }) + expect(responses[0]).toMatchObject({ + status: 400, + errortype: 'PAYLOAD_VALIDATION_FAILED', + errormessage: "The root value is missing the required field 'segmentAudienceId'." + }) + expect(responses[1]).toMatchObject({ + status: 400, + errortype: 'PAYLOAD_VALIDATION_FAILED', + errormessage: "The root value is missing the required field 'segmentAudienceId'." + }) + }) + + it('should return a 400 error when audience membership cannot be determined', async () => { + const expectedPatchJSON = { + patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] + } + + nock(REGIONS.DEFAULT.domain) + .patch(`${segmentBase}/visitor`, expectedPatchJSON) + .reply(200, { multistatus: [{ status: 200, message: 'success', operation: 'add' }] }) + + const events: SegmentEvent[] = [ + makeEvent('user1', true), + createTestEvent({ + type: 'identify', + userId: 'user2', + // computation_class is not "audience", so core cannot resolve membership for this event + context: { personas: { computation_key: 'test_audience', external_audience_id: SEGMENT_ID } } + }) + ] + + const responses = await testDestination.executeBatch('syncAudience', { + events, + settings, + mapping: batchMapping + }) + + expect(responses.length).toBe(2) + expect(responses[0]).toMatchObject({ + status: 200, + sent: { visitorId: 'user1', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] } + }) + expect(responses[1]).toMatchObject({ + status: 400, + errormessage: 'Unable to determine audience membership for this event', + sent: { visitorId: 'user2', segmentAudienceId: SEGMENT_ID, enable_batching: true } + }) + expect(responses[1]).not.toHaveProperty('body') }) }) @@ -265,14 +402,14 @@ describe('Pendo Audiences - syncAudience', () => { expect(responses[0]).toMatchObject({ status: 500, errormessage: 'Internal Server Error', - sent: { patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] }, - body: { visitorId: 'user1', traitsOrProperties: { test_audience: true }, segmentAudienceKey: 'test_audience', segmentAudienceId: SEGMENT_ID } + sent: { visitorId: 'user1', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] } }) expect(responses[1]).toMatchObject({ status: 500, errormessage: 'Internal Server Error', - sent: { patch: [{ op: 'add', path: '/visitors', value: ['user2'] }] }, - body: { visitorId: 'user2', traitsOrProperties: { test_audience: true }, segmentAudienceKey: 'test_audience', segmentAudienceId: SEGMENT_ID } + sent: { visitorId: 'user2', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'add', path: '/visitors', value: ['user2'] }] } }) }) @@ -289,14 +426,14 @@ describe('Pendo Audiences - syncAudience', () => { expect(responses[0]).toMatchObject({ status: 403, errormessage: 'Forbidden', - sent: { patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] }, - body: { visitorId: 'user1', traitsOrProperties: { test_audience: true }, segmentAudienceKey: 'test_audience', segmentAudienceId: SEGMENT_ID } + sent: { visitorId: 'user1', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] } }) expect(responses[1]).toMatchObject({ status: 403, errormessage: 'Forbidden', - sent: { patch: [{ op: 'remove', path: '/visitors', value: ['user2'] }] }, - body: { visitorId: 'user2', traitsOrProperties: { test_audience: false }, segmentAudienceKey: 'test_audience', segmentAudienceId: SEGMENT_ID } + sent: { visitorId: 'user2', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'remove', path: '/visitors', value: ['user2'] }] } }) }) @@ -320,14 +457,30 @@ describe('Pendo Audiences - syncAudience', () => { expect(responses[0]).toMatchObject({ status: 400, errormessage: 'Error adding visitor to segment', - sent: { patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] }, - body: { visitorId: 'user1', traitsOrProperties: { test_audience: true }, segmentAudienceKey: 'test_audience', segmentAudienceId: SEGMENT_ID } + sent: { visitorId: 'user1', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] } }) expect(responses[1]).toMatchObject({ status: 200, - sent: { patch: [{ op: 'remove', path: '/visitors', value: ['user2'] }] }, - body: { visitorId: 'user2', traitsOrProperties: { test_audience: false }, segmentAudienceKey: 'test_audience', segmentAudienceId: SEGMENT_ID } + sent: { visitorId: 'user2', segmentAudienceId: SEGMENT_ID, enable_batching: true }, + body: { patch: [{ op: 'remove', path: '/visitors', value: ['user2'] }] } }) }) + + it('should throw a RetryableError (429) for the whole batch when the API returns 409', async () => { + // 409 is transient ("operation in progress"); the entire PATCH failed, so we retry the whole + // batch by throwing rather than marking individual items as failed. + nock(REGIONS.DEFAULT.domain).patch(`${segmentBase}/visitor`).reply(409, { message: 'Conflict' }) + + const promise = testDestination.executeBatch('syncAudience', { + events: [makeEvent('user1', true), makeEvent('user2', false)], + settings, + mapping: batchMapping + }) + + await expect(promise).rejects.toThrow(RetryableError) + await expect(promise).rejects.toThrow('Pendo returned a 409. Segment is returning a 429 to trigger a retry.') + await expect(promise).rejects.toMatchObject({ status: 429, code: 'RETRYABLE_ERROR' }) + }) }) }) diff --git a/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/__tests__/engage-single.test.ts b/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/__tests__/engage-single.test.ts new file mode 100644 index 00000000000..97e75e3dcb4 --- /dev/null +++ b/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/__tests__/engage-single.test.ts @@ -0,0 +1,151 @@ +import nock from 'nock' +import { createTestEvent, createTestIntegration, RetryableError } from '@segment/actions-core' +import Destination from '../../index' +import { REGIONS, SEGMENT_ENDPOINT } from '../../constants' + +const testDestination = createTestIntegration(Destination) + +const settings = { + integrationKey: 'test-integration-key', + region: REGIONS.DEFAULT.name +} + +const SEGMENT_ID = 'seg-abc123' +const segmentBase = `/${SEGMENT_ENDPOINT}/${SEGMENT_ID}` + +const baseMapping = { + visitorId: { '@path': '$.userId' }, + segmentAudienceId: SEGMENT_ID, + enable_batching: false +} + +function makeEvent(userId: string, audienceValue: boolean, type: 'identify' | 'track' = 'identify') { + return createTestEvent({ + type, + userId, + traits: type === 'identify' ? { test_audience: audienceValue } : undefined, + properties: type === 'track' ? { test_audience: audienceValue } : undefined, + context: { + personas: { + computation_class: 'audience', + computation_key: 'test_audience', + external_audience_id: SEGMENT_ID + } + } + }) +} + +describe('Pendo Audiences - syncAudience perform', () => { + afterEach(() => { + nock.cleanAll() + }) + + it('should add a visitor to the audience successfully', async () => { + nock(REGIONS.DEFAULT.domain) + .patch(`${segmentBase}/visitor`, { patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] }) + .reply(200, { multistatus: [{ status: 200, message: 'success', operation: 'add' }] }) + + const responses = await testDestination.testAction('syncAudience', { + event: makeEvent('user1', true), + settings, + mapping: baseMapping + }) + + expect(responses[0].status).toBe(200) + }) + + it('should remove a visitor from the audience successfully', async () => { + nock(REGIONS.DEFAULT.domain) + .patch(`${segmentBase}/visitor`, { patch: [{ op: 'remove', path: '/visitors', value: ['user1'] }] }) + .reply(200, { multistatus: [{ status: 200, message: 'success', operation: 'remove' }] }) + + const responses = await testDestination.testAction('syncAudience', { + event: makeEvent('user1', false), + settings, + mapping: baseMapping + }) + + expect(responses[0].status).toBe(200) + }) + + it('should add a visitor from a track event successfully', async () => { + nock(REGIONS.DEFAULT.domain) + .patch(`${segmentBase}/visitor`, { patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] }) + .reply(200, { multistatus: [{ status: 200, message: 'success', operation: 'add' }] }) + + const responses = await testDestination.testAction('syncAudience', { + event: makeEvent('user1', true, 'track'), + settings, + mapping: baseMapping + }) + + expect(responses[0].status).toBe(200) + }) + + it('should remove a visitor from a track event successfully', async () => { + nock(REGIONS.DEFAULT.domain) + .patch(`${segmentBase}/visitor`, { patch: [{ op: 'remove', path: '/visitors', value: ['user1'] }] }) + .reply(200, { multistatus: [{ status: 200, message: 'success', operation: 'remove' }] }) + + const responses = await testDestination.testAction('syncAudience', { + event: makeEvent('user1', false, 'track'), + settings, + mapping: baseMapping + }) + + expect(responses[0].status).toBe(200) + }) + + it('should throw a validation error when visitorId is not provided', async () => { + await expect( + testDestination.testAction('syncAudience', { + event: makeEvent('user1', true), + settings, + mapping: { ...baseMapping, visitorId: '' } + }) + ).rejects.toThrow("The root value is missing the required field 'visitorId'.") + }) + + it('should throw a validation error when audience membership cannot be determined', async () => { + const event = createTestEvent({ + type: 'identify', + userId: 'user1', + // computation_class is not "audience", so core cannot resolve membership + context: { personas: { computation_key: 'test_audience', external_audience_id: SEGMENT_ID } } + }) + + await expect( + testDestination.testAction('syncAudience', { + event, + settings, + mapping: baseMapping + }) + ).rejects.toThrow('Unable to determine audience membership for this event') + }) + + it('should throw an IntegrationError when the API returns 500', async () => { + nock(REGIONS.DEFAULT.domain).patch(`${segmentBase}/visitor`).reply(500, { message: 'Internal Server Error' }) + + await expect( + testDestination.testAction('syncAudience', { + event: makeEvent('user1', true), + settings, + mapping: baseMapping + }) + ).rejects.toThrow('Internal Server Error') + }) + + it('should throw a RetryableError (429) when the API returns 409', async () => { + nock(REGIONS.DEFAULT.domain).patch(`${segmentBase}/visitor`).reply(409, { message: 'Conflict' }) + + const promise = testDestination.testAction('syncAudience', { + event: makeEvent('user1', true), + settings, + mapping: baseMapping + }) + + await expect(promise).rejects.toThrow(RetryableError) + await expect(promise).rejects.toThrow('Pendo returned a 409. Segment is returning a 429 to trigger a retry.') + await expect(promise).rejects.toMatchObject({ status: 429, code: 'RETRYABLE_ERROR' }) + }) +}) diff --git a/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/__tests__/perform.test.ts b/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/__tests__/perform.test.ts deleted file mode 100644 index 7b1a1a26a4c..00000000000 --- a/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/__tests__/perform.test.ts +++ /dev/null @@ -1,104 +0,0 @@ -import nock from 'nock' -import { createTestEvent, createTestIntegration } from '@segment/actions-core' -import Destination from '../../index' -import { REGIONS, SEGMENT_ENDPOINT } from '../../constants' - -const testDestination = createTestIntegration(Destination) - -const settings = { - integrationKey: 'test-integration-key', - region: REGIONS.DEFAULT.name -} - -const SEGMENT_ID = 'seg-abc123' -const segmentBase = `/${SEGMENT_ENDPOINT}/${SEGMENT_ID}` - -const baseMapping = { - visitorId: { '@path': '$.userId' }, - traitsOrProperties: { '@path': '$.traits' }, - segmentAudienceKey: 'test_audience', - segmentAudienceId: SEGMENT_ID, - enable_batching: false -} - -describe('Pendo Audiences - syncAudience perform', () => { - afterEach(() => { - nock.cleanAll() - }) - - it('should add a visitor to the audience successfully', async () => { - nock(REGIONS.DEFAULT.domain) - .patch(`${segmentBase}/visitor`, { patch: [{ op: 'add', path: '/visitors', value: ['user1'] }] }) - .reply(200, { multistatus: [{ status: 200, message: 'success', operation: 'add' }] }) - - const event = createTestEvent({ - userId: 'user1', - traits: { test_audience: true }, - context: { personas: { computation_key: 'test_audience', external_audience_id: SEGMENT_ID } } - }) - - const responses = await testDestination.testAction('syncAudience', { - event, - settings, - mapping: baseMapping - }) - - expect(responses[0].status).toBe(200) - }) - - it('should remove a visitor from the audience successfully', async () => { - nock(REGIONS.DEFAULT.domain) - .patch(`${segmentBase}/visitor`, { patch: [{ op: 'remove', path: '/visitors', value: ['user1'] }] }) - .reply(200, { multistatus: [{ status: 200, message: 'success', operation: 'remove' }] }) - - const event = createTestEvent({ - userId: 'user1', - traits: { test_audience: false }, - context: { personas: { computation_key: 'test_audience', external_audience_id: SEGMENT_ID } } - }) - - const responses = await testDestination.testAction('syncAudience', { - event, - settings, - mapping: baseMapping - }) - - expect(responses[0].status).toBe(200) - }) - - it('should throw a validation error when visitorId is not provided', async () => { - const event = createTestEvent({ - userId: 'user1', - traits: { test_audience: true }, - context: { personas: { computation_key: 'test_audience', external_audience_id: SEGMENT_ID } } - }) - - await expect( - testDestination.testAction('syncAudience', { - event, - settings, - mapping: { ...baseMapping, visitorId: '' } - }) - ).rejects.toThrow("The root value is missing the required field 'visitorId'.") - }) - - it('should throw an IntegrationError when the API returns 500', async () => { - nock(REGIONS.DEFAULT.domain) - .patch(`${segmentBase}/visitor`) - .reply(500, { message: 'Internal Server Error' }) - - const event = createTestEvent({ - userId: 'user1', - traits: { test_audience: true }, - context: { personas: { computation_key: 'test_audience', external_audience_id: SEGMENT_ID } } - }) - - await expect( - testDestination.testAction('syncAudience', { - event, - settings, - mapping: baseMapping - }) - ).rejects.toThrow("Internal Server Error") - }) -}) diff --git a/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/fields.ts b/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/fields.ts index 7a87a28a227..9338a4b7cb0 100644 --- a/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/fields.ts +++ b/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/fields.ts @@ -10,30 +10,6 @@ export const fields: Record = { '@path': '$.userId' } }, - traitsOrProperties: { - label: 'Traits or Properties', - description: 'Traits or Properties object from the identify() or track() call emitted by Engage.', - type: 'object', - required: true, - unsafe_hidden: true, - default: { - '@if': { - exists: { '@path': '$.traits' }, - then: { '@path': '$.traits' }, - else: { '@path': '$.properties' } - } - } - }, - segmentAudienceKey: { - label: 'Segment Audience Key', - description: 'Segment Audience Key. Used to determine whether the user is being added to or removed from the Pendo Segment.', - type: 'string', - required: true, - unsafe_hidden: true, - default: { - '@path': '$.context.personas.computation_key' - } - }, segmentAudienceId: { label: 'Segment External Audience ID', description: 'The External Audience ID from Segment, which maps to the Pendo Segment ID.', diff --git a/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/functions.ts b/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/functions.ts index 629155fc049..46fab531c08 100644 --- a/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/functions.ts +++ b/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/functions.ts @@ -1,16 +1,41 @@ -import { IntegrationError, ErrorCodes, getErrorCodeFromHttpStatus, RequestClient, MultiStatusResponse, JSONLikeObject, PayloadValidationError } from '@segment/actions-core' +import { + IntegrationError, + ErrorCodes, + getErrorCodeFromHttpStatus, + RequestClient, + MultiStatusResponse, + JSONLikeObject, + PayloadValidationError, + InvalidAudienceMembershipError, + RetryableError, + AudienceMembership +} from '@segment/actions-core' import type { Payload } from './generated-types' import type { AddMap, RemoveMap, PatchBodyJSON, BatchPatchResponse, BatchMultistatusItem } from './types' import { SEGMENT_ENDPOINT } from '../constants' import { getDomain } from '../functions' -export async function send(request: RequestClient, region: string, payload: Payload[], isBatch: boolean): Promise { +export async function send( + request: RequestClient, + region: string, + payload: Payload[], + isBatch: boolean, + audienceMemberships: AudienceMembership[] +): Promise { const msResponse = new MultiStatusResponse() const segmentId = payload[0]?.segmentAudienceId if (!segmentId) { payload.forEach((p, index) => { - handleError('PayloadValidationError', 'Missing Pendo Segment ID', isBatch, msResponse, index, p, 400) + handleError( + 'PayloadValidationError', + 'Missing Pendo Segment ID', + isBatch, + msResponse, + index, + 400, + p as unknown as JSONLikeObject + ) }) return msResponse } @@ -19,13 +44,33 @@ export async function send(request: RequestClient, region: string, payload: Payl const removes: RemoveMap = new Map() payload.forEach((p, index) => { - const { visitorId, traitsOrProperties, segmentAudienceKey } = p + const { visitorId } = p if (!visitorId) { - handleError('PayloadValidationError','Visitor ID is required', isBatch, msResponse, index, p, 400) + handleError( + 'PayloadValidationError', + 'Visitor ID is required', + isBatch, + msResponse, + index, + 400, + p as unknown as JSONLikeObject + ) return } - const isAdding = Boolean(traitsOrProperties[segmentAudienceKey]) - if (isAdding) { + const membership = Array.isArray(audienceMemberships) ? audienceMemberships[index] : undefined + if (typeof membership !== 'boolean') { + handleError( + 'InvalidAudienceMembershipError', + 'Unable to determine audience membership for this event', + isBatch, + msResponse, + index, + 400, + p as unknown as JSONLikeObject + ) + return + } + if (membership) { adds.set(index, visitorId) } else { removes.set(index, visitorId) @@ -55,7 +100,6 @@ export async function send(request: RequestClient, region: string, payload: Payl } try { - const response = await request( `${getDomain(region)}/${SEGMENT_ENDPOINT}/${segmentId}/visitor`, { @@ -75,50 +119,85 @@ export async function send(request: RequestClient, region: string, payload: Payl if (isSuccess) { msResponse.setSuccessResponseAtIndex(index, { status: item.status, - body: p as unknown as JSONLikeObject, - sent: buildSent(item.operation, visitorId) + sent: p as unknown as JSONLikeObject, + body: buildPendoRequest(item.operation, [visitorId]) }) } else { - handleError('IntegrationError', item.message, isBatch, msResponse, index, p, item.status, buildSent(item.operation, visitorId)) + handleError( + 'IntegrationError', + item.message, + isBatch, + msResponse, + index, + item.status, + p as unknown as JSONLikeObject, + buildPendoRequest(item.operation, [visitorId]) + ) } }) }) - } - catch (error) { + } catch (error) { const status = (error?.response?.status as number) || 500 const message = (error?.message as string) || 'An error occurred while syncing visitors to Pendo Segment.' + // Pendo returns 409 ("Operation in progress") when a write to the segment is already underway. + // This is transient, so throw a RetryableError to let Segment retry the whole request later. + // Throwing (rather than setting per-item responses) retries the entire batch, which is correct + // since the single PATCH for all visitors is what Pendo rejected. + if (status === 409) { + throw new RetryableError('Pendo returned a 409. Segment is returning a 429 to trigger a retry.', 429) + } + const allIndices = [...adds.keys(), ...removes.keys()] allIndices.forEach((index) => { const visitorId = adds.get(index) ?? removes.get(index) const op = adds.has(index) ? 'add' : 'remove' - handleError('IntegrationError', message, isBatch, msResponse, index, payload[index], status, buildSent(op, visitorId as string)) + handleError( + 'IntegrationError', + message, + isBatch, + msResponse, + index, + status, + payload[index] as unknown as JSONLikeObject, + buildPendoRequest(op, [visitorId as string]) + ) }) } - if(isBatch) { + if (isBatch) { return msResponse } return } -function buildSent(op: 'add' | 'remove', visitorId: string): JSONLikeObject { - return { patch: [{ op, path: '/visitors', value: [visitorId] }] } as unknown as JSONLikeObject +function buildPendoRequest(op: 'add' | 'remove', visitorIds: string[]): JSONLikeObject { + return { patch: [{ op, path: '/visitors', value: visitorIds }] } as unknown as JSONLikeObject } -function handleError(errType: 'PayloadValidationError' | 'IntegrationError', message: string, isBatch: boolean, msResponse: MultiStatusResponse, index: number, payload: Payload, status = 400, sent?: JSONLikeObject): void { +function handleError( + errType: 'PayloadValidationError' | 'IntegrationError' | 'InvalidAudienceMembershipError', + message: string, + isBatch: boolean, + msResponse: MultiStatusResponse, + index: number, + status = 400, + sent?: JSONLikeObject, + body?: JSONLikeObject +): void { if (!isBatch) { - if(errType === 'PayloadValidationError') { + if (errType === 'PayloadValidationError') { throw new PayloadValidationError(message) - } - else { - throw new IntegrationError( message, getErrorCodeFromHttpStatus(status) || ErrorCodes.UNKNOWN_ERROR, status) + } else if (errType === 'InvalidAudienceMembershipError' ) { + throw new InvalidAudienceMembershipError(message) + } else { + throw new IntegrationError(message, getErrorCodeFromHttpStatus(status) || ErrorCodes.UNKNOWN_ERROR, status) } } msResponse.setErrorResponseAtIndex(index, { status, - body: payload as unknown as JSONLikeObject, errormessage: message, - ...(sent && { sent }) + ...(sent && { sent }), + ...(body && { body }) }) } diff --git a/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/generated-types.ts b/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/generated-types.ts index a2d1f08b983..06937f7c521 100644 --- a/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/generated-types.ts +++ b/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/generated-types.ts @@ -5,16 +5,6 @@ export interface Payload { * The Pendo Visitor ID for the user. Maps to the userId in Segment by default. */ visitorId: string - /** - * Traits or Properties object from the identify() or track() call emitted by Engage. - */ - traitsOrProperties: { - [k: string]: unknown - } - /** - * Segment Audience Key. Used to determine whether the user is being added to or removed from the Pendo Segment. - */ - segmentAudienceKey: string /** * The External Audience ID from Segment, which maps to the Pendo Segment ID. */ diff --git a/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/index.ts b/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/index.ts index 32f7e4a80f5..e0273c2cb18 100644 --- a/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/index.ts +++ b/packages/destination-actions/src/destinations/pendo-audiences/syncAudience/index.ts @@ -9,11 +9,11 @@ const action: ActionDefinition = { description: 'Sync Segment Engage Audience membership to a Pendo Segment by adding or removing visitors.', defaultSubscription: 'type = "identify" or type = "track"', fields, - perform: (request, { payload, settings }) => { - return send(request, settings.region, [payload], false) + perform: (request, { payload, settings, audienceMembership }) => { + return send(request, settings.region, [payload], false, [audienceMembership]) }, - performBatch: (request, { payload, settings }) => { - return send(request, settings.region, payload, true) + performBatch: (request, { payload, settings, audienceMembership}) => { + return send(request, settings.region, payload, true, audienceMembership ?? []) } }