Skip to content

redis_client_pipeline 节点主从切换问题 #367

@41405465

Description

@41405465

目前使用3.6.0版本,采用两种方式进行redis写入,出现redis集群节点故障,其中一个节点发生主从切换之后:
1、redis_client_cluster方式,正常
2、redis_client_pipeline方式,异常,redis_client_pipeline在节点挂了或者节点主从切换之后,是否能够识别到节点的变更并重新设置,帮忙分析下,谢谢

使用方式:
g_pRedisPipeline = new RedisPipeline();
nRet = g_pRedisPipeline->Init((UtilIniFileAgent*)pIniFileAgent);
if (0 != nRet)
{
return ERROR_CODE_COM_FAILED;
}

void StatusRedisWriter::DealSaveRedisDataList(bool pipeline)
{
XLock(m_SaveRedisDataLock);
unsigned int nSize = XQueSize(m_qSaveRedisDataList);
XUnLock(m_SaveRedisDataLock);
if (nSize < m_nPipelineMax)
{
Sleep(50);
}

std::vector<RedisPipelineCmd*> vPipelineCmd;
for (int i = 0; i < m_nPipelineMax; ++i)
{
    XLock(m_SaveRedisDataLock);
    unsigned int nSize = XQueSize(m_qSaveRedisDataList);
    if (nSize == 0)
    {
        XUnLock(m_SaveRedisDataLock);
        break;
    }

    DCMDataSaveInfo stDataInfo;
    XQueFront(m_qSaveRedisDataList, stDataInfo);
    XQuePop(m_qSaveRedisDataList);
    XUnLock(m_SaveRedisDataLock);

    string strKey = "OBD:";
    strKey.append(stDataInfo.m_strDCMNo);

    std::vector<std::string> vCmdline;
    vCmdline.push_back("HMSET");
    vCmdline.push_back(strKey);
    vCmdline.push_back("UpLoadData");
    vCmdline.push_back(stDataInfo.m_pData);
    FreeSaveData(stDataInfo);

    RedisPipelineCmd* pCmd = g_pRedisPipeline->PushCommand(vCmdline);
    vPipelineCmd.push_back(pCmd);
}

if (vPipelineCmd.empty())
{
    LY_LOG_ERROR(g_LogHandle, "bug! m_qSaveRedisDataList size 0");
    return;
}

for (auto pCmd : vPipelineCmd)
{
    std::string strResult;
    auto acl_result = g_pRedisPipeline->WaitResult(pCmd, strResult);
    if (strResult != "OK\r\n")
    {
        LY_LOG_ERROR(g_LogHandle, "fail to write redis for key %s", pCmd->GetKey().c_str());
    }
    delete pCmd;
}

}

封装的RedisPipeline.cpp代码如下:

#include "RedisPipeline.h"

#include "Util/UtilIniStruct.hpp"
#include "Util/UtilString.h"

#include "acl/acl_cpp/lib_acl.hpp"
// #include "acl/acl_cpp/redis/redis.hpp"
// #include "acl/acl_cpp/redis/redis_client_pipeline.hpp"

bool ConfigRedis::LoadIni(UtilIniFileAgent* pIniFileAgent, const char* pNodeName)
{
LOAD_INI_KEY(AddrAndPort);
LOAD_INI_KEY(Password);
LOAD_INI_KEY(PoolNum);
LOAD_INI_KEY(Cluster);
LOAD_INI_KEY(Pipeline);

return CheckIni();

}

bool onfigRedis::CheckIni()
{
return !AddrAndPort.empty() && !Password.empty();
}

RedisPipeline::RedisPipeline()
{
}

RedisPipeline::~RedisPipeline()
{
if (m_pRedisPipeline)
{
m_pRedisPipeline->stop_thread();
delete m_pRedisPipeline;
m_pRedisPipeline = nullptr;
}
}

