You are currently viewing Implement Apache Flink near-online knowledge enrichment patterns

Implement Apache Flink near-online knowledge enrichment patterns


Stream knowledge processing lets you act on knowledge in actual time. Actual-time knowledge analytics will help you’ve on-time and optimized responses whereas enhancing the general buyer expertise.

Knowledge streaming workloads typically require knowledge within the stream to be enriched by way of exterior sources (equivalent to databases or different knowledge streams). Pre-loading of reference knowledge supplies low latency and excessive throughput. Nonetheless, this sample might not be appropriate for sure varieties of workloads:

  • Reference knowledge updates with excessive frequency
  • The streaming software must make an exterior name to compute the enterprise logic
  • Accuracy of the output is vital and the appliance shouldn’t use stale knowledge
  • Cardinality of reference knowledge may be very excessive, and the reference dataset is simply too large to be held within the state of the streaming software

For instance, for those who’re receiving temperature knowledge from a sensor community and have to get extra metadata of the sensors to investigate how these sensors map to bodily geographic places, you want to enrich it with sensor metadata knowledge.

Apache Flink is a distributed computation framework that permits for stateful real-time knowledge processing. It supplies a single set of APIs for constructing batch and streaming jobs, making it straightforward for builders to work with bounded and unbounded knowledge. Amazon Managed Service for Apache Flink (successor to Amazon Kinesis Knowledge Analytics) is an AWS service that gives a serverless, totally managed infrastructure for working Apache Flink functions. Builders can construct extremely obtainable, fault tolerant, and scalable Apache Flink functions with ease and without having to develop into an skilled in constructing, configuring, and sustaining Apache Flink clusters on AWS.

You should utilize a number of approaches to counterpoint your real-time knowledge in Amazon Managed Service for Apache Flink relying in your use case and Apache Flink abstraction degree. Every methodology has completely different results on the throughput, community site visitors, and CPU (or reminiscence) utilization. For a normal overview of knowledge enrichment patterns, confer with Frequent streaming knowledge enrichment patterns in Amazon Managed Service for Apache Flink.

This publish covers how one can implement knowledge enrichment for near-online streaming occasions with Apache Flink and how one can optimize efficiency. To check the efficiency of the enrichment patterns, we ran efficiency testing primarily based on artificial knowledge. The results of this take a look at is helpful as a normal reference. It’s vital to notice that the precise efficiency in your Flink workload will rely upon varied and various factors, equivalent to API latency, throughput, dimension of the occasion, and cache hit ratio.

We focus on three enrichment patterns, detailed within the following desk.

. Synchronous Enrichment Asynchronous Enrichment Synchronous Cached Enrichment
Enrichment strategy Synchronous, blocking per-record requests to the exterior endpoint Non-blocking parallel requests to the exterior endpoint, utilizing asynchronous I/O Incessantly accessed info is cached within the Flink software state, with a set TTL
Knowledge freshness At all times up-to-date enrichment knowledge At all times up-to-date enrichment knowledge Enrichment knowledge could also be stale, as much as the TTL
Improvement complexity Easy mannequin More durable to debug, because of multi-threading More durable to debug, because of counting on Flink state
Error dealing with Easy Extra advanced, utilizing callbacks Easy
Impression on enrichment API Max: one request per message Max: one request per message Scale back I/O to enrichment API (is determined by cache TTL)
Software latency Delicate to enrichment API latency Much less delicate to enrichment API latency Scale back software latency (is determined by cache hit ratio)
Different issues none none

Customizable TTL.

Solely synchronous implementation as of Flink 1.17

Results of the comparative take a look at (Throughput) ~350 occasions per second ~2,000 occasions per second ~28,000 occasions per second

Resolution overview

For this publish, we use an instance of a temperature sensor community (element 1 within the following structure diagram) that emits sensor info, equivalent to temperature, sensor ID, standing, and the timestamp this occasion was produced. These temperature occasions get ingested into Amazon Kinesis Knowledge Streams (2). Downstream techniques additionally require the model and nation code info of the sensors, as a way to analyze, for instance, the reliability per model and temperature per plant aspect.

Based mostly on the sensor ID, we enrich this sensor info from the Sensor Data API (3), which offer us with info of the model, location, and a picture. The ensuing enriched stream is shipped to a different Kinesis knowledge stream and may then be analyzed in an Amazon Managed Service for Apache Flink Studio pocket book (4).

Solution overview

Stipulations

To get began with implementing near-online knowledge enrichment patterns, you’ll be able to clone or obtain the code from the GitHub repository. This repository implements the Flink streaming software we described. You could find the directions on the right way to arrange Flink in both Amazon Managed Service for Apache Flink or different obtainable Flink deployment choices within the README.md file.

If you wish to learn the way these patterns are applied and the right way to optimize efficiency in your Flink software, you’ll be able to merely comply with together with this publish with out deploying the samples.

Venture overview

The undertaking is structured as follows:

docs/                               -- Incorporates undertaking documentation
src/
├── principal/java/...                   -- Incorporates all of the Flink software code
│   ├── ProcessTemperatureStream    -- Principal class that decides on the enrichment technique
│   ├── enrichment.                 -- Incorporates the completely different enrichment methods (sync, async and cached)
│   ├── occasion.                      -- Occasion POJOs
│   ├── serialize.                  -- Utils for serialization
│   └── utils.                      -- Utils for Parameter parsing
└── take a look at/                           -- Incorporates all of the Flink testing code

The principal methodology within the ProcessTemperatureStream class units up the run setting and both takes the parameters from the command line, if it’s is a neighborhood setting, or makes use of the appliance properties from Amazon Managed Service for Apache Flink. Based mostly on the parameter EnrichmentStrategy, it decides which implementation to select: synchronous enrichment (default), asynchronous enrichment, or cached enrichment primarily based on the Flink idea of KeyedState.

public static void principal(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     ParameterTool parameter = ParameterToolUtils.getParameters(args, env);

    String technique = parameter.get("EnrichmentStrategy", "SYNC");
     change (technique) CACHED)");
     
}

We go over the three approaches within the following sections.

Synchronous knowledge enrichment

Once you need to enrich your knowledge from an exterior supplier, you should utilize synchronous per-record lookup. When your Flink software processes an incoming occasion, it makes an exterior HTTP name and after sending each request, it has to attend till it receives the response.

As Flink processes occasions synchronously, the thread that’s working the enrichment is blocked till it receives the HTTP response. This leads to the processor staying idle for a big interval of processing time. Alternatively, the synchronous mannequin is simpler to design, debug, and hint. It additionally lets you all the time have the newest knowledge.

It may be built-in into your streaming software as such:

DataStream<EnrichedTemperature> enrichedTemperatureDataStream =
        temperatureDataStream
                .map(new SyncEnrichmentFunction(parameter.get("SensorApiUrl", DEFAULT_API_URL)));

The implementation of the enrichment operate seems to be like the next code:

public class SyncEnrichmentFunction extends RichMapFunction<Temperature, EnrichedTemperature> {

    // Setup of HTTP consumer and ObjectMapper

    @Override
    public EnrichedTemperature map(Temperature temperature) throws Exception {
        String url = this.getRequestUrl + temperature.getSensorId();

        // Retrieve response from sensor information API
        Response response = consumer
                .prepareGet(url)
                .execute()
                .toCompletableFuture()
                .get();

        // Parse the sensor information
        SensorInfo sensorInfo = parseSensorInfo(response.getResponseBody());

        // Merge the temperature sensor knowledge and sensor information knowledge
        return getEnrichedTemperature(temperature, sensorInfo);
    }

    // ...
}

To optimize the efficiency for synchronous enrichment, you should utilize the KeepAlive flag as a result of the HTTP consumer shall be reused for a number of occasions.

For functions with I/O-bound operators (equivalent to exterior knowledge enrichment), it will probably additionally make sense to extend the appliance parallelism with out rising the sources devoted to the appliance. You are able to do this by rising the ParallelismPerKPU setting of the Amazon Managed Service for Apache Flink software. This configuration describes the variety of parallel subtasks an software can carry out per Kinesis Processing Unit (KPU), and the next worth of ParallelismPerKPU can result in full utilization of KPU sources. However remember the fact that rising the parallelism doesn’t work in all instances, equivalent to when you find yourself consuming from sources with few shards or partitions.

In our artificial testing with Amazon Managed Service for Apache Flink, we noticed a throughput of roughly 350 occasions per second on a single KPU with 4 parallelism per KPU and the default settings.

Synchronous enrichment performance

Asynchronous knowledge enrichment

Synchronous enrichment doesn’t take full benefit of computing sources. That’s as a result of Fink waits for HTTP responses. However Flink gives asynchronous I/O for exterior knowledge entry. This lets you enrich the stream occasions asynchronously, so it will probably ship a request for different parts within the stream whereas it waits for the response for the primary component and requests may be batched for larger effectivity.

Sync I/O vs Async I/O

Whereas utilizing this sample, you must resolve between unorderedWait (the place it emits the outcome to the following operator as quickly because the response is acquired, disregarding the order of the weather on the stream) and orderedWait (the place it waits till all inflight I/O operations full, then sends the outcomes to the following operator in the identical order as the unique parts had been positioned on the stream). When your use case doesn’t require occasion ordering, unorderedWait supplies higher throughput and fewer idle time. Check with Enrich your knowledge stream asynchronously utilizing Amazon Managed Service for Apache Flink to be taught extra about this sample.

