Jerry Thomas
09/10/2019, 2:25 PMdef get_stream_data():
consumer = KafkaConsumer("my-topic",
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('ascii')),
consumer_timeout_ms=1000)
messages = [messages.value for messages in consumer]
if len(messages) == 0:
raise prefect.engine.signals.SKIP("No data received")
return messages
def process_itemwise(data):
data["result"] = 1
return data
def publish_result(data):
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.dumps(m).encode('ascii')))
producer.send("results", data)
producer.flush()
with Flow("custom") as flow:
data = get_stream_data()
result = process_itemwise.map(data)
publish_result.map(result)
while True:
flow.run()
Chris White
09/10/2019, 3:59 PM@task
decorators, but otherwise this looks reasonable to me. Note that your publish_result
mapped task will not begin running until proccess_itemwise
has completely all of its processingJerry Thomas
09/10/2019, 4:09 PMall
of the processing.
On a related note what would be the best way to run a parallel set of subtasks within the process_itemwise? Should I I create a flow within the process_itemwise and call it?Chris White
09/10/2019, 4:10 PMJerry Thomas
09/10/2019, 4:10 PMParameter
objects be better for performance?Chris White
09/10/2019, 4:35 PMgenerate_consumer
task that returns the Kafka Consumer (Parameters are usually assumed to be JSON-compatible)Jerry Thomas
09/11/2019, 10:48 AMflow.run()
?Chris White
09/11/2019, 4:25 PM@task
def looper():
if prefect.context.get("task_loop_count") > 500:
return "done"
To be honest, you might be able to run the loop indefinitely (and if so, I’d love to hear that it works!) but the feature itself wasn’t intended to be used that way so I just wanted to make sure to call that outJerry Thomas
09/13/2019, 4:08 AM