int RedisPipeline::Init(UtilIniFileAgent* pIniFileAgent)
{
if (!m_stConfig.LoadIni(pIniFileAgent, "Redis"))
{
LOG_ERROR("fial to load redis config");
return -1;
}
return Init();
}

int RedisPipeline::Init(const LYConfigRedis& stConfig)
{
m_stConfig = stConfig;
return Init();
}

int RedisPipeline::Init()
{
if (m_bIsInited)
{
return 0;
}

// 对于 pipeline ,只用到前两个配置项 addr 与 password
// addr 是单个 ip:port
m_pRedisPipeline = new acl::redis_client_pipeline(m_stConfig.AddrAndPort.c_str());
m_pRedisPipeline->set_password(m_stConfig.Password.c_str());
m_pRedisPipeline->start_thread();

m_bIsInited = true;
return 0;

}

std::unique_ptracl::redis RedisPipeline::get_redis_cmd()
{
std::unique_ptracl::redis cmd(new acl::redis);

cmd->set_pipeline(m_pRedisPipeline);
return cmd;

}

RedisPipelineCmd* RedisPipeline::PushCommand(const std::string& strCmdline)
{
std::vectorstd::string vCmdline;
SplitBySpace(strCmdline, vCmdline);
return PushCommand(vCmdline);
}

RedisPipelineCmd* RedisPipeline::PushCommand(std::vectorstd::string& vCmdline)
{
RedisPipelineCmd* pCmd = new RedisPipelineCmd(this);
pCmd->BuildRequest(vCmdline);
return pCmd;
}

const acl::redis_result* RedisPipeline::WaitResult(RedisPipelineCmd* pCmd, std::string& strResult)
{
const acl::redis_result* pResult = pCmd->WaitResult();
if (pResult)
{
acl::string str;
pResult->to_string(str);
strResult.assign(str.c_str(), str.size());
}
return pResult;
}

/* ************************************************************ */

RedisPipelineCmd::~RedisPipelineCmd()
{
if (m_pRedisCommand != nullptr)
{
delete m_pRedisCommand;
m_pRedisCommand = nullptr;
}
}

void RedisPipelineCmd::BuildRequest(const std::string& strCmdline)
{
std::vectorstd::string vCmdline;
SplitBySpace(strCmdline, vCmdline);
return BuildRequest(vCmdline);
}

void RedisPipelineCmd::BuildRequest(std::vectorstd::string& vCmdline)
{
ASSERT_RET(vCmdline.size() > 1);

if (m_pRedisCommand == nullptr)
{
    m_pRedisCommand = new acl::redis;
}

acl::redis_client_pipeline* pipeline = m_pRedisPipeline->get_acl_pipeline();
ASSERT_RET(pipeline);

m_pRedisCommand->set_pipeline(pipeline);
// 预设 slot 避免 redirect 开销
m_pRedisCommand->hash_slot(vCmdline[1].c_str());

m_vCmdline.clear();
m_vCmdline.swap(vCmdline);

size_t m_argc = m_vCmdline.size();
m_argv.resize(m_argc);
m_lens.resize(m_argc);
for (int i = 0; i < m_argc; ++i)
{
    m_argv[i] = m_vCmdline[i].c_str();
    m_lens[i] = m_vCmdline[i].size();
}

// pipeline 模式下,传入的 argv 指针须较长的生命周期
m_pRedisCommand->build_request(m_argc, &m_argv[0], &m_lens[0]);
acl::redis_pipeline_message& msg = m_pRedisCommand->get_pipeline_message();
pipeline->push(&msg);

}

const acl::redis_result* RedisPipelineCmd::WaitResult()
{
acl::redis_pipeline_message& msg = m_pRedisCommand->get_pipeline_message();
return msg.wait();
}

std::string RedisPipelineCmd::GetKey() const
{
if (m_vCmdline.size() > 2)
{
return m_vCmdline[1];
}
return "";
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions