-
Notifications
You must be signed in to change notification settings - Fork 33
Scripts
Nikita Zheleztsov edited this page Aug 7, 2025
·
5 revisions
This page is intended for developers only! Please, make sure you know, what you're doing before running these scripts on production cluster.
Full delete of the bucket with all its data (DANGEROUS!!!)
function find_sharded_spaces()
local spaces = {}
local idx = 'bucket_id'
for k, space in pairs(box.space) do
if type(k) == 'number' and space.index[idx] ~= nil then
local parts = space.index[idx].parts
local p = parts[1].type
if p == 'unsigned' or p == 'integer' or p == 'number' then
table.insert(spaces, space)
end
end
end
return spaces
end
function tuple_extract_key(tuple, parts)
local key = {}
for _, part in ipairs(parts) do
table.insert(key, tuple[part.fieldno])
end
return key
end
function full_delete_bucket(bucket_id)
local spaces = find_sharded_spaces()
for _, space in pairs(spaces) do
local pk_parts = space.index[0].parts
local bucket_index = space.index['bucket_id']
for _, tuple in bucket_index:pairs({bucket_id}) do
space:delete(tuple_extract_key(tuple, pk_parts))
end
end
vshard.storage.bucket_force_drop(bucket_id)
end
Find all buckets with RW refs
buckets = {};
for _, b in pairs(vshard.storage.buckets_info()) do
if b.ref_rw then
table.insert(buckets, b.id)
end
end;
return buckets
Send N buckets to another replicaset prioritizing bucket without refs or with min number of refs
--
-- Sends N buckets from the current instance to replicaset `rs_uuid`.
-- Available options:
-- * map_eval_timeout - timeout for checking of the current replicaset for refs
-- * opts.bucket_send_timeout - timeout for sending 1 bucket, 2 minutes by default
-- * allow_with_refs - allow sending buckets with refs, the buckets with min
-- number of refs will be chosen in that case.
--
-- Returns the number of sent buckets. Throws error in case of some problems.
--
function send_buckets_to(number, rs_uuid, opts)
local log = require('log')
local json = require('json')
local fiber = require('fiber')
opts = opts or {}
local LOG_PREFIX = 'BUCKET_SEND_SCRIPT: '
local map_eval_timeout = opts.map_eval_timeout or 15
local bucket_send_timeout = opts.bucket_send_timeout or 60 * 2
local allow_with_refs = opts.allow_with_refs or false
local function throw_error(errmsg, ...)
errmsg = string.format(errmsg, ...)
errmsg = LOG_PREFIX .. errmsg
log.error(errmsg)
error(errmsg)
end
local function log_msg(level, msg, ...)
msg = string.format(msg, ...)
msg = LOG_PREFIX .. msg
log[level](msg)
end
local function replicaset_map_eval(replicaset, eval, opts)
assert(opts.timeout)
local timeout = opts.timeout
local except = opts.except
local deadline = fiber.clock() + timeout
local ok, res, err, err_id
local futures = {}
local map = {}
local opts_eval = {is_async = true, timeout = timeout}
timeout, err, err_id = replicaset:wait_connected_all({
timeout = timeout,
except = except,
})
if not timeout then
goto fail
end
-- Send requests.
for replica_id, replica in pairs(replicaset.replicas) do
if replica_id == except then
goto next_call
end
ok, res = pcall(replica.conn.eval, replica.conn, eval, nil, opts_eval)
if not ok then
err = res
err_id = replica_id
goto fail
end
futures[replica_id] = res
::next_call::
end
-- Collect results.
for replica_id, future in pairs(futures) do
if timeout < 0 then
err_id = replica_id
err = box.error.new(box.error.TIMEOUT)
goto fail
end
res, err = future:wait_result(timeout)
if res == nil then
err_id = replica_id
goto fail
end
map[replica_id] = res
timeout = deadline - fiber.clock()
end
do return map end
::fail::
for _, f in pairs(futures) do
f:discard()
end
return nil, err, err_id
end
local function bucket_refrw_touch(bucket_id)
local status, err = vshard.storage.bucket_refrw(bucket_id)
if not status then
return nil, err
end
vshard.storage.bucket_unrefrw(bucket_id)
return vshard.storage.internal.bucket_refs[bucket_id]
end
if not vshard or not vshard.storage.internal.is_configured then
throw_error('Vshard is not configured on that instance')
end
local consts = vshard.consts
local internal = vshard.storage.internal
-- Check args.
if number <= 0 then
-- Nothing to send.
return 0
end
if not internal.replicasets[rs_uuid] then
throw_error('Unknown replicaset %s', rs_uuid)
end
-- The instance is RW master.
if not internal.is_master then
throw_error('This instance is not a master. Use another one')
end
if box.info.ro then
throw_error('This instance is master, but read-only')
end
-- Prev iteration is finished.
local _status = box.space._bucket.index.status
if _status:count({consts.BUCKET.SENDING}) ~= 0 then
throw_error('Some buckets are being sent')
end
if _status:count({consts.BUCKET.RECEIVING}) ~= 0 then
throw_error('Some bucket are being received')
end
-- Replicas don't have any RW refs, safe to pick any bucket
-- from the current instance. The replication won't break.
local check_rw_refs = [[
for _, b in pairs(vshard.storage.buckets_info()) do
if b.ref_rw then
return false
end
end;
return true
]]
local replicaset_not_rw_refs, err, err_id = replicaset_map_eval(
internal.this_replicaset, check_rw_refs, {
timeout = map_eval_timeout,
except = internal.this_replica.id
}
)
if not replicaset_not_rw_refs then
throw_error('Failed to check rw refs on replica %s: %s', err_id, err)
end
for r_id, value in pairs(replicaset_not_rw_refs) do
if not value then
throw_error('Replica %s has rw refs', r_id)
end
end
-- Try to pick buckets without rw refs.
local buckets_to_send = {}
local picked_buckets = 0
for _, b in pairs(vshard.storage.buckets_info()) do
if not internal.rebalancer_transfering_buckets[b.id] and not b.ref_rw then
buckets_to_send[b.id] = true
picked_buckets = picked_buckets + 1
if picked_buckets == number then
break
end
end
end
if picked_buckets ~= number and not allow_with_refs then
throw_error('Found %d buckets without refs, needed %d',
picked_buckets, number)
end
-- Pick buckets with the minimum number of rw refs.
if picked_buckets < number then
-- Sort the buckets by the number of rw refs (ascending order)
local sorted_buckets = {}
for _, b in pairs(vshard.storage.buckets_info()) do
if not internal.rebalancer_transfering_buckets[b.id] and
not buckets_to_send[b.id] then
table.insert(sorted_buckets, {id = b.id, refs = b.ref_rw})
end
end
table.sort(sorted_buckets, function(a, b) return a.refs < b.refs end)
-- Pick the buckets with the least rw refs until we have enough
for _, b in ipairs(sorted_buckets) do
buckets_to_send[b.id] = b.refs
picked_buckets = picked_buckets + 1
if picked_buckets == number then
break
end
end
end
if picked_buckets ~= number then
throw_error('Found %d buckets, needed %d', picked_buckets, number)
end
-- We've chosen the buckets and don't wanna new requests to interfere.
log_msg('info', 'Prohibiting rw requests on buckets: %s',
json.encode(buckets_to_send))
local bucket_refs = {}
for bid, _ in pairs(buckets_to_send) do
local ref, err = bucket_refrw_touch(bid)
if not ref then
throw_error('Failed to get ref for bucket %d: %s', bid, err)
end
bucket_refs[bid] = ref
end
for _, ref in pairs(bucket_refs) do
ref.rw_lock = true
end
local sent_buckets = 0
log_msg('info', 'Going to send %d buckets to %s: %s', picked_buckets, rs_uuid,
json.encode(buckets_to_send))
local bucket_send_opts = {timeout = bucket_send_timeout}
for bid, _ in pairs(buckets_to_send) do
log_msg('info', 'Sending bucket %d', bid)
-- Allow bucket_send to take rw_lock.
bucket_refs[bid].rw_lock = false
local ret, err = vshard.storage.bucket_send(bid, rs_uuid, bucket_send_opts)
-- Do not drop rw_lock for buckets, processed with bucket_send.
bucket_refs[bid] = nil
if ret then
sent_buckets = sent_buckets + 1
else
log_msg('error', 'Error during bucket sending %s', err)
break
end
end
-- Drop rw locks.
if sent_buckets < number then
for bid, ref in pairs(bucket_refs) do
log_msg('info', 'Dropping rw lock for bucket %d', bid)
ref.rw_lock = false
end
end
return sent_buckets
end
Find doubled buckets in the cluster
function find_doubled()
local BUCKET_COUNT = vshard.router.bucket_count()
local all_buckets = {}
for id = 1, BUCKET_COUNT do
all_buckets[id] = {
count = 0,
info = {},
uuids = {},
}
end
local routes = vshard.router.routeall()
for _, replicaset in pairs(routes) do
local buckets, err = replicaset:callro('vshard.storage.buckets_info',
{}, {timeout = 5})
if err then
error(err)
end
for id, bucket in pairs(buckets) do
all_buckets[id].count = all_buckets[id].count + 1
table.insert(all_buckets[id].uuids, replicaset.uuid)
table.insert(all_buckets[id].info, bucket)
end
end
local intersection = {}
for id = 1, BUCKET_COUNT do
if all_buckets[id].count > 1 then
intersection[id] = all_buckets[id]
end
end
return intersection
end
find_doubled()
Find all rw refs on replicas in the cluster
all_rw_refs = {};
for rs_uuid, rs in pairs(vshard.router.internal.static_router.replicasets) do
for r_uuid, r in pairs(rs.replicas) do
if r_uuid ~= rs.master.uuid then
all_rw_refs[rs_uuid] =
r.conn:eval('refs = {}; for _, tuple in box.space._bucket:pairs() do if vshard.storage.internal.bucket_refs[tuple[1]] and vshard.storage.internal.bucket_refs[tuple[1]].rw > 0 then table.insert(refs, tuple[1]) end end; return refs')
end
end
end;
return all_rw_refs
Drop all rw refs on replicas in the cluster (DANGEROUS!)
for _, rs in pairs(vshard.router.internal.static_router.replicasets) do
for r_uuid, r in pairs(rs.replicas) do
if r_uuid ~= rs.master.uuid then
r.conn:eval('for _, tuple in box.space._bucket:pairs() do if vshard.storage.internal.bucket_refs[tuple[1]] then vshard.storage.internal.bucket_refs[tuple[1]].rw = 0 end end')
end
end
end
Find all non-active buckets in the cluster
buckets_no_active = {};
for rs_uuid, rs in pairs(vshard.router.internal.static_router.replicasets) do
for r_uuid, r in pairs(rs.replicas) do
if r_uuid == rs.master.uuid then
buckets_no_active[rs_uuid] =
r.conn:eval('buckets = {}; for _, b in pairs(vshard.storage.buckets_info()) do if b.status ~= "active" then table.insert(buckets, b.id) end end; return buckets')
end
end
end;
return buckets_no_active
Find all buckets with RW locks in the cluster
buckets = {};
for _, rs in pairs(vshard.router.internal.static_router.replicasets) do
buckets[rs.name or rs.uuid] =
rs.master.conn:eval('ret = {}; for _, b in pairs(vshard.storage.buckets_info()) do if b.rw_lock then table.insert(ret, b) end end; return ret')
end;
return buckets
Find callrw requests, which take more than 10 seconds (DANGEROUS!)
-- Measure latency of callrw
old_vshard_router_callrw = vshard.router.callrw
vshard.router.callrw = function(bucket_id, func, args, opts)
local fiber = require('fiber')
local json = require('json')
local log = require('log')
local start_time = fiber.clock()
local call_status, call_err =
old_vshard_router_callrw(bucket_id, func, args, opts)
local latency = fiber.clock() - start_time
if call_err ~= nil then
if latency > 10 then
log.warn('LATENCY: %.3f, request for %d bucket on %s func failed ' ..
'with error %s', latency, bucket_id, func, call_err)
end
return call_status, call_err
end
if latency > 10 then
log.warn('LATENCY: %.3f, request for %d bucket on %s func succeeded',
latency, bucket_id, func)
end
return call_status
end
-- Restore the function back.
vshard.router.callrw = old_vshard_router_callrw
Find the fiber, name of which starts with `a`
a = 'vshard.rebalancer_worker_';
for id, f in pairs(require('fiber').info()) do
if string.sub(f.name, 1, #a) == a then
return f
end
end