diff --git a/address_translators.go b/address_translators.go index 8c0f7780d..a259ec7db 100644 --- a/address_translators.go +++ b/address_translators.go @@ -36,8 +36,10 @@ type AddressTranslator interface { Translate(addr net.IP, port int) (net.IP, int) } +// AddressTranslatorFunc implements AddressTranslator interface. type AddressTranslatorFunc func(addr net.IP, port int) (net.IP, int) +// Translate translates address and port. func (fn AddressTranslatorFunc) Translate(addr net.IP, port int) (net.IP, int) { return fn(addr, port) } diff --git a/conn.go b/conn.go index d2f83d742..f6cbcd5a0 100644 --- a/conn.go +++ b/conn.go @@ -85,6 +85,8 @@ type PasswordAuthenticator struct { AllowedAuthenticators []string } +// Challenge creates challenge response for auth handshake. +// Returns an error if authenticator is not approved. func (p PasswordAuthenticator) Challenge(req []byte) ([]byte, Authenticator, error) { if !approve(string(req), p.AllowedAuthenticators) { return nil, nil, fmt.Errorf("unexpected authenticator %q", req) @@ -97,6 +99,7 @@ func (p PasswordAuthenticator) Challenge(req []byte) ([]byte, Authenticator, err return resp, nil, nil } +// Success used in case of success auth handshake. func (p PasswordAuthenticator) Success(data []byte) error { return nil } @@ -132,6 +135,7 @@ type SslOptions struct { EnableHostVerification bool } +// ConnConfig configures connection used by the driver. type ConnConfig struct { ProtoVersion int CQLVersion string @@ -322,6 +326,9 @@ func (c *Conn) init(ctx context.Context, dialedHost *DialedHost) error { return nil } +// Write writes p to the connection. +// It returns the number of bytes written from p (0 <= n <= len(p)) and any error that caused the write to stop +// early. func (c *Conn) Write(p []byte) (n int, err error) { return c.w.writeContext(context.Background(), p) } @@ -548,6 +555,7 @@ func (c *Conn) closeWithError(err error) { } } +// Close closes the connection. func (c *Conn) Close() { c.closeWithError(nil) } @@ -1689,6 +1697,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter { } } +// Pick returns nil if connection is closed. func (c *Conn) Pick(qry *Query) *Conn { if c.Closed() { return nil @@ -1696,20 +1705,24 @@ func (c *Conn) Pick(qry *Query) *Conn { return c } +// Closed returns true if connection close process for the connection started. func (c *Conn) Closed() bool { c.mu.Lock() defer c.mu.Unlock() return c.closed } +// Address returns address used for the connection. func (c *Conn) Address() string { return c.addr } +// AvailableStreams returns the number of the available streams. func (c *Conn) AvailableStreams() int { return c.streams.Available() } +// UseKeyspace executes `USE ;` query and set keyspace as current. func (c *Conn) UseKeyspace(keyspace string) error { q := &writeQueryFrame{statement: `USE "` + keyspace + `"`} q.params.consistency = c.session.cons diff --git a/filters.go b/filters.go index 312bd0d1a..fb3951a9e 100644 --- a/filters.go +++ b/filters.go @@ -47,6 +47,7 @@ func AcceptAllFilter() HostFilter { }) } +// DenyAllFilter will deny all hosts func DenyAllFilter() HostFilter { return HostFilterFunc(func(host *HostInfo) bool { return false diff --git a/frame.go b/frame.go index 99b07e289..c04f43a7c 100644 --- a/frame.go +++ b/frame.go @@ -283,6 +283,8 @@ func (c Consistency) isSerial() bool { return c == Serial || c == LocalSerial } + +// ParseConsistency returns parsed consistency or panics in case of an error. func ParseConsistency(s string) Consistency { var c Consistency if err := c.UnmarshalText([]byte(strings.ToUpper(s))); err != nil { @@ -331,6 +333,7 @@ func (f frameHeader) Header() frameHeader { const defaultBufSize = 128 +// ObservedFrameHeader observe header of the frame. type ObservedFrameHeader struct { Version protoVersion Flags byte diff --git a/helpers.go b/helpers.go index 823c10689..daea91c66 100644 --- a/helpers.go +++ b/helpers.go @@ -35,6 +35,7 @@ import ( "gopkg.in/inf.v0" ) +// RowData contains values and column names of a single row. type RowData struct { Columns []string Values []interface{} diff --git a/host_source.go b/host_source.go index ffe54cf28..631a513eb 100644 --- a/host_source.go +++ b/host_source.go @@ -155,6 +155,7 @@ func (c cassVersion) nodeUpDelay() time.Duration { return 10 * time.Second } +// HostInfo holds information about the host (e.g. addresses and state). type HostInfo struct { // TODO(zariel): reduce locking maybe, not all values will change, but to ensure // that we are thread safe use a mutex to access all fields. @@ -193,6 +194,7 @@ func newHostInfo(addr net.IP, port int) (*HostInfo, error) { return host, nil } +// Equal returns true if hosts are equal of if connect addresses of the hosts are equal. func (h *HostInfo) Equal(host *HostInfo) bool { if h == host { // prevent rlock reentry @@ -202,6 +204,7 @@ func (h *HostInfo) Equal(host *HostInfo) bool { return h.ConnectAddress().Equal(host.ConnectAddress()) } +// Peer returns hosts peer. func (h *HostInfo) Peer() net.IP { h.mu.RLock() defer h.mu.RUnlock() @@ -259,30 +262,35 @@ func (h *HostInfo) ConnectAddress() net.IP { return addr } +// BroadcastAddress returns the broadcast address of the host. func (h *HostInfo) BroadcastAddress() net.IP { h.mu.RLock() defer h.mu.RUnlock() return h.broadcastAddress } +// ListenAddress returns the address on which a host listens for incoming connections. func (h *HostInfo) ListenAddress() net.IP { h.mu.RLock() defer h.mu.RUnlock() return h.listenAddress } +// RPCAddress returns address on which host listens for RPC requests. func (h *HostInfo) RPCAddress() net.IP { h.mu.RLock() defer h.mu.RUnlock() return h.rpcAddress } +// PreferredIP returns the preferred IP of the host. func (h *HostInfo) PreferredIP() net.IP { h.mu.RLock() defer h.mu.RUnlock() return h.preferredIP } +// DataCenter returns the name of the host data center. func (h *HostInfo) DataCenter() string { h.mu.RLock() dc := h.dataCenter @@ -290,6 +298,7 @@ func (h *HostInfo) DataCenter() string { return dc } +// Rack returns the name of the host rack. func (h *HostInfo) Rack() string { h.mu.RLock() rack := h.rack @@ -297,54 +306,63 @@ func (h *HostInfo) Rack() string { return rack } +// HostID returns the host ID. func (h *HostInfo) HostID() string { h.mu.RLock() defer h.mu.RUnlock() return h.hostId } +// SetHostID sets the host ID. func (h *HostInfo) SetHostID(hostID string) { h.mu.Lock() defer h.mu.Unlock() h.hostId = hostID } +// WorkLoad returns the current workload of the host. func (h *HostInfo) WorkLoad() string { h.mu.RLock() defer h.mu.RUnlock() return h.workload } +// Graph returns true if graph mode is enabled for the DSE. func (h *HostInfo) Graph() bool { h.mu.RLock() defer h.mu.RUnlock() return h.graph } +// DSEVersion returns the version of DSE instance. func (h *HostInfo) DSEVersion() string { h.mu.RLock() defer h.mu.RUnlock() return h.dseVersion } +// Partitioner returns the partitioner kind. func (h *HostInfo) Partitioner() string { h.mu.RLock() defer h.mu.RUnlock() return h.partitioner } +// ClusterName returns name of the cluster. func (h *HostInfo) ClusterName() string { h.mu.RLock() defer h.mu.RUnlock() return h.clusterName } +// Version returns version of the Cassandra instance. func (h *HostInfo) Version() cassVersion { h.mu.RLock() defer h.mu.RUnlock() return h.version } +// State returns state of the node. func (h *HostInfo) State() nodeState { h.mu.RLock() defer h.mu.RUnlock() @@ -358,12 +376,14 @@ func (h *HostInfo) setState(state nodeState) *HostInfo { return h } +// Tokens returns slice of tokens. func (h *HostInfo) Tokens() []string { h.mu.RLock() defer h.mu.RUnlock() return h.tokens } +// Port returns port which used for the connection. func (h *HostInfo) Port() int { h.mu.RLock() defer h.mu.RUnlock() @@ -432,10 +452,13 @@ func (h *HostInfo) update(from *HostInfo) { } } +// IsUp return true if the host is not nil and if the host state is node NodeUp. func (h *HostInfo) IsUp() bool { return h != nil && h.State() == NodeUp } +// HostnameAndPort returns a network address of the form "host:port". +// If host contains a colon - "[host]:port" will be returned. func (h *HostInfo) HostnameAndPort() string { h.mu.Lock() defer h.mu.Unlock() @@ -446,6 +469,8 @@ func (h *HostInfo) HostnameAndPort() string { return net.JoinHostPort(h.hostname, strconv.Itoa(h.port)) } +// ConnectAddressAndPort returns a network address of the form "host:port". +// If connect address contains a colon - "[host]:port" will be returned. func (h *HostInfo) ConnectAddressAndPort() string { h.mu.Lock() defer h.mu.Unlock() diff --git a/marshal.go b/marshal.go index 719a62281..b51ead4c1 100644 --- a/marshal.go +++ b/marshal.go @@ -2469,6 +2469,7 @@ type TypeInfo interface { NewWithError() (interface{}, error) } +// NativeType describes a Cassandra native types type NativeType struct { proto byte typ Type @@ -2487,14 +2488,17 @@ func (t NativeType) NewWithError() (interface{}, error) { return reflect.New(typ).Interface(), nil } +// Type returns identifier of a Cassandra internal datatype. func (s NativeType) Type() Type { return s.typ } +// Version returns native protocol version of a type. func (s NativeType) Version() byte { return s.proto } +// Custom returns the name of custom class. func (s NativeType) Custom() string { return s.custom } @@ -2508,6 +2512,7 @@ func (s NativeType) String() string { } } +// CollectionType describes a Cassandra collection types. type CollectionType struct { NativeType Key TypeInfo // only used for TypeMap @@ -2535,6 +2540,7 @@ func (c CollectionType) String() string { } } +// TupleTypeInfo describes a Cassandra tuple types. type TupleTypeInfo struct { NativeType Elems []TypeInfo @@ -2564,6 +2570,7 @@ type UDTField struct { Type TypeInfo } +// UDTTypeInfo describes a Cassandra UDT types. type UDTTypeInfo struct { NativeType KeySpace string diff --git a/metadata.go b/metadata.go index 63e27aeb6..a54b63fae 100644 --- a/metadata.go +++ b/metadata.go @@ -136,6 +136,7 @@ type MaterializedViewMetadata struct { baseTableName string } +// UserTypeMetadata holds the metadata for user types. type UserTypeMetadata struct { Keyspace string Name string diff --git a/policies.go b/policies.go index ed0b02f3e..492ee2f95 100644 --- a/policies.go +++ b/policies.go @@ -166,6 +166,7 @@ func (s *SimpleRetryPolicy) Attempt(q RetryableQuery) bool { return q.Attempts() <= s.NumRetries } +// GetRetryType returns RetryNextHost. func (s *SimpleRetryPolicy) GetRetryType(err error) RetryType { return RetryNextHost } @@ -176,6 +177,7 @@ type ExponentialBackoffRetryPolicy struct { Min, Max time.Duration } +// Attempt returns false if number of attempts is bigger than number of retries. func (e *ExponentialBackoffRetryPolicy) Attempt(q RetryableQuery) bool { if q.Attempts() > e.NumRetries { return false @@ -202,11 +204,12 @@ func getExponentialTime(min time.Duration, max time.Duration, attempts int) time return time.Duration(napDuration) } +// GetRetryType returns RetryNextHost. func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) RetryType { return RetryNextHost } -// DowngradingConsistencyRetryPolicy: Next retry will be with the next consistency level +// DowngradingConsistencyRetryPolicy is a retry policy where next retry will be with the next consistency level // provided in the slice // // On a read timeout: the operation is retried with the next provided consistency @@ -220,11 +223,12 @@ func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) RetryType { // // On an unavailable exception: if at least one replica is alive, the // operation is retried with the next provided consistency level. - type DowngradingConsistencyRetryPolicy struct { ConsistencyLevelsToTry []Consistency } +// Attempt returns true if number of attempts is less then ConsistencyLevelsToTry. +// Downgrade consistency if number of attempts is bigger than 0. func (d *DowngradingConsistencyRetryPolicy) Attempt(q RetryableQuery) bool { currentAttempt := q.Attempts() @@ -236,6 +240,7 @@ func (d *DowngradingConsistencyRetryPolicy) Attempt(q RetryableQuery) bool { return true } +// GetRetryType returns retry type according to an error type. func (d *DowngradingConsistencyRetryPolicy) GetRetryType(err error) RetryType { switch t := err.(type) { case *RequestErrUnavailable: @@ -775,10 +780,6 @@ func (d *dcAwareRR) Pick(q ExecutableQuery) NextHost { return roundRobbin(int(nextStartOffset), d.localHosts.get(), d.remoteHosts.get()) } -// RackAwareRoundRobinPolicy is a host selection policies which will prioritize and -// return hosts which are in the local rack, before hosts in the local datacenter but -// a different rack, before hosts in all other datacenters - type rackAwareRR struct { // lastUsedHostIdx keeps the index of the last used host. // It is accessed atomically and needs to be aligned to 64 bits, so we @@ -790,6 +791,9 @@ type rackAwareRR struct { hosts []cowHostList } +// RackAwareRoundRobinPolicy is a host selection policy which will prioritize and +// return hosts which are in the local rack, before hosts in the local datacenter but +// a different rack, before hosts in all other datacenters. func RackAwareRoundRobinPolicy(localDC string, localRack string) HostSelectionPolicy { hosts := make([]cowHostList, 3) return &rackAwareRR{localDC: localDC, localRack: localRack, hosts: hosts} @@ -919,10 +923,12 @@ type ConstantReconnectionPolicy struct { Interval time.Duration } +// GetInterval returns a time interval between connection retries. func (c *ConstantReconnectionPolicy) GetInterval(currentRetry int) time.Duration { return c.Interval } +// GetMaxRetries returns a max number of connect retries. func (c *ConstantReconnectionPolicy) GetMaxRetries() int { return c.MaxRetries } @@ -934,6 +940,7 @@ type ExponentialReconnectionPolicy struct { MaxInterval time.Duration } +// GetInterval returns a computed interval between connection retries. func (e *ExponentialReconnectionPolicy) GetInterval(currentRetry int) time.Duration { max := e.MaxInterval if max < e.InitialInterval { @@ -942,6 +949,7 @@ func (e *ExponentialReconnectionPolicy) GetInterval(currentRetry int) time.Durat return getExponentialTime(e.InitialInterval, max, currentRetry) } +// GetMaxRetries returns a max number of connect retries. func (e *ExponentialReconnectionPolicy) GetMaxRetries() int { return e.MaxRetries } diff --git a/session.go b/session.go index ed1a078d3..8bdb01a7a 100644 --- a/session.go +++ b/session.go @@ -528,6 +528,7 @@ func (s *Session) Close() { s.sessionStateMu.Unlock() } +// Closed returns true only after you call Session.Close(). func (s *Session) Closed() bool { s.sessionStateMu.RLock() closed := s.isClosed @@ -1023,6 +1024,7 @@ func (q *Query) Attempts() int { return q.metrics.attempts() } +// AddAttempts adds number of attempts i for given host. func (q *Query) AddAttempts(i int, host *HostInfo) { q.metrics.attempt(i, 0, host, false) } @@ -1032,6 +1034,7 @@ func (q *Query) Latency() int64 { return q.metrics.latency() } +// AddLatency adds latency for given host. func (q *Query) AddLatency(l int64, host *HostInfo) { q.metrics.attempt(0, time.Duration(l)*time.Nanosecond, host, false) } @@ -1061,6 +1064,8 @@ func (q *Query) CustomPayload(customPayload map[string][]byte) *Query { return q } +// Context returns the context of the Query. +// Returns the context.Background() if Query.Context == nil. func (q *Query) Context() context.Context { if q.context == nil { return context.Background() @@ -1138,11 +1143,6 @@ func (q *Query) WithContext(ctx context.Context) *Query { return &q2 } -// Deprecate: does nothing, cancel the context passed to WithContext -func (q *Query) Cancel() { - // TODO: delete -} - func (q *Query) execute(ctx context.Context, conn *Conn) *Iter { return conn.executeQuery(ctx, q) } @@ -1804,6 +1804,7 @@ func (n *nextIter) fetch() *Iter { return n.next } +// Batch represents a group of CQL statements that can be executed together. type Batch struct { Type BatchType Entries []BatchEntry @@ -1871,6 +1872,7 @@ func (b *Batch) Observer(observer BatchObserver) *Batch { return b } +// Keyspace returns the keyspace the batch will be executed against. func (b *Batch) Keyspace() string { return b.keyspace } @@ -1885,6 +1887,7 @@ func (b *Batch) Attempts() int { return b.metrics.attempts() } +// AddAttempts adds number of attempts i for given host. func (b *Batch) AddAttempts(i int, host *HostInfo) { b.metrics.attempt(i, 0, host, false) } @@ -1894,6 +1897,7 @@ func (b *Batch) Latency() int64 { return b.metrics.latency() } +// AddLatency adds latency for given host. func (b *Batch) AddLatency(l int64, host *HostInfo) { b.metrics.attempt(0, time.Duration(l)*time.Nanosecond, host, false) } @@ -1910,6 +1914,8 @@ func (b *Batch) SetConsistency(c Consistency) { b.Cons = c } +// Context returns the context of the Batch. +// Returns the context.Background() if Batch.Context == nil. func (b *Batch) Context() context.Context { if b.context == nil { return context.Background() @@ -1917,6 +1923,7 @@ func (b *Batch) Context() context.Context { return b.context } +// IsIdempotent returns false if one of the Batch.Entries are not Idempotent. func (b *Batch) IsIdempotent() bool { for _, entry := range b.Entries { if !entry.Idempotent { @@ -1930,6 +1937,7 @@ func (b *Batch) speculativeExecutionPolicy() SpeculativeExecutionPolicy { return b.spec } +// SpeculativeExecutionPolicy sets SpeculativeExecutionPolicy and returns Batch func (b *Batch) SpeculativeExecutionPolicy(sp SpeculativeExecutionPolicy) *Batch { b.spec = sp return b @@ -1974,11 +1982,6 @@ func (b *Batch) WithContext(ctx context.Context) *Batch { return &b2 } -// Deprecate: does nothing, cancel the context passed to WithContext -func (*Batch) Cancel() { - // TODO: delete -} - // Size returns the number of batch statements to be executed by the batch operation. func (b *Batch) Size() int { return len(b.Entries) @@ -2052,6 +2055,12 @@ func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host }) } +// GetRoutingKey gets the routing key to use for routing this batch. If +// a routing key has not been explicitly set, then the routing key will +// be constructed if possible using the keyspace's schema and the batch +// info for this batch entry statement. If the routing key cannot be determined +// then nil will be returned with no error. On any error condition, +// an error description will be returned. func (b *Batch) GetRoutingKey() ([]byte, error) { if b.routingKey != nil { return b.routingKey, nil @@ -2153,6 +2162,7 @@ const ( CounterBatch BatchType = 2 ) +// BatchEntry represents a single query inside a batch operation. type BatchEntry struct { Stmt string Args []interface{} @@ -2282,6 +2292,7 @@ func (s *Session) GetHosts() []*HostInfo { return s.ring.allHosts() } +// ObservedQuery observes a single query. type ObservedQuery struct { Keyspace string Statement string @@ -2323,6 +2334,7 @@ type QueryObserver interface { ObserveQuery(context.Context, ObservedQuery) } +// ObservedBatch observes a single batch operation. type ObservedBatch struct { Keyspace string Statements []string diff --git a/uuid.go b/uuid.go index cc5f1c21f..47739e961 100644 --- a/uuid.go +++ b/uuid.go @@ -116,6 +116,7 @@ func UUIDFromBytes(input []byte) (UUID, error) { return u, nil } +// MustRandomUUID wraps RandomUUID but panics in case of an error. func MustRandomUUID() UUID { uuid, err := RandomUUID() if err != nil {