2024年重庆甘肃安徽等省职业院校技能大赛_大数据应用开发样题解析-模块C:实时数据处理-任务二:实时指标计算

环境说明

Flink 任务在Yarn 上用per job 模式(即Job 分离模式,不采用Session 模式),方便Yarn 回收资源;

建议使用gson 解析json 数据。

任务描述

编写Java 工程代码,使用Flink 消费Kafka 中dwd 层的Topic 数据,表结构与离线数据表结构相同,时间语义使用Processing Time。

任务分析

注意任务描述中的两个关键词:一是”编写Java工程代码”,二是”使用Processing Time”(即使用处理时间)。

本任务共有两个子任务组成:

  • 实时指标计算子任务1

  • 实时指标计算子任务2

子任务1

子任务1描述

1、使用Flink 消费kafka 中log_product_browse 主题的数据,统计商品的UV(浏览用户量)和PV(商品浏览量),将结果写入HBase 中的表ads:online_uv_pv 中。使用Hive cli(没写错)查询ads.pv_uv_result 表按照product_id 和pv 进行降序排序,查询出10 条数据,将结果截图粘贴至客户端桌面【Release\模块C 提交结果.docx】中对应的任务序号下;

其中,表空间为:ads,rowkey 为:计算时的年月日时分秒+商品id,列族为:info,列名为:商品id,商品名称,uv,pv,modified_time

例:Rowkey:2022-10-24 16:47:38-13645

子任务1分析

根据题意,编写Flink程序,读取log_product_browse主题的数据,统计uv和pv值,写入HBase的ads:online_uv_pv表中;

注意,HBase中使用了命名空间ads,hbase表都位于该ads表空间中。

在该任务描述中,涉及到几个行业术语,解释如下:

  • (1) 商品的UV:UV是指独立访问用户数,即Unique Visitor。同一个用户多次浏览商品,UV数仍计为1。因此这里的UV指的是浏览商品的唯一用户量。00:00-24:00内相同的用户只被计算一次。

  • (2) 商品的PV:PV是指页面访问量,即Page View,用户每次对商品的浏览均被记录,用户对同一商品的多次访问要进行累计。因此这里的PV指的是用户对商品的浏览量的累计。

  • (3) 用Hive cli查询HBase,说明要为HBase映射Hive表(”没写错”这句话,有故弄玄虚之嫌。直接说明映射Hive表不好吗?在公司你这样写需求试试,喷不S你!)

注意,这个任务需求说明仍有几点容易产生歧义的地方:

  • (1) 关于行键的说明,”rowkey 为:计算时的年月日时分秒+商品id”,接着给出的示例为”例:Rowkey:2022-10-24 16:47:38-13645″,怎么理解其中的”+”就是”-“?

  • (2) 这是一个聚合统计,但没有给出统计周期,那也就是直接事件驱动了-来一个数据就统计一次,这样做好吗?

  • (3) 需求描述中是”统计商品的UV(浏览用户量)和PV(商品浏览量)”,但是存储到HBase表中的列又包含“商品id,商品名称,uv,pv,modified_time”,这岂不是要统计每个商品的uv和pv?你描写任务需求时说清楚点会s吗?

  • (4) 上面的问题(3)又带来了新的问题:如果写入到HBase表中包含商品名称,那么因为Kafka的log_product_browse主题中的数据只包含有商品id,并不包含商品名称,因此我们还需要引入一个商品信息维表。对于每条流式数据,可以关联该维表数据源,以便为实时计算提供数据关联查询(根据商品id获取商品名称)。

另外,当我们使用uv、pv这两个统计指标时,通常隐含(预设)一个场景的问题:按天统计 pv、uv。即统计每天从0点到当前时间的pv和uv,第二天0点重新开始计数第二天的。大家可以回忆一下(如果你曾经做过自媒体或电商行业,在后台查看平台统计你店铺的uv和pv),是不是这样?

亦或,因为比赛时间不会跨天(即在同一天内完成比赛),并且任务需求中说的是“时间语义使用Processing Time”使用处理时间,因此赛方隐含(预设)的场景是当天的uv、pv统计?所以不必考虑跨天的问题,也就不必考虑累加uv、pv清零问题和状态问题??

所以,作为赛题,不应该这样不专业和这样的模糊不清,而应该明确业务场景问题。否则,评分规则该怎么定?或者我们怎么知道评分规则中是如何判断得分点呢?

