Skip to content

Plugin architecture need to be slowly redesigned to fix the inherent fragileness of the current implementation #248

@Arkatufus

Description

@Arkatufus

Current infrastructure state:

PlainSource

JVM implementation

abstracy class BaseSingleSourceLogic

  • Extended by:
    • ExternalSingleSourceLogic
    • SingleSourceLogic
  • Extends class abstract akka.stream.GraphStageLogic
  • Mixin:
    • PromiseControl (trait with impure default method implementation)
      • Extends akka.stream.GraphStageLogic
      • Mixin:
        • scalasdl.Control (trait with pure default implementation)
    • abstract class akka.kafka.internal.MetricsControl: (not used in our implementation)
    • abstract class akka.kafka.internal.StageIdLogging: (not used in our implementation)
    • SourceLogicSubscription: (interface with (pure?) default implementation)
      Used to manage subscription AsyncCallbacks
      • Declares self: GraphStageLogic =>
    • MessageBuilder (pure trait)
    • SourceLogicBuffer: (trait with inpure default implementation) Used to buffer ConsumerRecored<K, V>
      • Declares self: GraphStageLogic with StageIdLogging =>

abstract class SingleSourceLogic

  • Extended by:
    • TransactionalSourceLogic
  • Extends BaseSingleSourceLogic
  • Used in:
    • PlainSource Logic

abstract class KafkaSourceStage

  • Extends GraphsStagewithMaterializedValue

final class PlainSource

  • Extends KafkaSourceStage
  • Logic: SingleSourceLogic

C# implementation

IControl

  • Implemented by:
    • abstract class PromiseControl

IMessageBuilder

  • Implemented by:
    • PlainMessageBuilder
    • CommittableMessageBuilderBase
    • CommittableSourceMessageBuilder
    • OffsetContextBuilder
    • ITransactionalMessageBuilderStage
    • TransactionalMessageBuilder
    • TransactionalSourceLogic

abstract class PromiseControl

  • Extended by:
    • BaseSingleSourceControl
  • Hacks:
    • GraphStageLogic.GetAsyncCallback method is passed in the constructor to be used to create AsyncCallback functions
    • All required methods from BaseSingleSourceLogic are passed as Action and Func

BaseSingleSourceControl

  • Hacks:
    • PerformShutdown method, stored as readonly field, passed from Logic as Action

abstract class BaseSingleSourceLogic

  • Extends Akka.Streams.Stage.GraphStageLogic
  • Difference:
    • JVM MessageBuilder implemented as IMessageBuilder as readonly field, configured by a factory method passed from constructor parameter.
      Possible problems:
      • Wrong message builder class can be passed in by the factory method
    • JVM PromiseControl implemented as IPromiseControl -> BaseSingleSourceControl instance as readonly field
      Possible problems:
      • Threading problem, AsyncCallback are passed around and there is no guarantee that this will actually work in runtime
      • Methods that might need to be called as AsyncCallback might not be called as such
    • JVM SourceLogicBuffer is implemented using ConcurrentQueue
      Possible problem:
      • implementation possibly missing the filtering function declared in JVM
    • JVM SourceLogicSubscription functions are folded into the class
      Hacks:
      • SourceLogicSubscription methods are declared as an abstract method, all classes implementing this class will have to have their own kafka subscription handling code
        Possible problem:
      • implementations might be using the wrong kafka subscription handling code, since this code can possibly be duplicated in multiple places

SingleSourceStageLogic

  • Extends BaseSingleSourceLogic

abstract class KafkaSourceStage

  • Extends GraphStageWithMaterializedValue

PlainSourceStage (PlainSource in JVM)

  • Extends KafkaSourceStage
  • Logic: SingleSourceStageLogic

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions