Skip to content

APP-5362 : feat: integrate uvloop for improved asyncio performance #179

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion application_sdk/activities/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
from functools import wraps
from typing import Any, Awaitable, Callable, Optional, TypeVar, cast

import uvloop
from temporalio import activity

from application_sdk.common.logger_adaptors import get_logger

logger = get_logger(__name__)

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

F = TypeVar("F", bound=Callable[..., Awaitable[Any]])

Expand Down
3 changes: 2 additions & 1 deletion application_sdk/application/fastapi/routers/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
import uuid

import psutil
import uvloop
from fastapi import APIRouter, status
from fastapi.responses import JSONResponse

from application_sdk.common.logger_adaptors import get_logger

logger = get_logger(__name__)

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
router = APIRouter(
prefix="/server",
tags=["server"],
Expand Down
2 changes: 2 additions & 0 deletions application_sdk/clients/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from enum import Enum
from typing import Any, Dict, List

import uvloop
from sqlalchemy import create_engine, text
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine, create_async_engine
from temporalio import activity
Expand All @@ -19,6 +20,7 @@
from application_sdk.common.logger_adaptors import get_logger

activity.logger = get_logger(__name__)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


class SQLConstants(Enum):
Expand Down
3 changes: 2 additions & 1 deletion application_sdk/decorators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
from typing import Any, Callable, Dict, Iterator, Optional, Union

import pandas as pd
import uvloop

from application_sdk.activities import ActivitiesState
from application_sdk.common.logger_adaptors import get_logger
from application_sdk.inputs import Input

logger = get_logger(__name__)

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

executor = ThreadPoolExecutor()

Expand Down
2 changes: 2 additions & 0 deletions application_sdk/handlers/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, Dict, List, Optional, Set, Tuple

import pandas as pd
import uvloop

from application_sdk.application.fastapi.models import MetadataType
from application_sdk.clients.sql import SQLClient
Expand All @@ -13,6 +14,7 @@
from application_sdk.inputs.sql_query import SQLQueryInput

logger = get_logger(__name__)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


class SQLConstants(Enum):
Expand Down
2 changes: 2 additions & 0 deletions application_sdk/inputs/sql_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any, Dict, Iterator, Optional, Union

import pandas as pd
import uvloop
from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession
Expand All @@ -15,6 +16,7 @@
from application_sdk.inputs import Input

logger = get_logger(__name__)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


def _get_sql_query(
Expand Down
2 changes: 2 additions & 0 deletions application_sdk/outputs/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import orjson
import pyarrow as pa
import pyarrow.parquet as pq
import uvloop
from temporalio import activity

from application_sdk.common.logger_adaptors import get_logger
from application_sdk.outputs.objectstore import ObjectStoreOutput

activity.logger = get_logger(__name__)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


class ChunkedObjectStoreWriterInterface(ABC):
Expand Down
2 changes: 2 additions & 0 deletions application_sdk/test_utils/workflow_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
import time
from typing import Any, Optional

import uvloop
from temporalio.client import WorkflowExecutionStatus, WorkflowHandle

from application_sdk.clients.temporal import TemporalClient

logger = logging.getLogger(__name__)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


async def monitor_workflow_execution_and_write_status(
Expand Down
2 changes: 2 additions & 0 deletions application_sdk/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import threading
from typing import Any, List, Sequence

import uvloop
from temporalio.types import CallableType
Comment on lines +11 to 12
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential thread safety issues with uvloop initialization.

Setting the event loop policy at module level can lead to race conditions when using threading. The worker creates daemon threads that run their own event loops, which could conflict with the global policy setting.

Move the uvloop initialization into the worker thread:

-asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

 class Worker:
     async def start(self, daemon: bool = False, *args: Any, **kwargs: Any) -> None:
         if daemon:
             worker_thread = threading.Thread(
-                target=lambda: asyncio.run(self.start(daemon=False)), daemon=True
+                target=lambda: self._run_worker_with_uvloop(), daemon=True
             )
             worker_thread.start()
             return

+    def _run_worker_with_uvloop(self):
+        asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
+        asyncio.run(self.start(daemon=False))

Also applies to: 18-19


from application_sdk.clients.temporal import TemporalClient
from application_sdk.common.logger_adaptors import get_logger

logger = get_logger(__name__)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting the event loop policy sets the event loop at all the asyncio instances in the application sdk and in the apps, hence decided to keep it here as worker is used in all the apps



class Worker:
Expand Down
2 changes: 2 additions & 0 deletions application_sdk/workflows/metadata_extraction/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datetime import timedelta
from typing import Any, Callable, Coroutine, Dict, List, Sequence, Type

import uvloop
from temporalio import workflow
from temporalio.common import RetryPolicy

Expand All @@ -21,6 +22,7 @@
from application_sdk.workflows.metadata_extraction import MetadataExtractionWorkflow

workflow.logger = get_logger(__name__)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


@workflow.defn
Expand Down
2 changes: 2 additions & 0 deletions application_sdk/workflows/query_extraction/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datetime import timedelta
from typing import Any, Callable, Coroutine, Dict, List, Sequence, Type

import uvloop
from temporalio import workflow
from temporalio.common import RetryPolicy

Expand All @@ -20,6 +21,7 @@
from application_sdk.workflows.query_extraction import QueryExtractionWorkflow

logger = get_logger(__name__)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


@workflow.defn
Expand Down
3 changes: 3 additions & 0 deletions examples/application_custom_fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
import uuid
from typing import Any, Dict

import uvloop
from fastapi import APIRouter

from application_sdk.application.fastapi import FastAPIApplication, HttpWorkflowTrigger
from application_sdk.handlers import HandlerInterface
from application_sdk.workflows import WorkflowInterface

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


class CustomHandler(HandlerInterface):
async def load(self, **kwargs: Any) -> None:
Expand Down
4 changes: 4 additions & 0 deletions examples/application_fastapi.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import asyncio
from typing import Any, Dict, List

import uvloop

from application_sdk.application.fastapi import FastAPIApplication, HttpWorkflowTrigger
from application_sdk.handlers.sql import SQLHandler
from application_sdk.workflows import WorkflowInterface

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


class SampleSQLHandler(SQLHandler):
async def prepare(self, credentials: Dict[str, Any], **kwargs) -> None:
Expand Down
2 changes: 2 additions & 0 deletions examples/application_hello_world.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
from typing import Any, Callable, Dict, Sequence

import uvloop
from temporalio import workflow

from application_sdk.activities import ActivitiesInterface
Expand All @@ -12,6 +13,7 @@
APPLICATION_NAME = "hello-world"

logger = get_logger(__name__)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


@workflow.defn
Expand Down
3 changes: 3 additions & 0 deletions examples/application_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from typing import Any, Dict
from urllib.parse import quote_plus

import uvloop

from application_sdk.activities.metadata_extraction.sql import (
SQLMetadataExtractionActivities,
)
Expand All @@ -46,6 +48,7 @@
APPLICATION_NAME = "postgres"

logger = get_logger(__name__)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


class PostgreSQLClient(AsyncSQLClient):
Expand Down
4 changes: 3 additions & 1 deletion examples/application_sql_miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from typing import Any, Dict
from urllib.parse import quote_plus

import uvloop

from application_sdk.activities.query_extraction.sql import SQLQueryExtractionActivities
from application_sdk.clients.sql import SQLClient
from application_sdk.clients.temporal import TemporalClient
Expand All @@ -31,7 +33,7 @@
from application_sdk.workflows.query_extraction.sql import SQLQueryExtractionWorkflow

logger = get_logger(__name__)

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
APPLICATION_NAME = "snowflake"


Expand Down
2 changes: 2 additions & 0 deletions examples/application_sql_with_custom_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from typing import Any, Dict
from urllib.parse import quote_plus

import uvloop
from pyatlan.model.assets import Database

from application_sdk.activities.metadata_extraction.sql import (
Expand All @@ -52,6 +53,7 @@
DATABASE_DIALECT = "postgresql"

logger = get_logger(__name__)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


class PostgreSQLClient(AsyncSQLClient):
Expand Down
2 changes: 2 additions & 0 deletions examples/application_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import timedelta
from typing import Any, Callable, Dict, List

import uvloop
from temporalio import activity, workflow

from application_sdk.activities import ActivitiesInterface
Expand All @@ -21,6 +22,7 @@
from application_sdk.workflows import WorkflowInterface

logger = get_logger(__name__)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


class SampleActivities(ActivitiesInterface):
Expand Down
4 changes: 4 additions & 0 deletions examples/run_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import asyncio

import uvloop

from application_sdk.clients.temporal import TemporalClient
from application_sdk.test_utils.workflow_monitoring import run_and_monitor_workflow
from examples.application_hello_world import application_hello_world
Expand All @@ -13,6 +15,8 @@
application_sql_with_custom_transformer,
)

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


async def main():
temporal_client = TemporalClient()
Expand Down
Loading
Loading