面对这种需求描述模糊的问题,真的很让人抓狂。我们暂且认为出题方只考虑了比赛那个时间段的uv、pv计算(从“时间语义使用Processing Time”这句中推测),而不考虑真实的生产环境,那么也就不必使用窗口函数和触发器了,仅简单地基于事件驱动实时统计累加量即可。

数据源和数据格式分析

本子任务涉及到读取Kafka中log_product_browse 主题的数据。该数据来自于实时数据生成器,生成的初始数据格式大致如下:

   product_browse:(2730|18456|1|2023042739287771|'20230419230057');
  product_browse:(5438|11696|0|0|'20230419193256');
  product_browse:(6045|16691|1|2023042737193353|'20230419103857");
  product_browse:(9378|11940|1|2023042721662101|'20230419225958');
  product_browse:(6544|8248|1|2023042744627979|'20230419233957');
  product_browse:(13916|11696|0|0|'20230419174756');
  product_browse:(6085|18456|1|2023042739287771|'20230419200157');
  product_browse:(2726|8248|0|0|'20230420014757');
  product_browse:(7692|8248|1|2023042744627979|'20230420030757");
  product_browse:(6684|11696|1|2023042785336981|'20230419231056');
  product_browse:(2923|11696|0|0|'20230420002956');
  product_browse:(4753|18456|1|2023042739287771|'20230419221957');
  product_browse:(3235|8248|1|2023042744627979|'20230420014757');
  product_browse:(8008|11696|1|2023042785336981|'20230420010456');
  product_browse:(14426|16691|1|2023042737193353|'20230419235457');

观察以上初始数据,可以得知以下几点信息:

  • (1)不包含log_id字段。

  • (2)modified_time字段变成了字符串’yyyyMMddHHmmss’。

  • (3)各字段的含义如下:product_id|customer_id|gen_order|order_sn|modified_time。

然后经过上一个子任务的实时数据处理,写入Kafka的log_product_browse 主题。根据任务描述可知,Kafka的log_product_browse 主题的存储格式为json,每条json字符串的格式大致如下:

  {
    "log_id":"随机数(0-9)+MMddHHmmssSSS",
    "product_id":"2730",
    "customer_id":18456,
    "gen_order":1,
    "order_sn":"2023042739287771",
    "modified_time":"20230419230057"
}

不过请注意,不同省在比赛时这个格式也有可能会有调整,但方法是相同的。知晓原理,掌握方法,万变不离其踪,大家学会随机应变即可。

子任务1实现

要完成这样的表写入,使用Flink Table API更加简单和优雅 。因此本任务我们使用Flink Table API/Flink SQL来实现。

1)首先,在HBase中,创建命名空间ads(HBase有两个预定义命名空间: hbase和default)。打开hbase shell,执行语句如下:

hbase> create_namespace 'ads'

2)然后,在HBase中,创建要写入的数据表online_uv_pv。在hbase shell中,执行语句如下:

