• Jérémy Trudel

    Jérémy Trudel

    1 year ago
    Hey everyone! I'm trying to log some parameter in a prefect flow. After launching a quick run, I can't find any mention of my log on cloud. It looks a bit like this:
    @task
    def extract_copy_history(cursor, schema_table):
      logger = prefect.context.get("logger")
      <http://logger.info|logger.info>(f"Schema table name is {schema_table}.")
    Now when I do a quick run on Prefect Cloud, no mention of it appears in my logs despite it being set on "Showing logs for all log levels". I see the log for the task in itself (extract_copy_history) and all other tasks. Just not my custom log.
    Jérémy Trudel
    Kevin Kho
    17 replies
    Copy to Clipboard
  • Satheesh K

    Satheesh K

    1 year ago
    Hello everyone! What is the best way to access actual mapped task results, I want to get only part of the result, E.g:
    intermediate_result = 0
    with Flow("flow") as flow:
    	param1 = Parameter("list1")
    	mapped_list = create_map(param1)
    	results = task1.map(mapped_list)
    	intermediate_result = results[1]
    	results2 = task2(results[1])
    Satheesh K
    Kevin Kho
    23 replies
    Copy to Clipboard
  • Greg Roche

    Greg Roche

    1 year ago
    Hi folks, a question about reregistering flows after the flow logic has changed. We have a local agent (running inside a docker container) which executes flows that are stored in S3. Each flow is usually split across multiple Python files, usually with a
    main.py
    file that imports code from these other files and defines the actual flow logic. If
    main.py
    is updated, a simple re-registration of the flow seems to be enough to allow the agent to execute the updated flow, because the updated logic is stored on S3 and is then downloaded by the agent during the next execution. However, if one of the other files (which
    main.py
    imports) is changed, re-registration alone isn't enough to allow the agent to execute the updated flow, seemingly because only the content of
    main.py
    is stored on S3 at registration. Practically this means that almost every time we make any change any of our flows, we need to rebuild our docker image with the updated logic, redeploy it, and replace the old agent with the new one, before then re-registering the flow. Is there some way for us to register a flow so that all of the flow's code, not just the code in the file that defines the flow, is stored in S3 and we don't need to constantly rebuild and redeploy the agent's image for almost every change? Or is there a cleaner approach to solving this issue which has worked for anybody here? Thanks in advance.
    Greg Roche
    Michael Adkins
    5 replies
    Copy to Clipboard
  • Robin

    Robin

    1 year ago
    Dear prefect people, we have some troubles on running our dbt model with prefect. Using dbt run, we are able to run the model locally, however in prefect, we only get the following error
    ERROR - prefect.DbtShellTask | Command failed with exit code 1
    Any thoughts on what could be wrong or how to get further information and debug the flow are appreciated! 🙂
    Robin
    Kevin Kho
    +1
    60 replies
    Copy to Clipboard
  • Joseph Loss

    Joseph Loss

    1 year ago
    Quick question, has anyone implemented .net tasks using Prefect?
    Joseph Loss
    Kevin Kho
    4 replies
    Copy to Clipboard
  • Carter Kwon

    Carter Kwon

    1 year ago
    Hello, I have a question around the registration and execution of flows. I have an ETL flow that looks something like this
    <task functions... >
    
    with Flow("ETL Flow", schedule=schedule, storage=Docker(registry_url=os.getenv("REGISTRY_URL"), image_name=os.getenv("IMAGE_NAME")), run_config=ECSRun(task_role_arn=os.getenv("TASK_ROLE_ARN"), execution_role_arn=os.getenv("EXECUTION_ROLE_ARN"))) as flow:
        DAYS_AGO = 5
        TARGET_DATE = (datetime.now() - timedelta(days=DAYS_AGO)).strftime('%Y-%m-%d')
    
        <use TARGET_DATE to make API calls inside tasks... >
    We have a CI/CD process in place that registers our flows after they've been pushed to git. For this particular flow,
    TARGET_DATE
    should equal
    today's date - 5 days
    because the API needs a few days for the analytics to be available. I've noticed that
    TARGET_DATE
    actually ends up being
    date of flow registration - 5 days
    . Is there a way to have this code executed every time the flow is run instead of once at registration so
    TARGET_DATE
    changes every day?
    Carter Kwon
    Zach Angell
    2 replies
    Copy to Clipboard
  • j

    Julio Venegas

    1 year ago
    Hi community! I’m creating my own instance of a Task class that returns multiple values, I added the Tuple return-type annotation but I’m still getting the error `TypeError: Task is not iterable. If your task returns multiple results, pass
    nout
    to the task decorator/constructor, or provide a
    Tuple
    return-type annotation to your task.` when I instantiate the Task with nout=2. Class in the thread. Any suggestions?
    j
    Kevin Kho
    26 replies
    Copy to Clipboard
  • Ryan Baker

    Ryan Baker

    1 year ago
    If I build my own docker image for use with a prefect flow in prefect cloud, my experience has been that if I run the flow, then update the docker image, then run the flow again, it does not pull the new image, but is caching the previous image and using that. Is there a way I can clear the cache? Or am I forced to specify a new docker image in the run configuration, such as with a git-hash tag on the image?
    Ryan Baker
    Kevin Kho
    9 replies
    Copy to Clipboard
  • j

    jack

    1 year ago
    Hey all! We were testing out our local prefect agent spinned up from our EC2 instance and we were able to run most flows except for one type - flows registered with Docker storage type. I believe docker flows aren't supported by local agents... Are there any good alternatives to Docker storage type flows for containerized flow storage method that could be used to more sophisticated flows? We would like to use the local agent for most things but don't want to move to a fargate agent or something anytime soon before we try other flow storage method.
    j
    Michael Adkins
    5 replies
    Copy to Clipboard
  • v

    Vincent

    1 year ago
    Hi All, I was wondering is someone could help me identify why some of my tasks are pending. I have the following flow running on prefect cloud with a dask backend. for some reason, the task scheduler has not started 2/4 of the tasks. thanks for any advice
    v
    Kevin Kho
    +1
    10 replies
    Copy to Clipboard