Skip to content

[RFC-0004] Allow disabling of insecure HTTP connections for alert providers #404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions controllers/event_handling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

"github.com/fluxcd/pkg/ssa"
. "github.com/onsi/gomega"
"github.com/sethvargo/go-limiter/memorystore"
"github.com/sethvargo/go-limiter/noopstore"
prommetrics "github.com/slok/go-http-metrics/metrics/prometheus"
"github.com/slok/go-http-metrics/middleware"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -45,20 +45,17 @@ func TestEventHandler(t *testing.T) {
}),
})

store, err := memorystore.New(&memorystore.Config{
Interval: 5 * time.Minute,
})
if err != nil {
t.Fatalf("failed to create memory storage")
}
store, err := noopstore.New()
g.Expect(err).ToNot(HaveOccurred())

eventServer := server.NewEventServer("127.0.0.1:56789", logf.Log, k8sClient, true)
serverEndpoint := "127.0.0.1:56789"
eventServer := server.NewEventServer(serverEndpoint, logf.Log, k8sClient, true, true)
stopCh := make(chan struct{})
go eventServer.ListenAndServe(stopCh, eventMdlw, store)

rcvServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
req = r
w.WriteHeader(200)
w.WriteHeader(http.StatusOK)
}))
defer rcvServer.Close()
defer close(stopCh)
Expand All @@ -77,6 +74,9 @@ func TestEventHandler(t *testing.T) {
Address: rcvServer.URL,
},
}

g.Expect(err).ToNot(HaveOccurred())

