Real time is the future? Stream computing in the mind of a small and micro enterprise

Posted Jun 28, 202015 min read

Abstract:This article is shared by Mr. Tang Duo, Mo Zhi technical team, mainly about the whole process of introducing stream computing within its technical team, including the initial decision-making, trade-offs and final landing, and their thinking, perception and experience sharing along the way .

  1. First Flink
  2. Why do you have to go to Flink
  3. A small example
  4. Summary

Tips:"Real time is the future" may be just a slogan in the eyes of many people, but in Mozhi, this is the story they created by themselves.

Hello everyone, we are Zhejiang Mozhi Information Technology Co., Ltd., an entrepreneurial team that has just been in business for three years. Its main business is e-commerce operation and is currently a four-star Taobao service provider.

Our core team has served well-known domestic brands such as women's clothing, home appliances, mothers and infants, men's clothing, children's clothing, jewelry, cosmetics, etc. We have rich experience in brand operation management, and the brands we have served are in the forefront of the industry.

The main business focuses on the pan-fashion field(clothing, infants, beauty, life, home, jewelry) Internet platform brand operation and brand-wide network promotion, involving brand positioning and promotion, e-commerce operation, product planning and operation, visual design, Comprehensive end-to-end services such as marketing, customer service, warehousing and logistics.

This article will share the story of Mo Zhi's relationship with flow computing.

The first contact with Flink streaming computing was at the Yunqi Conference in September 18th. Mr. Dasha shared Flink with the developers on the scene and online. The venue was full and there were also audiences on the third and fifth floors outside the venue. Although the teacher's explanation time is not long, and the listener is only half-knowledge, but there is a very strong feeling, "real-time, that is the future."

After returning from Yunqi Town, I discussed it with my team. Everyone decided to go to Flink, but the difficulty of going forward was something we did not anticipate. At that time, there were few study materials. A "Flink Basic Course" was looked back and forth. The threshold for hands-on practice was high, and the progress was very unsatisfactory.

640 1.png
Figure 1 Yunqi Conference Flow Computing Branch

In March 19, I was fortunate to participate in the Flink user exchange meeting held in Hangzhou. When I registered, I just listened with a learning attitude, but I was shocked when I arrived at the scene. The participants were not only Flink s in-depth users, but even more All of them come from large companies with a valuation of more than 10 billion yuan. Both the content of the discussion and the origin make us feel inferior.

The next day after coming back, the five people who went together all went to the company to work overtime. Even if it was not clear, the impact of this meeting on everyone was huge, and it also prompted us to make a decision, even if it was more difficult. Flink should also be used.

After this month, our Flink Job written in Java came online. Even if the implemented function is very simple, this is a solid step for us.

640 2.png
Figure 2 A photo widely circulated in the community

At the beginning of 2020, the epidemic raged, team members changed, and objective conditions forced us to give up everything we had written in Java and switch to Python. This decision is extremely difficult, and we are well aware that everything will return to its original point.

But our relationship with Flink is not over yet. It happened that we saw that the community launched the PyFlink support plan, so email consultation was also fortunate to be favored. In the following month, we transferred the original Flink Job to PyFlink with the help of teachers such as Jinzhu, Fudian, and Duanchen. At the same time, we took the needs to learn the characteristics of PyFlink. This is the opportunity to share the learning results with you.

Speaking of which, some peers must ask, Why does a small and micro enterprise still need high-end computing, can it be used?

We are faced with a few serious facts:

  1. Expansion of the number of personnel brings multiple expenses. It took 3 years for the company to expand the team size to 150 people. This is a very difficult thing in the small city of Jiaxing, and the main business is e-commerce operation. This kind of work is more like our software industry. Project outsourcing. When it comes to outsourcing, peers will surely think of manpower allocation. In short, only projects can support people. If there is no project, the idle labor cost is a loss-making transaction.
  2. Efforts to improve human efficiency are difficult, and even strict KPIs will have bottlenecks. The first thing that my colleagues go to work every day is to publish the sales results of the previous day, but this little daily report will take half an hour. The time limit of the data is "T + 1", which is slightly behind.
  3. When doing through train promotion, due to the negligence of colleagues, some products that no longer need to be paid for promotion or can reduce the bid price continue to burn money according to the original plan. Manual monitoring is difficult to find these problems in time.