The asynchronous enrichment may be added as follows:

SingleOutputStreamOperator<EnrichedTemperature> asyncEnrichedTemperatureSingleOutputStream =
        AsyncDataStream
                .unorderedWait(
                        temperatureDataStream,
                        new AsyncEnrichmentFunction(parameter.get("SensorApiUrl", DEFAULT_API_URL)),
                        ASYNC_OPERATOR_TIMEOUT,
                        TimeUnit.MILLISECONDS,
                        ASYNC_OPERATOR_CAPACITY);

The enrichment operate works comparable because the synchronous implementation. It first retrieves the sensor information as a Java Future, which represents the results of an asynchronous computation. As quickly because it’s obtainable, it parses the data after which merges each objects into an EnrichedTemperature:

public class AsyncEnrichmentFunction extends RichAsyncFunction<Temperature, EnrichedTemperature> {

    // Setup of HTTP consumer and ObjectMapper

    @Override
    public void asyncInvoke(closing Temperature temperature, closing ResultFuture<EnrichedTemperature> resultFuture) {
        String url = this.getRequestUrl + temperature.getSensorId();

        // Retrieve response from sensor information API
        Future<Response> future = consumer
                .prepareGet(url)
                .execute();
        CompletableFuture
                .supplyAsync(() -> {
                    strive {
                        Response response = future.get();

                        // Parse the sensor information as quickly as it's obtainable
                        return parseSensorInfo(response.getResponseBody());
                    } catch (Exception e) {
                        return null;
                    }
                })
                .thenAccept((SensorInfo sensorInfo) ->

                    // Merge the temperature sensor knowledge and sensor information knowledge
                    resultFuture.full(getEnrichedTemperature(temperature, sensorInfo)));
    }

    // ...
}

In our testing with Amazon Managed Service for Apache Flink, we noticed a throughput of two,000 occasions per second on a single KPU with 2 parallelism per KPU and the default settings.

Async enrichment performance

Synchronous cached knowledge enrichment

Though quite a few operations in a knowledge stream give attention to particular person occasions independently, equivalent to occasion parsing, there are particular operations that retain info throughout a number of occasions. These operations, equivalent to window operators, are known as stateful because of their potential to take care of state.

The keyed state is saved inside an embedded key-value retailer, conceptualized as part of Flink’s structure. This state is partitioned and distributed along with the streams which are consumed by the stateful operators. Because of this, entry to the key-value state is proscribed to keyed streams, that means it will probably solely be accessed after a keyed or partitioned knowledge change, and is restricted to the values related to the present occasion’s key. For extra details about the ideas, confer with Stateful Stream Processing.

You should utilize the keyed state for ceaselessly accessed info that doesn’t change typically, such because the sensor info. This won’t solely mean you can scale back the load on downstream sources, but in addition improve the effectivity of your knowledge enrichment as a result of no round-trip to an exterior useful resource for already fetched keys is critical and there’s additionally no have to recompute the data. However remember the fact that Amazon Managed Service for Apache Flink shops transient knowledge in a RocksDB backend, which provides a latency to retrieving the data. However as a result of RocksDB is native to the node processing the info, that is quicker than reaching out to exterior sources, as you’ll be able to see within the following instance.

To make use of keyed streams, you must partition your stream utilizing the .keyBy(...) methodology, which assures that occasions for a similar key, on this case sensor ID, shall be routed to the identical employee. You’ll be able to implement it as follows:

SingleOutputStreamOperator<EnrichedTemperature> cachedEnrichedTemperatureSingleOutputStream = temperatureDataStream
        .keyBy(Temperature::getSensorId)
        .course of(new CachedEnrichmentFunction(
                parameter.get("SensorApiUrl", DEFAULT_API_URL),
                parameter.get("CachedItemsTTL", String.valueOf(CACHED_ITEMS_TTL))));

We’re utilizing the sensor ID as the important thing to partition the stream and later enrich it. This manner, we will then cache the sensor info as a part of the keyed state. When choosing a partition key in your use case, select one which has a excessive cardinality. This results in an excellent distribution of occasions throughout completely different staff.

To retailer the sensor info, we use the ValueState. To configure the state administration, now we have to explain the state kind through the use of the TypeHint. Moreover, we will configure how lengthy a sure state shall be cached by specifying the time-to-live (TTL) earlier than the state shall be cleaned up and has to retrieved or recomputed once more.

public class CachedEnrichmentFunction extends KeyedProcessFunction<String, Temperature, EnrichedTemperature> {

    // Setup of HTTP consumer and ObjectMapper...

    personal transient ValueState<SensorInfo> cachedSensorInfoLight;
    
