Skip to content

Scripts

Nikita Zheleztsov edited this page Aug 7, 2025 · 5 revisions

Disclaimer

This page is intended for developers only! Please, make sure you know, what you're doing before running these scripts on production cluster.

Storage

Full delete of the bucket with all its data (DANGEROUS!!!)
function find_sharded_spaces()
    local spaces = {}
    local idx = 'bucket_id'
    for k, space in pairs(box.space) do
        if type(k) == 'number' and space.index[idx] ~= nil then
            local parts = space.index[idx].parts
            local p = parts[1].type
            if p == 'unsigned' or p == 'integer' or p == 'number' then
	    	table.insert(spaces, space)
            end
        end
    end
    return spaces
end

function tuple_extract_key(tuple, parts)
    local key = {}
    for _, part in ipairs(parts) do
        table.insert(key, tuple[part.fieldno])
    end
    return key
end

function full_delete_bucket(bucket_id)
    local spaces = find_sharded_spaces()
    for _, space in pairs(spaces) do
    	local pk_parts = space.index[0].parts
    	local bucket_index = space.index['bucket_id']
    	for _, tuple in bucket_index:pairs({bucket_id}) do
    		space:delete(tuple_extract_key(tuple, pk_parts))
	end
    end

    vshard.storage.bucket_force_drop(bucket_id)
end
Find all buckets with RW refs
buckets = {};
for _, b in pairs(vshard.storage.buckets_info()) do
    if b.ref_rw then
        table.insert(buckets, b.id)
    end