As the leader of IT planning, I have always hoped to rely on the team s rich experience and trading ability in e-commerce management. This goal is very clear, that is, to build our own real-time data decision-making platform.

Decision-making, let's take it apart for a moment, decision-making and strategy. The team has its own experience and judgment logic for doing things. We put it on the strategy side. What we lack now is the "decision ability". Decision must consider both accuracy and timeliness. Of course, if It is also excellent to be able to optimize strategies incrementally when making decisions. So we roughly planned the architecture in Figure 3. From bottom to top are our DataSource(data source), Swarm(multi-source data collection platform), DW(data warehouse), NB(e-commerce offline data center), Radical(e-commerce data decision platform). The data is collected, saved, calculated, displayed, and applied layer by layer, and Flink plays an important role in real-time calculation during the data life cycle.

Remember the news of merchants being shorn in the e-commerce scene?

At present, no e-commerce ERP has a functional design for this aspect. If you can write a small plug-in for real-time monitoring of abnormal sales based on Flink flow calculation, you can get the actual paid amount in the order to compare the previous commodity price, and then combine the latest inventory calculation to determine the result and pop up an alarm in due course Can such a tragedy be avoided?

Of course, there is no boundary for brain holes that can be opened for real-time computing application points in e-commerce scenarios. Moreover, if the above system continuously iterates and optimizes, will it replace labor costs? If it does, it must be a new beginning.

The project came to the project. In just three years, our small and micro enterprise did not record much data, nothing more than the operation and order data of the store. The data collection platform helped us monitor the 15 stores in operation every second, each store There are more than 60 data monitoring points. But only relying on Flink's stream computing, we can get the data results we want and make the right decision as soon as possible. The examples I shared with you today are also on this background.

640 3.png
Figure 3 Architecture diagram and technology stack(data flow direction)

03 A small example

According to our own needs and the characteristics of Flink, we built a real-time monitoring system based on Flink flow calculation to monitor abnormal conditions. The following is a small example of real-time monitoring of online commodity prices. This was completed during the time we participated in the PyFlink support program. I hope that everyone can feel the convenience of PyFlink development.

Project Background

There is a beauty distribution project in the company, that is, there are thousands of dealer stores at the same time as the flagship store. Business colleagues hope that the price of the goods in the dealer store can be monitored by technical means to not be lower than the flagship store to avoid affecting the flagship store sales. So we thought of the following ideas:

640 4.png
Figure 4 Problem solving ideas

Practice process

Based on the above ideas, we first collected the following data samples:

            {"shop_name":"Dealer 1",
             "item_name":"Condensation Essence",
             "item_url":"https://*****",
             "item_img":"https://*****",
             "item_price":200.00,
             "discount_info":"['Every 200 full minus 20, not capped']",
             "item_size":""},
            {"shop_name":"Dealer 2",
             "item_name":"Essence Oil 1",
             "item_url":"https://*****",
             "item_img":"https://",
             "item_price":200.00,
             "discount_info":"['Every 200 is full minus 15 and not capped']",
             "item_size":"125ml"}

Then, according to the data sample, you can write a method to register Kafka source.

# register kafka_source
def register_rides_source_from_kafka(st_env):
    st_env \
        .connect(# declare the external system to connect to
        Kafka()
            .version("universal")
            .topic("cbj4")
            # .topic("user")
            .start_from_earliest()
            .property("zookeeper.connect", "localhost:2181")
            .property("bootstrap.servers", "localhost:9092")
   ) \
        .with_format(# declare a format for this system
        Json()
            .fail_on_missing_field(True)
            .schema(DataTypes.ROW([
            DataTypes.FIELD('shop_name', DataTypes.STRING()),
            DataTypes.FIELD('item_name', DataTypes.STRING()),
            DataTypes.FIELD('item_url', DataTypes.STRING()),
            DataTypes.FIELD('item_img', DataTypes.STRING()),
            DataTypes.FIELD('item_price', DataTypes.STRING()),
            DataTypes.FIELD('discount_info', DataTypes.STRING()),
            DataTypes.FIELD('item_size', DataTypes.STRING()),
       ]))) \
        .with_schema(# declare the schema of the table
        Schema()
            .field("shop_name", DataTypes.STRING())
            .field("item_name", DataTypes.STRING())
            .field("item_url", DataTypes.STRING())
            .field("item_img", DataTypes.STRING())
            .field("item_price", DataTypes.STRING())
            .field('discount_info', DataTypes.STRING())
            .field("item_size", DataTypes.STRING())
   ) \
        .in_append_mode() \
        .register_table_source("KafkaSource")

The sample data of the CSV file referenced by the commodity price is:

1, Essence oil 1,125ml,**,************, the whole body essence oil available for sensitive muscles,****,200,180
2, essence oil 1,200ml,**,************, brighten and whiten, lighten scars,****,300,280
3. Massage oil 1,125ml,**,************, effectively increase skin elasticity,****,200,180
4, massage oil 1,200ml,**,************, continue to moisturize and nourish the skin,****,300,280
5, massage oil 2,125ml,**,************, moisturizing and firming, deeply moisturizing the skin,****,300,280
6, shower gel, 500ml,**,************, soothing and calming, moisturize dry skin, prevent dry skin,****,100,80
7, Concentrate Essence, 4x6ml,**,************, dense light lines, multi-effect antioxidant, firming and elasticity,****,200,180
8, essence oil 2,30ml,**,************, improve the fragile and sensitive dry skin. Small molecule essence penetrates the bottom of the muscle, intensively hydrates,****,200,180
9, cleansing gel, 200ml,**,************, acne cleanser is preferred,****,100,80

So we can write a method to register CSV source.

# register csv_source
def register_rides_source_from_csv(st_env):
    # Data source file
    source_file ='/demo_job1/price control table.csv'

    # Create data source table
    st_env.connect(FileSystem().path(source_file)) \
        .with_format(OldCsv()
                     .field_delimiter(',')
                     .field('xh', DataTypes.STRING()) # serial number
                     .field('spmc', DataTypes.STRING()) # Commodity name
                     .field('rl', DataTypes.STRING()) # capacity
                     .field('xg', DataTypes.STRING()) # box gauge
                     .field('txm', DataTypes.STRING()) # barcode
                     .field('gx', DataTypes.STRING()) # efficacy
                     .field('myfs', DataTypes.STRING()) # trade method
                     .field('ztxsjg', DataTypes.STRING()) # The main image shows the price
                     .field('dpzddsj', DataTypes.STRING()) # lowest price per bottle
                    ) \
        .with_schema(Schema()
                     .field('xh', DataTypes.STRING()) # serial number
                     .field('spmc', DataTypes.STRING()) # Commodity name
                     .field('rl', DataTypes.STRING()) # capacity
                     .field('xg', DataTypes.STRING()) # box gauge
                     .field('txm', DataTypes.STRING()) # barcode
                     .field('gx', DataTypes.STRING()) # efficacy
                     .field('myfs', DataTypes.STRING()) # trade method
                     .field('ztxsjg', DataTypes.STRING()) # The main image shows the price
                     .field('dpzddsj', DataTypes.STRING()) # lowest price per bottle
                    ) \
        .register_table_source('CsvSource')

And, the style we want to output to CSV:

Dealer 1, massage oil 1, https://**********, https://**********, 200.00, [], 125ml, 200.0, 3, massage Oil 1,125ml,**,************, the body essence oil available for sensitive muscles,****,200,180
Distributor 2, essence oil 2, https://**********, https://**********, 190.00, [], 30ml, 190.0, 8, essence Oil 2,30ml,**,************, improve fragile and sensitive dry skin. Small molecule essence penetrates the bottom of the muscle, intensively hydrates,****,200,180
Distributor 3, Essential Oil 2, https://**********, https://**********, 200.00, [], 30ml, 200.0, 8, essence Oil 2,30ml,**,************, improve fragile and sensitive dry skin. Small molecule essence penetrates the bottom of the muscle, intensively hydrates,****,200,180
Dealer 1, Essence Oil 2, https://**********, https://**********, 200.00, ['minus 20 for every full 200; no Capping'], 30ml, 180, 8, essence oil 2,30ml, **,**************, improve fragile and sensitive dry skin. Small molecule essence penetrates the bottom of the muscle, intensively hydrates,****,200,180
Dealer 1, massage oil 1, https://**********, https://**********, 200.00, ['minus 20 for every full 200; not on Capping'], 125ml, 180, 3, massage oil 1,125ml,**,************, effectively increase skin elasticity,****,200,180
Dealer 3, massage oil 1, https://**********, https://**********, 200.00, ['minus 20 every full 200; not on Capping'], 125ml, 180.0, 3, massage oil 1,125ml,**,************, effectively increase skin elasticity,****,200,180
Dealer 2, Essence Oil 1, https://**********, https://**********, 200.00, ['minus 20 for every full 200; no Capping'], 125ml, 180.0, 1, essence oil 1,125ml,**,************, whole body essence oil available for sensitive muscles,****,200,180
Dealer 3, Essence Oil 1, https://**********, https://**********, 200.00, ['20 minus every full 200; not on Capping'], 125ml, 180.0, 1, essence oil 1,125ml,**,************, whole body essence oil available for sensitive muscles,****,200,180
Distributor 1, Essence Oil 1, https://**********, https://**********, 300.00, ['minus 20 for every full 200; not on Capping'], 200ml, 280.0, 2, essence oil 1,200ml,**,************, brighten and whiten, lighten scars,****,300,280
Dealer 1, Essence Oil 1, https://**********, https://**********, 190.00, ['minus 20 for every full 200; no Capping'], 125ml, 190.0, 1, essence oil 1,125ml,**,************, whole body essence oil available for sensitive muscles,****,200,180

According to the output style, let's write the method of registering CSV sink.

# register csv sink
def register_sink(st_env):
    result_file = "./result.csv"
    sink_field = {
        "shop_name":DataTypes.STRING(),
        "item_name":DataTypes.STRING(),
        "item_url":DataTypes.STRING(),
        "item_img":DataTypes.STRING(),
        "item_price":DataTypes.STRING(),
        "discount_info":DataTypes.STRING(),
        # "discount_info":DataTypes.ARRAY(DataTypes.STRING()),
        "item_size":DataTypes.STRING(),
        "min_price":DataTypes.FLOAT(),
        "xh":DataTypes.STRING(),
        "spmc":DataTypes.STRING(),
        "rl":DataTypes.STRING(),
        "xg":DataTypes.STRING(),
        "txm":DataTypes.STRING(),
        "gx":DataTypes.STRING(),
        "myfs":DataTypes.STRING(),
        "ztxsjg":DataTypes.STRING(),
        "dpzddsj":DataTypes.STRING(),
    }

    st_env.register_table_sink("result_tab",
                               CsvTableSink(list(sink_field.keys()),
                                            list(sink_field.values()),
                                            result_file))

Both input and output are available. We write according to the calculation and judgment logic required by business colleagues, that is, the price of the goods in the retailer's store is not lower than the flagship store.

According to the logic of writing business, the operators in the Table API cannot meet all the needs, so we need to customize the UDF to process several of the fields, namely "match the actual product according to the product name", "identify according to the product name, product page price, "Commodity capacity", "calculate preferential prices on demand", "format coupon information", etc.

# -*- coding:utf-8 -*-

import re
import logging

from pyflink.table import DataTypes
from pyflink.table.udf import udf, ScalarFunction


# Extend ScalarFunction
class IdentifyGoods(ScalarFunction):
    """ Identify the name of the product, corresponding to the standard product name """

    def eval(self, item_name):
        logging.info("Enter UDF")
        logging.info(item_name)

        # Standard product name
        regexes = re.compile(r'[essential oil cleansing gel shower gel essence 12]')
        items = ["Essence Oil 1", "Massage Oil 1", "Massage Oil 1", "Essence Oil 2", "Cleansing Gel", "Shower Gel", "Condensation Essence"]
        items_set = []
        for index, value in enumerate(items):
            items_set.append(set(list(value))) # Turn a title into a set to facilitate intersection

        # First match characters other than the product name, then remove
        sub_str = re.sub(regexes,'', item_name)
        spbt = re.sub(repr([sub_str]),'', item_name) # repr Force variable unescaped

        # Find the best matching product title, otherwise consider it as an unknown product
        intersection_len = 0
        items_index = None

        for index, value in enumerate(items_set):
            j = value & set(list(spbt)) # intersection
            j_len = len(list(j))
            if j_len> intersection_len:
                intersection_len = j_len
                items_index = index

        item_name ='Unknown item' if items_index is None else items[items_index]

        logging.info(item_name)
        return item_name


identify_goods = udf(IdentifyGoods(), DataTypes.STRING(), DataTypes.STRING())


class IdentifyCapacity(ScalarFunction):
    """ Identify product capacity based on product name and product page price """

    def eval(self, item_name, price):

        # Initialize commodity prices and specifications
        price = 0 if len(price) == 0 else float(price)
        item_size =''

        # Here, the judgment logic is to be modified and reconstructed
        if float(price) <= float(5):
            logging.info('This is a coupon!!!')
        elif item_name == "essential oil 1" and price> 200 and price <= 300:
            item_size = '200ml'
        elif item_name == "essential oil 1" and price <= 200:
            item_size = '125ml'
        elif item_name == "essential oil 1" and price >= 300:
            item_size ='Essential Oil 1 Pack'
        elif item_name == "Massage Oil 1" and price> 200 and price <= 300:
            item_size = '200ml'
        elif item_name == "Massage Oil 1" and price <= 200:
            item_size = '125ml'
        elif item_name == "Massage Oil 1" and price >= 300:
            item_size ='Massage Oil 1 Pack'
        elif item_name == "Massage Oil 2":
            item_size = '125ml'
        elif item_name == "Essence Oil 2":
            item_size = '30ml'
        elif item_name == "Cleansing Gel":
            item_size = '200ml'
        elif item_name == "Shower gel":
            item_size = '500ml'
        elif item_name == "Condensation Essence":
            item_size = '4x6ml'
        return item_size


identify_capacity = udf(IdentifyCapacity(), [DataTypes.STRING(), DataTypes.STRING()], DataTypes.STRING())


# Named Function
@udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.FLOAT())
def get_min_price(price, discount_info):
    """ Calculate preferential prices on demand """
    price = 0 if len(price) == 0 else float(price)

    # Match all coupons
    coupons = []
    for i in eval(discount_info):
        regular_v1 = re.findall(r"full\d+subtracted\d+", i)
        if len(regular_v1) != 0:
            coupons.append(regular_v1[0])

        regular_v2 = re.findall(r"Every full \d+minus\d+", i)
        if len(regular_v2) != 0:
            coupons.append(regular_v2[0])

    # If there is coupon information, then calculate the lowest price
    min_price = price
    mayby_price = []
    if len(coupons) >= 0:
        regexes_v2 = re.compile(r'\d+')
        for i in coupons:
            a = re.findall(regexes_v2, i)
            cut_price = min(float(a[0]), float(a[1]))
            flag_price = max(float(a[0]), float(a[1]))
            if flag_price <= price:
                mayby_price.append(min_price-cut_price)

    if len(mayby_price)> 0:
        min_price = min(mayby_price)

    return min_price


