Hi Everyone, I would like to migrate from mage to ...
# ask-community
h
Hi Everyone, I would like to migrate from mage to prefect on-prem. The migration worked extremely smooth because we could reuse the blocks and convert them 1:1 to tasks. Our Flow contains multiple tasks, run takes about 20 seconds. Now we would like to wrap the execution with Flask in order to trigger the flow and when the flow is done to get the return of the last task. It works with flask but if I trigger a parallel curl request I'm getting the following error:
Copy code
22:47:32.802 | ERROR   | Task run 'task2-0' - Crash detected! Execution was cancelled by the runtime environment.
22:47:32.839 | ERROR   | Flow run 'garnet-bullfrog' - Crash detected! Execution was cancelled by the runtime environment
Here is a exmaple of how our flow and tasks are written.
Copy code
from flask import Flask, request
from prefect import flow, get_run_logger, task
import time

app = Flask(__name__)


@task
def task1(payload):
    print(payload)
    time.sleep(5)


@task
def task2():
    time.sleep(5)


@flow(name="valentin_flask")
def process_request(payload):
    logger = get_run_logger()
    logger.info("Hello from Flask")
    print(payload)
    time.sleep(5)
    task1(payload)
    task2()
    return {"ok": True}

@app.route("/getConfiguration", methods=["POST"])
def push_event():
    payload = request.json
    return_msg = process_request(payload)
    return return_msg



if __name__ == "__main__":
    app.run(host="0.0.0.0", debug=True, port=8080)
I'm a bit stuck. Any help would be great. Thanks
i
Just IMO — I would not recommend mixing server and ETL orchestration in one repo. It will work better if you keep them separate and trigger the flows with API.
r
Hi @Ivan, do you have a boilerplate example of an API call that return the result of a flow run? My understanding is that you persist the result somewhere and retrieve it using the storage key later on, but I'm not sure about the Prefect way of doing this in the same call like @Hen rik
i
I'm not sure if this possible with API. But maybe you could write the logic inside of a flow function in Prefect where you can send any data with post request
👀 1
h
@Romain Vincent Here is what I tried and it seems to work. Maybe it could be improved :)
Copy code
from threading import Thread
import bson
from flask import request, Flask, json
from prefect.deployments import deployments

app = Flask(__name__)


class Compute(Thread):
    def __init__(self, intput):
        Thread.__init__(self)
        self.request = intput

    def run(self):
        result = deployments.run_deployment(name="test/test", parameters=self.request,
                                            flow_run_name="test123")
        print(result.id)


@app.route('/getTaskflow', methods=["POST"])
def myfunc():
    payload = request.json
    server_name = payload
    id = str(bson.ObjectId())
    body = {
        "id": id,
        "hostname": server_name,
        "payload": payload
    }

    thread = Compute(body)
    thread.start()

    response = app.response_class(
        response=json.dumps(body),
        status=200,
        mimetype='application/json'
    )
    return response


if __name__ == "__main__":
    app.run(host="0.0.0.0", debug=True, port=8000)
I did not manage to fetch the result directly. If you manage to do so please let me know.
👍 1
r
Yeah I was going for a solution like :
Copy code
def run_and_wait_for_result(name):
    run_id = run_deployment(name=name)
    while True:
        status = check_run_status(run_id)
        if status == 'DONE':
             return get_result_from_storage(run_id)
Something like that. But I'm not a huge fan of this solution.
h
how is the storage behaving in concurrent runs of deployments. Do you have a example of how to interact with storage. I pretty new to prefect.
r
I'm reading this at the moment. I think you can define a S3 bucket for storing serialized results and assign them unique keys. I suppose you can define a retention time with the caching logic or flush them from time to time.
👀 1
I think our use case is pretty common, I wish there was a tutorial or something about that.
j
Thanks for the idea, Romain! I think that might go nicely as part of this proposed guide.
🙏 1