-
Notifications
You must be signed in to change notification settings - Fork 12
task: Add query transformer #588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
* feat: basic eventstore completed * fix: workflow and activity state cleanup * refactor: move triggers to models * fix: Fixed UI bug * style: fix minor comment typo * feat: Updated to use programmatic subscription with manifest file * Merge branch 'main' into APP-6781-event-based-workflows * chore: minor fix post-merge * fix: general bug fixes * fix: general bug fixes * fix: Adding support for dropping events when no matcher is found * fix: Bug fixes * fix: fixes for http bindings * Merge remote-tracking branch 'origin/APP-6781-event-based-workflows' into APP-6781-event-based-workflows * fix: precommits * fix: event naming * fix: unit tests * fix: unit tests * fix: application->server * fix: Removed the example sample app to move them to sample-apps repo * chore: Removing unneeded code * fix: unit tests * fix: minor PR fixes * chore: Updated eventstore name to be retrieved from env * fix: Minor PR fixes * fix: Minor fix self.event_subscriptions never set
🎉 Snyk checks have passed. No issues have been found so far.✅ security/snyk check is complete. No issues have been found. (View Details) ✅ license/snyk check is complete. No issues have been found. (View Details) ✅ code/snyk check is complete. No issues have been found. (View Details) |
📜 Docstring Coverage ReportRESULT: PASSED (minimum: 30.0%, actual: 72.5%) Detailed Coverage Report |
📦 Trivy Vulnerability Scan Results
Report Summary
Scan Result Details✅ No vulnerabilities found during the scan for |
📦 Trivy Secret Scan Results
Report Summary
Scan Result Details✅ No secrets found during the scan for |
|
🛠 Docs available at: https://k.atlan.dev/application-sdk/add_query_transformer |
☂️ Python Coverage
Overall Coverage
New FilesNo new covered files... Modified Files
|
|
🛠 Full Test Coverage Report: https://k.atlan.dev/coverage/application-sdk/pr/588 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Adds support for Atlas Query transformation and enhances event-driven workflow triggers in the FastAPI server.
- Introduces a new
Querytransformer class and its YAML template for Atlas entities. - Refactors event publishing (
EventStore.publish_event) and updates server to use Dapr-based event routing withEventWorkflowTrigger. - Updates tests to exercise the new event endpoint and adjusts project tooling (Dapr port).
Reviewed Changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/unit/server/fastapi/test__init__.py | Refactored event-trigger tests to use httpx client |
| pyproject.toml | Changed Dapr app port from 3000 to 8000 |
| application_sdk/transformers/query/templates/query.yaml | Added Atlas Query entity template |
| application_sdk/transformers/atlas/sql.py | Added Query transformer class |
| application_sdk/test_utils/hypothesis/strategies/server/fastapi/init.py | Switched event strategy from specific to generic |
| application_sdk/server/fastapi/models.py | Added EventWorkflowRequest/Response models |
| application_sdk/server/fastapi/init.py | Overhauled event routing, Dapr subscriptions, drop |
| application_sdk/outputs/eventstore.py | Replaced create_event with publish_event and enrich metadata |
| application_sdk/constants.py | Introduced EVENT_STORE_NAME constant |
Comments suppressed due to low confidence (4)
tests/unit/server/fastapi/test__init__.py:96
- [nitpick] The test docstring still refers to "hypothesis generated event data" although Hypothesis is no longer used; please update or remove the misleading docstring.
async def test_event_trigger_success(self):
docs/docs/concepts/server.md:28
- The description no longer mentions
event_id,event_filters, or routing via Dapr; please update to reflect the new properties ofEventWorkflowTrigger.
* `EventWorkflowTrigger`: Triggers a workflow based on incoming events
application_sdk/test_utils/hypothesis/strategies/server/fastapi/init.py:64
- Using
st.builds(Event, event_type=..., event_name=..., data={})will pass those as positional args after metadata, likely resulting in an invalid constructor call; consider supplying a defaultmetadataargument or using keyword-only builds.
Event,
application_sdk/transformers/atlas/sql.py:1244
- [nitpick] The transformer reads
obj.get("querytxt")but the field name is inconsistent with typical SQL conventions; please confirm the correct input key or rename to match the template'slongRawQueryattribute.
attributes["longRawQuery"] = obj.get("querytxt")
| start-dapr = "dapr run --enable-api-logging --log-level debug --app-id app --app-port 8000 --dapr-http-port 3500 --dapr-grpc-port 50001 --dapr-http-max-request-size 1024 --resources-path components" | ||
| start-temporal = "temporal server start-dev --db-filename /tmp/temporal.db" | ||
| start-deps.shell = "poe start-dapr & poe start-temporal &" | ||
| stop-deps.shell = "lsof -ti:3000,3500,7233,50001 | xargs kill -9 2>/dev/null || true" |
Copilot
AI
Jun 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Dapr stop-deps command still targets port 3000, but the app now runs on 8000; update the stop-deps shell task to kill the correct port.
| stop-deps.shell = "lsof -ti:3000,3500,7233,50001 | xargs kill -9 2>/dev/null || true" | |
| stop-deps.shell = "lsof -ti:8000,3500,7233,50001 | xargs kill -9 2>/dev/null || true" |
| ) | ||
|
|
||
| app.include_router(workflow_router, prefix="/workflows/v1") | ||
| self.app.include_router(self.workflow_router, prefix="/workflows/v1") |
Copilot
AI
Jun 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Including the same router inside each trigger registration can lead to duplicate routes; move include_router calls out of the loop to a single centralized inclusion.
| self.app.include_router(self.workflow_router, prefix="/workflows/v1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Attribute Mapping Fails, Data Duplicates
The Query.get_attributes method duplicates data in custom_attributes. The loop intended for unmapped fields incorrectly adds fields already mapped to attributes (e.g., "querytxt", "dbname", "schemaname"). This occurs because the check k not in attributes fails: input keys are mapped to different attribute names (e.g., "longRawQuery", "defaultDatabaseQualifiedName"), so the original input keys are not found in the attributes dictionary, leading to their re-addition to custom_attributes.
application_sdk/transformers/atlas/sql.py#L1274-L1278
application-sdk/application_sdk/transformers/atlas/sql.py
Lines 1274 to 1278 in 9f0b35b
| # Add any other fields not mapped above to custom_attributes | |
| for k, v in obj.items(): | |
| if k not in attributes and k not in custom_attributes: | |
| custom_attributes[k] = v |
Bug: Missing Fundamental Attributes in Query Entity
The Query.get_attributes method is missing the required Atlas entity attributes 'name' and 'qualified_name'. These are fundamental attributes consistently set by all other entity transformers (Database, Schema, Table, Column, Function, Procedure, TagAttachment), and their omission will likely cause issues when creating Atlas Query entities.
application_sdk/transformers/atlas/sql.py#L1239-L1284
application-sdk/application_sdk/transformers/atlas/sql.py
Lines 1239 to 1284 in 9f0b35b
| attributes = {} | |
| custom_attributes = {} | |
| # Map fields from input to Query entity attributes | |
| attributes["longRawQuery"] = obj.get("querytxt") | |
| attributes["rawQueryText"] = obj.get("querytxt") | |
| attributes["defaultDatabaseQualifiedName"] = obj.get("dbname") | |
| attributes["defaultSchemaQualifiedName"] = obj.get("schemaname") | |
| attributes["parentQualifiedName"] = obj["parentQualifiedName"] | |
| attributes["collectionQualifiedName"] = obj["collectionQualifiedName"] | |
| attributes["isPrivate"] = obj.get("isPrivate") | |
| attributes["isSqlSnippet"] = obj.get("isSqlSnippet") | |
| attributes["isVisualQuery"] = obj.get("isVisualQuery") | |
| attributes["visualBuilderSchemaBase64"] = obj.get( | |
| "visualBuilderSchemaBase64" | |
| ) | |
| attributes["variablesSchemaBase64"] = obj.get("variablesSchemaBase64") | |
| # Deprecated field for backward compatibility | |
| attributes["rawQuery"] = obj.get("rawQuery") | |
| # Place username and sessionid in custom_attributes | |
| if obj.get("username"): | |
| custom_attributes["username"] = obj["username"] | |
| if obj.get("sessionid"): | |
| custom_attributes["sessionid"] = obj["sessionid"] | |
| if obj.get("starttime"): | |
| custom_attributes["starttime"] = obj["starttime"] | |
| if obj.get("endtime"): | |
| custom_attributes["endtime"] = obj["endtime"] | |
| if obj.get("session_starttime"): | |
| custom_attributes["session_starttime"] = obj["session_starttime"] | |
| if obj.get("session_endtime"): | |
| custom_attributes["session_endtime"] = obj["session_endtime"] | |
| # Add any other fields not mapped above to custom_attributes | |
| for k, v in obj.items(): | |
| if k not in attributes and k not in custom_attributes: | |
| custom_attributes[k] = v | |
| return { | |
| "attributes": attributes, | |
| "custom_attributes": custom_attributes, | |
| "entity_class": Query, | |
| } |
Was this report helpful? Give feedback by reacting with 👍 or 👎
Changelog
Additional context (e.g. screenshots, logs, links)
Checklist
Copyleft License Compliance