end;
return buckets
Send N buckets to another replicaset prioritizing bucket without refs or with min number of refs
--
-- Sends N buckets from the current instance to replicaset `rs_uuid`.
-- Available options:
--   * map_eval_timeout - timeout for checking of the current replicaset for refs
--   * opts.bucket_send_timeout - timeout for sending 1 bucket, 2 minutes by default
--   * allow_with_refs - allow sending buckets with refs, the buckets with min
--                       number of refs will be chosen in that case.
--
-- Returns the number of sent buckets. Throws error in case of some problems.
--
function send_buckets_to(number, rs_uuid, opts)
    local log = require('log')
    local json = require('json')
    local fiber = require('fiber')

    opts = opts or {}
    local LOG_PREFIX = 'BUCKET_SEND_SCRIPT: '
    local map_eval_timeout = opts.map_eval_timeout or 15
    local bucket_send_timeout = opts.bucket_send_timeout or 60 * 2
    local allow_with_refs = opts.allow_with_refs or false

    local function throw_error(errmsg, ...)
        errmsg = string.format(errmsg, ...)
        errmsg = LOG_PREFIX .. errmsg
        log.error(errmsg)
        error(errmsg)
    end

    local function log_msg(level, msg, ...)
        msg = string.format(msg, ...)
        msg = LOG_PREFIX .. msg
        log[level](msg)
    end

    local function replicaset_map_eval(replicaset, eval, opts)
        assert(opts.timeout)
        local timeout = opts.timeout
        local except = opts.except
        local deadline = fiber.clock() + timeout
        local ok, res, err, err_id
        local futures = {}
        local map = {}
        local opts_eval = {is_async = true, timeout = timeout}

        timeout, err, err_id = replicaset:wait_connected_all({
            timeout = timeout,
            except = except,
        })
        if not timeout then
            goto fail
        end

        -- Send requests.
        for replica_id, replica in pairs(replicaset.replicas) do
            if replica_id == except then
                goto next_call
            end
            ok, res = pcall(replica.conn.eval, replica.conn, eval, nil, opts_eval)
            if not ok then
                err = res
                err_id = replica_id
                goto fail
            end
            futures[replica_id] = res
            ::next_call::
        end

        -- Collect results.
        for replica_id, future in pairs(futures) do
            if timeout < 0 then
                err_id = replica_id
                err = box.error.new(box.error.TIMEOUT)
                goto fail
            end
            res, err = future:wait_result(timeout)
            if res == nil then
                err_id = replica_id
                goto fail
            end
            map[replica_id] = res
            timeout = deadline - fiber.clock()
        end
        do return map end

        ::fail::
        for _, f in pairs(futures) do
            f:discard()
        end
        return nil, err, err_id
    end

    local function bucket_refrw_touch(bucket_id)
        local status, err = vshard.storage.bucket_refrw(bucket_id)
        if not status then
            return nil, err
        end
        vshard.storage.bucket_unrefrw(bucket_id)
        return vshard.storage.internal.bucket_refs[bucket_id]
    end

    if not vshard or not vshard.storage.internal.is_configured then
        throw_error('Vshard is not configured on that instance')
    end

    local consts = vshard.consts
    local internal = vshard.storage.internal

    -- Check args.
    if number <= 0 then
        -- Nothing to send.
        return 0
    end
    if not internal.replicasets[rs_uuid] then
        throw_error('Unknown replicaset %s', rs_uuid)
    end

    -- The instance is RW master.
    if not internal.is_master then
        throw_error('This instance is not a master. Use another one')
    end
    if box.info.ro then
        throw_error('This instance is master, but read-only')
    end

    -- Prev iteration is finished.
    local _status = box.space._bucket.index.status
    if _status:count({consts.BUCKET.SENDING}) ~= 0 then
        throw_error('Some buckets are being sent')
    end
    if _status:count({consts.BUCKET.RECEIVING}) ~= 0 then
        throw_error('Some bucket are being received')
    end

    -- Replicas don't have any RW refs, safe to pick any bucket
    -- from the current instance. The replication won't break.
    local check_rw_refs = [[
        for _, b in pairs(vshard.storage.buckets_info()) do
            if b.ref_rw then
                return false
            end
        end;
        return true
    ]]
    local replicaset_not_rw_refs, err, err_id  = replicaset_map_eval(
        internal.this_replicaset, check_rw_refs, {
            timeout = map_eval_timeout,
            except = internal.this_replica.id
        }
    )
    if not replicaset_not_rw_refs then
        throw_error('Failed to check rw refs on replica %s: %s', err_id, err)
    end
    for r_id, value in pairs(replicaset_not_rw_refs) do
        if not value then
            throw_error('Replica %s has rw refs', r_id)
        end
    end

    -- Try to pick buckets without rw refs.
    local buckets_to_send = {}
    local picked_buckets = 0
    for _, b in pairs(vshard.storage.buckets_info()) do
        if not internal.rebalancer_transfering_buckets[b.id] and not b.ref_rw then
            buckets_to_send[b.id] = true
            picked_buckets = picked_buckets + 1
            if picked_buckets == number then
                break
            end
        end
    end
    if picked_buckets ~= number and not allow_with_refs then
        throw_error('Found %d buckets without refs, needed %d',
                    picked_buckets, number)
    end
    -- Pick buckets with the minimum number of rw refs.
    if picked_buckets < number then
        -- Sort the buckets by the number of rw refs (ascending order)
        local sorted_buckets = {}
        for _, b in pairs(vshard.storage.buckets_info()) do
            if not internal.rebalancer_transfering_buckets[b.id] and
               not buckets_to_send[b.id] then
                table.insert(sorted_buckets, {id = b.id, refs = b.ref_rw})
            end
        end
        table.sort(sorted_buckets, function(a, b) return a.refs < b.refs end)
        -- Pick the buckets with the least rw refs until we have enough
        for _, b in ipairs(sorted_buckets) do
            buckets_to_send[b.id] = b.refs
            picked_buckets = picked_buckets + 1
            if picked_buckets == number then
                break
            end
        end
    end

    if picked_buckets ~= number then
        throw_error('Found %d buckets, needed %d', picked_buckets, number)
    end

    -- We've chosen the buckets and don't wanna new requests to interfere.
    log_msg('info', 'Prohibiting rw requests on buckets: %s',
            json.encode(buckets_to_send))
    local bucket_refs = {}
    for bid, _ in pairs(buckets_to_send) do
        local ref, err = bucket_refrw_touch(bid)
        if not ref then
            throw_error('Failed to get ref for bucket %d: %s', bid, err)
        end
        bucket_refs[bid] = ref
    end
    for _, ref in pairs(bucket_refs) do
        ref.rw_lock = true
    end

    local sent_buckets = 0
    log_msg('info', 'Going to send %d buckets to %s: %s', picked_buckets, rs_uuid,
            json.encode(buckets_to_send))
    local bucket_send_opts = {timeout = bucket_send_timeout}
    for bid, _ in pairs(buckets_to_send) do
        log_msg('info', 'Sending bucket %d', bid)
        -- Allow bucket_send to take rw_lock.
        bucket_refs[bid].rw_lock = false
        local ret, err = vshard.storage.bucket_send(bid, rs_uuid, bucket_send_opts)
        -- Do not drop rw_lock for buckets, processed with bucket_send.
        bucket_refs[bid] = nil
        if ret then
            sent_buckets = sent_buckets + 1
        else
            log_msg('error', 'Error during bucket sending %s', err)
            break
        end
    end

    -- Drop rw locks.
    if sent_buckets < number then
        for bid, ref in pairs(bucket_refs) do
            log_msg('info', 'Dropping rw lock for bucket %d', bid)
            ref.rw_lock = false
        end
    end

    return sent_buckets