hbase> create 'ads:online_uv_pv','info'
  1. 接下来,创建一个Flink流处理程序Task04Job.java,编辑代码如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Task04Job {
  public static void main(String[] args) {
      // 获得流执行环境
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // 表执行环境
      final StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

      // 创建关联维表(来自mysql中的product_info表)
      String source_jdbc_table_create = "CREATE TABLE product_info_source (" +
              " product_id BIGINT," +
              " product_core STRING," +
              " product_name STRING," +
              " bar_code STRING," +
              " brand_id BIGINT," +
              " one_category_id INTEGER," +
              " two_category_id INTEGER," +
              " three_category_id INTEGER," +
              " supplier_id BIGINT," +
              " price DOUBLE," +
              " average_cost DOUBLE," +
              " publish_status INTEGER," +
              " audit_status INTEGER," +
              " weight DOUBLE," +
              " length DOUBLE," +
              " height DOUBLE," +
              " width DOUBLE," +
              " color_type INTEGER," +
              " production_date STRING," +
              " shelf_life BIGINT," +
              " descript STRING," +
              " indate STRING," +
              " modified_time STRING," +
              " PRIMARY KEY (product_id) NOT ENFORCED" +
              ") WITH (" +
              "   'connector'='jdbc'," +
              "   'username'='root'," +
              "   'password'='123456'," +
              "   'url'='jdbc:mysql://192.168.190.139:3306/ds_db01?useSSL=false&serverTimezone=Asia/Shanghai&useUnicode=true'," +
              "   'table-name'='product_info'" +
              ")";
      tableEnvironment.executeSql("DROP TABLE IF EXISTS product_info_source");
      tableEnvironment.executeSql(source_jdbc_table_create);

      // 只取product_id 和 product_name 两个字段
      Table product_dim_table = tableEnvironment.sqlQuery("select product_id, product_name from product_info_source");
      // 注册到一个临时表中
      tableEnvironment.createTemporaryView("product_dim_table", product_dim_table);

      // 创建源表
      String source_kafka_table_create =
              "CREATE TEMPORARY TABLE product_browse_source ( " +
                      "   log_id BIGINT," +
                      "   product_id STRING," +
                      "   customer_id BIGINT," +
                      "   gen_order BIGINT" +
                      "   order_sn STRING," +
                      "   modified_time STRING" +
                      ") WITH ('connector'='kafka'," +
                      "       'topic'='log_product_browse'," +
                      "       'properties.bootstrap.servers'='192.168.190.139:9092'," +
                      "       'properties.group.id'='group-test'," +
                      "       'format'='json'," +
                      "       'scan.startup.mode'='latest-offset'" +
                      ")";
      tableEnvironment.executeSql("DROP TABLE IF EXISTS product_browse_source");
      tableEnvironment.executeSql(source_kafka_table_create);

      // 创建Sink表
      String sink_hbase_table_create =
              "CREATE TEMPORARY TABLE product_browse_sink (" +
                      "   row_key STRING," +
                      "   info ROW<log_id BIGINT," +
                      "   order_sn STRING," +
                      "   product_id BIGINT," +
                      "   customer_id STRING," +
                      "   gen_order BIGINT" +
                      "   modified_time STRING>," +
                      "   PRIMARY KEY (row_key) NOT ENFORCED" +
                      ") WITH ('connector' = 'hbase-2.2'," +
                      "       'table-name' = 'ads:online_uv_pv'," +
                      "       'zookeeper.quorum' = '192.168.190.139:2181'" +
                      " )";
      tableEnvironment.executeSql("DROP TABLE IF EXISTS product_browse_sink");
      tableEnvironment.executeSql(sink_hbase_table_create);

      // ETL管道
      // rowkey 为:计算时的年月日时分秒+商品id,列族为:info,列名为:商品id,商品名称,uv,pv,modified_time
      // 例:Rowkey:2022-10-24 16:47:38-13645
      // 加载源表,关联维表,获取每个商品id的商品名称,并计算UV(distinct+count)和PV(count)
      String querySql =
              "select concat(date_format(now(),'yyyy-MM-dd HH:mm:ss'), '-', product_id) as row_key," +
                      "     row(product_id, product_name, count(distinct(customer_id)), count(product_id)) as info " +
                      "from (select s.product_id, d.product_name, s.modified_time" +
                      "     from product_browse_source s join product_dim_table d" +
                      "     on s.product_id=d.product_id" +
                      "     ) tmp_tb " +
                      "group by product_id, product_name";
      tableEnvironment.sqlQuery(querySql).executeInsert("product_pv_uv_hbase"); // 将表写入HBase表

      // TableEnvironment 使用 executeSql() 方法已经执行了sql语句,不需要再使用execute()方法。
  }
}
  1. 使用mvn clean package命令(或任何其他自己熟悉的方式)将项目打包。

执行步骤

请首先确保已经启动了大数据运行环境(执行startall.sh脚本),然后按以下步骤执行。

将Flink流处理jar包部署到YARN集群上运行:

# cd /opt/module/flink-1.14.0

# ./bin/flink run -t yarn-per-job -c com.xueai8.ss2024.Task04Job xxx.jar

请将上述命令中的xxx.jar替换为你部署的jar包名称。

使用Hive cli查询ads.pv_uv_result 表

首先,创建Hive外表,关联到HBase的ads:online_uv_pv表。在hive cli中,执行如下SQL语句:

hive> CREATE DATABASE ads;

hive> CREATE EXTERNAL TABLE ads.pv_uv_result(
    rowkey STRING,
    product_id STRING,
    product_name STRING,
    uv BIGINT,
    pv BIGINT,
    modified_time STRING
) ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties('hbase.columns.mapping'=':rowkey,info:product_id,info:product_name,info:uv,info:pv,info:modified_time')
tblproperties('hbase.table.name'='ads:online_uv_pv','hbase.table.default.storage.type'='binary');

在Hive中执行Hive QL查询语句,查询写入HBase中的数据(按照product_id 和pv 进行降序排序,查询出10 条数据):

hive> select * from ads.pv_uv_result order by product_id desc, pv desc  limit 10;