# Callable Function
class FormatDiscountInfo(object):
    """ format coupon information to output .csv file using """

    def __call__(self, discount_str):
        discount_str = str(discount_str).replace(',', ";")
        return discount_str


format_discount_info = udf(f=FormatDiscountInfo(), input_types=DataTypes.STRING(), result_type=DataTypes.STRING())

Finally, write the main method of calculating the job:

# query
def calculate_func(st_env):
    # Coordinates are collected data tables
    left = st_env.from_path("KafkaSource") \
        .select("shop_name, "
                "identify_goods(item_name) as item_name, "# Identify the name of the product corresponding to the basic information table
                "item_url, "
                "item_img,"
                "item_price, "
                "discount_info, "
                "item_size"
               ) \
        .select("shop_name, "
                "item_name, "
                "item_url, "
                "item_img,"
                "item_price, "
                "discount_info, "
                "identify_capacity(item_name, item_price) as item_size, "# Identify product capacity based on product name and product price
                "get_min_price(item_price, discount_info) as min_price "# Calculate the lowest price based on the page price and discount information
               ) \
        .select("shop_name, "
                "item_name, "
                "item_url, "
                "item_img,"
                "item_price, "
                "format_discount_info(discount_info) as discount_info, "# Format discount information for easy storage in .csv file
                "item_size, "
                "min_price "
               )

    # The right table is the basic information table
    right = st_env.from_path("CsvSource") \
        .select("xh, spmc, rl, xg, txm, gx, myfs, ztxsjg, dpzddsj")

    result = left.join(right).where("item_name = spmc && item_size = rl") # Join the two tables by product name and product capacity

    result.insert_into("result_tab") # output join result to csv sink

# main function
def get_price_demo():
    # init env
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1) # set parallelism
    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
st_env = StreamTableEnvironment.create(
env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())

    # register source
    register_rides_source_from_csv(st_env)
    register_rides_source_from_kafka(st_env)

    # register sink
    register_sink(st_env)

    # register function
    st_env.register_function("identify_goods", identify_goods)
    st_env.register_function("identify_capacity", identify_capacity)
    st_env.register_function("get_min_price", get_min_price)
    st_env.register_function("format_discount_info", format_discount_info)

    # query
    calculate_func(st_env)

    # execute
    print("Submit job")
    st_env.execute("item_got_price")

04 Summary

After the project went online, because the data collection end kept providing data, with the help of flow calculation, we have identified 200 suspected price violation product links, while maintaining brand power and price power, and avoiding loss of more than 400,000 flagship store sales , And this is just one of our many monitoring Flink Jobs.

Today, in line with the idea of "making enterprises smaller and smaller, the market is bigger and bigger", using IT technology to replace manual operations is already the fastest way, even if small and micro enterprises like ours are not like big factories. That is technology-led. As long as the output can be used by business colleagues and improves work efficiency, and is valued by the company's top management, our work is meaningful and sustainable.

If you are like us, mainly in the Python language, and the development work is mostly for data analysis and real-time decision-making, and you are eager to enjoy the accuracy, efficiency, and convenience brought by stream computing, then welcome to join the PyFlink ecosystem, let us work together for her Tomorrow will be added. At the same time, Flink 1.11 version is also expected to be released in mid-to-late June, when PyFlink will come with Pandas.

Finally, thanks again to all those who have helped us in the support plan! In short, PyFlink, you deserve it.

640 5.png

If you are also interested in the PyFlink community support plan, you can fill out the questionnaire below and work with us to build the PyFlink ecosystem.

https://survey.aliyun.com/apps/zhiliao/B5JOoruzY