Skip to content

Commit 1c30528

Browse files
removing statestore dependency
1 parent 09485ca commit 1c30528

File tree

5 files changed

+98
-36
lines changed

5 files changed

+98
-36
lines changed

application_sdk/activities/metadata_extraction/sql.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,11 +136,19 @@ async def _set_state(self, workflow_args: Dict[str, Any]):
136136
handler = self.handler_class(sql_client)
137137
self._state[workflow_id].handler = handler
138138

139+
# Handle credentials - support both StateStore and direct approaches
139140
if "credential_guid" in workflow_args:
141+
# StateStore approach - load credentials from SecretStore
140142
credentials = SecretStoreInput.extract_credentials(
141143
workflow_args["credential_guid"]
142144
)
143145
await sql_client.load(credentials)
146+
elif "credentials" in workflow_args:
147+
# Direct approach - use credentials directly from workflow_args
148+
credentials = workflow_args["credentials"]
149+
await sql_client.load(credentials)
150+
else:
151+
logger.warning("No credentials found in workflow_args")
144152

145153
self._state[workflow_id].sql_client = sql_client
146154

application_sdk/clients/temporal.py

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from application_sdk.clients.workflow import WorkflowClient
2424
from application_sdk.constants import (
2525
APPLICATION_NAME,
26+
DEPLOYMENT_NAME,
2627
MAX_CONCURRENT_ACTIVITIES,
2728
WORKFLOW_AUTH_ENABLED,
2829
WORKFLOW_HOST,
@@ -265,13 +266,16 @@ def __init__(
265266
def get_worker_task_queue(self) -> str:
266267
"""Get the worker task queue name.
267268
268-
The task queue name is derived from the application name and is used
269-
to route workflow tasks to appropriate workers.
269+
The task queue name is derived from the application name and deployment name
270+
and is used to route workflow tasks to appropriate workers.
270271
271272
Returns:
272-
str: The task queue name, which is the same as the application name.
273+
str: The task queue name in format "app_name-deployment_name".
273274
"""
274-
return self.application_name
275+
if DEPLOYMENT_NAME:
276+
return f"{self.application_name}-{DEPLOYMENT_NAME}"
277+
else:
278+
return self.application_name
275279

276280
def get_connection_string(self) -> str:
277281
"""Get the Temporal server connection string.
@@ -307,10 +311,13 @@ async def load(self) -> None:
307311
ConnectionError: If connection to the Temporal server fails.
308312
ValueError: If authentication is enabled but credentials are missing.
309313
"""
314+
# Set TLS based on host - disable for localhost/127.0.0.1, enable for others
315+
tls_enabled = self.host not in ["127.0.0.1", "localhost"]
316+
310317
connection_options: Dict[str, Any] = {
311318
"target_host": self.get_connection_string(),
312319
"namespace": self.namespace,
313-
"tls": True,
320+
"tls": tls_enabled,
314321
}
315322

316323
if self.auth_enabled:
@@ -355,12 +362,24 @@ async def start_workflow(
355362
WorkflowFailureError: If the workflow fails to start.
356363
ValueError: If the client is not loaded.
357364
"""
365+
# Check if credentials should be stored based on credentialSource
366+
should_store_credentials = False
358367
if "credentials" in workflow_args:
359-
# remove credentials from workflow_args and add reference to credentials
360-
workflow_args["credential_guid"] = SecretStoreOutput.store_credentials(
361-
workflow_args["credentials"]
362-
)
363-
del workflow_args["credentials"]
368+
credential_source = workflow_args["credentials"].get("credentialSource", "")
369+
should_store_credentials = credential_source == "direct"
370+
371+
if should_store_credentials:
372+
# Only store credentials if credentialSource is "direct"
373+
workflow_args["credential_guid"] = SecretStoreOutput.store_credentials(
374+
workflow_args["credentials"]
375+
)
376+
del workflow_args["credentials"]
377+
else:
378+
# For non-direct credential sources, keep credentials in workflow_args
379+
# but don't store them in SecretStore
380+
logger.info(
381+
f"Skipping credential storage for credentialSource: {credential_source}"
382+
)
364383

365384
workflow_id = workflow_args.get("workflow_id")
366385
output_prefix = workflow_args.get("output_prefix", "/tmp/output")
@@ -376,19 +395,36 @@ async def start_workflow(
376395
}
377396
)
378397

398+
# Determine configuration approach based on credential storage
399+
if should_store_credentials:
400+
# StateStore approach - store configuration and pass only workflow_id with flag
379401
StateStoreOutput.store_configuration(workflow_id, workflow_args)
380-
381-
logger.info(f"Created workflow config with ID: {workflow_id}")
402+
args = [{"workflow_id": workflow_id, "_use_statestore": True}]
403+
logger.info(
404+
f"Created workflow config with ID: {workflow_id} (StateStore approach)"
405+
)
406+
else:
407+
# Direct approach - pass full configuration with flag
408+
workflow_args["_use_statestore"] = False
409+
args = [workflow_args]
410+
logger.info(f"Created workflow with ID: {workflow_id} (direct approach)")
382411

383412
try:
384-
# Pass the full workflow_args to the workflow
413+
# Get task_queue from workflow_args or credentials or use default
414+
task_queue = (
415+
workflow_args.get("task_queue")
416+
or workflow_args.get("credentials", {}).get("task_queue")
417+
or self.worker_task_queue
418+
)
419+
420+
# Pass the conditional args to the workflow
385421
if not self.client:
386422
raise ValueError("Client is not loaded")
387423
handle = await self.client.start_workflow(
388424
workflow_class, # type: ignore
389-
args=[{"workflow_id": workflow_id}],
425+
args=args,
390426
id=workflow_id,
391-
task_queue=self.worker_task_queue,
427+
task_queue=task_queue,
392428
cron_schedule=workflow_args.get("cron_schedule", ""),
393429
execution_timeout=WORKFLOW_MAX_TIMEOUT_HOURS,
394430
)

application_sdk/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
#: Name of the application, used for identification
3232
APPLICATION_NAME = os.getenv("ATLAN_APPLICATION_NAME", "default")
3333
#: Name of the deployment, used to distinguish between different deployments of the same application
34-
DEPLOYMENT_NAME = os.getenv("ATLAN_DEPLOYMENT_NAME", "")
34+
DEPLOYMENT_NAME = os.getenv("ATLAN_DEPLOYMENT_NAME", "atlan")
3535
#: Host address for the application's HTTP server
3636
APP_HOST = str(os.getenv("ATLAN_APP_HTTP_HOST", "localhost"))
3737
#: Port number for the application's HTTP server

application_sdk/workflows/__init__.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -64,23 +64,32 @@ async def run(self, workflow_config: Dict[str, Any]) -> None:
6464
"""Run the workflow with the given configuration.
6565
6666
This method provides the base implementation for workflow execution. It:
67-
1. Extracts workflow configuration from the state store
67+
1. Extracts workflow configuration from the state store or uses direct configuration
6868
2. Sets up workflow run ID and retry policy
6969
3. Executes the preflight check activity
7070
7171
Args:
72-
workflow_config (Dict[str, Any]): Includes workflow_id and other parameters
73-
workflow_id is used to extract the workflow configuration from the
74-
state store.
72+
workflow_config (Dict[str, Any]): Either includes workflow_id and _use_statestore flag
73+
for StateStore approach, or contains the complete workflow configuration for
74+
direct approach.
7575
"""
76-
# Get the workflow configuration from the state store
77-
workflow_args: Dict[str, Any] = await workflow.execute_activity_method(
78-
self.activities_cls.get_workflow_args,
79-
workflow_config, # Pass the whole config containing workflow_id
80-
retry_policy=RetryPolicy(maximum_attempts=3, backoff_coefficient=2),
81-
start_to_close_timeout=self.default_start_to_close_timeout,
82-
heartbeat_timeout=self.default_heartbeat_timeout,
83-
)
76+
# Determine configuration approach based on _use_statestore flag
77+
use_statestore = workflow_config.get("_use_statestore", False)
78+
79+
if use_statestore:
80+
# StateStore approach - retrieve workflow configuration from state store
81+
workflow_args: Dict[str, Any] = await workflow.execute_activity_method(
82+
self.activities_cls.get_workflow_args,
83+
workflow_config, # Pass the whole config containing workflow_id
84+
retry_policy=RetryPolicy(maximum_attempts=3, backoff_coefficient=2),
85+
start_to_close_timeout=self.default_start_to_close_timeout,
86+
heartbeat_timeout=self.default_heartbeat_timeout,
87+
)
88+
logger.info("Retrieved workflow configuration from StateStore")
89+
else:
90+
# Direct approach - use passed configuration directly
91+
workflow_args = workflow_config
92+
logger.info("Using workflow configuration passed directly")
8493

8594
logger.info("Starting workflow execution")
8695

application_sdk/workflows/metadata_extraction/sql.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -190,16 +190,25 @@ async def run(self, workflow_config: Dict[str, Any]) -> None:
190190
workflow_success = False
191191

192192
try:
193+
# Let the base workflow handle the hybrid approach and preflight check
193194
await super().run(workflow_config)
194195

195-
# Get the workflow configuration from the state store
196-
workflow_args: Dict[str, Any] = await workflow.execute_activity_method(
197-
self.activities_cls.get_workflow_args,
198-
workflow_config, # Pass the whole config containing workflow_id
199-
retry_policy=RetryPolicy(maximum_attempts=3, backoff_coefficient=2),
200-
start_to_close_timeout=self.default_start_to_close_timeout,
201-
heartbeat_timeout=self.default_heartbeat_timeout,
202-
)
196+
# For the direct approach, use workflow_config as workflow_args
197+
# For the StateStore approach, we need to retrieve it again
198+
use_statestore = workflow_config.get("_use_statestore", False)
199+
200+
if use_statestore:
201+
# StateStore approach - retrieve workflow configuration from state store
202+
workflow_args: Dict[str, Any] = await workflow.execute_activity_method(
203+
self.activities_cls.get_workflow_args,
204+
workflow_config,
205+
retry_policy=RetryPolicy(maximum_attempts=3, backoff_coefficient=2),
206+
start_to_close_timeout=self.default_start_to_close_timeout,
207+
heartbeat_timeout=self.default_heartbeat_timeout,
208+
)
209+
else:
210+
# Direct approach - use passed configuration directly
211+
workflow_args = workflow_config
203212

204213
workflow_run_id = workflow.info().run_id
205214
workflow_args["workflow_run_id"] = workflow_run_id

0 commit comments

Comments
 (0)