Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion runtimes/go/pubsub/internal/aws/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (t *topic) PublishMessage(ctx context.Context, orderingKey string, attrs ma
return aws.ToString(result.MessageId), nil
}

func (t *topic) Subscribe(logger *zerolog.Logger, maxConcurrency int, ackDeadline time.Duration, retryPolicy *types.RetryPolicy, implCfg *config.PubsubSubscription, f types.RawSubscriptionCallback) {
func (t *topic) Subscribe(logger *zerolog.Logger, maxConcurrency int, pullConcurrency int, ackDeadline time.Duration, retryPolicy *types.RetryPolicy, implCfg *config.PubsubSubscription, f types.RawSubscriptionCallback) {
ackDeadline = utils.Clamp(ackDeadline, time.Second, 12*time.Hour)

if maxConcurrency == 0 {
Expand Down
2 changes: 1 addition & 1 deletion runtimes/go/pubsub/internal/azure/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (t *topic) processMessage(
return err
}

func (t *topic) Subscribe(logger *zerolog.Logger, maxConcurrency int, ackDeadline time.Duration, retryPolicy *types.RetryPolicy, subCfg *config.PubsubSubscription, f types.RawSubscriptionCallback) {
func (t *topic) Subscribe(logger *zerolog.Logger, maxConcurrency int, pullConcurrency int, ackDeadline time.Duration, retryPolicy *types.RetryPolicy, subCfg *config.PubsubSubscription, f types.RawSubscriptionCallback) {
receiver, err := t.client.NewReceiverForSubscription(t.topicCfg.ProviderName, subCfg.ProviderName, nil)
if err != nil {
panic(fmt.Sprintf("failed to create pubsub receiver for subscription %s: %s", subCfg.EncoreName, err))
Expand Down
2 changes: 1 addition & 1 deletion runtimes/go/pubsub/internal/encorecloud/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (t *topic) PublishMessage(ctx context.Context, orderingKey string, attrs ma
return t.mgr.client.PublishToTopic(ctx, t.cfg.ProviderName, orderingKey, attrs, data)
}

func (t *topic) Subscribe(logger *zerolog.Logger, maxConcurrency int, ackDeadline time.Duration, retryPolicy *types.RetryPolicy, subCfg *config.PubsubSubscription, f types.RawSubscriptionCallback) {
func (t *topic) Subscribe(logger *zerolog.Logger, maxConcurrency int, pullConcurrency int, ackDeadline time.Duration, retryPolicy *types.RetryPolicy, subCfg *config.PubsubSubscription, f types.RawSubscriptionCallback) {
if subCfg.ID == "" {
panic("encorecloud pubsub subscriptions must have an ID")
}
Expand Down
4 changes: 2 additions & 2 deletions runtimes/go/pubsub/internal/gcp/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func (mgr *Manager) getClientForProject(projectID string) *pubsub.Client {
return client
}

// numGoroutines computes the number of goroutines to use for the subscription,
// adaptivePullConcurrency computes the number of pull connections to use for the subscription,
// by adaptively taking into account gRPC stream limits and the number of subscriptions.
func numGoroutines(numSubs int) int {
func adaptivePullConcurrency(numSubs int) int {
if numSubs < 1 {
numSubs = 1 // avoid division by zero
}
Expand Down
13 changes: 9 additions & 4 deletions runtimes/go/pubsub/internal/gcp/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (t *topic) PublishMessage(ctx context.Context, orderingKey string, attrs ma
return id, err
}

func (t *topic) Subscribe(logger *zerolog.Logger, maxConcurrency int, ackDeadline time.Duration, retryPolicy *types.RetryPolicy, subCfg *config.PubsubSubscription, f types.RawSubscriptionCallback) {
func (t *topic) Subscribe(logger *zerolog.Logger, maxConcurrency int, pullConcurrency int, ackDeadline time.Duration, retryPolicy *types.RetryPolicy, subCfg *config.PubsubSubscription, f types.RawSubscriptionCallback) {
if subCfg.PushOnly && subCfg.ID == "" {
panic("push-only subscriptions must have a subscription ID")
}
Expand Down Expand Up @@ -104,8 +104,12 @@ func (t *topic) Subscribe(logger *zerolog.Logger, maxConcurrency int, ackDeadlin
}
subscription.ReceiveSettings.MaxOutstandingMessages = maxConcurrency

if experiments.AdaptiveGCPPubSubGoroutines.Enabled(t.mgr.experiments) {
// Compute the number of goroutines to use for this subscription.
// Set the number of pull connections
if pullConcurrency > 0 {
// User explicitly configured PullConcurrency
subscription.ReceiveSettings.NumGoroutines = pullConcurrency
} else if experiments.AdaptiveGCPPubSubGoroutines.Enabled(t.mgr.experiments) {
// Fall back to adaptive calculation if experiment is enabled
streamingSubsInProject := 0
for _, topic := range t.mgr.runtime.PubsubTopics {
for _, sub := range topic.Subscriptions {
Expand All @@ -114,8 +118,9 @@ func (t *topic) Subscribe(logger *zerolog.Logger, maxConcurrency int, ackDeadlin
}
}
}
subscription.ReceiveSettings.NumGoroutines = numGoroutines(streamingSubsInProject)
subscription.ReceiveSettings.NumGoroutines = adaptivePullConcurrency(streamingSubsInProject)
}
// Otherwise, use GCP's default (10)

// Start the subscription with the GCP library
go func() {
Expand Down
2 changes: 1 addition & 1 deletion runtimes/go/pubsub/internal/noop/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ func (t *Topic) PublishMessage(ctx context.Context, orderingKey string, attrs ma
return "", ErrNoop
}

func (t *Topic) Subscribe(logger *zerolog.Logger, maxConcurrency int, _ time.Duration, _ *types.RetryPolicy, subCfg *config.PubsubSubscription, f types.RawSubscriptionCallback) {
func (t *Topic) Subscribe(logger *zerolog.Logger, maxConcurrency int, pullConcurrency int, ackDeadline time.Duration, retryPolicy *types.RetryPolicy, subCfg *config.PubsubSubscription, f types.RawSubscriptionCallback) {
// no-op
}
2 changes: 1 addition & 1 deletion runtimes/go/pubsub/internal/nsq/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type messageWrapper struct {
Data json.RawMessage
}

func (l *topic) Subscribe(logger *zerolog.Logger, maxConcurrency int, ackDeadline time.Duration, retryPolicy *types.RetryPolicy, implCfg *config.PubsubSubscription, f types.RawSubscriptionCallback) {
func (l *topic) Subscribe(logger *zerolog.Logger, maxConcurrency int, pullConcurrency int, ackDeadline time.Duration, retryPolicy *types.RetryPolicy, implCfg *config.PubsubSubscription, f types.RawSubscriptionCallback) {
if implCfg.PushOnly {
panic("push-only subscriptions are not supported by nsq")
}
Expand Down
2 changes: 1 addition & 1 deletion runtimes/go/pubsub/internal/test/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (t *TestTopic[T]) PublishMessage(ctx context.Context, orderingKey string, a
}

// Subscribe will register a new subscriber for the pub sub topic. By default these will not be called during tests
func (t *TestTopic[T]) Subscribe(logger *zerolog.Logger, maxConcurrency int, ackDeadline time.Duration, retryPolicy *types.RetryPolicy, implCfg *config.PubsubSubscription, f types.RawSubscriptionCallback) {
func (t *TestTopic[T]) Subscribe(logger *zerolog.Logger, maxConcurrency int, pullConcurrency int, ackDeadline time.Duration, retryPolicy *types.RetryPolicy, implCfg *config.PubsubSubscription, f types.RawSubscriptionCallback) {
t.m.Lock()
defer t.m.Unlock()
t.subscribers[implCfg.EncoreName] = f
Expand Down
2 changes: 1 addition & 1 deletion runtimes/go/pubsub/internal/types/private.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ type RawSubscriptionCallback func(ctx context.Context, msgID string, publishTime
// TopicImplementation gives us a private API to implementing topics, which we can change without impacting the public API
type TopicImplementation interface {
PublishMessage(ctx context.Context, orderingKey string, attrs map[string]string, data []byte) (id string, err error)
Subscribe(logger *zerolog.Logger, maxConcurrency int, ackDeadline time.Duration, retryPolicy *RetryPolicy, implCfg *config.PubsubSubscription, f RawSubscriptionCallback)
Subscribe(logger *zerolog.Logger, maxConcurrency int, pullConcurrency int, ackDeadline time.Duration, retryPolicy *RetryPolicy, implCfg *config.PubsubSubscription, f RawSubscriptionCallback)
}
2 changes: 1 addition & 1 deletion runtimes/go/pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func NewSubscription[T any](topic *Topic[T], name string, cfg SubscriptionConfig
Logger()

// Subscribe to the topic
topic.topic.Subscribe(&log, cfg.MaxConcurrency, cfg.AckDeadline, cfg.RetryPolicy, subscription, func(ctx context.Context, msgID string, publishTime time.Time, deliveryAttempt int, attrs map[string]string, data []byte) (err error) {
topic.topic.Subscribe(&log, cfg.MaxConcurrency, cfg.PullConcurrency, cfg.AckDeadline, cfg.RetryPolicy, subscription, func(ctx context.Context, msgID string, publishTime time.Time, deliveryAttempt int, attrs map[string]string, data []byte) (err error) {
if ctx.Err() != nil {
return ctx.Err()
}
Expand Down
14 changes: 14 additions & 0 deletions runtimes/go/pubsub/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@ type SubscriptionConfig[T any] struct {
// [GCP Push Delivery Rate]: https://cloud.google.com/pubsub/docs/push#push_delivery_rate
MaxConcurrency int

// PullConcurrency is the number of concurrent pull connections to use
// for this subscription. Each connection can pull multiple messages, and this
// setting controls the number of parallel streaming connections to the provider.
//
// This is useful for GCP Pub/Sub to control quota consumption, as each
// streaming connection generates operations for heartbeats and state management.
//
// If not set, it defaults to the cloud provider's implementation default (10 for GCP Pub/Sub).
// Set to a lower value to reduce quota consumption for low-traffic subscriptions.
//
// This setting only affects GCP Pub/Sub pull subscriptions and is ignored by
// other providers.
PullConcurrency int

// Filter is a boolean expression using =, !=, IN, &&
// It is used to filter which messages are forwarded from the
// topic to a subscription
Expand Down