end

Router

Find doubled buckets in the cluster
function find_doubled()
  local BUCKET_COUNT = vshard.router.bucket_count()
  local all_buckets = {}
  for id = 1, BUCKET_COUNT do
    all_buckets[id] = {
      count = 0,
      info = {},
      uuids = {},
    }
  end

  local routes = vshard.router.routeall()
  for _, replicaset in pairs(routes) do
    local buckets, err = replicaset:callro('vshard.storage.buckets_info',
                                           {}, {timeout = 5})
    if err then
      error(err)
    end

    for id, bucket in pairs(buckets) do
      all_buckets[id].count = all_buckets[id].count + 1
      table.insert(all_buckets[id].uuids, replicaset.uuid)
      table.insert(all_buckets[id].info, bucket)
    end
  end

  local intersection = {}
  for id = 1, BUCKET_COUNT do
    if all_buckets[id].count > 1 then
      intersection[id] = all_buckets[id]
    end
  end

  return intersection
end

find_doubled()
Find all rw refs on replicas in the cluster
all_rw_refs = {};
for rs_uuid, rs in pairs(vshard.router.internal.static_router.replicasets) do
    for r_uuid, r in pairs(rs.replicas) do
        if r_uuid ~= rs.master.uuid then
            all_rw_refs[rs_uuid] =
                r.conn:eval('refs = {}; for _, tuple in box.space._bucket:pairs() do if vshard.storage.internal.bucket_refs[tuple[1]] and vshard.storage.internal.bucket_refs[tuple[1]].rw > 0 then table.insert(refs, tuple[1]) end end; return refs')
        end
    end
end;
return all_rw_refs
Drop all rw refs on replicas in the cluster (DANGEROUS!)
for _, rs in pairs(vshard.router.internal.static_router.replicasets) do
    for r_uuid, r in pairs(rs.replicas) do
        if r_uuid ~= rs.master.uuid then
            r.conn:eval('for _, tuple in box.space._bucket:pairs() do if vshard.storage.internal.bucket_refs[tuple[1]] then vshard.storage.internal.bucket_refs[tuple[1]].rw = 0 end end')
        end
    end
end
Find all non-active buckets in the cluster
buckets_no_active = {};
for rs_uuid, rs in pairs(vshard.router.internal.static_router.replicasets) do
    for r_uuid, r in pairs(rs.replicas) do
        if r_uuid == rs.master.uuid then
            buckets_no_active[rs_uuid] =
                r.conn:eval('buckets = {}; for _, b in pairs(vshard.storage.buckets_info()) do if b.status ~= "active" then table.insert(buckets, b.id) end end; return buckets')
        end
    end
end;
return buckets_no_active
Find all buckets with RW locks in the cluster
buckets = {};
for _, rs in pairs(vshard.router.internal.static_router.replicasets) do
    buckets[rs.name or rs.uuid] =
        rs.master.conn:eval('ret = {}; for _, b in pairs(vshard.storage.buckets_info()) do if b.rw_lock then table.insert(ret, b) end end; return ret')
end;
return buckets
Find callrw requests, which take more than 10 seconds (DANGEROUS!)
-- Measure latency of callrw
old_vshard_router_callrw = vshard.router.callrw
vshard.router.callrw = function(bucket_id, func, args, opts)
    local fiber = require('fiber')
    local json = require('json')
    local log = require('log')

    local start_time = fiber.clock()
    local call_status, call_err =
        old_vshard_router_callrw(bucket_id, func, args, opts)
    local latency = fiber.clock() - start_time

    if call_err ~= nil then
        if latency > 10 then
            log.warn('LATENCY: %.3f, request for %d bucket on %s func failed ' ..
                     'with error %s', latency, bucket_id, func, call_err)
        end
        return call_status, call_err
    end
    if latency > 10 then
        log.warn('LATENCY: %.3f, request for %d bucket on %s func succeeded',
                  latency, bucket_id, func)
    end
    return call_status
end

-- Restore the function back.
vshard.router.callrw =  old_vshard_router_callrw

Any

Find the fiber, name of which starts with `a`
a = 'vshard.rebalancer_worker_';
for id, f in pairs(require('fiber').info()) do
    if string.sub(f.name, 1, #a) == a then
        return f
    end
end
Clone this wiki locally