    @Override
    public void open(Configuration configuration) throws Exception {
        // Initialize HTTP consumer
    
        ValueStateDescriptor<SensorInfo> descriptor =
                new ValueStateDescriptor<>("sensorInfo", TypeInformation.of(new TypeHint<>()}));
    
        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.seconds(this.ttl))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
                .construct();
        descriptor.enableTimeToLive(ttlConfig);
    
        cachedSensorInfoLight = getRuntimeContext().getState(descriptor);
    }
    
    // ...
}

As of Flink 1.17, entry to the state just isn’t doable in asynchronous capabilities, so the implementation have to be synchronous.

It first checks if the sensor info for this explicit key exists; in that case, it will get enriched. In any other case, it retrieves the sensor info, parses it, after which merges each objects into an EnrichedTemperature:

public class CachedEnrichmentFunction extends KeyedProcessFunction<String, Temperature, EnrichedTemperature> {

    // Setup of HTTP consumer, ObjectMapper and ValueState

    @Override
    public void processElement(Temperature temperature, KeyedProcessFunction<String, Temperature, EnrichedTemperature>.Context ctx, Collector<EnrichedTemperature> out) throws Exception {
        SensorInfo sensorInfoCachedEntry = cachedSensorInfoLight.worth();

        // Test if sensor information is cached
        if (sensorInfoCachedEntry != null) {
            out.acquire(getEnrichedTemperature(temperature, sensorInfoCachedEntry));
        } else {
            String url = this.getRequestUrl + temperature.getSensorId();

            // Retrieve response from sensor information API
            Response response = consumer
                    .prepareGet(url)
                    .execute()
                    .toCompletableFuture()
                    .get();

            // Parse the sensor information
            SensorInfo sensorInfo = parseSensorInfo(response.getResponseBody());

            // Cache the sensor information
            cachedSensorInfoLight.replace(sensorInfo);

            // Merge the temperature sensor knowledge and sensor information knowledge
            out.acquire(getEnrichedTemperature(temperature, sensorInfo));
        }
    }

    // ...
}

In our artificial testing with Amazon Managed Service for Apache Flink, we noticed a throughput of 28,000 occasions per second on a single KPU with 4 parallelism per KPU and the default settings.

Sync+Cached enrichment performance

You may also see the influence and diminished load on the downstream sensor API.

Impact on Enrichment API

Take a look at your workload on Amazon Managed Service for Apache Flink

This publish in contrast completely different approaches to run an software on Amazon Managed Service for Apache Flink with 1 KPU. Testing with a single KPU offers a superb efficiency baseline that lets you evaluate the enrichment patterns with out producing a full-scale manufacturing workload.

It’s vital to know that the precise efficiency of the enrichment patterns is determined by the precise workload and different exterior techniques the Flink software interacts with. For instance, efficiency of cached enrichment could range with the cache hit ratio. Synchronous enrichment could behave in a different way relying on the response latency of the enrichment endpoint.

To guage which strategy most closely fits your workload, you need to first carry out scaled-down checks with 1 KPU and a restricted throughput of life like knowledge, presumably experimenting with completely different values of Parallelism per KPU. After you establish the very best strategy, it’s vital to check the implementation at full scale, with actual knowledge and integrating with actual exterior techniques, earlier than shifting to manufacturing.

Abstract

This publish explored completely different approaches to implement near-online knowledge enrichment utilizing Flink, specializing in three communication patterns: synchronous enrichment, asynchronous enrichment, and caching with Flink KeyedState.

We in contrast the throughput achieved by every strategy, with caching utilizing Flink KeyedState being as much as 14 occasions quicker than utilizing asynchronous I/O, on this explicit experiment with artificial knowledge. Moreover, we delved into optimizing the efficiency of Apache Flink, particularly on Amazon Managed Service for Apache Flink. We mentioned methods and finest practices to maximise the efficiency of Flink functions in a managed setting, enabling you to completely benefit from the capabilities of Flink in your near-online knowledge enrichment wants.

Total, this overview gives insights into completely different knowledge enrichment patterns, their efficiency traits, and optimization strategies when utilizing Apache Flink, significantly within the context of near-online knowledge enrichment situations and on Amazon Managed Service for Apache Flink.

We welcome your suggestions. Please depart your ideas and questions within the feedback part.


Concerning the authors

Luis MoralesLuis Morales works as Senior Options Architect with digital-native companies to help them in continually reinventing themselves within the cloud. He’s captivated with software program engineering, cloud-native distributed techniques, test-driven growth, and all issues code and safety.

Lorenzo NicoraLorenzo Nicora works as Senior Streaming Resolution Architect serving to prospects throughout EMEA. He has been constructing cloud-native, data-intensive techniques for a number of years, working within the finance business each by means of consultancies and for fin-tech product corporations. He leveraged open supply applied sciences extensively and contributed to a number of tasks, together with Apache Flink.

Leave a Reply