g.Expect(k8sClient.Create(context.Background(), provider)).To(Succeed())
g.Eventually(func() bool {
var obj notifyv1.Provider
Expand Down Expand Up @@ -170,9 +170,9 @@ func TestEventHandler(t *testing.T) {
testSent := func() {
buf := &bytes.Buffer{}
g.Expect(json.NewEncoder(buf).Encode(&event)).To(Succeed())
res, err := http.Post("http://localhost:56789/", "application/json", buf)
res, err := http.Post("http://"+serverEndpoint, "application/json", buf)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(res.StatusCode).To(Equal(202)) // event_server responds with 202 Accepted
g.Expect(res.StatusCode).To(Equal(http.StatusAccepted))
}

testForwarded := func() {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
k8s.io/api v0.25.3
k8s.io/apimachinery v0.25.3
k8s.io/client-go v0.25.3
k8s.io/kubectl v0.24.0
sigs.k8s.io/cli-utils v0.33.0
sigs.k8s.io/controller-runtime v0.13.0
sigs.k8s.io/yaml v1.3.0
Expand Down Expand Up @@ -151,7 +152,6 @@ require (
k8s.io/component-base v0.25.2 // indirect
k8s.io/klog/v2 v2.80.1 // indirect
k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 // indirect
k8s.io/kubectl v0.24.0 // indirect
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/kustomize/api v0.12.1 // indirect
Expand Down
18 changes: 17 additions & 1 deletion internal/server/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"regexp"
"strings"
"time"
Expand All @@ -44,7 +45,6 @@ import (

func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
r.Context()
body, err := io.ReadAll(r.Body)
if err != nil {
s.logger.Error(err, "reading the request body failed")
Expand Down Expand Up @@ -243,6 +243,22 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
continue
}

webhookUrl, err := url.Parse(webhook)
if err != nil {
s.logger.Error(nil, "Error parsing webhook url",
"reconciler kind", v1beta1.ProviderKind,
"name", providerName.Name,
"namespace", providerName.Namespace)
continue
}

if !s.supportHttpScheme && webhookUrl.Scheme == "http" {
s.logger.Error(nil, "http scheme is blocked",
"reconciler kind", v1beta1.ProviderKind,
"name", providerName.Name,
"namespace", providerName.Namespace)
continue
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having reviewed #435, I think we can implement this block in a different way such that it becomes more apparent to the user that their Provider and Alert won't work.
At the beginning of this function, handleEvent(), all the alerts are listed and alerts are matched against the event. While doing so, the alerts are checked to be Ready. Not ready alerts are ignored.
In Provider reconciler, we can parse the address of the webhook and mark the object as stalled, as per the RFC https://github.com/fluxcd/flux2/tree/main/rfcs/0004-insecure-http#design-details and Ready=False. But if the address is specified in a secret ref, the address in the secret can change without updating the Provider object. So, when secretRef is present, we can just fail the reconciliation with Ready=False and allow it to retry with exponential backoff.
Because the Provider is not ready, the associated Alert would also become not ready and that intern would make the event handler to drop the event early in the above function. The failure in the configuration would be visible on the object itself, compared to the current implementation where it'll be just logged and may not be visible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose we delay this PR until v1beta2 is released. If RFC-0004 instructs the objects to marked as stalled, then we'll probably need to add secret watches to NC and cascade stalling from Provider to all dependant Alerts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gunishmatta unfortunately, we will have to label these changes on hold until the #435 is merged, by which point we will need to review the implementation based on @darkowlzz comments above.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No Worries, Will keep following it and would focus on contributing to other issues at Flux and understanding the code in depth.

factory := notifier.NewFactory(webhook, proxy, username, provider.Spec.Channel, token, headers, certPool, password)
sender, err := factory.Notifier(provider.Spec.Type)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions internal/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,17 @@ type EventServer struct {
logger logr.Logger
kubeClient client.Client
noCrossNamespaceRefs bool
supportHttpScheme bool
}

// NewEventServer returns an HTTP server that handles events
func NewEventServer(port string, logger logr.Logger, kubeClient client.Client, noCrossNamespaceRefs bool) *EventServer {
func NewEventServer(port string, logger logr.Logger, kubeClient client.Client, noCrossNamespaceRefs bool, supportHttpScheme bool) *EventServer {
return &EventServer{
port: port,
logger: logger.WithName("event-server"),
kubeClient: kubeClient,
noCrossNamespaceRefs: noCrossNamespaceRefs,
supportHttpScheme: supportHttpScheme,
}
}

Expand Down Expand Up @@ -104,7 +106,7 @@ func (s *EventServer) logRateLimitMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
recorder := &statusRecorder{
ResponseWriter: w,
Status: 200,
Status: http.StatusOK,
}
h.ServeHTTP(recorder, r)

Expand Down
144 changes: 135 additions & 9 deletions internal/server/event_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,39 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/onsi/gomega"
. "github.com/onsi/gomega"
"github.com/sethvargo/go-limiter/httplimit"
"github.com/sethvargo/go-limiter/memorystore"
"github.com/sethvargo/go-limiter/noopstore"
"github.com/slok/go-http-metrics/middleware"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/kubectl/pkg/scheme"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to get rid of this new dependency by using k8s.io/apimachinery/pkg/runtime to create a new scheme. Refer

"sigs.k8s.io/controller-runtime/pkg/client/fake"
logf "sigs.k8s.io/controller-runtime/pkg/log"

notifyv1 "github.com/fluxcd/notification-controller/api/v1beta1"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/events"
)

func TestEventKeyFunc(t *testing.T) {
g := gomega.NewGomegaWithT(t)
g := NewWithT(t)

// Setup middleware
store, err := memorystore.New(&memorystore.Config{
Interval: 10 * time.Minute,
})
g.Expect(err).ShouldNot(gomega.HaveOccurred())
g.Expect(err).ShouldNot(HaveOccurred())
middleware, err := httplimit.NewMiddleware(store, eventKeyFunc)
g.Expect(err).ShouldNot(gomega.HaveOccurred())
g.Expect(err).ShouldNot(HaveOccurred())
handler := middleware.Handle(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
Expand Down Expand Up @@ -129,19 +139,135 @@ func TestEventKeyFunc(t *testing.T) {
Message: tt.message,
}
eventData, err := json.Marshal(event)
g.Expect(err).ShouldNot(gomega.HaveOccurred())
g.Expect(err).ShouldNot(HaveOccurred())

req := httptest.NewRequest("POST", "/", bytes.NewBuffer(eventData))
g.Expect(err).ShouldNot(gomega.HaveOccurred())
g.Expect(err).ShouldNot(HaveOccurred())
res := httptest.NewRecorder()
handler.ServeHTTP(res, req)

if tt.rateLimit {
g.Expect(res.Code).Should(gomega.Equal(429))
g.Expect(res.Header().Get("X-Ratelimit-Remaining")).Should(gomega.Equal("0"))
g.Expect(res.Code).Should(Equal(http.StatusTooManyRequests))
g.Expect(res.Header().Get("X-Ratelimit-Remaining")).Should(Equal("0"))
} else {
g.Expect(res.Code).Should(gomega.Equal(200))
g.Expect(res.Code).Should(Equal(http.StatusOK))
}
})
}
}

func TestBlockInsecureHTTP(t *testing.T) {
g := NewWithT(t)

var requestsReceived int
rcvServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestsReceived++
io.Copy(io.Discard, r.Body)
w.WriteHeader(http.StatusOK)
}))
defer rcvServer.Close()

utilruntime.Must(notifyv1.AddToScheme(scheme.Scheme))

testNamespace := "test-ns"
providerKey := "provider"
client := fake.NewFakeClientWithScheme(scheme.Scheme,
&notifyv1.Provider{
ObjectMeta: metav1.ObjectMeta{
Name: providerKey,
Namespace: testNamespace,
},
Spec: notifyv1.ProviderSpec{
Type: "generic",
Address: rcvServer.URL,
},
},
&notifyv1.Alert{
ObjectMeta: metav1.ObjectMeta{
Name: "some-alert-name",
Namespace: testNamespace,
},
Spec: notifyv1.AlertSpec{
ProviderRef: meta.LocalObjectReference{
Name: providerKey,
},
EventSeverity: "info",
EventSources: []notifyv1.CrossNamespaceObjectReference{
{
Kind: "Bucket",
Name: "hyacinth",
Namespace: testNamespace,
},
},
},
Status: notifyv1.AlertStatus{
Conditions: []metav1.Condition{
{Type: meta.ReadyCondition, Status: metav1.ConditionTrue},
},
},
},
)

eventMdlw := middleware.New(middleware.Config{})

store, err := noopstore.New()
g.Expect(err).ToNot(HaveOccurred())

serverEndpoint := "127.0.0.1:56789"
eventServer := NewEventServer(serverEndpoint, logf.Log, client, true, true)
stopCh := make(chan struct{})
go eventServer.ListenAndServe(stopCh, eventMdlw, store)
defer close(stopCh)

event := events.Event{
InvolvedObject: corev1.ObjectReference{
Kind: "Bucket",
Name: "hyacinth",
Namespace: testNamespace,
},
Severity: "info",
Timestamp: metav1.Now(),
Message: "well that happened",
Reason: "event-happened",
ReportingController: "source-controller",
}

eventServerTests := []struct {
name string
isHttpEnabled bool
url string
wantRequest int
}{
{
name: "http scheme is disabled",
isHttpEnabled: false,
wantRequest: 0,
},
{
name: "http scheme is enabled",
isHttpEnabled: true,
wantRequest: 1,
},
}
for _, tt := range eventServerTests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
requestsReceived = 0 // reset counter

// Change the internal state instead of creating a new server.
eventServer.supportHttpScheme = tt.isHttpEnabled

buf := &bytes.Buffer{}
g.Expect(json.NewEncoder(buf).Encode(&event)).To(Succeed())
res, err := http.Post("http://"+serverEndpoint, "application/json", buf)

g.Expect(err).ToNot(HaveOccurred())
g.Expect(res.StatusCode).To(Equal(http.StatusAccepted))

// Requests happens async, so should the assertion.
g.Eventually(func() bool {
return requestsReceived == tt.wantRequest
}, 5*time.Second).Should(BeTrue())
})
}
}
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func main() {
leaderElectionOptions leaderelection.Options
aclOptions acl.Options
rateLimiterOptions helper.RateLimiterOptions
insecureAllowHTTP bool
)

flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
Expand All @@ -82,6 +83,7 @@ func main() {
flag.BoolVar(&watchAllNamespaces, "watch-all-namespaces", true,
"Watch for custom resources in all namespaces, if set to false it will only watch the runtime namespace.")
flag.DurationVar(&rateLimitInterval, "rate-limit-interval", 5*time.Minute, "Interval in which rate limit has effect.")
flag.BoolVar(&insecureAllowHTTP, "insecure-allow-http", true, "Enable the use of HTTP Scheme (no HTTPS) across all controller level objects. This is not recommended for production environments")
clientOptions.BindFlags(flag.CommandLine)
logOptions.BindFlags(flag.CommandLine)
leaderElectionOptions.BindFlags(flag.CommandLine)
Expand Down Expand Up @@ -169,7 +171,7 @@ func main() {
Registry: crtlmetrics.Registry,
}),
})
eventServer := server.NewEventServer(eventsAddr, log, mgr.GetClient(), aclOptions.NoCrossNamespaceRefs)
eventServer := server.NewEventServer(eventsAddr, log, mgr.GetClient(), aclOptions.NoCrossNamespaceRefs, insecureAllowHTTP)
go eventServer.ListenAndServe(ctx.Done(), eventMdlw, store)

setupLog.Info("starting webhook receiver server", "addr", receiverAddr)
Expand Down