-
Notifications
You must be signed in to change notification settings - Fork 21
Open
Labels
Description
Current infrastructure state:
PlainSource
JVM implementation
abstracy class BaseSingleSourceLogic
- Extended by:
- ExternalSingleSourceLogic
- SingleSourceLogic
- Extends class abstract akka.stream.GraphStageLogic
- Mixin:
- PromiseControl (trait with impure default method implementation)
- Extends akka.stream.GraphStageLogic
- Mixin:
- scalasdl.Control (trait with pure default implementation)
abstract class akka.kafka.internal.MetricsControl: (not used in our implementation)abstract class akka.kafka.internal.StageIdLogging: (not used in our implementation)- SourceLogicSubscription: (interface with (pure?) default implementation)
Used to manage subscription AsyncCallbacks- Declares
self: GraphStageLogic =>
- Declares
- MessageBuilder (pure trait)
- SourceLogicBuffer: (trait with inpure default implementation) Used to buffer ConsumerRecored<K, V>
- Declares
self: GraphStageLogic with StageIdLogging =>
- Declares
- PromiseControl (trait with impure default method implementation)
abstract class SingleSourceLogic
- Extended by:
- TransactionalSourceLogic
- Extends BaseSingleSourceLogic
- Used in:
- PlainSource Logic
abstract class KafkaSourceStage
- Extends GraphsStagewithMaterializedValue
final class PlainSource
- Extends KafkaSourceStage
- Logic: SingleSourceLogic
C# implementation
IControl
- Implemented by:
- abstract class PromiseControl
IMessageBuilder
- Implemented by:
- PlainMessageBuilder
- CommittableMessageBuilderBase
- CommittableSourceMessageBuilder
- OffsetContextBuilder
- ITransactionalMessageBuilderStage
- TransactionalMessageBuilder
- TransactionalSourceLogic
abstract class PromiseControl
- Extended by:
- BaseSingleSourceControl
- Hacks:
- GraphStageLogic.GetAsyncCallback method is passed in the constructor to be used to create AsyncCallback functions
- All required methods from BaseSingleSourceLogic are passed as Action and Func
BaseSingleSourceControl
- Hacks:
- PerformShutdown method, stored as readonly field, passed from Logic as Action
abstract class BaseSingleSourceLogic
- Extends Akka.Streams.Stage.GraphStageLogic
- Difference:
- JVM MessageBuilder implemented as IMessageBuilder as readonly field, configured by a factory method passed from constructor parameter.
Possible problems:- Wrong message builder class can be passed in by the factory method
- JVM PromiseControl implemented as IPromiseControl -> BaseSingleSourceControl instance as readonly field
Possible problems:- Threading problem, AsyncCallback are passed around and there is no guarantee that this will actually work in runtime
- Methods that might need to be called as AsyncCallback might not be called as such
- JVM SourceLogicBuffer is implemented using ConcurrentQueue
Possible problem:- implementation possibly missing the filtering function declared in JVM
- JVM SourceLogicSubscription functions are folded into the class
Hacks:- SourceLogicSubscription methods are declared as an abstract method, all classes implementing this class will have to have their own kafka subscription handling code
Possible problem: - implementations might be using the wrong kafka subscription handling code, since this code can possibly be duplicated in multiple places
- SourceLogicSubscription methods are declared as an abstract method, all classes implementing this class will have to have their own kafka subscription handling code
- JVM MessageBuilder implemented as IMessageBuilder as readonly field, configured by a factory method passed from constructor parameter.
SingleSourceStageLogic
- Extends BaseSingleSourceLogic
abstract class KafkaSourceStage
- Extends GraphStageWithMaterializedValue
PlainSourceStage (PlainSource in JVM)
- Extends KafkaSourceStage
- Logic: SingleSourceStageLogic