请注意,如果执行中遇到疑问或错误,请参考这个教程的子任务1部分

附:pom.xml文件参考

    <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <flink.version>1.14.0</flink.version>
      <target.java.version>1.8</target.java.version>
      <scala.binary.version>2.12</scala.binary.version>
      <scala.version>2.12.11</scala.version>
      <maven.compiler.source>8</maven.compiler.source>
      <maven.compiler.target>8</maven.compiler.target>
      <log4j.version>2.12.1</log4j.version>
  </properties>

  <dependencies>
      <!-- Apache Flink dependencies -->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-scala_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
      </dependency>
      <!-- 如果使用的是scala语言... -->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-common</artifactId>
          <version>${flink.version}</version>
      </dependency>

      <!-- Scala Library, provided by Flink as well. -->
      <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>${scala.version}</version>
          <scope>provided</scope>
      </dependency>

      <!-- Add connector dependencies here. They must be in the default scope (compile). -->
      <!-- Example:-->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-base</artifactId>
          <version>${flink.version}</version>
      </dependency>

      <!--HBase -->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-hbase-base_2.12</artifactId>
          <version>1.14.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-hbase-2.2_2.12</artifactId>
          <version>1.14.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-client</artifactId>
          <version>2.2.3</version>
      </dependency>

      <!--JDBC连接器-->
      <!-->flink-connector-jdbc flink版本需在1.11.0之后<!-->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
      </dependency>
      <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.48</version>
      </dependency>

      <!-- Add logging framework, to produce console output when running in the IDE. -->
      <!-- These dependencies are excluded from the application JAR by default. -->
      <dependency>
          <groupId>org.apache.logging.log4j</groupId>
          <artifactId>log4j-slf4j-impl</artifactId>
          <version>${log4j.version}</version>
          <scope>runtime</scope>
      </dependency>
      <dependency>
          <groupId>org.apache.logging.log4j</groupId>
          <artifactId>log4j-api</artifactId>
          <version>${log4j.version}</version>
          <scope>runtime</scope>
      </dependency>
      <dependency>
          <groupId>org.apache.logging.log4j</groupId>
          <artifactId>log4j-core</artifactId>
          <version>${log4j.version}</version>
          <scope>runtime</scope>
      </dependency>
  </dependencies>

子任务2

子任务2描述

2、使用Flink 消费kafka 中fact_order_detail 主题的数据,统计商城每分钟的GMV(结果四舍五入保留两位小数),将结果存入redis 中(value 为字符串格式,仅存GMV),key 为store_gmv,使用redis cli 以get key 方式获取store_gmv 值,将每次截图粘贴至客户端桌面【Release\模块C 提交结果.docx】中对应的任务序号下(每分钟查询一次,至少查询3 次)。

(GMV:所有订单金额,购买商品单价*购买商品数量,包括已下单未付款)

子任务2分析

术语GMV:

GMV(全称Gross Merchandise Volume),即商品交易总额 ,是成交总额(一定时间段内)的意思。GMV在电商中是一个非常重要的指标,但不同的电商对于GMV的算法完全不一样;算法不同,计算出的数据也千差万别。

GMV和实际交易额的区别:

  • GMV:那些只要下单付款的订单金额,就会被计入到GMV当中,管你取消不取消订单、拒不拒收货物、退不退货呢;

  • 实际交易额:只有买家收到货并确认收货,事后也不退款的订单金额,才能被计入到实际交易额中。

任务分析:

  • (1) 统计“每分钟”的GMV,意味着使用1分钟大小的滚动窗口(处理时间)。

  • (2) 实际上,本任务直接使用订单表order_master进行统计即可。不太理解为什么要用order_detail表。

根据任务书中的描述,kafka 中fact_order_detail 主题的数据以json格式存储,大致如下:

{
        "order_detail_id":1,
        "order_sn":"2022111496083548",
        "product_id":5380,
        "product_name":"无线5.0蓝牙耳机双耳入耳式迷你跑步运动",
        "product_cnt":3,
        "product_price":702.53,
        "average_cost":0.00,
        "weight":4.89577,
        "fee_money":25.80,
        "w_id":1291,
        "create_time":"20221109153354",
        "modified_time":"2022-11-10 04:55:54"
    }

完成本任务,只涉及其中的product_cnt和product_price字段。

子任务2实现

