批量用户离线处理容器——绿萝

最近工作中涉及到两个对批量用户进行离线处理的工作:

  1. 对若干用户进行打标记。
  2. 对若干用户进行消息推送。

联想到之前也有很多这种类似的工作,索性把其中共用的部分抽离出来做成了框架——取名叫绿萝。

通过使用该容器,使用者编写处理业务相关的代码,业务无关的部分交给容器来解决。好比 Servlet和Tomcat之间的关系,绿萝作为一个容器来运行业务代码。

1、有什么用

具体来说,绿萝主要为使用者处理以下逻辑:

  1. 从文件中读取用户列表进行后续处理
  2. 把读取的用户列表分发给若干现成进行进行处理,线程数可指定
  3. 可以通过参数对消费的QPS进行限制,防止影响线上系统
  4. 对所有用户的处理结果进行返回码的统计
  5. 把业务参数解析成Map供使用者进行使用

2、怎么用

绿萝使用起来很简单,需要以下几步:

2.1 安装lvluo到本地repo

在lvluo目录下执行

mvn clean install -Dmaven.test.skip

2.2、引入依赖

        <dependency>
            <groupId>com.netease</groupId>
            <artifactId>lvluo-worker</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

2.3、实现接口

写一个类继承AbstractConsumerTask,并添加@Component注解。
(注:该类需要放置在com.netease路径下)

例如,要对1亿用户进行消息推送,那么编写以下类即可:

@Component
public class PushConsumerTask extends AbstractConsumerTask {

    private static final Logger LOGGER = LoggerFactory.getLogger(OneConsumerTask.class);
    private static final Logger SUCCESS_LOGGER = LoggerFactory.getLogger("successLogger");
    private static final Logger FAIL_LOGGER = LoggerFactory.getLogger("failLogger");

    private AtomicLong successCount = new AtomicLong(0);
    private AtomicLong failCount = new AtomicLong(0);

    @Override
    public int doService(String username, Map<String, String> requestMap) {

        //获取请求参数,进行参数校验
        String message = requestMap.get("message");
        if (StringUtils.isEmpty(indexStr)) {
            System.out.println("message 参数为空");
            System.exit(-1)
        }

        //业务逻辑处理
        int retCode = pushMessage(username, message);

        //打印需要的结果到日志
        if (retCode == 0) {
            SUCCESS_LOGGER.info(username);
            LOGGER.info(MessageFormat.format("成功:第{0}个逻辑处理完成, 用户名为{1}", successCount.addAndGet(1), username));
        } else {
            FAIL_LOGGER.info(username);
            LOGGER.info(MessageFormat.format("失败:第{0}个逻辑处理完成, 用户名为{1}", failCount.addAndGet(1), username));
        }

        return retCode;
    }

2.4、打包

pom.xml中加入以下代码:
其中finalName要指定成想要的名字。


    <build>
        <finalName>demo</finalName>
        <sourceDirectory>src/main/java</sourceDirectory>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.handlers</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.netease.Main</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.schemas</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

2.5、运行

java -jar demo.jar 消费任务 文件路径 业务参数 系统参数
  • 消费任务 - 编写的业务类的名称,首字母替换成小写
  • 文件路径 - 用户名文件对应的文件路径
  • 业务参数 - 你的业务中需要用的参数 (如果有特殊字符或者中文需要进行urlencode)
  • 系统参数 - 如threadCount 指定线程数,qps指定限制的频率

如,

java -jar demo.jar pushConsumerTask usernameList.txt message=xx threadCount=10&qps=100

2.6、输出结果

控制台输出:

10:32:05,096  INFO DemoConsumerTask:51 - 失败:第504个逻辑处理完成, 用户名为urstest_czz994
10:32:05,096  INFO DemoConsumerTask:48 - 成功:第492个逻辑处理完成, 用户名为urstest_czz995
10:32:05,096  INFO AbstractConsumerTask:92 - 本次请求返回码1,用户名为urstest_czz994。总共请求995次,当前返回码统计结果:{0:491,1:504}
10:32:05,097  INFO AbstractConsumerTask:92 - 本次请求返回码0,用户名为urstest_czz995。总共请求996次,当前返回码统计结果:{0:492,1:504}

logs/success.log:

urstest_czz3
urstest_czz4
urstest_czz6
urstest_czz9
urstest_czz10
urstest_czz11

logs/fail.log

urstest_czz0
urstest_czz1
urstest_czz2
urstest_czz5
urstest_czz7

3、注意事项

  1. ConsumerTask类放在com.netease路径下
  2. Spring版本用4及以上版本,推荐4.2.9.RELEASE

4、代码

如果对代码有兴趣欢迎围观吐槽~
https://github.com/czzshr/lvluo

工程中包含了两个module:

  • lvluo-demo 是lvluo的使用demo
  • lvluo-worker 是lvluo的具体实现
/** * RECOMMENDED CONFIGURATION VARIABLES: EDIT AND UNCOMMENT THE SECTION BELOW TO INSERT DYNAMIC VALUES FROM YOUR PLATFORM OR CMS. * LEARN WHY DEFINING THESE VARIABLES IS IMPORTANT: https://disqus.com/admin/universalcode/#configuration-variables*/ /* var disqus_config = function () { this.page.url = PAGE_URL; // Replace PAGE_URL with your page's canonical URL variable this.page.identifier = PAGE_IDENTIFIER; // Replace PAGE_IDENTIFIER with your page's unique identifier variable }; */ (function() { // DON'T EDIT BELOW THIS LINE var d = document, s = d.createElement('script'); s.src = 'https://chenzz.disqus.com/embed.js'; s.setAttribute('data-timestamp', +new Date()); (d.head || d.body).appendChild(s); })();