https://prefect.io logo
Title
j

Jerry Thomas

09/10/2019, 2:25 PM
I need to process streaming data from Kafka and then send the result back to output. I was able to achieve my objective using the following approach. Wanted to check if this is the right way or is there a better way to process streaming data
def get_stream_data():
    consumer = KafkaConsumer("my-topic",
                             bootstrap_servers=['localhost:9092'],
                             value_deserializer=lambda m: json.loads(m.decode('ascii')),
                             consumer_timeout_ms=1000)

    messages = [messages.value for messages in consumer]
    if len(messages) == 0:
        raise prefect.engine.signals.SKIP("No data received")
    return messages

def process_itemwise(data):
    data["result"] = 1
    return data

def publish_result(data):
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_deserializer=lambda m: json.dumps(m).encode('ascii')))
    producer.send("results", data)
    producer.flush()

with Flow("custom") as flow:
    data = get_stream_data()
    result = process_itemwise.map(data)
    publish_result.map(result)

while True:
    flow.run()
c

Chris White

09/10/2019, 3:59 PM
Hey @Jerry Thomas! Looks like your code snippet is missing the Prefect
@task
decorators, but otherwise this looks reasonable to me. Note that your
publish_result
mapped task will not begin running until
proccess_itemwise
has completely all of its processing
j

Jerry Thomas

09/10/2019, 4:09 PM
Thanks Chris. I missed the annotations while posting the question. I assume if I need to perfom the publish for each item, i should call the publish in the process_itemwise task so that it does not wait for
all
of the processing. On a related note what would be the best way to run a parallel set of subtasks within the process_itemwise? Should I I create a flow within the process_itemwise and call it?
c

Chris White

09/10/2019, 4:10 PM
Before I answer, let me ask a quick follow-up: will you only process a finite number of items off the Kafka stream, or do you want this flow to run indefinitely?
j

Jerry Thomas

09/10/2019, 4:10 PM
I do need this to run indefinitely.
Another item is that I am creating the kafka consumer and producer objects in each task. Would creating those objects outside and passing them as
Parameter
objects be better for performance?
c

Chris White

09/10/2019, 4:35 PM
Yea, I’d probably recommend not using Parameters and instead just using a
generate_consumer
task that returns the Kafka Consumer (Parameters are usually assumed to be JSON-compatible)
So, you are probably aware that we have a concept of Long Running Flow on our product roadmap, but just in case: https://docs.prefect.io/core/PINs/PIN-08-Listener-Flows.html
💯 1
in the meantime, your current design makes sense but you might also consider Task Looping: - https://docs.prefect.io/core/PINs/PIN-11-Task-Loops.html - https://docs.prefect.io/core/examples/task_looping.html The only caveat here is that I’d highly recommend you finish the loop every few hundred iterations, and then restart the flow, as they are not intended for indefinitely running processes
✔️ 1
👍 1
j

Jerry Thomas

09/11/2019, 10:48 AM
Thanks a lot Chris. I don’t quite understand what you mean by finish the loop every few hundred iterations. Do you mean that the task loop should be used for max of a few hundred and then wrapped in an external infinite loop like the one I have for the
flow.run()
?
c

Chris White

09/11/2019, 4:25 PM
Yup, exactly! Something like:
@task
def looper():
    if prefect.context.get("task_loop_count") > 500:
        return "done"
To be honest, you might be able to run the loop indefinitely (and if so, I’d love to hear that it works!) but the feature itself wasn’t intended to be used that way so I just wanted to make sure to call that out
j

Jerry Thomas

09/13/2019, 4:08 AM
Thanks Chris. Will try it out and see how it works.