另创建一个Flink流处理程序Task05Job.java,编辑代码如下:

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class Task05Job {

  // 输入订单事件类型(只取订单状态和支付金额)
  public static class OrderDetailEvent{
      int productCnt;         // 购买商品数量
      double productPrice;   // 购买商品单价

      public OrderDetailEvent() {}

      public OrderDetailEvent(int productCnt,double productPrice) {
          this.productCnt = productCnt;
          this.productPrice = productPrice;
      }

      @Override
      public String toString() {
          return "OrderDetailEvent{" +
                  "productCnt=" + productCnt +
                  ", productPrice=" + productPrice +
                  '}';
      }
  }

  public static void main(String[] args) throws Exception {
      // 获得流执行环境
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      // kafka source
      KafkaSource<String> source = KafkaSource.<String>builder()
              .setBootstrapServers("192.168.190.139:9092") // 注意,如果是kafka集群,请使用broker列表
              .setTopics("fact_order_detail")   // kafka topic
              .setGroupId("group-test")
              .setStartingOffsets(OffsetsInitializer.earliest())
              .setValueOnlyDeserializer(new SimpleStringSchema())
              .build();

      // 定义redis sink的配置 (默认端口号6379)
      FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
              .setMaxTotal(1000)
              .setMaxIdle(32)
              .setTimeout(10*1000)
              .setHost("192.168.190.139")
              .setPort(6379)
              .build();

      // 流处理管道
      DataStream<Double> orderStream = env
              // 指定Kafka数据源
              .fromSource(source, WatermarkStrategy.<String>noWatermarks(), "Kafka Source")
              .map(new MapFunction<String, OrderDetailEvent>() {
                  @Override
                  public OrderDetailEvent map(String line) throws Exception {
                      // 转换为OrderDetailEvent对象
                      // 注意,比赛时根据接收到的json数据自行进行调整
                      JsonObject jsonObj = JsonParser.parseString(line).getAsJsonObject();
                      int productCnt = jsonObj.get("product_cnt").getAsInt();     // 商品数量
                      double productPrice = jsonObj.get("product_price").getAsDouble();   // 商品金额
                      // System.out.println("product_cnt:" + productCnt + ", product_price:" + productPrice);
                      return new OrderDetailEvent(productCnt,productPrice);
                  }
              })
              // 转换为keyed stream
              .keyBy(new KeySelector<OrderDetailEvent, Integer>() {
                  @Override
                  public Integer getKey(OrderDetailEvent value) throws Exception {
                      return 0;
                  }
              })
              // 指定窗口
              .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
              // 执行窗口聚合函数
              .aggregate(new AggGmvTemp());

      orderStream.print();

      // data sink
      orderStream.addSink(new RedisSink<Double>(conf, new RedisSinkMapper()));

      // execute program
      env.execute("Flink Streaming");
  }

  // 窗口增量聚合函数: IN, ACC, OUT
  static class AggGmvTemp implements AggregateFunction<OrderDetailEvent, Double, Double> {

      // 创建初始ACC
      @Override
      public Double createAccumulator() {
          return 0.00;
      }

      // 累加每个订单项的支付金额
      @Override
      public Double add(OrderDetailEvent value, Double accumulator) {
          return accumulator + value.productCnt * value.productPrice;
      }

      // 分区合并
      @Override
      public Double getResult(Double accumulator) {
          return accumulator;
      }

      // 返回已下单订单的总支付金额
      @Override
      public Double merge(Double acc1, Double acc2) {
          return acc1 + acc2;
      }
  }

  // redisMap接口,设置key和value
  // Redis Sink 核心类是 RedisMappe 接口,使用时我们要编写自己的redis操作类实现这个接口中的三个方法
  static class RedisSinkMapper implements RedisMapper<Double> {
      // getCommandDescription:设置数据使用的数据结构 HashSet 并设置key的名称
      @Override
      public RedisCommandDescription getCommandDescription() {
          // RedisCommand.SET 指定存储类型
          return new RedisCommandDescription(RedisCommand.SET);
      }

      /**
        * 获取 value值 value的数据是键值对
        *
        * @param event
        * @return
        */
      //指定key
      // 查看所有key:keys *             查看指定key:get store_gmv
      @Override
      public String getKeyFromData(Double event) {
          return "store_gmv";
      }

      // 指定value
      @Override
      public String getValueFromData(Double event) {
          return String.valueOf(event);
      }
  }
}
© 版权声明
THE END
喜欢就支持一下吧
点赞272赞赏 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

夸夸
夸夸
还有吗!没看够!
取消
昵称表情代码图片

    暂无评论内容