diff --git a/config/schema/artifacts/datastore_config.yaml b/config/schema/artifacts/datastore_config.yaml index 61028816d..ae3276abb 100644 --- a/config/schema/artifacts/datastore_config.yaml +++ b/config/schema/artifacts/datastore_config.yaml @@ -1236,6 +1236,9 @@ index_templates: type: integer nested_fields2|the_seasons: type: integer + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1310,6 +1313,9 @@ index_templates: type: integer widget_options|colors: type: integer + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1476,6 +1482,9 @@ index_templates: type: integer fees|amount_cents: type: integer + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1527,6 +1536,9 @@ indices: type: integer shapes|coordinates: type: integer + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1593,6 +1605,9 @@ indices: type: integer owner_ids: type: integer + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1625,6 +1640,9 @@ indices: type: keyword __typename: type: keyword + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1654,6 +1672,9 @@ indices: type: integer manufacturer_id: type: keyword + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1690,6 +1711,9 @@ indices: type: keyword nationality: type: keyword + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1722,6 +1746,9 @@ indices: type: keyword manufacturer_id: type: keyword + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1753,6 +1780,9 @@ indices: type: keyword __typename: type: keyword + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1778,6 +1808,9 @@ indices: format: strict_date active: type: boolean + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1803,6 +1836,9 @@ indices: type: keyword name: type: keyword + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1835,6 +1871,9 @@ indices: created_at: type: date format: strict_date_time + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -2156,13 +2195,116 @@ scripts: // No timestamp values matched the params, so return `false`. return false; - update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d: + update_index_data_079bafcf4d739acd8659a631377fa9c8: context: update script: lang: painless source: |- - // --- Helper Functions --- // - void setup(Map source, String relationship, Map counts) { + // ============================================================ + // Helper Functions + // ============================================================ + + // Encodes a list of strings into a length-prefixed string. + // Each part is encoded as "length:value" and parts are concatenated directly. + // This encoding is unambiguous regardless of what characters the values contain. + String encodeKey(List parts) { + StringBuilder sb = new StringBuilder(); + for (String part : parts) { + sb.append(part.length()); + sb.append(':'); + sb.append(part); + } + return sb.toString(); + } + + // Decodes a length-prefixed string back into a list of strings. + List decodeKey(String key) { + List parts = new ArrayList(); + int i = 0; + while (i < key.length()) { + int colonPos = key.indexOf(":", i); + int length = Integer.parseInt(key.substring(i, colonPos)); + int valueStart = colonPos + 1; + parts.add(key.substring(valueStart, valueStart + length)); + i = valueStart + length; + } + return parts; + } + + // Builds a nested element key (as an encoded string) from path segments. + // List segments contribute their matched identifier value; object segments contribute their field name. + // Returns "" if no path segments are configured (i.e., this is not a nested sourced event). + String buildNestedElementKey(String relationship, Map nestedSourcedPaths, Map pathIdentifiers) { + List pathSegments = (List) nestedSourcedPaths.get(relationship); + if (pathSegments == null) { + return ""; + } + List parts = new ArrayList(); + for (Map segment : pathSegments) { + if ("list".equals(segment.get("type"))) { + parts.add(pathIdentifiers[segment.sourceField]); + } else { + parts.add(segment.get("field")); + } + } + return encodeKey(parts); + } + + // Builds the versions key by combining the relationship name with the element key parts. + // For top-level events (empty element key), returns just the relationship name. + String buildVersionsKey(String relationship, String nestedElementKey) { + if (nestedElementKey.isEmpty()) { + return relationship; + } + List parts = decodeKey(nestedElementKey); + parts.add(0, relationship); + return encodeKey(parts); + } + + // Finds an element in a list where element[matchField] equals matchValue. Returns null if not found. + def findInList(List elements, String matchField, String matchValue) { + for (Map element : elements) { + if (matchValue.equals(element[matchField])) { + return element; + } + } + return null; + } + + // Navigates from `source` through `pathSegments` to find the target nested element. + // Returns the matched element, or null if the path doesn't exist or no match is found. + def navigateToNestedElement(Map source, List pathSegments, List keyParts) { + Map current = source; + + for (int i = 0; i < pathSegments.size(); i++) { + Map segment = (Map) pathSegments.get(i); + String field = (String) segment.get("field"); + + if (!current.containsKey(field)) { + return null; + } + + if ("list".equals(segment.get("type"))) { + current = (Map) findInList((List) current.get(field), (String) segment.get("matchField"), (String) keyParts.get(i)); + } else { + current = (Map) current.get(field); + } + + if (current == null) { + return null; + } + } + + return current; + } + + + // ============================================================ + // Main Functions + // ============================================================ + + // Initializes internal bookkeeping structures (__sources, __versions, __counts, __nested_sourced_data). + void setup(Map source, String versionsKey, String relationship, String nestedElementKey, Map counts) { if (source.__sources == null) { source.__sources = []; } @@ -2171,8 +2313,17 @@ scripts: source.__versions = [:]; } - if (source.__versions[relationship] == null) { - source.__versions[relationship] = [:]; + if (source.__versions[versionsKey] == null) { + source.__versions[versionsKey] = [:]; + } + + if (!nestedElementKey.isEmpty()) { + if (source.__nested_sourced_data == null) { + source.__nested_sourced_data = [:]; + } + if (source.__nested_sourced_data[relationship] == null) { + source.__nested_sourced_data[relationship] = [:]; + } } if (counts != null && source.__counts == null) { @@ -2180,23 +2331,24 @@ scripts: } } - void validateSource(Map source, String id, String relationship, String sourceId, long eventVersion) { - Map relationshipVersionsMap = source.__versions.get(relationship); - List previousSourceIdsForRelationship = relationshipVersionsMap.keySet().stream().filter(key -> key != sourceId).collect(Collectors.toList()); + // Validates that this event is allowed: no relationship mutation and no stale version. + void validateSource(Map source, String id, String relationship, String sourceId, long eventVersion, String versionsKey) { + Map versionsMap = source.__versions[versionsKey]; - if (previousSourceIdsForRelationship.size() > 0) { + // Check that no other source ID has previously written to this target. + List previousSourceIds = versionsMap.keySet().stream().filter(key -> key != sourceId).collect(Collectors.toList()); + if (previousSourceIds.size() > 0) { throw new IllegalArgumentException( "Cannot update document " + id + " " + "with data from related " + relationship + " " + sourceId + " " + - "because the related " + relationship + " has apparently changed (was: " + previousSourceIdsForRelationship + "), " + + "because the related " + relationship + " has apparently changed (was: " + previousSourceIds + "), " + "but mutations of relationships used with `sourced_from` are not supported because " + "allowing it could break ElasticGraph's out-of-order processing guarantees." - ); + ); } - Number maybeDocVersion = relationshipVersionsMap.get(sourceId); - - // Our JSON schema requires event versions to be non-negative, so we can safely use Long.MIN_VALUE as a stand-in when the value is null. + // Check that the event version is newer than what we've already seen. + Number maybeDocVersion = versionsMap.get(sourceId); long docVersion = maybeDocVersion == null ? Long.MIN_VALUE : maybeDocVersion.longValue(); if (docVersion >= eventVersion) { @@ -2207,6 +2359,7 @@ scripts: } } + // Applies top-level fields to the document via putAll, and merges __counts. void applyTopLevelFields(Map source, String id, Map topLevelFields, Map counts) { source.id = id; source.putAll(topLevelFields); @@ -2216,8 +2369,50 @@ scripts: } } - void recordSource(Map source, String relationship, String sourceId, long eventVersion) { - source.__versions[relationship][sourceId] = eventVersion; + // Stores nested sourced fields in the __nested_sourced_data buffer for later application. + void storeNestedSourcedData(Map source, String relationship, Map nestedSourcedFields, String nestedElementKey) { + if (nestedSourcedFields.isEmpty()) { + return; + } + + ((Map) source.__nested_sourced_data[relationship]).put(nestedElementKey, nestedSourcedFields); + } + + // Applies nested sourced data from the __nested_sourced_data buffer to matched nested elements. + // Reads path config from the nestedSourcedPaths param. + // Called after every event so that after a self-event's putAll overwrites nested arrays, + // the buffered data gets re-applied. + void applyNestedSourcedData(Map source, Map nestedSourcedPaths) { + if (source.__nested_sourced_data == null) { + return; + } + + for (sourcedEntry in source.__nested_sourced_data.entrySet()) { + String relationship = (String) sourcedEntry.getKey(); + Map dataByKey = (Map) sourcedEntry.getValue(); + List pathSegments = (List) nestedSourcedPaths.get(relationship); + + if (pathSegments == null || dataByKey == null) { + continue; + } + + for (elementEntry in dataByKey.entrySet()) { + List keyParts = decodeKey((String) elementEntry.getKey()); + if (keyParts.size() != pathSegments.size()) { + continue; + } + + Map target = (Map) navigateToNestedElement(source, pathSegments, keyParts); + if (target != null) { + target.putAll((Map) elementEntry.getValue()); + } + } + } + } + + // Records the event version in __versions and adds the relationship to __sources. + void recordSource(Map source, String versionsKey, String relationship, String sourceId, long eventVersion) { + source.__versions[versionsKey][sourceId] = eventVersion; // Record the relationship in `__sources` if it's not already there. We maintain it as an append-only set using a sorted list. // This ensures deterministic ordering of its elements regardless of event ingestion order, and lets us check membership in O(log N) time. @@ -2234,15 +2429,26 @@ scripts: } } - // --- Main script body --- // + // ============================================================ + // Main Execution + // ============================================================ + Map source = ctx._source; String id = params.id; String relationship = params.relationship; String sourceId = params.sourceId; - long eventVersion = (long) params.version; // Cast to long since JSON parses numbers as doubles + long eventVersion = (long) params.version; Map counts = params.__counts; + Map nestedSourcedFields = params.nestedSourcedFields; + Map nestedSourcedPathIdentifiers = params.nestedSourcedPathIdentifiers; + Map nestedSourcedPaths = params.nestedSourcedPaths; + + String nestedElementKey = buildNestedElementKey(relationship, nestedSourcedPaths, nestedSourcedPathIdentifiers); + String versionsKey = buildVersionsKey(relationship, nestedElementKey); - setup(source, relationship, counts); - validateSource(source, id, relationship, sourceId, eventVersion); + setup(source, versionsKey, relationship, nestedElementKey, counts); + validateSource(source, id, relationship, sourceId, eventVersion, versionsKey); applyTopLevelFields(source, id, params.topLevelFields, counts); - recordSource(source, relationship, sourceId, eventVersion); + storeNestedSourcedData(source, relationship, nestedSourcedFields, nestedElementKey); + applyNestedSourcedData(source, nestedSourcedPaths); + recordSource(source, versionsKey, relationship, sourceId, eventVersion); diff --git a/config/schema/artifacts/runtime_metadata.yaml b/config/schema/artifacts/runtime_metadata.yaml index 19a30bf8f..42aa4630b 100644 --- a/config/schema/artifacts/runtime_metadata.yaml +++ b/config/schema/artifacts/runtime_metadata.yaml @@ -3085,7 +3085,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: full_address: cardinality: one @@ -3275,7 +3275,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: __typename: cardinality: one @@ -3314,7 +3314,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: __typename: cardinality: one @@ -3414,7 +3414,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: created_at: cardinality: one @@ -3724,7 +3724,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: __typename: cardinality: one @@ -3897,7 +3897,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: created_at: cardinality: one @@ -4382,7 +4382,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: ceo: cardinality: one @@ -4540,7 +4540,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: created_at: cardinality: one @@ -5501,7 +5501,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: __typename: cardinality: one @@ -5700,7 +5700,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: __typename: cardinality: one @@ -5767,7 +5767,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: active: cardinality: one @@ -6304,7 +6304,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: name: cardinality: one @@ -6654,7 +6654,7 @@ object_types_by_name: relationship: __self rollover_timestamp_value_source: formed_on routing_value_source: league - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: country_code: cardinality: one @@ -7754,7 +7754,7 @@ object_types_by_name: relationship: __self rollover_timestamp_value_source: created_at routing_value_source: workspace_id2 - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: amount_cents: cardinality: one @@ -7832,7 +7832,7 @@ object_types_by_name: version: cardinality: one relationship: widget - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: widget_cost: cardinality: one @@ -8062,7 +8062,7 @@ object_types_by_name: relationship: __self rollover_timestamp_value_source: introduced_on routing_value_source: primary_continent - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: details: cardinality: one @@ -9107,7 +9107,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: name: cardinality: one @@ -9129,7 +9129,7 @@ object_types_by_name: relationship: workspace rollover_timestamp_value_source: widget.created_at routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: workspace_name: cardinality: one @@ -9364,4 +9364,4 @@ static_script_ids_by_scoped_name: field/as_day_of_week: field_as_day_of_week_f2b5c7d9e8f75bf2457b52412bfb6537 field/as_time_of_day: field_as_time_of_day_ed82aba44fc66bff5635bec4305c1c66 filter/by_time_of_day: filter_by_time_of_day_ea12d0561b24961789ab68ed38435612 - update/index_data: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + update/index_data: update_index_data_079bafcf4d739acd8659a631377fa9c8 diff --git a/config/schema/artifacts_with_apollo/datastore_config.yaml b/config/schema/artifacts_with_apollo/datastore_config.yaml index 61028816d..ae3276abb 100644 --- a/config/schema/artifacts_with_apollo/datastore_config.yaml +++ b/config/schema/artifacts_with_apollo/datastore_config.yaml @@ -1236,6 +1236,9 @@ index_templates: type: integer nested_fields2|the_seasons: type: integer + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1310,6 +1313,9 @@ index_templates: type: integer widget_options|colors: type: integer + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1476,6 +1482,9 @@ index_templates: type: integer fees|amount_cents: type: integer + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1527,6 +1536,9 @@ indices: type: integer shapes|coordinates: type: integer + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1593,6 +1605,9 @@ indices: type: integer owner_ids: type: integer + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1625,6 +1640,9 @@ indices: type: keyword __typename: type: keyword + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1654,6 +1672,9 @@ indices: type: integer manufacturer_id: type: keyword + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1690,6 +1711,9 @@ indices: type: keyword nationality: type: keyword + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1722,6 +1746,9 @@ indices: type: keyword manufacturer_id: type: keyword + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1753,6 +1780,9 @@ indices: type: keyword __typename: type: keyword + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1778,6 +1808,9 @@ indices: format: strict_date active: type: boolean + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1803,6 +1836,9 @@ indices: type: keyword name: type: keyword + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -1835,6 +1871,9 @@ indices: created_at: type: date format: strict_date_time + __nested_sourced_data: + type: object + dynamic: 'false' __sources: type: keyword __versions: @@ -2156,13 +2195,116 @@ scripts: // No timestamp values matched the params, so return `false`. return false; - update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d: + update_index_data_079bafcf4d739acd8659a631377fa9c8: context: update script: lang: painless source: |- - // --- Helper Functions --- // - void setup(Map source, String relationship, Map counts) { + // ============================================================ + // Helper Functions + // ============================================================ + + // Encodes a list of strings into a length-prefixed string. + // Each part is encoded as "length:value" and parts are concatenated directly. + // This encoding is unambiguous regardless of what characters the values contain. + String encodeKey(List parts) { + StringBuilder sb = new StringBuilder(); + for (String part : parts) { + sb.append(part.length()); + sb.append(':'); + sb.append(part); + } + return sb.toString(); + } + + // Decodes a length-prefixed string back into a list of strings. + List decodeKey(String key) { + List parts = new ArrayList(); + int i = 0; + while (i < key.length()) { + int colonPos = key.indexOf(":", i); + int length = Integer.parseInt(key.substring(i, colonPos)); + int valueStart = colonPos + 1; + parts.add(key.substring(valueStart, valueStart + length)); + i = valueStart + length; + } + return parts; + } + + // Builds a nested element key (as an encoded string) from path segments. + // List segments contribute their matched identifier value; object segments contribute their field name. + // Returns "" if no path segments are configured (i.e., this is not a nested sourced event). + String buildNestedElementKey(String relationship, Map nestedSourcedPaths, Map pathIdentifiers) { + List pathSegments = (List) nestedSourcedPaths.get(relationship); + if (pathSegments == null) { + return ""; + } + List parts = new ArrayList(); + for (Map segment : pathSegments) { + if ("list".equals(segment.get("type"))) { + parts.add(pathIdentifiers[segment.sourceField]); + } else { + parts.add(segment.get("field")); + } + } + return encodeKey(parts); + } + + // Builds the versions key by combining the relationship name with the element key parts. + // For top-level events (empty element key), returns just the relationship name. + String buildVersionsKey(String relationship, String nestedElementKey) { + if (nestedElementKey.isEmpty()) { + return relationship; + } + List parts = decodeKey(nestedElementKey); + parts.add(0, relationship); + return encodeKey(parts); + } + + // Finds an element in a list where element[matchField] equals matchValue. Returns null if not found. + def findInList(List elements, String matchField, String matchValue) { + for (Map element : elements) { + if (matchValue.equals(element[matchField])) { + return element; + } + } + return null; + } + + // Navigates from `source` through `pathSegments` to find the target nested element. + // Returns the matched element, or null if the path doesn't exist or no match is found. + def navigateToNestedElement(Map source, List pathSegments, List keyParts) { + Map current = source; + + for (int i = 0; i < pathSegments.size(); i++) { + Map segment = (Map) pathSegments.get(i); + String field = (String) segment.get("field"); + + if (!current.containsKey(field)) { + return null; + } + + if ("list".equals(segment.get("type"))) { + current = (Map) findInList((List) current.get(field), (String) segment.get("matchField"), (String) keyParts.get(i)); + } else { + current = (Map) current.get(field); + } + + if (current == null) { + return null; + } + } + + return current; + } + + + // ============================================================ + // Main Functions + // ============================================================ + + // Initializes internal bookkeeping structures (__sources, __versions, __counts, __nested_sourced_data). + void setup(Map source, String versionsKey, String relationship, String nestedElementKey, Map counts) { if (source.__sources == null) { source.__sources = []; } @@ -2171,8 +2313,17 @@ scripts: source.__versions = [:]; } - if (source.__versions[relationship] == null) { - source.__versions[relationship] = [:]; + if (source.__versions[versionsKey] == null) { + source.__versions[versionsKey] = [:]; + } + + if (!nestedElementKey.isEmpty()) { + if (source.__nested_sourced_data == null) { + source.__nested_sourced_data = [:]; + } + if (source.__nested_sourced_data[relationship] == null) { + source.__nested_sourced_data[relationship] = [:]; + } } if (counts != null && source.__counts == null) { @@ -2180,23 +2331,24 @@ scripts: } } - void validateSource(Map source, String id, String relationship, String sourceId, long eventVersion) { - Map relationshipVersionsMap = source.__versions.get(relationship); - List previousSourceIdsForRelationship = relationshipVersionsMap.keySet().stream().filter(key -> key != sourceId).collect(Collectors.toList()); + // Validates that this event is allowed: no relationship mutation and no stale version. + void validateSource(Map source, String id, String relationship, String sourceId, long eventVersion, String versionsKey) { + Map versionsMap = source.__versions[versionsKey]; - if (previousSourceIdsForRelationship.size() > 0) { + // Check that no other source ID has previously written to this target. + List previousSourceIds = versionsMap.keySet().stream().filter(key -> key != sourceId).collect(Collectors.toList()); + if (previousSourceIds.size() > 0) { throw new IllegalArgumentException( "Cannot update document " + id + " " + "with data from related " + relationship + " " + sourceId + " " + - "because the related " + relationship + " has apparently changed (was: " + previousSourceIdsForRelationship + "), " + + "because the related " + relationship + " has apparently changed (was: " + previousSourceIds + "), " + "but mutations of relationships used with `sourced_from` are not supported because " + "allowing it could break ElasticGraph's out-of-order processing guarantees." - ); + ); } - Number maybeDocVersion = relationshipVersionsMap.get(sourceId); - - // Our JSON schema requires event versions to be non-negative, so we can safely use Long.MIN_VALUE as a stand-in when the value is null. + // Check that the event version is newer than what we've already seen. + Number maybeDocVersion = versionsMap.get(sourceId); long docVersion = maybeDocVersion == null ? Long.MIN_VALUE : maybeDocVersion.longValue(); if (docVersion >= eventVersion) { @@ -2207,6 +2359,7 @@ scripts: } } + // Applies top-level fields to the document via putAll, and merges __counts. void applyTopLevelFields(Map source, String id, Map topLevelFields, Map counts) { source.id = id; source.putAll(topLevelFields); @@ -2216,8 +2369,50 @@ scripts: } } - void recordSource(Map source, String relationship, String sourceId, long eventVersion) { - source.__versions[relationship][sourceId] = eventVersion; + // Stores nested sourced fields in the __nested_sourced_data buffer for later application. + void storeNestedSourcedData(Map source, String relationship, Map nestedSourcedFields, String nestedElementKey) { + if (nestedSourcedFields.isEmpty()) { + return; + } + + ((Map) source.__nested_sourced_data[relationship]).put(nestedElementKey, nestedSourcedFields); + } + + // Applies nested sourced data from the __nested_sourced_data buffer to matched nested elements. + // Reads path config from the nestedSourcedPaths param. + // Called after every event so that after a self-event's putAll overwrites nested arrays, + // the buffered data gets re-applied. + void applyNestedSourcedData(Map source, Map nestedSourcedPaths) { + if (source.__nested_sourced_data == null) { + return; + } + + for (sourcedEntry in source.__nested_sourced_data.entrySet()) { + String relationship = (String) sourcedEntry.getKey(); + Map dataByKey = (Map) sourcedEntry.getValue(); + List pathSegments = (List) nestedSourcedPaths.get(relationship); + + if (pathSegments == null || dataByKey == null) { + continue; + } + + for (elementEntry in dataByKey.entrySet()) { + List keyParts = decodeKey((String) elementEntry.getKey()); + if (keyParts.size() != pathSegments.size()) { + continue; + } + + Map target = (Map) navigateToNestedElement(source, pathSegments, keyParts); + if (target != null) { + target.putAll((Map) elementEntry.getValue()); + } + } + } + } + + // Records the event version in __versions and adds the relationship to __sources. + void recordSource(Map source, String versionsKey, String relationship, String sourceId, long eventVersion) { + source.__versions[versionsKey][sourceId] = eventVersion; // Record the relationship in `__sources` if it's not already there. We maintain it as an append-only set using a sorted list. // This ensures deterministic ordering of its elements regardless of event ingestion order, and lets us check membership in O(log N) time. @@ -2234,15 +2429,26 @@ scripts: } } - // --- Main script body --- // + // ============================================================ + // Main Execution + // ============================================================ + Map source = ctx._source; String id = params.id; String relationship = params.relationship; String sourceId = params.sourceId; - long eventVersion = (long) params.version; // Cast to long since JSON parses numbers as doubles + long eventVersion = (long) params.version; Map counts = params.__counts; + Map nestedSourcedFields = params.nestedSourcedFields; + Map nestedSourcedPathIdentifiers = params.nestedSourcedPathIdentifiers; + Map nestedSourcedPaths = params.nestedSourcedPaths; + + String nestedElementKey = buildNestedElementKey(relationship, nestedSourcedPaths, nestedSourcedPathIdentifiers); + String versionsKey = buildVersionsKey(relationship, nestedElementKey); - setup(source, relationship, counts); - validateSource(source, id, relationship, sourceId, eventVersion); + setup(source, versionsKey, relationship, nestedElementKey, counts); + validateSource(source, id, relationship, sourceId, eventVersion, versionsKey); applyTopLevelFields(source, id, params.topLevelFields, counts); - recordSource(source, relationship, sourceId, eventVersion); + storeNestedSourcedData(source, relationship, nestedSourcedFields, nestedElementKey); + applyNestedSourcedData(source, nestedSourcedPaths); + recordSource(source, versionsKey, relationship, sourceId, eventVersion); diff --git a/config/schema/artifacts_with_apollo/runtime_metadata.yaml b/config/schema/artifacts_with_apollo/runtime_metadata.yaml index 8e7ff906c..953a0338f 100644 --- a/config/schema/artifacts_with_apollo/runtime_metadata.yaml +++ b/config/schema/artifacts_with_apollo/runtime_metadata.yaml @@ -3114,7 +3114,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: full_address: cardinality: one @@ -3304,7 +3304,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: __typename: cardinality: one @@ -3343,7 +3343,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: __typename: cardinality: one @@ -3464,7 +3464,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: created_at: cardinality: one @@ -3826,7 +3826,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: __typename: cardinality: one @@ -3999,7 +3999,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: created_at: cardinality: one @@ -4484,7 +4484,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: ceo: cardinality: one @@ -4642,7 +4642,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: created_at: cardinality: one @@ -5624,7 +5624,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: __typename: cardinality: one @@ -5823,7 +5823,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: __typename: cardinality: one @@ -5890,7 +5890,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: active: cardinality: one @@ -6433,7 +6433,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: name: cardinality: one @@ -6783,7 +6783,7 @@ object_types_by_name: relationship: __self rollover_timestamp_value_source: formed_on routing_value_source: league - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: country_code: cardinality: one @@ -7883,7 +7883,7 @@ object_types_by_name: relationship: __self rollover_timestamp_value_source: created_at routing_value_source: workspace_id2 - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: amount_cents: cardinality: one @@ -7961,7 +7961,7 @@ object_types_by_name: version: cardinality: one relationship: widget - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: widget_cost: cardinality: one @@ -8191,7 +8191,7 @@ object_types_by_name: relationship: __self rollover_timestamp_value_source: introduced_on routing_value_source: primary_continent - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: details: cardinality: one @@ -9236,7 +9236,7 @@ object_types_by_name: cardinality: one relationship: __self routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: name: cardinality: one @@ -9258,7 +9258,7 @@ object_types_by_name: relationship: workspace rollover_timestamp_value_source: widget.created_at routing_value_source: id - script_id: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + script_id: update_index_data_079bafcf4d739acd8659a631377fa9c8 top_level_fields_params: workspace_name: cardinality: one @@ -9536,4 +9536,4 @@ static_script_ids_by_scoped_name: field/as_day_of_week: field_as_day_of_week_f2b5c7d9e8f75bf2457b52412bfb6537 field/as_time_of_day: field_as_time_of_day_ed82aba44fc66bff5635bec4305c1c66 filter/by_time_of_day: filter_by_time_of_day_ea12d0561b24961789ab68ed38435612 - update/index_data: update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d + update/index_data: update_index_data_079bafcf4d739acd8659a631377fa9c8 diff --git a/elasticgraph-admin/spec/integration/elastic_graph/admin/index_definition_configurator/shared_examples.rb b/elasticgraph-admin/spec/integration/elastic_graph/admin/index_definition_configurator/shared_examples.rb index 639e39234..a68cc4d7a 100644 --- a/elasticgraph-admin/spec/integration/elastic_graph/admin/index_definition_configurator/shared_examples.rb +++ b/elasticgraph-admin/spec/integration/elastic_graph/admin/index_definition_configurator/shared_examples.rb @@ -42,7 +42,7 @@ def simulate_presence_of_extra_setting(admin, index_definition_name, name, value let(:output_io) { StringIO.new } let(:clock) { class_double(::Time, now: ::Time.utc(2024, 3, 20, 12, 0, 0)) } let(:mapping_removal_note_snippet) { "extra fields listed here will not actually get removed" } - let(:index_meta_fields) { ["__sources", "__typename", "__versions"] } + let(:index_meta_fields) { ["__nested_sourced_data", "__sources", "__typename", "__versions"] } it "idempotently creates an index or index template, avoiding unneeded datastore write calls" do expect { diff --git a/elasticgraph-datastore_core/lib/elastic_graph/datastore_core/index_definition.rb b/elasticgraph-datastore_core/lib/elastic_graph/datastore_core/index_definition.rb index c95dc2f7c..c1024792f 100644 --- a/elasticgraph-datastore_core/lib/elastic_graph/datastore_core/index_definition.rb +++ b/elasticgraph-datastore_core/lib/elastic_graph/datastore_core/index_definition.rb @@ -33,7 +33,8 @@ def self.with(name:, runtime_metadata:, config:, datastore_clients_by_name:, sch env_index_config: env_index_config, defined_clusters: config.clusters.keys.to_set, datastore_clients_by_name: datastore_clients_by_name, - has_had_multiple_sources: runtime_metadata.has_had_multiple_sources + has_had_multiple_sources: runtime_metadata.has_had_multiple_sources, + nested_sourced_paths: runtime_metadata.nested_sourced_paths.transform_values { |segments| segments.map(&:to_painless_param) } } if (rollover = runtime_metadata.rollover) diff --git a/elasticgraph-datastore_core/lib/elastic_graph/datastore_core/index_definition/index.rb b/elasticgraph-datastore_core/lib/elastic_graph/datastore_core/index_definition/index.rb index 1b99e7a4e..a85c309fb 100644 --- a/elasticgraph-datastore_core/lib/elastic_graph/datastore_core/index_definition/index.rb +++ b/elasticgraph-datastore_core/lib/elastic_graph/datastore_core/index_definition/index.rb @@ -15,11 +15,12 @@ class DatastoreCore module IndexDefinition class Index < Support::MemoizableData.define( :name, :route_with, :default_sort_clauses, :current_sources, :fields_by_path, - :env_index_config, :defined_clusters, :datastore_clients_by_name, :env_agnostic_settings, :has_had_multiple_sources + :env_index_config, :defined_clusters, :datastore_clients_by_name, :env_agnostic_settings, :has_had_multiple_sources, + :nested_sourced_paths ) # `Data.define` provides all these methods: # @dynamic name, route_with, default_sort_clauses, current_sources, fields_by_path, env_index_config, env_agnostic_settings - # @dynamic defined_clusters, datastore_clients_by_name, initialize, has_had_multiple_sources + # @dynamic defined_clusters, datastore_clients_by_name, initialize, has_had_multiple_sources, nested_sourced_paths # `include IndexDefinition::Base` provides all these methods. Steep should be able to detect it # but can't for some reason so we have to declare them with `@dynamic`. diff --git a/elasticgraph-datastore_core/lib/elastic_graph/datastore_core/index_definition/rollover_index_template.rb b/elasticgraph-datastore_core/lib/elastic_graph/datastore_core/index_definition/rollover_index_template.rb index 9a70836da..44ea8bda7 100644 --- a/elasticgraph-datastore_core/lib/elastic_graph/datastore_core/index_definition/rollover_index_template.rb +++ b/elasticgraph-datastore_core/lib/elastic_graph/datastore_core/index_definition/rollover_index_template.rb @@ -23,11 +23,11 @@ module IndexDefinition class RolloverIndexTemplate < Support::MemoizableData.define( :name, :route_with, :default_sort_clauses, :current_sources, :fields_by_path, :env_index_config, :index_args, :defined_clusters, :datastore_clients_by_name, :timestamp_field_path, :frequency, - :env_agnostic_settings, :has_had_multiple_sources + :env_agnostic_settings, :has_had_multiple_sources, :nested_sourced_paths ) # `Data.define` provides all these methods: # @dynamic name, route_with, default_sort_clauses, current_sources, fields_by_path, env_index_config, env_agnostic_settings - # @dynamic index_args, defined_clusters, datastore_clients_by_name, timestamp_field_path, frequency, initialize, has_had_multiple_sources + # @dynamic index_args, defined_clusters, datastore_clients_by_name, timestamp_field_path, frequency, initialize, has_had_multiple_sources, nested_sourced_paths # `include IndexDefinition::Base` provides all these methods. Steep should be able to detect it # but can't for some reason so we have to declare them with `@dynamic`. diff --git a/elasticgraph-datastore_core/sig/elastic_graph/datastore_core/index_definition.rbs b/elasticgraph-datastore_core/sig/elastic_graph/datastore_core/index_definition.rbs index b12ac2343..8d087e939 100644 --- a/elasticgraph-datastore_core/sig/elastic_graph/datastore_core/index_definition.rbs +++ b/elasticgraph-datastore_core/sig/elastic_graph/datastore_core/index_definition.rbs @@ -26,6 +26,7 @@ module ElasticGraph def current_sources: () -> ::Set[::String] def fields_by_path: () -> ::Hash[::String, SchemaArtifacts::RuntimeMetadata::IndexField] def has_had_multiple_sources: () -> bool + def nested_sourced_paths: () -> ::Hash[::String, ::Array[::Hash[::String, ::String]]] def env_index_config: () -> Configuration::IndexDefinition def env_agnostic_settings: () -> ::Hash[::String, untyped] def defined_clusters: () -> ::Set[::String] diff --git a/elasticgraph-indexer/lib/elastic_graph/indexer/operation/update.rb b/elasticgraph-indexer/lib/elastic_graph/indexer/operation/update.rb index 32e04c04a..5766e597f 100644 --- a/elasticgraph-indexer/lib/elastic_graph/indexer/operation/update.rb +++ b/elasticgraph-indexer/lib/elastic_graph/indexer/operation/update.rb @@ -145,11 +145,14 @@ def script_params prepared_record: prepared_record ) - # The normal indexing script uses `__counts`. Other indexing scripts (e.g. the ones generated - # for derived indexing) do not use `__counts` so there's no point in spending effort on computing - # it. Plus, the logic below raises an exception in that case, so it's important we avoid it. + # The normal indexing script uses `__counts` and `nestedSourcedPaths`. Other indexing scripts + # (e.g. the ones generated for derived indexing) do not use these so there's no point in + # spending effort on computing them. Plus, the logic below raises an exception in that case, + # so it's important we avoid it. return initial_params unless update_target.for_normal_indexing? + initial_params["nestedSourcedPaths"] = destination_index_def.nested_sourced_paths + CountAccumulator.merge_list_counts_into( initial_params, mapping: destination_index_mapping, diff --git a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/datastore_indexing_router_spec.rb b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/datastore_indexing_router_spec.rb index 6d77a53e5..6dffcadc3 100644 --- a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/datastore_indexing_router_spec.rb +++ b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/datastore_indexing_router_spec.rb @@ -265,7 +265,7 @@ def type_name_for_index(index_name) upsert: {}, script: a_hash_including( id: /WidgetCurrency_from_Widget_/, - params: {"topLevelFields" => {"name" => ["thing1"]}, "id" => "USD"} + params: {"nestedSourcedFields" => {}, "nestedSourcedPathIdentifiers" => {}, "topLevelFields" => {"name" => ["thing1"]}, "id" => "USD"} ) ) end diff --git a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/update_spec.rb b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/update_spec.rb index 1a15a9e62..0c54b2975 100644 --- a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/update_spec.rb +++ b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/update_spec.rb @@ -73,7 +73,7 @@ module Operation {update: {_id: "17", _index: "widget_workspaces", retry_on_conflict: Update::CONFLICT_RETRIES}}, { script: {id: operations.first.update_target.script_id, params: { - "topLevelFields" => {"name" => ["thing1"]}, + "nestedSourcedFields" => {}, "nestedSourcedPathIdentifiers" => {}, "topLevelFields" => {"name" => ["thing1"]}, "id" => "17" }}, scripted_upsert: true, @@ -102,7 +102,7 @@ module Operation {update: {_id: "17", _index: "widget_workspaces", retry_on_conflict: Update::CONFLICT_RETRIES}}, { script: {id: INDEX_DATA_UPDATE_SCRIPT_ID, params: { - "topLevelFields" => {"name" => "thing1"}, + "nestedSourcedFields" => {}, "nestedSourcedPathIdentifiers" => {}, "nestedSourcedPaths" => {}, "topLevelFields" => {"name" => "thing1"}, "id" => "17", "staticValue" => 47, "sourceType" => "Widget", @@ -156,7 +156,7 @@ module Operation {update: {_id: "17", _index: "widget_workspaces", retry_on_conflict: Update::CONFLICT_RETRIES}}, { script: {id: operations.first.update_target.script_id, params: { - "topLevelFields" => {"name" => []}, + "nestedSourcedFields" => {}, "nestedSourcedPathIdentifiers" => {}, "topLevelFields" => {"name" => []}, "id" => "17" }}, scripted_upsert: true, @@ -177,7 +177,7 @@ module Operation {update: {_id: "embedded_workspace_id", _index: "widget_workspaces", retry_on_conflict: Update::CONFLICT_RETRIES}}, { script: {id: operations.first.update_target.script_id, params: { - "topLevelFields" => {"name" => ["thing1"]}, + "nestedSourcedFields" => {}, "nestedSourcedPathIdentifiers" => {}, "topLevelFields" => {"name" => ["thing1"]}, "id" => "embedded_workspace_id" }}, scripted_upsert: true, @@ -202,7 +202,7 @@ module Operation {update: {_id: "17", _index: "widget_workspaces", retry_on_conflict: Update::CONFLICT_RETRIES}}, { script: {id: operations.first.update_target.script_id, params: { - "topLevelFields" => {"embedded_values.missing_field" => [], "name" => nil}, + "nestedSourcedFields" => {}, "nestedSourcedPathIdentifiers" => {}, "topLevelFields" => {"embedded_values.missing_field" => [], "name" => nil}, "id" => "17" }}, scripted_upsert: true, @@ -229,6 +229,7 @@ module Operation {update: {_id: "17", _index: "widget_workspaces", retry_on_conflict: Update::CONFLICT_RETRIES}}, { script: {id: operations.first.update_target.script_id, params: { + "nestedSourcedFields" => {}, "nestedSourcedPathIdentifiers" => {}, "topLevelFields" => { "embedded_values" => ["thing1"], "name" => { @@ -261,7 +262,7 @@ module Operation { script: {id: operations.first.update_target.script_id, params: { # Float-typed integer values are coerced to true ints before indexing - "topLevelFields" => {"size" => [an_instance_of(::Integer).and(eq_to(4))]}, + "nestedSourcedFields" => {}, "nestedSourcedPathIdentifiers" => {}, "topLevelFields" => {"size" => [an_instance_of(::Integer).and(eq_to(4))]}, "id" => "17" }}, scripted_upsert: true, @@ -282,7 +283,7 @@ module Operation {update: {_id: "17", _index: "widget_workspaces", retry_on_conflict: Update::CONFLICT_RETRIES}}, { script: {id: operations.first.update_target.script_id, params: { - "topLevelFields" => {"name" => ["thing1"]}, + "nestedSourcedFields" => {}, "nestedSourcedPathIdentifiers" => {}, "topLevelFields" => {"name" => ["thing1"]}, "id" => "17" }}, scripted_upsert: true, @@ -291,7 +292,7 @@ module Operation {update: {_id: "18", _index: "widget_workspaces", retry_on_conflict: Update::CONFLICT_RETRIES}}, { script: {id: operations.first.update_target.script_id, params: { - "topLevelFields" => {"name" => ["thing1"]}, + "nestedSourcedFields" => {}, "nestedSourcedPathIdentifiers" => {}, "topLevelFields" => {"name" => ["thing1"]}, "id" => "18" }}, scripted_upsert: true, @@ -300,7 +301,7 @@ module Operation {update: {_id: "19", _index: "widget_workspaces", retry_on_conflict: Update::CONFLICT_RETRIES}}, { script: {id: operations.first.update_target.script_id, params: { - "topLevelFields" => {"name" => ["thing1"]}, + "nestedSourcedFields" => {}, "nestedSourcedPathIdentifiers" => {}, "topLevelFields" => {"name" => ["thing1"]}, "id" => "19" }}, scripted_upsert: true, diff --git a/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/runtime_metadata/index_definition.rb b/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/runtime_metadata/index_definition.rb index 9e4f26eb0..59a44c093 100644 --- a/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/runtime_metadata/index_definition.rb +++ b/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/runtime_metadata/index_definition.rb @@ -8,6 +8,7 @@ require "elastic_graph/schema_artifacts/runtime_metadata/hash_dumper" require "elastic_graph/schema_artifacts/runtime_metadata/index_field" +require "elastic_graph/schema_artifacts/runtime_metadata/nested_sourced_path_segment" require "elastic_graph/schema_artifacts/runtime_metadata/sort_field" module ElasticGraph @@ -16,22 +17,24 @@ module RuntimeMetadata # Runtime metadata related to a datastore index definition. # # @private - class IndexDefinition < ::Data.define(:route_with, :rollover, :default_sort_fields, :current_sources, :fields_by_path, :has_had_multiple_sources) + class IndexDefinition < ::Data.define(:route_with, :rollover, :default_sort_fields, :current_sources, :fields_by_path, :has_had_multiple_sources, :nested_sourced_paths) ROUTE_WITH = "route_with" ROLLOVER = "rollover" DEFAULT_SORT_FIELDS = "default_sort_fields" CURRENT_SOURCES = "current_sources" FIELDS_BY_PATH = "fields_by_path" HAS_HAD_MULTIPLE_SOURCES = "has_had_multiple_sources" + NESTED_SOURCED_PATHS = "nested_sourced_paths" - def initialize(route_with:, rollover:, default_sort_fields:, current_sources:, fields_by_path:, has_had_multiple_sources:) + def initialize(route_with:, rollover:, default_sort_fields:, current_sources:, fields_by_path:, has_had_multiple_sources:, nested_sourced_paths: {}) super( route_with: route_with, rollover: rollover, default_sort_fields: default_sort_fields, current_sources: current_sources.to_set, fields_by_path: fields_by_path, - has_had_multiple_sources: has_had_multiple_sources + has_had_multiple_sources: has_had_multiple_sources, + nested_sourced_paths: nested_sourced_paths ) end @@ -42,7 +45,8 @@ def self.from_hash(hash) default_sort_fields: hash[DEFAULT_SORT_FIELDS]&.map { |h| SortField.from_hash(h) } || [], current_sources: hash[CURRENT_SOURCES] || [], fields_by_path: (hash[FIELDS_BY_PATH] || {}).transform_values { |h| IndexField.from_hash(h) }, - has_had_multiple_sources: hash[HAS_HAD_MULTIPLE_SOURCES] || false + has_had_multiple_sources: hash[HAS_HAD_MULTIPLE_SOURCES] || false, + nested_sourced_paths: (hash[NESTED_SOURCED_PATHS] || {}).transform_values { |segments| segments.map { |h| NestedSourcedPathSegment.from_hash(h) } } ) end @@ -53,6 +57,7 @@ def to_dumpable_hash DEFAULT_SORT_FIELDS => default_sort_fields.map(&:to_dumpable_hash), FIELDS_BY_PATH => HashDumper.dump_hash(fields_by_path, &:to_dumpable_hash), HAS_HAD_MULTIPLE_SOURCES => (true if has_had_multiple_sources), + NESTED_SOURCED_PATHS => nested_sourced_paths.transform_values { |segments| segments.map(&:to_dumpable_hash) }, ROLLOVER => rollover&.to_dumpable_hash, ROUTE_WITH => route_with } diff --git a/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/runtime_metadata/nested_sourced_data_params.rb b/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/runtime_metadata/nested_sourced_data_params.rb new file mode 100644 index 000000000..b989390b5 --- /dev/null +++ b/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/runtime_metadata/nested_sourced_data_params.rb @@ -0,0 +1,54 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "elastic_graph/schema_artifacts/runtime_metadata/params" + +module ElasticGraph + module SchemaArtifacts + module RuntimeMetadata + # Bundles the param configuration for nested sourced_from update targets. + # `field_params` defines which fields to extract from the event and write onto + # the target nested element. `path_identifier_params` defines which values to + # extract from the event to identify which nested element to target. + # + # @private + class NestedSourcedDataParams < ::Data.define(:field_params, :path_identifier_params) + FIELD_PARAMS = "field_params" + PATH_IDENTIFIER_PARAMS = "path_identifier_params" + + EMPTY = new(field_params: {}, path_identifier_params: {}) + + def self.from_hash(hash) + new( + field_params: Param.load_params_hash(hash[FIELD_PARAMS] || {}), + path_identifier_params: Param.load_params_hash(hash[PATH_IDENTIFIER_PARAMS] || {}) + ) + end + + def to_dumpable_hash + { + FIELD_PARAMS => Param.dump_params_hash(field_params), + PATH_IDENTIFIER_PARAMS => Param.dump_params_hash(path_identifier_params) + } + end + + def empty? + field_params.empty? && path_identifier_params.empty? + end + + # Resolves params into script-ready values from the given prepared record. + def script_params_for(prepared_record) + { + "nestedSourcedFields" => field_params.transform_values { |param| param.value_for(prepared_record) }, + "nestedSourcedPathIdentifiers" => path_identifier_params.transform_values { |param| param.value_for(prepared_record) } + } + end + end + end + end +end diff --git a/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/runtime_metadata/nested_sourced_path_segment.rb b/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/runtime_metadata/nested_sourced_path_segment.rb new file mode 100644 index 000000000..09d7e76c0 --- /dev/null +++ b/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/runtime_metadata/nested_sourced_path_segment.rb @@ -0,0 +1,66 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +module ElasticGraph + module SchemaArtifacts + module RuntimeMetadata + # Represents a segment in a nested sourced path that navigates into a list field, + # matching an element by a key field. + # + # @private + class ListPathSegment < ::Data.define(:field, :match_field, :source_field) + # @dynamic to_painless_param + TYPE = "list" + FIELD = "field" + MATCH_FIELD = "matchField" + SOURCE_FIELD = "sourceField" + + def to_dumpable_hash + {"type" => TYPE, FIELD => field, MATCH_FIELD => match_field, SOURCE_FIELD => source_field} + end + + alias_method :to_painless_param, :to_dumpable_hash + + def self.from_hash(hash) + new(field: hash[FIELD], match_field: hash[MATCH_FIELD], source_field: hash[SOURCE_FIELD]) + end + end + + # Represents a segment in a nested sourced path that navigates into an object field. + # + # @private + class ObjectPathSegment < ::Data.define(:field) + # @dynamic to_painless_param + TYPE = "object" + FIELD = "field" + + def to_dumpable_hash + {"type" => TYPE, FIELD => field} + end + + alias_method :to_painless_param, :to_dumpable_hash + + def self.from_hash(hash) + new(field: hash[FIELD]) + end + end + + # @private + module NestedSourcedPathSegment + def self.from_hash(hash) + case hash["type"] + when ListPathSegment::TYPE + ListPathSegment.from_hash(hash) + when ObjectPathSegment::TYPE + ObjectPathSegment.from_hash(hash) + end + end + end + end + end +end diff --git a/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/runtime_metadata/update_target.rb b/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/runtime_metadata/update_target.rb index 37c021ae5..b0a8ff2d8 100644 --- a/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/runtime_metadata/update_target.rb +++ b/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/runtime_metadata/update_target.rb @@ -7,6 +7,7 @@ # frozen_string_literal: true require "elastic_graph/constants" +require "elastic_graph/schema_artifacts/runtime_metadata/nested_sourced_data_params" require "elastic_graph/schema_artifacts/runtime_metadata/params" module ElasticGraph @@ -23,6 +24,7 @@ class UpdateTarget < ::Data.define( :routing_value_source, :rollover_timestamp_value_source, :top_level_fields_params, + :nested_sourced_data_params, :metadata_params ) TYPE = "type" @@ -32,6 +34,7 @@ class UpdateTarget < ::Data.define( ROUTING_VALUE_SOURCE = "routing_value_source" ROLLOVER_TIMESTAMP_VALUE_SOURCE = "rollover_timestamp_value_source" TOP_LEVEL_FIELDS_PARAMS = "top_level_fields_params" + NESTED_SOURCED_DATA_PARAMS = "nested_sourced_data_params" METADATA_PARAMS = "metadata_params" def self.from_hash(hash) @@ -43,6 +46,7 @@ def self.from_hash(hash) routing_value_source: hash[ROUTING_VALUE_SOURCE], rollover_timestamp_value_source: hash[ROLLOVER_TIMESTAMP_VALUE_SOURCE], top_level_fields_params: Param.load_params_hash(hash[TOP_LEVEL_FIELDS_PARAMS] || {}), + nested_sourced_data_params: NestedSourcedDataParams.from_hash(hash[NESTED_SOURCED_DATA_PARAMS] || {}), metadata_params: Param.load_params_hash(hash[METADATA_PARAMS] || {}) ) end @@ -52,6 +56,7 @@ def to_dumpable_hash # Keys here are ordered alphabetically; please keep them that way. ID_SOURCE => id_source, METADATA_PARAMS => Param.dump_params_hash(metadata_params), + NESTED_SOURCED_DATA_PARAMS => nested_sourced_data_params.to_dumpable_hash, RELATIONSHIP => relationship, ROLLOVER_TIMESTAMP_VALUE_SOURCE => rollover_timestamp_value_source, ROUTING_VALUE_SOURCE => routing_value_source, @@ -66,15 +71,18 @@ def for_normal_indexing? end def params_for(doc_id:, event:, prepared_record:) - top_level_fields = top_level_fields_params.to_h do |name, param| - [name, param.value_for(prepared_record)] + top_level_fields = top_level_fields_params.transform_values do |param| + param.value_for(prepared_record) end - meta = metadata_params.to_h do |name, param| - [name, param.value_for(event)] + meta = metadata_params.transform_values do |param| + param.value_for(event) end - meta.merge({"id" => doc_id, "topLevelFields" => top_level_fields}) + meta.merge( + {"id" => doc_id, "topLevelFields" => top_level_fields}, + nested_sourced_data_params.script_params_for(prepared_record) + ) end end end diff --git a/elasticgraph-schema_artifacts/sig/elastic_graph/schema_artifacts/runtime_metadata/index_definition.rbs b/elasticgraph-schema_artifacts/sig/elastic_graph/schema_artifacts/runtime_metadata/index_definition.rbs index fc6bacfcd..d1e52c042 100644 --- a/elasticgraph-schema_artifacts/sig/elastic_graph/schema_artifacts/runtime_metadata/index_definition.rbs +++ b/elasticgraph-schema_artifacts/sig/elastic_graph/schema_artifacts/runtime_metadata/index_definition.rbs @@ -8,6 +8,7 @@ module ElasticGraph attr_reader current_sources: ::Set[::String] attr_reader fields_by_path: ::Hash[::String, IndexField] attr_reader has_had_multiple_sources: bool + attr_reader nested_sourced_paths: ::Hash[::String, ::Array[nestedSourcedPathSegment]] def initialize: ( route_with: ::String, @@ -15,7 +16,8 @@ module ElasticGraph default_sort_fields: ::Array[SortField], current_sources: ::Set[::String], fields_by_path: ::Hash[::String, IndexField], - has_had_multiple_sources: bool + has_had_multiple_sources: bool, + ?nested_sourced_paths: ::Hash[::String, ::Array[nestedSourcedPathSegment]] ) -> void def with: ( @@ -24,7 +26,8 @@ module ElasticGraph ?default_sort_fields: ::Array[SortField], ?current_sources: ::Enumerable[::String], ?fields_by_path: ::Hash[::String, IndexField], - ?has_had_multiple_sources: bool + ?has_had_multiple_sources: bool, + ?nested_sourced_paths: ::Hash[::String, ::Array[nestedSourcedPathSegment]] ) -> IndexDefinition end @@ -35,6 +38,7 @@ module ElasticGraph CURRENT_SOURCES: "current_sources" FIELDS_BY_PATH: "fields_by_path" HAS_HAD_MULTIPLE_SOURCES: "has_had_multiple_sources" + NESTED_SOURCED_PATHS: "nested_sourced_paths" def initialize: ( route_with: ::String, @@ -42,7 +46,8 @@ module ElasticGraph default_sort_fields: ::Array[SortField], current_sources: ::Enumerable[::String], fields_by_path: ::Hash[::String, IndexField], - has_had_multiple_sources: bool + has_had_multiple_sources: bool, + ?nested_sourced_paths: ::Hash[::String, ::Array[nestedSourcedPathSegment]] ) -> void def self.from_hash: (::Hash[::String, untyped]) -> IndexDefinition diff --git a/elasticgraph-schema_artifacts/sig/elastic_graph/schema_artifacts/runtime_metadata/nested_sourced_data_params.rbs b/elasticgraph-schema_artifacts/sig/elastic_graph/schema_artifacts/runtime_metadata/nested_sourced_data_params.rbs new file mode 100644 index 000000000..3a00c7632 --- /dev/null +++ b/elasticgraph-schema_artifacts/sig/elastic_graph/schema_artifacts/runtime_metadata/nested_sourced_data_params.rbs @@ -0,0 +1,23 @@ +module ElasticGraph + module SchemaArtifacts + module RuntimeMetadata + class NestedSourcedDataParamsSuperType + attr_reader field_params: paramsHash + attr_reader path_identifier_params: paramsHash + + def initialize: (field_params: paramsHash, path_identifier_params: paramsHash) -> void + end + + class NestedSourcedDataParams < NestedSourcedDataParamsSuperType + FIELD_PARAMS: "field_params" + PATH_IDENTIFIER_PARAMS: "path_identifier_params" + EMPTY: NestedSourcedDataParams + + def self.from_hash: (::Hash[::String, untyped]) -> NestedSourcedDataParams + def to_dumpable_hash: () -> ::Hash[::String, untyped] + def empty?: () -> bool + def script_params_for: (::Hash[::String, untyped]) -> ::Hash[::String, ::Hash[::String, untyped]] + end + end + end +end diff --git a/elasticgraph-schema_artifacts/sig/elastic_graph/schema_artifacts/runtime_metadata/nested_sourced_path_segment.rbs b/elasticgraph-schema_artifacts/sig/elastic_graph/schema_artifacts/runtime_metadata/nested_sourced_path_segment.rbs new file mode 100644 index 000000000..dffd3d525 --- /dev/null +++ b/elasticgraph-schema_artifacts/sig/elastic_graph/schema_artifacts/runtime_metadata/nested_sourced_path_segment.rbs @@ -0,0 +1,45 @@ +module ElasticGraph + module SchemaArtifacts + module RuntimeMetadata + class ListPathSegmentSuperType + attr_reader field: ::String + attr_reader match_field: ::String + attr_reader source_field: ::String + + def initialize: (field: ::String, match_field: ::String, source_field: ::String) -> void + end + + class ListPathSegment < ListPathSegmentSuperType + TYPE: "list" + FIELD: "field" + MATCH_FIELD: "matchField" + SOURCE_FIELD: "sourceField" + + def self.from_hash: (::Hash[::String, untyped]) -> ListPathSegment + def to_dumpable_hash: () -> ::Hash[::String, ::String] + alias to_painless_param to_dumpable_hash + end + + class ObjectPathSegmentSuperType + attr_reader field: ::String + + def initialize: (field: ::String) -> void + end + + class ObjectPathSegment < ObjectPathSegmentSuperType + TYPE: "object" + FIELD: "field" + + def self.from_hash: (::Hash[::String, untyped]) -> ObjectPathSegment + def to_dumpable_hash: () -> ::Hash[::String, ::String] + alias to_painless_param to_dumpable_hash + end + + type nestedSourcedPathSegment = ListPathSegment | ObjectPathSegment + + module NestedSourcedPathSegment + def self.from_hash: (::Hash[::String, untyped]) -> nestedSourcedPathSegment? + end + end + end +end diff --git a/elasticgraph-schema_artifacts/sig/elastic_graph/schema_artifacts/runtime_metadata/update_target.rbs b/elasticgraph-schema_artifacts/sig/elastic_graph/schema_artifacts/runtime_metadata/update_target.rbs index 4a43bfdb3..99647cd38 100644 --- a/elasticgraph-schema_artifacts/sig/elastic_graph/schema_artifacts/runtime_metadata/update_target.rbs +++ b/elasticgraph-schema_artifacts/sig/elastic_graph/schema_artifacts/runtime_metadata/update_target.rbs @@ -9,6 +9,7 @@ module ElasticGraph attr_reader routing_value_source: ::String? attr_reader rollover_timestamp_value_source: ::String? attr_reader top_level_fields_params: paramsHash + attr_reader nested_sourced_data_params: NestedSourcedDataParams attr_reader metadata_params: paramsHash def initialize: ( @@ -19,6 +20,7 @@ module ElasticGraph routing_value_source: ::String?, rollover_timestamp_value_source: ::String?, top_level_fields_params: paramsHash, + nested_sourced_data_params: NestedSourcedDataParams, metadata_params: paramsHash ) -> void @@ -30,6 +32,7 @@ module ElasticGraph ?routing_value_source: ::String?, ?rollover_timestamp_value_source: ::String?, ?top_level_fields_params: paramsHash, + ?nested_sourced_data_params: NestedSourcedDataParams, ?metadata_params: paramsHash ) -> UpdateTarget @@ -46,6 +49,7 @@ module ElasticGraph ROUTING_VALUE_SOURCE: "routing_value_source" ROLLOVER_TIMESTAMP_VALUE_SOURCE: "rollover_timestamp_value_source" TOP_LEVEL_FIELDS_PARAMS: "top_level_fields_params" + NESTED_SOURCED_DATA_PARAMS: "nested_sourced_data_params" METADATA_PARAMS: "metadata_params" def self.from_hash: (::Hash[::String, untyped]) -> UpdateTarget diff --git a/elasticgraph-schema_artifacts/spec/unit/elastic_graph/schema_artifacts/runtime_metadata/schema_spec.rb b/elasticgraph-schema_artifacts/spec/unit/elastic_graph/schema_artifacts/runtime_metadata/schema_spec.rb index c9f3733eb..132a82dc7 100644 --- a/elasticgraph-schema_artifacts/spec/unit/elastic_graph/schema_artifacts/runtime_metadata/schema_spec.rb +++ b/elasticgraph-schema_artifacts/spec/unit/elastic_graph/schema_artifacts/runtime_metadata/schema_spec.rb @@ -43,6 +43,7 @@ module RuntimeMetadata routing_value_source: "cost.currency_name", rollover_timestamp_value_source: "currency_introduced_on", top_level_fields_params: {"workspace_id" => DynamicParam.new(source_path: "wid", cardinality: :one)}, + nested_sourced_data_params: NestedSourcedDataParams::EMPTY, metadata_params: {"relationshipName" => StaticParam.new(value: "currency")} ), UpdateTarget.new( @@ -53,6 +54,7 @@ module RuntimeMetadata routing_value_source: nil, rollover_timestamp_value_source: nil, top_level_fields_params: {}, + nested_sourced_data_params: NestedSourcedDataParams::EMPTY, metadata_params: {} ) ], @@ -311,6 +313,7 @@ module RuntimeMetadata routing_value_source: nil, rollover_timestamp_value_source: nil, top_level_fields_params: {"workspace_id" => dynamic_param_with(cardinality: :many)}, + nested_sourced_data_params: NestedSourcedDataParams::EMPTY, metadata_params: {} )]), "IndexDefinitionNamesOnly" => object_type_with(index_definition_names: ["foo", "bar"]), diff --git a/elasticgraph-schema_artifacts/spec/unit/elastic_graph/schema_artifacts/runtime_metadata/update_target_spec.rb b/elasticgraph-schema_artifacts/spec/unit/elastic_graph/schema_artifacts/runtime_metadata/update_target_spec.rb index d3ee76a61..2050a653b 100644 --- a/elasticgraph-schema_artifacts/spec/unit/elastic_graph/schema_artifacts/runtime_metadata/update_target_spec.rb +++ b/elasticgraph-schema_artifacts/spec/unit/elastic_graph/schema_artifacts/runtime_metadata/update_target_spec.rb @@ -26,6 +26,7 @@ module RuntimeMetadata routing_value_source: nil, rollover_timestamp_value_source: nil, top_level_fields_params: {}, + nested_sourced_data_params: NestedSourcedDataParams::EMPTY, metadata_params: {} ) end @@ -84,9 +85,9 @@ module RuntimeMetadata } ) - without_id_or_top_level_fields = params.except("id", "topLevelFields") + without_omitted_fields = params.except("id", "topLevelFields", "nestedSourcedFields", "nestedSourcedPathIdentifiers") - expect(without_id_or_top_level_fields).to eq( + expect(without_omitted_fields).to eq( "foo" => 43, "bar" => "hello", "bazz" => [12] diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/factory.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/factory.rb index cd1bf8a8f..a993b8ab3 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/factory.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/factory.rb @@ -273,13 +273,14 @@ def new_field_source(relationship_name:, field_path:) end @@field_source_new = prevent_non_factory_instantiation_of(SchemaElements::FieldSource) - def new_relationship(field, cardinality:, related_type:, foreign_key:, direction:) + def new_relationship(field, cardinality:, related_type:, foreign_key:, direction:, indexing_only: false) @@relationship_new.call( field, cardinality: cardinality, related_type: related_type, foreign_key: foreign_key, - direction: direction + direction: direction, + indexing_only: indexing_only ) end @@relationship_new = prevent_non_factory_instantiation_of(SchemaElements::Relationship) diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb index a01df76c5..a8cd351f9 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb @@ -272,7 +272,8 @@ def runtime_metadata_for_source_type metadata_params: {}, top_level_fields_params: fields.map(&:source_field).to_h do |f| [f, SchemaArtifacts::RuntimeMetadata::DynamicParam.new(source_path: f, cardinality: :many)] - end + end, + nested_sourced_data_params: SchemaArtifacts::RuntimeMetadata::NestedSourcedDataParams::EMPTY ) end diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/index.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/index.rb index a6400db64..8bef98b55 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/index.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/index.rb @@ -38,7 +38,11 @@ module Indexing # @return [RolloverConfig, nil] rollover configuration for the index # @!attribute [r] has_had_multiple_sources_flag # @return [Boolean] whether this index has ever had multiple sources - class Index < Struct.new(:name, :default_sort_pairs, :settings, :schema_def_state, :indexed_type, :routing_field_path, :rollover_config, :has_had_multiple_sources_flag) + # @!attribute [r] nested_sourced_paths + # @return [Hash>] + # map from relationship name to the path segments that the painless script uses to + # navigate to nested elements whose fields are sourced from another type via `sourced_from`. + class Index < Struct.new(:name, :default_sort_pairs, :settings, :schema_def_state, :indexed_type, :routing_field_path, :rollover_config, :has_had_multiple_sources_flag, :nested_sourced_paths) include Mixins::HasReadableToSAndInspect.new { |i| i.name } # @param name [String] name of the index @@ -55,7 +59,7 @@ def initialize(name, settings, schema_def_state, indexed_type) settings = DEFAULT_SETTINGS.merge(Support::HashUtil.flatten_and_stringify_keys(settings, prefix: "index")) - super(name, [], settings, schema_def_state, indexed_type, nil, nil, false) + super(name, [], settings, schema_def_state, indexed_type, nil, nil, false, {}) schema_def_state.after_user_definition_complete do # `id` is the field Elasticsearch/OpenSearch use for routing by default: @@ -251,6 +255,13 @@ def to_index_template_config } end + # Registers the nested sourced path segments for a relationship on this index. + # Called by `SourcedUpdateTargetsResolver` during schema resolution. + # @api private + def register_nested_sourced_paths(relationship_name, path_segments) + nested_sourced_paths[relationship_name] = path_segments + end + # @return [SchemaArtifacts::RuntimeMetadata::IndexDefinition] runtime metadata for this index def runtime_metadata SchemaArtifacts::RuntimeMetadata::IndexDefinition.new( @@ -264,7 +275,8 @@ def runtime_metadata direction: direction ) end, - has_had_multiple_sources: has_had_multiple_sources_flag + has_had_multiple_sources: has_had_multiple_sources_flag, + nested_sourced_paths: nested_sourced_paths ) end @@ -298,6 +310,13 @@ def mappings .then { |mapping| ListCountsMapping.merged_into(mapping, for_type: indexed_type) } .then do |fm| internal_fields = { + "__nested_sourced_data" => { + "type" => "object", + # __nested_sourced_data stores sourced data for nested sourced_from fields. Its keys are not + # statically known (they're relationship names and composite element keys), so we + # set dynamic to "false" to allow arbitrary keys in _source without indexing them. + "dynamic" => "false" + }, "__sources" => {"type" => "keyword"}, "__versions" => { "type" => "object", diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/nested_relationship_chain_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/nested_relationship_chain_resolver.rb new file mode 100644 index 000000000..541aa0bb7 --- /dev/null +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/nested_relationship_chain_resolver.rb @@ -0,0 +1,191 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "elastic_graph/errors" + +module ElasticGraph + module SchemaDefinition + module Indexing + # Resolves a chain of `parent_relationship` links from a leaf embedded type up to the + # root indexed type. Produces a `ResolvedNestedChain` describing the nested path and + # match fields at each level. + # + # @private + class NestedRelationshipChainResolver + def initialize(schema_def_state:) + @schema_def_state = schema_def_state + end + + # Resolves the chain starting from `starting_relationship` (which must have a + # `parent_ref`) on `starting_type`. + # + # Returns a tuple of [resolved_chain, errors]. + # If errors is non-empty, resolved_chain will be nil. + def resolve(starting_relationship, starting_type) + errors = [] # : ::Array[::String] + chain = [] # : ::Array[PathSegment] + visited_types = ::Set.new([starting_type.name]) + + current_rel, current_type = resolve_chain( + starting_relationship, starting_type, chain, errors, visited_types + ) + + return [nil, errors] if errors.any? + + # The recursion terminated because current_rel has no parent_ref — + # this is the root relationship. Validate that current_type is indexed. + unless current_type.root_document_type? + errors << "The `parent_relationship` chain from #{rel_description(starting_type, starting_relationship)} " \ + "terminates at `#{current_type.name}`, but `#{current_type.name}` is not an indexed type. " \ + "The chain must terminate at an indexed type." + return [nil, errors] + end + + resolved_chain = ResolvedNestedChain.new( + root_indexed_type: current_type, + path_segments: chain.reverse, # reverse so root-to-leaf order + root_relationship: current_rel + ) + + [resolved_chain, errors] + end + + private + + # Recursively walks from leaf to root, building path segments in reverse. + # Returns the final [relationship, type] tuple when the chain terminates + # (i.e., no more parent_ref), or short-circuits on errors. + def resolve_chain(current_rel, current_type, chain, errors, visited_types) + ref = current_rel.parent_ref + return [current_rel, current_type] unless ref + + parent_type, parent_rel = validate_link(current_rel, current_type, ref, errors, visited_types) + return [current_rel, current_type] if errors.any? + + build_path_segment(current_rel, current_type, parent_type, chain, errors) + return [current_rel, current_type] if errors.any? + + visited_types.add(parent_type.name) + resolve_chain(parent_rel, parent_type, chain, errors, visited_types) + end + + # Validates a single link in the chain: checks indexing_only, circular refs, + # parent type existence, parent relationship existence, and source type consistency. + # Returns [parent_type, parent_rel] on success, or appends to errors and returns nils. + def validate_link(current_rel, current_type, ref, errors, visited_types) + unless current_rel.indexing_only + errors << "#{rel_description(current_type, current_rel)} uses `parent_relationship` but is not declared with " \ + "`indexing_only: true`. Relationships with `parent_relationship` must be indexing-only." + return [nil, nil] + end + + parent_type_name = ref.type_ref.name + if visited_types.include?(parent_type_name) + errors << "#{rel_description(current_type, current_rel)} creates a circular `parent_relationship` chain " \ + "— `#{parent_type_name}` was already visited. The chain must terminate at a root indexed type." + return [nil, nil] + end + + parent_type = ref.type_ref.as_object_type + unless parent_type + errors << "#{rel_description(current_type, current_rel)} references parent type " \ + "`#{parent_type_name}` via `parent_relationship`, but that type does not exist." + return [nil, nil] + end + + parent_rel = parent_type.relationships_by_name[ref.relationship_name] + unless parent_rel + errors << "#{rel_description(current_type, current_rel)} references parent relationship " \ + "`#{parent_type.name}.#{ref.relationship_name}` via `parent_relationship`, " \ + "but that relationship does not exist. Is it misspelled?" + return [nil, nil] + end + + current_source_type_name = current_rel.related_type.unwrap_non_null.name + parent_source_type_name = parent_rel.related_type.unwrap_non_null.name + unless current_source_type_name == parent_source_type_name + errors << "#{rel_description(current_type, current_rel)} relates to `#{current_source_type_name}`, " \ + "but its parent relationship `#{parent_type.name}.#{ref.relationship_name}` relates to " \ + "`#{parent_source_type_name}`. All relationships in a `parent_relationship` chain must relate to the same source type." + return [nil, nil] + end + + [parent_type, parent_rel] + end + + # Builds a PathSegment for the current level and appends it to chain. + # Validates the embedding field exists and (for list segments) that the child type has an id field. + def build_path_segment(current_rel, current_type, parent_type, chain, errors) + embedding_field = find_embedding_field(parent_type, current_type, errors) + return if errors.any? + + unless embedding_field + errors << "#{rel_description(current_type, current_rel)} declares `#{parent_type.name}` as its parent type " \ + "via `parent_relationship`, but `#{parent_type.name}` has no field of type `#{current_type.name}`." + return + end + + if embedding_field.type.list? + unless current_type.indexing_fields_by_name_in_index["id"] + errors << "#{rel_description(current_type, current_rel)} requires an `id` field on `#{current_type.name}` " \ + "for nested element matching, but `#{current_type.name}` has no field named `id`." + return + end + end + + # We use "id" as the match field, consistent with how ElasticGraph relationships always join on `id` + # via foreign keys. In the future, it would be nice if this field name were configurable. + chain << PathSegment.new( + parent_type: parent_type, + embedding_field: embedding_field, + match_field: "id", + source_field: current_rel.foreign_key + ) + end + + def find_embedding_field(parent_type, child_type, errors) + matches = parent_type.graphql_fields_by_name.values.select do |field| + field.type.fully_unwrapped.name == child_type.name + end + + if matches.size > 1 + field_names = matches.map(&:name).join(", ") + errors << "`#{parent_type.name}` has multiple fields of type `#{child_type.name}` (#{field_names}). " \ + "Ambiguous embedding path for `parent_relationship` — cannot determine which field to use." + nil + else + matches.first + end + end + + def rel_description(type, relationship) + "`#{type.name}.#{relationship.name}`" + end + end + + # The result of resolving a nested relationship chain. + # + # @private + ResolvedNestedChain = ::Data.define( + :root_indexed_type, # ObjectType - the indexed type at the root + :path_segments, # Array - ordered root-to-leaf + :root_relationship # Relationship - the root relationship (no parent_relationship) + ) + + # A single segment of the nested path. + # + # @private + PathSegment = ::Data.define( + :parent_type, # ObjectType - the parent type at this level + :embedding_field, # Field - the field on parent_type that embeds the child type + :match_field, # String - field on the nested type to match (e.g., "id") + :source_field # String - field on the source type with the match value (from `via`) + ) + end + end +end diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rb new file mode 100644 index 000000000..5a004e007 --- /dev/null +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rb @@ -0,0 +1,194 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "elastic_graph/schema_artifacts/runtime_metadata/nested_sourced_path_segment" +require "elastic_graph/schema_artifacts/runtime_metadata/params" +require "elastic_graph/schema_definition/indexing/update_target_factory" + +module ElasticGraph + module SchemaDefinition + module Indexing + # Responsible for resolving a nested relationship chain and a set of `sourced_from` fields + # into an `UpdateTarget` for updating nested elements within a root indexed type. + # + # @private + class NestedUpdateTargetResolver + def initialize( + object_type:, + relationship:, + sourced_fields:, + resolved_chain:, + field_path_resolver:, + schema_def_state: + ) + @object_type = object_type + @relationship = relationship + @sourced_fields = sourced_fields + @resolved_chain = resolved_chain + @field_path_resolver = field_path_resolver + @schema_def_state = schema_def_state + end + + # Returns a tuple of [update_target, errors]. + # If errors is non-empty, update_target will be nil. + def resolve + errors = [] # : ::Array[::String] + + if relationship.many? + errors << "`#{object_type.name}.#{relationship.name}` is a `relates_to_many` relationship, " \ + "but nested `sourced_from` is only supported on a `relates_to_one` relationship." + return [nil, errors] + end + + field_params = resolve_nested_sourced_data_params(errors) + return [nil, errors] if field_params.empty? && errors.any? + + path_identifier_params = build_path_identifier_params + nested_sourced_paths = build_nested_sourced_paths + routing_value_source = resolve_routing(errors) + rollover_timestamp_value_source = resolve_rollover(errors) + validate_has_had_multiple_sources(errors) + + if errors.any? + [nil, errors] + else + yield resolved_chain.root_indexed_type, relationship, nested_sourced_paths + + nested_sourced_data_params = SchemaArtifacts::RuntimeMetadata::NestedSourcedDataParams.new( + field_params: field_params, + path_identifier_params: path_identifier_params + ) + + update_target = UpdateTargetFactory.new_normal_indexing_update_target( + type: resolved_chain.root_indexed_type.name, + relationship: relationship.name, + id_source: resolved_chain.root_relationship.foreign_key, + top_level_fields_params: {}, + nested_sourced_data_params: nested_sourced_data_params, + routing_value_source: routing_value_source, + rollover_timestamp_value_source: rollover_timestamp_value_source + ) + + [update_target, errors] + end + end + + private + + # @dynamic object_type, relationship, sourced_fields, resolved_chain, field_path_resolver, schema_def_state + attr_reader :object_type, :relationship, :sourced_fields, :resolved_chain, :field_path_resolver, :schema_def_state + + def related_type + @related_type ||= schema_def_state.object_types_by_name[relationship.related_type.unwrap_non_null.name] + end + + def resolve_nested_sourced_data_params(errors) + sourced_fields.filter_map do |field| + field_source = field.source # : SchemaElements::FieldSource + referenced_field_path = field_path_resolver.resolve_public_path(related_type, field_source.field_path) do |parent_field| + !parent_field.type.list? + end + + if referenced_field_path.nil? + errors << "`#{object_type.name}.#{field.name}` has an invalid `sourced_from` argument: " \ + "`#{related_type.name}.#{field_source.field_path}` does not exist as an indexing field." + nil + else + param = SchemaArtifacts::RuntimeMetadata::DynamicParam.new( + source_path: referenced_field_path.path_in_index, + cardinality: :one + ) + [field.name_in_index, param] + end + end.to_h + end + + def build_path_identifier_params + resolved_chain.path_segments.filter_map do |segment| + # Only list segments need identifier fields — object segments have no ambiguity. + next unless segment.embedding_field.type.list? + + source_field = segment.source_field + [source_field, SchemaArtifacts::RuntimeMetadata::DynamicParam.new( + source_path: source_field, + cardinality: :one + )] + end.to_h + end + + def build_nested_sourced_paths + resolved_chain.path_segments.map do |segment| + if segment.embedding_field.type.list? + SchemaArtifacts::RuntimeMetadata::ListPathSegment.new( + field: segment.embedding_field.name_in_index, + match_field: segment.match_field, + source_field: segment.source_field + ) + else + SchemaArtifacts::RuntimeMetadata::ObjectPathSegment.new( + field: segment.embedding_field.name_in_index + ) + end + end + end + + def resolve_routing(errors) + root_rel = resolved_chain.root_relationship + root_index = resolved_chain.root_indexed_type.index_def + + routing_value_source = root_rel.routing_value_source_for_index(root_index) do |local_need| + errors << "Cannot update `#{resolved_chain.root_indexed_type.name}` documents with nested sourced data from " \ + "`#{relationship.name}` events, because `#{resolved_chain.root_indexed_type.name}` uses custom shard routing " \ + "but we don't know what field to use to route the update requests. To fix it, add a call like this to the " \ + "`#{resolved_chain.root_indexed_type.name}.#{root_rel.name}` relationship definition: " \ + "`rel.equivalent_field \"[#{related_type.name} field]\", locally_named: \"#{local_need}\"`." + return [nil, errors] + end + + if routing_value_source + field_path = field_path_resolver.resolve_public_path(related_type, routing_value_source) do |parent_field| + !parent_field.type.list? + end + field_path&.path_in_index + end + end + + def resolve_rollover(errors) + root_rel = resolved_chain.root_relationship + root_index = resolved_chain.root_indexed_type.index_def + + rollover_value_source = root_rel.rollover_timestamp_value_source_for_index(root_index) do |local_need| + errors << "Cannot update `#{resolved_chain.root_indexed_type.name}` documents with nested sourced data from " \ + "`#{relationship.name}` events, because `#{resolved_chain.root_indexed_type.name}` uses a rollover index " \ + "but we don't know what field to use to select an index for the update requests. To fix it, add a call like this to the " \ + "`#{resolved_chain.root_indexed_type.name}.#{root_rel.name}` relationship definition: " \ + "`rel.equivalent_field \"[#{related_type.name} field]\", locally_named: \"#{local_need}\"`." + return [nil, errors] + end + + if rollover_value_source + field_path = field_path_resolver.resolve_public_path(related_type, rollover_value_source) do |parent_field| + !parent_field.type.list? + end + field_path&.path_in_index + end + end + + def validate_has_had_multiple_sources(errors) + root_type = resolved_chain.root_indexed_type + root_index_def = root_type.index_def + if root_index_def && !root_index_def.has_had_multiple_sources_flag + errors << "Type `#{root_type.name}` has nested `sourced_from` fields (via `#{object_type.name}.#{relationship.name}`) " \ + "but its index `#{root_index_def.name}` has not been configured with `has_had_multiple_sources!`. " \ + "To resolve this, add `i.has_had_multiple_sources!` within the `t.index \"#{root_index_def.name}\"` block." + end + end + end + end + end +end diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_update_targets_resolver.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_update_targets_resolver.rb new file mode 100644 index 000000000..222c259ec --- /dev/null +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/sourced_update_targets_resolver.rb @@ -0,0 +1,167 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "elastic_graph/errors" +require "elastic_graph/schema_definition/indexing/nested_relationship_chain_resolver" +require "elastic_graph/schema_definition/indexing/nested_update_target_resolver" +require "elastic_graph/schema_definition/indexing/relationship_resolver" +require "elastic_graph/schema_definition/indexing/update_target_resolver" + +module ElasticGraph + module SchemaDefinition + module Indexing + # Resolves all `sourced_from` relationships across the schema into update targets, + # keyed by the source type name that publishes the events. + # + # @private + class SourcedUpdateTargetsResolver + def initialize(schema_def_state:) + @schema_def_state = schema_def_state + @sourced_field_errors = [] # : ::Array[::String] + @relationship_errors = [] # : ::Array[::String] + @sourced_update_targets_by_type_name = ::Hash.new { |h, k| h[k] = [] } # : ::Hash[untyped, ::Array[SchemaArtifacts::RuntimeMetadata::UpdateTarget]] + end + + # Returns a map of object type name → list of sourced update targets for that type. + def resolve + @schema_def_state.object_types_by_name.except(*@schema_def_state.namespace_types_by_name.keys).values.each do |object_type| + resolve_for_type(object_type) + end + + raise_if_errors + @sourced_update_targets_by_type_name + end + + private + + def resolve_for_type(object_type) + fields_with_sources_by_relationship_name = + if object_type.own_index_def.nil? + # only indexed types can have `sourced_from` fields, and resolving `fields_with_sources` on an unindexed union type + # such as `_Entity` when we are using apollo can lead to exceptions when multiple entity types have the same field name + # that use different mapping types. + {} # : ::Hash[::String, ::Array[SchemaElements::Field]] + else + object_type + .fields_with_sources + .group_by { |f| (_ = f.source).relationship_name } + end + + defined_relationships = object_type.relationships_by_name.keys + + (defined_relationships | fields_with_sources_by_relationship_name.keys).each do |relationship_name| + sourced_fields = fields_with_sources_by_relationship_name.fetch(relationship_name) { [] } + relationship_resolver = RelationshipResolver.new( + schema_def_state: @schema_def_state, + object_type: object_type, + relationship_name: relationship_name, + sourced_fields: sourced_fields + ) + + resolved_relationship, relationship_error = relationship_resolver.resolve + @relationship_errors << relationship_error if relationship_error + + if object_type.own_index_def && resolved_relationship && sourced_fields.any? + resolve_top_level_update_target(object_type, resolved_relationship, sourced_fields) + end + end + + # Process nested sourced_from fields on non-indexed types. + if object_type.own_index_def.nil? + resolve_nested_update_targets(object_type) + end + end + + def resolve_top_level_update_target(object_type, resolved_relationship, sourced_fields) + update_target_resolver = UpdateTargetResolver.new( + object_type: object_type, + resolved_relationship: resolved_relationship, + sourced_fields: sourced_fields, + field_path_resolver: @schema_def_state.field_path_resolver + ) + + update_target, errors = update_target_resolver.resolve + @sourced_update_targets_by_type_name[resolved_relationship.related_type.name] << update_target if update_target + @sourced_field_errors.concat(errors) + + # Validate that has_had_multiple_sources! has been called when sourced_from is used + if (index_def = object_type.own_index_def) && !index_def.has_had_multiple_sources_flag + @sourced_field_errors << "Type `#{object_type.name}` uses `sourced_from` fields but its index `#{index_def.name}` " \ + "has not been configured with `has_had_multiple_sources!`. To resolve this, add `i.has_had_multiple_sources!` within the " \ + "`t.index \"#{index_def.name}\"` block. This flag is required because indices with multiple sources can contain " \ + "incomplete documents, and ElasticGraph needs to know this to apply proper filtering. Once set, this flag should remain even " \ + "if you later remove all `sourced_from` fields, as the index may still contain historical incomplete documents." + end + end + + def resolve_nested_update_targets(object_type) + nested_relationships = object_type.relationships_by_name + .select { |_, rel| rel.parent_ref } + + return if nested_relationships.empty? + + fields_with_sources_by_relationship_name = object_type + .indexing_fields_by_name_in_index.values + .reject { |f| f.source.nil? } + .group_by { |f| (_ = f.source).relationship_name } + + nested_relationships.each do |rel_name, relationship| + empty_fields = [] # : ::Array[SchemaElements::Field] + sourced_fields = fields_with_sources_by_relationship_name.fetch(rel_name) { empty_fields } + + next if sourced_fields.empty? + + chain_resolver = NestedRelationshipChainResolver.new(schema_def_state: @schema_def_state) + resolved_chain, chain_errors = chain_resolver.resolve(relationship, object_type) + + if chain_errors.any? + @sourced_field_errors.concat(chain_errors) + next + end + + resolved_chain = _ = resolved_chain # : ResolvedNestedChain + resolver = NestedUpdateTargetResolver.new( + object_type: object_type, + relationship: relationship, + sourced_fields: sourced_fields, + resolved_chain: resolved_chain, + field_path_resolver: @schema_def_state.field_path_resolver, + schema_def_state: @schema_def_state + ) + + update_target, resolve_errors = resolver.resolve do |indexed_type, rel, nested_sourced_paths| + indexed_type.index_def.register_nested_sourced_paths(rel.name, nested_sourced_paths) + end + @sourced_field_errors.concat(resolve_errors) + + next unless update_target + + related_type_name = relationship.related_type.unwrap_non_null.name + @sourced_update_targets_by_type_name[related_type_name] << update_target + end + end + + def raise_if_errors + full_errors = [] # : ::Array[::String] + + if @sourced_field_errors.any? + full_errors << "Schema had #{@sourced_field_errors.size} error(s) related to `sourced_from` fields:\n\n#{@sourced_field_errors.map.with_index(1) { |e, i| "#{i}. #{e}" }.join("\n\n")}" + end + + if @relationship_errors.any? + full_errors << "Schema had #{@relationship_errors.size} error(s) related to relationship fields:\n\n#{@relationship_errors.map.with_index(1) { |e, i| "#{i}. #{e}" }.join("\n\n")}" + end + + unless full_errors.empty? + raise Errors::SchemaError, full_errors.join("\n\n") + end + end + end + end + end +end diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_factory.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_factory.rb index 95790c774..e17d66d16 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_factory.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/update_target_factory.rb @@ -18,19 +18,21 @@ def self.new_normal_indexing_update_target( id_source:, top_level_fields_params:, routing_value_source:, - rollover_timestamp_value_source: + rollover_timestamp_value_source:, + nested_sourced_data_params: SchemaArtifacts::RuntimeMetadata::NestedSourcedDataParams::EMPTY ) SchemaArtifacts::RuntimeMetadata::UpdateTarget.new( type: type, relationship: relationship, script_id: INDEX_DATA_UPDATE_SCRIPT_ID, id_source: id_source, + routing_value_source: routing_value_source, + rollover_timestamp_value_source: rollover_timestamp_value_source, + top_level_fields_params: top_level_fields_params, + nested_sourced_data_params: nested_sourced_data_params, metadata_params: standard_metadata_params.merge({ "relationship" => SchemaArtifacts::RuntimeMetadata::StaticParam.new(value: relationship) - }), - top_level_fields_params: top_level_fields_params, - routing_value_source: routing_value_source, - rollover_timestamp_value_source: rollover_timestamp_value_source + }) ) end diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/results.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/results.rb index 59a8c3891..68e47212b 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/results.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/results.rb @@ -12,8 +12,7 @@ require "elastic_graph/schema_artifacts/artifacts_helper_methods" require "elastic_graph/schema_definition/indexing/event_envelope" require "elastic_graph/schema_definition/indexing/json_schema_with_metadata" -require "elastic_graph/schema_definition/indexing/relationship_resolver" -require "elastic_graph/schema_definition/indexing/update_target_resolver" +require "elastic_graph/schema_definition/indexing/sourced_update_targets_resolver" require "elastic_graph/schema_definition/mixins/has_readable_to_s_and_inspect" require "elastic_graph/schema_definition/schema_elements/field_path" require "elastic_graph/schema_definition/scripting/file_system_repository" @@ -143,11 +142,11 @@ def build_dynamic_scripts end def build_runtime_metadata - extra_update_targets_by_object_type_name = identify_extra_update_targets_by_object_type_name + sourced_update_targets_by_type_name = Indexing::SourcedUpdateTargetsResolver.new(schema_def_state: state).resolve object_types_by_name = all_types .select { |t| t.respond_to?(:graphql_fields_by_name) } - .to_h { |type| [type.name, (_ = type).runtime_metadata(extra_update_targets_by_object_type_name.fetch(type.name) { [] })] } + .to_h { |type| [type.name, (_ = type).runtime_metadata(sourced_update_targets_by_type_name.fetch(type.name) { [] })] } scalar_types_by_name = state.scalar_types_by_name.transform_values(&:runtime_metadata) @@ -180,80 +179,6 @@ def build_runtime_metadata ).tap { |rm| verify_runtime_metadata(rm) } end - # Builds a map, keyed by object type name, of extra `update_targets` that have been generated - # from any fields that use `sourced_from` on other types. - def identify_extra_update_targets_by_object_type_name - sourced_field_errors = [] # : ::Array[::String] - relationship_errors = [] # : ::Array[::String] - - state.object_types_by_name.except(*state.namespace_types_by_name.keys).values.each_with_object( - ::Hash.new { |h, k| h[k] = [] } # : ::Hash[untyped, ::Array[SchemaArtifacts::RuntimeMetadata::UpdateTarget]] - ) do |object_type, accum| - fields_with_sources_by_relationship_name = - if object_type.own_index_def.nil? - # only indexed types can have `sourced_from` fields, and resolving `fields_with_sources` on an unindexed union type - # such as `_Entity` when we are using apollo can lead to exceptions when multiple entity types have the same field name - # that use different mapping types. - {} # : ::Hash[::String, ::Array[SchemaElements::Field]] - else - object_type - .fields_with_sources - .group_by { |f| (_ = f.source).relationship_name } - end - - defined_relationships = object_type.relationships_by_name.keys - - (defined_relationships | fields_with_sources_by_relationship_name.keys).each do |relationship_name| - sourced_fields = fields_with_sources_by_relationship_name.fetch(relationship_name) { [] } - relationship_resolver = Indexing::RelationshipResolver.new( - schema_def_state: state, - object_type: object_type, - relationship_name: relationship_name, - sourced_fields: sourced_fields - ) - - resolved_relationship, relationship_error = relationship_resolver.resolve - relationship_errors << relationship_error if relationship_error - - if object_type.own_index_def && resolved_relationship && sourced_fields.any? - update_target_resolver = Indexing::UpdateTargetResolver.new( - object_type: object_type, - resolved_relationship: resolved_relationship, - sourced_fields: sourced_fields, - field_path_resolver: state.field_path_resolver - ) - - update_target, errors = update_target_resolver.resolve - accum[resolved_relationship.related_type.name] << update_target if update_target - sourced_field_errors.concat(errors) - - # Validate that has_had_multiple_sources! has been called when sourced_from is used - if (index_def = object_type.own_index_def) && !index_def.has_had_multiple_sources_flag - sourced_field_errors << "Type `#{object_type.name}` uses `sourced_from` fields but its index `#{index_def.name}` " \ - "has not been configured with `has_had_multiple_sources!`. To resolve this, add `i.has_had_multiple_sources!` within the " \ - "`t.index \"#{index_def.name}\"` block. This flag is required because indices with multiple sources can contain " \ - "incomplete documents, and ElasticGraph needs to know this to apply proper filtering. Once set, this flag should remain even " \ - "if you later remove all `sourced_from` fields, as the index may still contain historical incomplete documents." - end - end - end - end.tap do - full_errors = [] # : ::Array[::String] - - if sourced_field_errors.any? - full_errors << "Schema had #{sourced_field_errors.size} error(s) related to `sourced_from` fields:\n\n#{sourced_field_errors.map.with_index(1) { |e, i| "#{i}. #{e}" }.join("\n\n")}" - end - - if relationship_errors.any? - full_errors << "Schema had #{relationship_errors.size} error(s) related to relationship fields:\n\n#{relationship_errors.map.with_index(1) { |e, i| "#{i}. #{e}" }.join("\n\n")}" - end - - unless full_errors.empty? - raise Errors::SchemaError, full_errors.join("\n\n") - end - end - end - # Generates the SDL defined by your schema. Intended to be called only once # at the very end (after evaluating the "main" template). `Evaluator` calls this # automatically at the end. diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/schema_elements/relationship.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/schema_elements/relationship.rb index 29c7ec2f0..3acc8c590 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/schema_elements/relationship.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/schema_elements/relationship.rb @@ -37,24 +37,42 @@ module SchemaElements # end # end class Relationship < DelegateClass(Field) - # @dynamic related_type, hide_relationship_runtime_metadata, hide_relationship_runtime_metadata= + # @dynamic related_type, foreign_key, hide_relationship_runtime_metadata, hide_relationship_runtime_metadata=, parent_ref, indexing_only + + # References a parent relationship in a nested sourced_from chain. + # @private + ParentRef = ::Data.define(:type_ref, :relationship_name) # @return [ObjectType, InterfaceType, UnionType] the type this relationship relates to attr_reader :related_type + # @return [String] the foreign key field name (the `via` parameter) + # @private + attr_reader :foreign_key + # @private attr_accessor :hide_relationship_runtime_metadata + # @return [ParentRef, nil] reference to the parent relationship in a nested sourced_from chain # @private - def initialize(field, cardinality:, related_type:, foreign_key:, direction:) + attr_reader :parent_ref + + # @return [Boolean] true if this relationship is for indexing only (not exposed in GraphQL) + # @private + attr_reader :indexing_only + + # @private + def initialize(field, cardinality:, related_type:, foreign_key:, direction:, indexing_only: false) super(field) self.hide_relationship_runtime_metadata = false @cardinality = cardinality @related_type = related_type @foreign_key = foreign_key @direction = direction + @indexing_only = indexing_only @equivalent_field_paths_by_local_path = {} @additional_filter = {} + @parent_ref = nil end # Adds additional filter conditions to a relationship beyond the foreign key. @@ -136,6 +154,59 @@ def equivalent_field(path, locally_named: path) end end + # Indicates that this relationship chains through a parent relationship to reach the root indexed type. + # + # Use this API when defining relationships on embedded (non-indexed) types that need to use `sourced_from` + # on their fields. By chaining relationships through parent types, ElasticGraph can resolve the path from + # the nested type up to the root indexed type and properly update nested fields when source events arrive. + # + # @param parent_type_name [String] name of the parent type in the nesting hierarchy + # @param parent_relationship_name [String] name of the relationship on the parent type + # @return [void] + # + # @example Define a nested sourced_from relationship chain + # ElasticGraph.define_schema do |schema| + # schema.object_type "Team" do |t| + # t.field "id", "ID!" + # t.field "name", "String" + # t.field "players", "[Player!]" + # t.relates_to_many "statLines", "StatLine", via: "teamId", dir: :in, indexing_only: true + # t.index "teams" do |i| + # i.has_had_multiple_sources! + # end + # end + # + # schema.object_type "Player" do |t| + # t.field "id", "ID!" + # t.field "name", "String" + # t.field "goalsScored", "Int" do |f| + # f.sourced_from "statLine", "goals" + # end + # t.relates_to_one "statLine", "StatLine", via: "playerId", dir: :in, indexing_only: true do |r| + # r.parent_relationship "Team", "statLines" + # end + # end + # + # schema.object_type "StatLine" do |t| + # t.field "id", "ID!" + # t.field "teamId", "ID" + # t.field "playerId", "ID" + # t.field "goals", "Int" + # t.index "stat_lines" + # end + # end + def parent_relationship(parent_type_name, parent_relationship_name) + if @parent_ref + raise Errors::SchemaError, "`parent_relationship` has been called multiple times on `#{parent_type.name}.#{name}`, " \ + "but each relationship can have only one `parent_relationship`." + end + + @parent_ref = ParentRef.new( + type_ref: schema_def_state.type_ref(parent_type_name), + relationship_name: parent_relationship_name + ) + end + # Gets the `routing_value_source` from this relationship for the given `index`, based on the configured # routing used by `index` and the configured equivalent fields. # diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/schema_elements/type_with_subfields.rb b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/schema_elements/type_with_subfields.rb index 4fc5f9c2f..692281b2a 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/schema_elements/type_with_subfields.rb +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/schema_elements/type_with_subfields.rb @@ -574,7 +574,8 @@ def relates_to(field_name, type, via:, dir:, foreign_key_type:, cardinality:, re cardinality: cardinality, related_type: schema_def_state.type_ref(related_type).to_final_form, foreign_key: via, - direction: dir + direction: dir, + indexing_only: indexing_only ) field.relationship = relationship diff --git a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/scripting/scripts/update/index_data.painless b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/scripting/scripts/update/index_data.painless index b618f13fa..9bc8dabe6 100644 --- a/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/scripting/scripts/update/index_data.painless +++ b/elasticgraph-schema_definition/lib/elastic_graph/schema_definition/scripting/scripts/update/index_data.painless @@ -1,5 +1,108 @@ -// --- Helper Functions --- // -void setup(Map source, String relationship, Map counts) { +// ============================================================ +// Helper Functions +// ============================================================ + +// Encodes a list of strings into a length-prefixed string. +// Each part is encoded as "length:value" and parts are concatenated directly. +// This encoding is unambiguous regardless of what characters the values contain. +String encodeKey(List parts) { + StringBuilder sb = new StringBuilder(); + for (String part : parts) { + sb.append(part.length()); + sb.append(':'); + sb.append(part); + } + return sb.toString(); +} + +// Decodes a length-prefixed string back into a list of strings. +List decodeKey(String key) { + List parts = new ArrayList(); + int i = 0; + while (i < key.length()) { + int colonPos = key.indexOf(":", i); + int length = Integer.parseInt(key.substring(i, colonPos)); + int valueStart = colonPos + 1; + parts.add(key.substring(valueStart, valueStart + length)); + i = valueStart + length; + } + return parts; +} + +// Builds a nested element key (as an encoded string) from path segments. +// List segments contribute their matched identifier value; object segments contribute their field name. +// Returns "" if no path segments are configured (i.e., this is not a nested sourced event). +String buildNestedElementKey(String relationship, Map nestedSourcedPaths, Map pathIdentifiers) { + List pathSegments = (List) nestedSourcedPaths.get(relationship); + if (pathSegments == null) { + return ""; + } + List parts = new ArrayList(); + for (Map segment : pathSegments) { + if ("list".equals(segment.get("type"))) { + parts.add(pathIdentifiers[segment.sourceField]); + } else { + parts.add(segment.get("field")); + } + } + return encodeKey(parts); +} + +// Builds the versions key by combining the relationship name with the element key parts. +// For top-level events (empty element key), returns just the relationship name. +String buildVersionsKey(String relationship, String nestedElementKey) { + if (nestedElementKey.isEmpty()) { + return relationship; + } + List parts = decodeKey(nestedElementKey); + parts.add(0, relationship); + return encodeKey(parts); +} + +// Finds an element in a list where element[matchField] equals matchValue. Returns null if not found. +def findInList(List elements, String matchField, String matchValue) { + for (Map element : elements) { + if (matchValue.equals(element[matchField])) { + return element; + } + } + return null; +} + +// Navigates from `source` through `pathSegments` to find the target nested element. +// Returns the matched element, or null if the path doesn't exist or no match is found. +def navigateToNestedElement(Map source, List pathSegments, List keyParts) { + Map current = source; + + for (int i = 0; i < pathSegments.size(); i++) { + Map segment = (Map) pathSegments.get(i); + String field = (String) segment.get("field"); + + if (!current.containsKey(field)) { + return null; + } + + if ("list".equals(segment.get("type"))) { + current = (Map) findInList((List) current.get(field), (String) segment.get("matchField"), (String) keyParts.get(i)); + } else { + current = (Map) current.get(field); + } + + if (current == null) { + return null; + } + } + + return current; +} + + +// ============================================================ +// Main Functions +// ============================================================ + +// Initializes internal bookkeeping structures (__sources, __versions, __counts, __nested_sourced_data). +void setup(Map source, String versionsKey, String relationship, String nestedElementKey, Map counts) { if (source.__sources == null) { source.__sources = []; } @@ -8,8 +111,17 @@ void setup(Map source, String relationship, Map counts) { source.__versions = [:]; } - if (source.__versions[relationship] == null) { - source.__versions[relationship] = [:]; + if (source.__versions[versionsKey] == null) { + source.__versions[versionsKey] = [:]; + } + + if (!nestedElementKey.isEmpty()) { + if (source.__nested_sourced_data == null) { + source.__nested_sourced_data = [:]; + } + if (source.__nested_sourced_data[relationship] == null) { + source.__nested_sourced_data[relationship] = [:]; + } } if (counts != null && source.__counts == null) { @@ -17,23 +129,24 @@ void setup(Map source, String relationship, Map counts) { } } -void validateSource(Map source, String id, String relationship, String sourceId, long eventVersion) { - Map relationshipVersionsMap = source.__versions.get(relationship); - List previousSourceIdsForRelationship = relationshipVersionsMap.keySet().stream().filter(key -> key != sourceId).collect(Collectors.toList()); +// Validates that this event is allowed: no relationship mutation and no stale version. +void validateSource(Map source, String id, String relationship, String sourceId, long eventVersion, String versionsKey) { + Map versionsMap = source.__versions[versionsKey]; - if (previousSourceIdsForRelationship.size() > 0) { + // Check that no other source ID has previously written to this target. + List previousSourceIds = versionsMap.keySet().stream().filter(key -> key != sourceId).collect(Collectors.toList()); + if (previousSourceIds.size() > 0) { throw new IllegalArgumentException( "Cannot update document " + id + " " + "with data from related " + relationship + " " + sourceId + " " + - "because the related " + relationship + " has apparently changed (was: " + previousSourceIdsForRelationship + "), " + + "because the related " + relationship + " has apparently changed (was: " + previousSourceIds + "), " + "but mutations of relationships used with `sourced_from` are not supported because " + "allowing it could break ElasticGraph's out-of-order processing guarantees." - ); + ); } - Number maybeDocVersion = relationshipVersionsMap.get(sourceId); - - // Our JSON schema requires event versions to be non-negative, so we can safely use Long.MIN_VALUE as a stand-in when the value is null. + // Check that the event version is newer than what we've already seen. + Number maybeDocVersion = versionsMap.get(sourceId); long docVersion = maybeDocVersion == null ? Long.MIN_VALUE : maybeDocVersion.longValue(); if (docVersion >= eventVersion) { @@ -44,6 +157,7 @@ void validateSource(Map source, String id, String relationship, String sourceId, } } +// Applies top-level fields to the document via putAll, and merges __counts. void applyTopLevelFields(Map source, String id, Map topLevelFields, Map counts) { source.id = id; source.putAll(topLevelFields); @@ -53,8 +167,50 @@ void applyTopLevelFields(Map source, String id, Map topLevelFields, Map counts) } } -void recordSource(Map source, String relationship, String sourceId, long eventVersion) { - source.__versions[relationship][sourceId] = eventVersion; +// Stores nested sourced fields in the __nested_sourced_data buffer for later application. +void storeNestedSourcedData(Map source, String relationship, Map nestedSourcedFields, String nestedElementKey) { + if (nestedSourcedFields.isEmpty()) { + return; + } + + ((Map) source.__nested_sourced_data[relationship]).put(nestedElementKey, nestedSourcedFields); +} + +// Applies nested sourced data from the __nested_sourced_data buffer to matched nested elements. +// Reads path config from the nestedSourcedPaths param. +// Called after every event so that after a self-event's putAll overwrites nested arrays, +// the buffered data gets re-applied. +void applyNestedSourcedData(Map source, Map nestedSourcedPaths) { + if (source.__nested_sourced_data == null) { + return; + } + + for (sourcedEntry in source.__nested_sourced_data.entrySet()) { + String relationship = (String) sourcedEntry.getKey(); + Map dataByKey = (Map) sourcedEntry.getValue(); + List pathSegments = (List) nestedSourcedPaths.get(relationship); + + if (pathSegments == null || dataByKey == null) { + continue; + } + + for (elementEntry in dataByKey.entrySet()) { + List keyParts = decodeKey((String) elementEntry.getKey()); + if (keyParts.size() != pathSegments.size()) { + continue; + } + + Map target = (Map) navigateToNestedElement(source, pathSegments, keyParts); + if (target != null) { + target.putAll((Map) elementEntry.getValue()); + } + } + } +} + +// Records the event version in __versions and adds the relationship to __sources. +void recordSource(Map source, String versionsKey, String relationship, String sourceId, long eventVersion) { + source.__versions[versionsKey][sourceId] = eventVersion; // Record the relationship in `__sources` if it's not already there. We maintain it as an append-only set using a sorted list. // This ensures deterministic ordering of its elements regardless of event ingestion order, and lets us check membership in O(log N) time. @@ -71,15 +227,26 @@ void recordSource(Map source, String relationship, String sourceId, long eventVe } } -// --- Main script body --- // +// ============================================================ +// Main Execution +// ============================================================ + Map source = ctx._source; String id = params.id; String relationship = params.relationship; String sourceId = params.sourceId; -long eventVersion = (long) params.version; // Cast to long since JSON parses numbers as doubles +long eventVersion = (long) params.version; Map counts = params.__counts; +Map nestedSourcedFields = params.nestedSourcedFields; +Map nestedSourcedPathIdentifiers = params.nestedSourcedPathIdentifiers; +Map nestedSourcedPaths = params.nestedSourcedPaths; + +String nestedElementKey = buildNestedElementKey(relationship, nestedSourcedPaths, nestedSourcedPathIdentifiers); +String versionsKey = buildVersionsKey(relationship, nestedElementKey); -setup(source, relationship, counts); -validateSource(source, id, relationship, sourceId, eventVersion); +setup(source, versionsKey, relationship, nestedElementKey, counts); +validateSource(source, id, relationship, sourceId, eventVersion, versionsKey); applyTopLevelFields(source, id, params.topLevelFields, counts); -recordSource(source, relationship, sourceId, eventVersion); +storeNestedSourcedData(source, relationship, nestedSourcedFields, nestedElementKey); +applyNestedSourcedData(source, nestedSourcedPaths); +recordSource(source, versionsKey, relationship, sourceId, eventVersion); diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/factory.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/factory.rbs index e40b2fa50..10b39933b 100644 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/factory.rbs +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/factory.rbs @@ -112,7 +112,8 @@ module ElasticGraph cardinality: SchemaElements::Relationship::cardinality, related_type: SchemaElements::TypeReference, foreign_key: ::String, - direction: SchemaElements::foreignKeyDirection + direction: SchemaElements::foreignKeyDirection, + ?indexing_only: bool ) -> SchemaElements::Relationship @@relationship_new: ::Method diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/index.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/index.rbs index 0dc296381..e898655a7 100644 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/index.rbs +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/index.rbs @@ -8,8 +8,10 @@ module ElasticGraph attr_reader rollover_config: RolloverConfig? attr_reader has_had_multiple_sources_flag: bool attr_reader indexed_type: indexableType + attr_reader nested_sourced_paths: ::Hash[::String, ::Array[SchemaArtifacts::RuntimeMetadata::nestedSourcedPathSegment]] def uses_custom_routing?: () -> bool + def register_nested_sourced_paths: (::String, ::Array[SchemaArtifacts::RuntimeMetadata::nestedSourcedPathSegment]) -> void def to_index_config: () -> ::Hash[::String, untyped] def to_index_template_config: () -> ::Hash[::String, untyped] def runtime_metadata: () -> SchemaArtifacts::RuntimeMetadata::IndexDefinition diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/nested_relationship_chain_resolver.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/nested_relationship_chain_resolver.rbs new file mode 100644 index 000000000..44c487038 --- /dev/null +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/nested_relationship_chain_resolver.rbs @@ -0,0 +1,37 @@ +module ElasticGraph + module SchemaDefinition + module Indexing + class NestedRelationshipChainResolver + @schema_def_state: State + + def initialize: (schema_def_state: State) -> void + def resolve: (SchemaElements::Relationship, untyped) -> [ResolvedNestedChain?, ::Array[::String]] + + private + + def resolve_chain: (SchemaElements::Relationship, untyped, ::Array[PathSegment], ::Array[::String], ::Set[::String]) -> [SchemaElements::Relationship, untyped] + def validate_link: (SchemaElements::Relationship, untyped, SchemaElements::Relationship::ParentRef, ::Array[::String], ::Set[::String]) -> [untyped, untyped] + def build_path_segment: (SchemaElements::Relationship, untyped, untyped, ::Array[PathSegment], ::Array[::String]) -> void + def find_embedding_field: (untyped, untyped, ::Array[::String]) -> SchemaElements::Field? + def rel_description: (untyped, SchemaElements::Relationship) -> ::String + end + + class ResolvedNestedChain + attr_reader root_indexed_type: untyped + attr_reader path_segments: ::Array[PathSegment] + attr_reader root_relationship: SchemaElements::Relationship + + def initialize: (root_indexed_type: untyped, path_segments: ::Array[PathSegment], root_relationship: SchemaElements::Relationship) -> void + end + + class PathSegment + attr_reader parent_type: untyped + attr_reader embedding_field: SchemaElements::Field + attr_reader match_field: ::String + attr_reader source_field: ::String + + def initialize: (parent_type: untyped, embedding_field: SchemaElements::Field, match_field: ::String, source_field: ::String) -> void + end + end + end +end diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rbs new file mode 100644 index 000000000..e5343ad4e --- /dev/null +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/nested_update_target_resolver.rbs @@ -0,0 +1,43 @@ +module ElasticGraph + module SchemaDefinition + module Indexing + class NestedUpdateTargetResolver + @object_type: untyped + @relationship: SchemaElements::Relationship + @sourced_fields: ::Array[SchemaElements::Field] + @resolved_chain: ResolvedNestedChain + @field_path_resolver: SchemaElements::FieldPath::Resolver + @schema_def_state: State + @related_type: untyped + + def initialize: ( + object_type: untyped, + relationship: SchemaElements::Relationship, + sourced_fields: ::Array[SchemaElements::Field], + resolved_chain: ResolvedNestedChain, + field_path_resolver: SchemaElements::FieldPath::Resolver, + schema_def_state: State + ) -> void + + def resolve: () { (untyped, SchemaElements::Relationship, ::Array[SchemaArtifacts::RuntimeMetadata::nestedSourcedPathSegment]) -> void } -> [SchemaArtifacts::RuntimeMetadata::UpdateTarget?, ::Array[::String]] + + private + + attr_reader object_type: untyped + attr_reader relationship: SchemaElements::Relationship + attr_reader sourced_fields: ::Array[SchemaElements::Field] + attr_reader resolved_chain: ResolvedNestedChain + attr_reader field_path_resolver: SchemaElements::FieldPath::Resolver + attr_reader schema_def_state: State + + def related_type: () -> untyped + def resolve_nested_sourced_data_params: (::Array[::String]) -> SchemaArtifacts::RuntimeMetadata::paramsHash + def build_path_identifier_params: () -> SchemaArtifacts::RuntimeMetadata::paramsHash + def build_nested_sourced_paths: () -> ::Array[SchemaArtifacts::RuntimeMetadata::nestedSourcedPathSegment] + def resolve_routing: (::Array[::String]) -> untyped + def resolve_rollover: (::Array[::String]) -> untyped + def validate_has_had_multiple_sources: (::Array[::String]) -> void + end + end + end +end diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/sourced_update_targets_resolver.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/sourced_update_targets_resolver.rbs new file mode 100644 index 000000000..9eb131fc2 --- /dev/null +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/sourced_update_targets_resolver.rbs @@ -0,0 +1,22 @@ +module ElasticGraph + module SchemaDefinition + module Indexing + class SourcedUpdateTargetsResolver + @schema_def_state: State + @sourced_field_errors: ::Array[::String] + @relationship_errors: ::Array[::String] + @sourced_update_targets_by_type_name: ::Hash[::String, ::Array[SchemaArtifacts::RuntimeMetadata::UpdateTarget]] + + def initialize: (schema_def_state: State) -> void + def resolve: () -> ::Hash[::String, ::Array[SchemaArtifacts::RuntimeMetadata::UpdateTarget]] + + private + + def resolve_for_type: (untyped) -> void + def resolve_top_level_update_target: (untyped, untyped, ::Array[SchemaElements::Field]) -> void + def resolve_nested_update_targets: (untyped) -> void + def raise_if_errors: () -> void + end + end + end +end diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_factory.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_factory.rbs index 4375b8138..a950336a0 100644 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_factory.rbs +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/indexing/update_target_factory.rbs @@ -8,7 +8,8 @@ module ElasticGraph id_source: ::String, top_level_fields_params: SchemaArtifacts::RuntimeMetadata::paramsHash, routing_value_source: ::String?, - rollover_timestamp_value_source: ::String? + rollover_timestamp_value_source: ::String?, + ?nested_sourced_data_params: SchemaArtifacts::RuntimeMetadata::NestedSourcedDataParams ) -> SchemaArtifacts::RuntimeMetadata::UpdateTarget private diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/results.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/results.rbs index c35f971f2..5de76d5e4 100644 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/results.rbs +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/results.rbs @@ -39,7 +39,6 @@ module ElasticGraph def generate_datastore_config: () -> ::Hash[::String, untyped] def build_dynamic_scripts: () -> ::Array[Scripting::Script] def build_runtime_metadata: () -> SchemaArtifacts::RuntimeMetadata::Schema - def identify_extra_update_targets_by_object_type_name: () -> ::Hash[::String, ::Array[SchemaArtifacts::RuntimeMetadata::UpdateTarget]] def generate_sdl: () -> ::String def build_public_json_schema: () -> ::Hash[::String, untyped] def json_schema_indexing_field_types_by_name: () -> ::Hash[::String, Indexing::_FieldType] diff --git a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/schema_elements/relationship.rbs b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/schema_elements/relationship.rbs index ee1d03122..6fa82a6fd 100644 --- a/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/schema_elements/relationship.rbs +++ b/elasticgraph-schema_definition/sig/elastic_graph/schema_definition/schema_elements/relationship.rbs @@ -8,25 +8,42 @@ module ElasticGraph class Relationship < RelationshipSupertype type cardinality = :one | :many attr_reader related_type: TypeReference + attr_reader foreign_key: ::String attr_accessor hide_relationship_runtime_metadata: bool + attr_reader parent_ref: Relationship::ParentRef? + attr_reader indexing_only: bool @cardinality: cardinality @related_type: TypeReference @foreign_key: ::String @direction: foreignKeyDirection + @indexing_only: bool @equivalent_field_paths_by_local_path: ::Hash[::String, ::String] @additional_filter: ::Hash[::String, untyped] + @parent_ref: Relationship::ParentRef? + + class ParentRefSuperType + attr_reader type_ref: TypeReference + attr_reader relationship_name: ::String + + def initialize: (type_ref: TypeReference, relationship_name: ::String) -> void + end + + class ParentRef < ParentRefSuperType + end def initialize: ( Field, cardinality: cardinality, related_type: TypeReference, foreign_key: ::String, - direction: foreignKeyDirection + direction: foreignKeyDirection, + ?indexing_only: bool ) -> void def additional_filter: (::Hash[::String, untyped]) -> void def equivalent_field: (::String, ?locally_named: ::String) -> void + def parent_relationship: (::String, ::String) -> void def routing_value_source_for_index: [T] (Indexing::Index) { (::String) -> bot } -> ::String? def rollover_timestamp_value_source_for_index: [T] (Indexing::Index) { (::String) -> bot } -> ::String? def validate_equivalent_fields: (SchemaElements::FieldPath::Resolver) -> ::Array[::String] diff --git a/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/datastore_config/index_mappings/miscellaneous_spec.rb b/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/datastore_config/index_mappings/miscellaneous_spec.rb index 0e0922eb6..7d9523044 100644 --- a/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/datastore_config/index_mappings/miscellaneous_spec.rb +++ b/elasticgraph-schema_definition/spec/unit/elastic_graph/schema_definition/datastore_config/index_mappings/miscellaneous_spec.rb @@ -347,7 +347,7 @@ module SchemaDefinition mapping = generate_mapping.call(graphql_only: true) # Verify that it does not have a property for `size` or `options.size` - expect(mapping.fetch("properties").keys).to contain_exactly("id", "options", "__sources", "__versions", "__typename") + expect(mapping.fetch("properties").keys).to contain_exactly("id", "options", "__nested_sourced_data", "__sources", "__versions", "__typename") expect(mapping.fetch("properties")).to include({ "id" => {"type" => "keyword"}, "options" => { diff --git a/elasticgraph-support/lib/elastic_graph/constants.rb b/elasticgraph-support/lib/elastic_graph/constants.rb index f0449d88c..b749cbf69 100644 --- a/elasticgraph-support/lib/elastic_graph/constants.rb +++ b/elasticgraph-support/lib/elastic_graph/constants.rb @@ -140,7 +140,7 @@ module ElasticGraph # # Note: this constant is automatically kept up-to-date by our `schema_artifacts:dump` rake task. # @private - INDEX_DATA_UPDATE_SCRIPT_ID = "update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d" + INDEX_DATA_UPDATE_SCRIPT_ID = "update_index_data_079bafcf4d739acd8659a631377fa9c8" # When an update script has a no-op result we often want to communicate more information about # why it was a no-op back to ElatsicGraph from the script. The only way to do that is to throw diff --git a/spec_support/lib/elastic_graph/spec_support/runtime_metadata_support.rb b/spec_support/lib/elastic_graph/spec_support/runtime_metadata_support.rb index 705031d12..4d156166e 100644 --- a/spec_support/lib/elastic_graph/spec_support/runtime_metadata_support.rb +++ b/spec_support/lib/elastic_graph/spec_support/runtime_metadata_support.rb @@ -72,6 +72,7 @@ def derived_indexing_update_target_with( routing_value_source: routing_value_source, rollover_timestamp_value_source: rollover_timestamp_value_source, top_level_fields_params: top_level_fields_params, + nested_sourced_data_params: NestedSourcedDataParams::EMPTY, metadata_params: metadata_params ) end @@ -93,6 +94,7 @@ def normal_indexing_update_target_with( routing_value_source: routing_value_source, rollover_timestamp_value_source: rollover_timestamp_value_source, top_level_fields_params: top_level_fields_params, + nested_sourced_data_params: NestedSourcedDataParams::EMPTY, metadata_params: metadata_params ) end