Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
930 views
in Technique[技术] by (71.8m points)

apache kafka - How to process a KStream in a batch of max size or fallback to a time window?

I would like to create a Kafka stream-based application that processes a topic and takes messages in batches of size X (i.e. 50) but if the stream has low flow, to give me whatever the stream has within Y seconds (i.e. 5).

So, instead of processing messages one by one, I process a List[Record] where the size of the list is 50 (or maybe less).

This is to make some I/O bound processing more efficient.

I know that this can be implemented with the classic Kafka API but was looking for a stream-based implementation that can also handle offset committing natively, taking errors/failures into account. I couldn't find anything related int he docs or by searching around and was wondering if anyone has a solution to this problem.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

@Matthias J. Sax answer is nice, I just want to add an example for this, I think it might be useful for someone. let's say we want to combine incoming values into the following type:

public class MultipleValues { private List<String> values; }

To collect messages into batches with max size, we need to create transformer:

public class MultipleValuesTransformer implements Transformer<String, String, KeyValue<String, MultipleValues>> {
    private ProcessorContext processorContext;
    private String stateStoreName;
    private KeyValueStore<String, MultipleValues> keyValueStore;
    private Cancellable scheduledPunctuator;

    public MultipleValuesTransformer(String stateStoreName) {
        this.stateStoreName = stateStoreName;
    }

    @Override
    public void init(ProcessorContext processorContext) {
        this.processorContext = processorContext;
        this.keyValueStore = (KeyValueStore) processorContext.getStateStore(stateStoreName);
        scheduledPunctuator = processorContext.schedule(Duration.ofSeconds(30), PunctuationType.WALL_CLOCK_TIME, this::doPunctuate);
    }

    @Override
    public KeyValue<String, MultipleValues> transform(String key, String value) {
        MultipleValues itemValueFromStore = keyValueStore.get(key);
        if (isNull(itemValueFromStore)) {
            itemValueFromStore = MultipleValues.builder().values(Collections.singletonList(value)).build();
        } else {
            List<String> values = new ArrayList<>(itemValueFromStore.getValues());
            values.add(value);
            itemValueFromStore = itemValueFromStore.toBuilder()
                    .values(values)
                    .build();
        }
        if (itemValueFromStore.getValues().size() >= 50) {
            processorContext.forward(key, itemValueFromStore);
            keyValueStore.put(key, null);
        } else {
            keyValueStore.put(key, itemValueFromStore);
        }
        return null;
    }

    private void doPunctuate(long timestamp) {
        KeyValueIterator<String, MultipleValues> valuesIterator = keyValueStore.all();
        while (valuesIterator.hasNext()) {
            KeyValue<String, MultipleValues> keyValue = valuesIterator.next();
            if (nonNull(keyValue.value)) {
                processorContext.forward(keyValue.key, keyValue.value);
                keyValueStore.put(keyValue.key, null);
            }
        }
    }

    @Override
    public void close() {
        scheduledPunctuator.cancel();
    }
}

and we need to create key-value store, add it to StreamsBuilder, and build KStream flow using transform method

Properties props = new Properties();
...
Serde<MultipleValues> multipleValuesSerge = Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(MultipleValues.class));
StreamsBuilder builder = new StreamsBuilder();
String storeName = "multipleValuesStore";
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(storeName);
StoreBuilder<KeyValueStore<String, MultipleValues>> storeBuilder =
        Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), multipleValuesSerge);
builder.addStateStore(storeBuilder);

builder.stream("source", Consumed.with(Serdes.String(), Serdes.String()))
        .transform(() -> new MultipleValuesTransformer(storeName), storeName)
        .print(Printed.<String, MultipleValues>toSysOut().withLabel("transformedMultipleValues"));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.start();

with such approach we used the incoming key for which we did aggregation. if you need to collect messages not by key, but by some message's fields, you need the following flow to trigger rebalancing on KStream (by using intermediate topic):

.selectKey(..)
.through(intermediateTopicName)
.transform( ..)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

2.1m questions

2.1m answers

60 comments

56.6k users

...