https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • g

    Gustavo de Paula

    09/08/2021, 3:31 PM
    Hi everyone! I have a scheduled flow that runs everyday successfully, but when I try to quick run it, the last task fails with the following message: Can't open lib 'ODBC Driver 17 for SQL Server' : file not found". Is there a runtime difference between scheduled run and quick run?
    k
    10 replies · 2 participants
  • f

    Filip Lindvall

    09/08/2021, 3:35 PM
    I'm trying to build a new generic task for working with airtable. When trying to do it, building a separate package where I have the task
    class ReadTable(Task):
        def __init__(
            self,
            # table: str = None,
            app: str = None,
            formula: str = None,
            api_key_name: str = "default_cred",
            project_id: str = "some_id",
            **kwargs: Any,
        ):
            # self.table = table
            <http://self.app|self.app> = app
            self.formula = formula
            self.api_key_name = api_key_name
            self.project_id = project_id
    
            super().__init__(**kwargs)
    
        @defaults_from_attrs("app", "formula", "api_key_name", "project_id")
        def run(
            self,
            table: str = None,
            app: str = None,
            formula: str = None,
            api_key_name: str = None,
            project_id: str = None,
        ) -> List[Any]:
            # some code
            return json.dumps(ret)
    Trying to call this passing table as a
    prefect.Parameter
    it does not "resolve" to a string. However wrapping the call in a
    readTable = ReadTable(app="some_app_space")
    
    @task
    def wrapper(table: str) -> List[Any]:
        return readTable.run(table=table)
    Then it works and
    table
    gets resolved correctly to its string value. Why is this? I've been looking allover for documentation.
    k
    9 replies · 2 participants
  • a

    An Hoang

    09/08/2021, 4:10 PM
    super basic question but how do I access the result of a mapped task from the
    state = flow.run()
    object? I always get a
    KeyError
    when trying to do
    state.result[mapped_task].result
    but
    state.result[not_mapped_task].result
    works fine
    z
    m
    5 replies · 3 participants
  • k

    Kostas Chalikias

    09/08/2021, 4:21 PM
    Hi everyone, anyone else in Europe using Prefect Cloud and finding the front end too slow to use? It takes me in the order of 1 min to login, find a flow, open a run and look at the logs, majority of the time spent on pages loading.
    n
    1 reply · 2 participants
  • m

    Martim Lobao

    09/08/2021, 5:23 PM
    hey all, I’ve also got a couple questions related to some issues we’ve encountered during some recent flow runs, not sure if this is the right place to ask for help: 1. we have a flow which at one point has 3 tasks that are triggered in parallel when their dependent task finishes. for whatever reason, two of those tasks got triggered automatically as soon as the parent task finished, but the other one was stuck in a pending state. it eventually started, but only after the other 2 tasks finished. is there any reason why the 3 tasks did not run in parallel? 2. it can sometimes happen that we attempt to restart a flow but we have to click the restart button a couple of times before the actual flow restarts. is there any reason why this happens or is it just a prefect bug? this has happened on a couple of occasions now (here i clicked the restart button, waited 3 minutes and then clicked again, and didn’t change anything between attempts)
    k
    9 replies · 2 participants
  • a

    An Hoang

    09/08/2021, 6:49 PM
    Maybe I'm overthinking this, but I need to accomplish the following:
    #input : Have a list of X: [X1,X2, X3 ...etc]
    #input: total_times = 1_000_000
    #property: x.do_work(1_000_000) = 1000 * x.do_work(1000)
    #output needed: [X1.do_work(1_000_000), X2.do_work(1_000_000) ..etc]
    
    @task
    def long_running_task(x, n_times):
        result = x.do_work(n_times)
        return result
    I have hundreds of Dask workers and want to split this work into units of
    x.do_work(1000)
    to get the output as fast as possible through maximizing parallelism. How should I write my mapping functions to achieve this? Do I just generate a list of
    X_list = [x] *1000
    for each
    X
    and a list of
    iteration_list = [1000] * 1000
    , then do
    long_running_task.map(x=flatten(X_list), n_times = flatten(iteration_list))
    ?
    k
    14 replies · 2 participants
  • j

    Jose Chen

    09/08/2021, 7:19 PM
    Hi has anyone ran into issues with old versions of flows retaining their scheduled runs and not clearing it? The old version is archived and the schedule is off, but it will still have 2-4 jobs remaining that don't get canceled. This is causing us to run two jobs at the same time every now and then.
    k
    4 replies · 2 participants
  • d

    David Jenkins

    09/08/2021, 7:24 PM
    I'm trying to figure out how to set the output of a particular task in one flow as a parameter in another flow and I am just not getting it. I cannot find documentation or any code examples. This is my setup: 1. I have three dependent flows: flow1 -> flow2 -> flow3 2. flow1 has around 10 tasks. One of those tasks, say the 4th one, named GenerateRandomData, randomly generates a string based upon the data sent to it. So, something like this:
    random_string = GenerateRandomData(data=data)
    3. random_string from flow1 will then be a parameter sent to flow2, so the parameter is dynamically set at runtime since the value of random_string is different with each execution of the three flows. I learn best from reading code, so a basic example would be most helpful.
    k
    f
    15 replies · 3 participants
  • e

    Erik Amundson

    09/08/2021, 8:49 PM
    Hi, I'm running into an issue with the dask distributed executor where the scheduler is quickly running out of memory. The entire workflow is :
    import prefect
    
    
    @prefect.task
    def do_nothing(n):
        pass
    
    
    with prefect.Flow("Dummy Flow") as flow:
        listy = list(range(200000))
        do_nothing.map(listy)
    The scheduler pod runs out of memory after around 300 tasks, the screenshot of the dask dashboard is attached. Has anyone ran into this issue or have any ideas for a fix? We normally run prefect 0.14.16 but I've tried on both 0.14.16 and latest (0.15.5) with the same results.
    k
    7 replies · 2 participants
  • a

    Aric Huang

    09/08/2021, 11:44 PM
    Hi, I have a few questions about managing different versions of flows: • Is it possible to get the flow's numerical version number and project name during a flow run, either from the
    prefect.context
    object or elsewhere? • The flow ID used in the URL https://cloud.prefect.io/paravision/flow/<ID> seems to be different from the flow ID that's output when registering a flow. Is it possible to get that ID during a flow run through
    prefect.context
    ?
    prefect.context.flow_id
    looks like the same one that's returned when registering the flow, but would like to also get this other flow ID. • When registering a flow, it increments the version number and archives the previous flow version. Is it possible to un-archive a previous version so it can be run again?
    👀 1
    k
    c
    +1
    32 replies · 4 participants
  • g

    Gaylord Cherencey

    09/09/2021, 5:48 AM
    Hello, I get a
    FileNotFoundError, No such file or directory
    while using Git storage to a private git server. I set Secret as ENV and specify
    git_token_secret_name
    . Does this mean that the agent can connect to the repository but don't find the Flow python file?
    k
    z
    13 replies · 3 participants
  • e

    Eddie Atkinson

    09/09/2021, 5:57 AM
    Does
    StartFlowRun
    support passing parameters as lists of dictionaries in Python? I am currently getting the following error when I try to do that:
    prefect.exceptions.ClientError: 400 Client Error: Bad Request for url: <https://api.prefect.io/>
    This is likely caused by a poorly formatted GraphQL query or mutation but the response could not be parsed for more details
    I tried performing the same operation via the interactive graphql api and I needed to escape the
    {
    and
    "
    to get the query to work. Looking at the source it seems as though this doesn’t happen and might be the cause of my problem.
    k
    k
    8 replies · 3 participants
  • b

    Ben Muller

    09/09/2021, 7:13 AM
    Hey peeps, What am I doing wrong here. I am trying to provide multiple clocks into my flow schedule ( trying with one to start off with ) but running into an error on the param. I have followed other examples, but it just isnt working here
    schedule = Schedule(
        clocks=[
            CronClock(
                cron="*/7 * * * *",
                parameter_defaults=dict(account_identifier="inrun"),
                start_date=datetime.now(tz=timezone("Australia/Brisbane")),
            )
    )
    
    with Flow(
        name="betfair_flow",
        storage=Storage().in_a_s3_bucket(),
        run_config=RunConfig().fargate_on_ecs(cpu=512, memory=2048),
        schedule=schedule,
        executor=LocalDaskExecutor(scheduler="threads"),
    ) as flow:
        betfair_account_identifier = Parameter("account_identifier")
    Error in 🧵
    j
    k
    45 replies · 3 participants
  • f

    Filip Lindvall

    09/09/2021, 8:57 AM
    If I want to map over a task that takes several arguments. e.g.
    def some_task(id:str, old: Dict, new: Dict)
    and I have data as
    [{"id":"some_id", "old":{...}, "new":{...}}, ...]
    is there some easy way to explode the dict into the tasks, or is it best to write some wrapper task whose only responsibility is to take the data and pass it to the intended task.
    m
    k
    3 replies · 3 participants
  • i

    Issam Assafi

    09/09/2021, 11:48 AM
    Hello, If i have my python code on Machine 1, then make a flow and register it... then I launch an agent from Machine 2, would it be able to launch the initial flow? If no, what's the appropriate way to handle similar scenarios?
    m
    k
    6 replies · 3 participants
  • m

    Martim Lobao

    09/09/2021, 12:19 PM
    minor issue: just ran a flow which completed successfully, but the prefect slack app complained that the flow failed. any reason for this to happen/i should be concerned?
    Run 
    attractive-bug
     (
    821098cf-c921-49ef-93d2-dd741b440aca
    ) of flow 
    example
     failed 
    STARTED_NOT_FINISHED
     SLA (
    ee53f0cf-6d07-4dee-afb6-d1205baf141c
    ) after 60 seconds. See the UI for more details.
    g
    2 replies · 2 participants
  • r

    Rafael

    09/09/2021, 12:29 PM
    Hello, I am starting using prefect, I am a luigi user so I am testing the “Result/Checkpoint/target” feature in prefect.. I tried the simple ex.
    import prefect
    from prefect import task, Flow, Parameter
    from prefect.engine.results import LocalResult
    
    result = LocalResult(dir="./prefect-results", )
    
    @task
    def hello_task(named_params):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(named_params)
        return 'asd'
    
    with Flow("hello-flow", result=result) as flow:
        named_params = Parameter("named_params",)
        hello_task(named_params)
    
    flow.run(named_params='param')
    I was expecting to find a file in the the
    ./prefect-results
    folder.. the folder is there but it is empty.. I also tried
    import prefect
    from prefect import task, Flow, Parameter
    from prefect.engine.results import LocalResult
    
    @task(checkpoint=True, result=LocalResult(dir="./prefect-results"))
    def hello_task(named_params):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(named_params)
        return 'asd'
    
    with Flow("hello-flow", ) as flow:
        named_params = Parameter("named_params",)
        hello_task(named_params)
    
    flow.run(named_params='param')
    also empty.. Am I doing something wrong, or I misunderstood how this feature work?
    a
    m
    4 replies · 3 participants
  • a

    Abhishek

    09/09/2021, 12:54 PM
    Folks i am porting some of my Airflow jobs to prefect. And wanted to know, is FunctionTask behaviour is same as PythonOperator (Airflow operator) behaviour?
    k
    4 replies · 2 participants
  • m

    Michael

    09/09/2021, 2:45 PM
    Hey all! I am looking for a way to use file targets for output caching, but parameterised by the upstream dependencies to a specific task. In other words, I don’t care exactly what the GCSResult file path looks like, but I need that 1) the filepath corresponds to that task’s upstream dependencies (such that one task could have many possible output results), and 2) all these different results could be cached at any one time. Has anyone done something like this? NB: what would be amazing is something like an
    {upstream}
    context option for a given task that somehow serializes the upstream dependencies into the result / target file name at run time. I know there is a
    {parameters}
    context value (so I could use this in worst case), but not every task in a particular flow depends on all of that flow’s parameters, so this would result in unecessary computation. Right now I implement something like this inside the task’s run logic itself, but I imagine it’s a helluva lot slower than letting the orchestrator deal with skipping tasks rather than entering every task to perform a file existence check before exiting (and also a lot less clean). Cheers for any help!
    j
    b
    7 replies · 3 participants
  • s

    Sean Talia

    09/09/2021, 2:53 PM
    Hi all – I think I'm having a slow morning, but does anyone have a good heuristic around where to start debugging when you're trying to pass the result of one task to another, but your flow schematic is showing them as being unconnected? For example, I have something like:
    @task
    def custom_task_1(input) -> str:
      return f"Custom Input: {input}"
    
    with Flow(...) as flow: 
        custom_task_1_results = custom_task_1("Hello World!")
        custom_task_2 = CustomTask2(message=custom_task_1_results)
    but when I look at my flow's schematic, it doesn't show
    custom_task_1
    as being upstream of
    CustomTask2
    ...I don't remember ever having run into this, so I feel like there's something quite obvious that I must be missing. I appreciate any help!
    k
    3 replies · 2 participants
  • a

    Alex

    09/09/2021, 3:07 PM
    Hi all, I'm just getting started with Prefect and Docker as well. Basically, I'm trying to find out how I go from executing flow runs using the Local agent to using the Docker agent. I have a registered flow that's stored on Github. This runs fine using a Local agent in my local environment. Attempting to run the flow with a Docker agent fails with
    ModuleNotFoundError
    . To make this work will I need to create a docker image that includes all the dependencies? Also, this means that I will have to use another option for storage?
    k
    4 replies · 2 participants
  • j

    Jack Sundberg

    09/09/2021, 3:17 PM
    Is it possible to change the logging-level of parameter tasks? For flows that have many parameters (and most with default values), it'd be nice to remove them from the info-level logs, where they can be overwhelming.
    k
    13 replies · 2 participants
  • m

    Michael

    09/09/2021, 6:04 PM
    Hey there! I am trying to set up checkpointing / result saving / output caching, and it seems everything is now working correctly when I run flows locally (I set
    *prefect*.config.flows.checkpointing = True
    somewhere in the code). But I can’t for the life of me get it to work when I use my Docker agent that’s connected to Prefect cloud. I set
    --env PREFECT__FLOWS__CHECKPOINTING=true
    as an env variable when I run the agent, but it never uses any checkpointing logic. Any ideas here? I’m more than happy to provide more info as needed
    k
    26 replies · 2 participants
  • j

    Jack Sundberg

    09/09/2021, 7:00 PM
    I'm seeing an odd bug when I try to reset a project (delete it and then recreate it). Here's a example of what I'm trying:
    client.create_project(project_name="Hello, World!")
    is_successful = client.delete_project(project_name="Hello, World!")
    client.create_project(project_name="Hello, World!")  # FAILS HERE
    The second call to create_project fails with a ClientError due to a uniqueness violation -- but in raising that error, the client is unable to tell which existing project causes the uniqueness violation (bc I just deleted it). Would anyone have insight as to what's going on here? I've tried sleeping between each line too so I don't think this is a race condition, but I'm not totally sure.
    k
    a
    10 replies · 3 participants
  • j

    joshua mclellan

    09/09/2021, 7:17 PM
    is there a trigger I can use to do some logging after a run is complete? I would love to write to a database when a flow finishes without having to add a task and muddle the logic of the DAG its self
    m
    3 replies · 2 participants
  • m

    Maikel Penz

    09/09/2021, 8:29 PM
    Hey ! I’m running the prefect agent on AWS EKS (Kubernetes) and when firing a job (which creates a new pod on the same cluster) I get the error
    NoCredentialsError('Unable to locate credentials')
    from the job execution. In the past I used the ECS agent and I was able to pass a
    task_role_arn
    to it. However I see that both the
    KubernetesAgent
    and the run config for Kubernetes
    KubernetesRun
    don’t have a “role” parameter to inform. My next thought was that either the role attached to the
    EKS cluster
    or the
    Fargate profile
    should do the work, but as a test I gave admin to both and I still get the credentials issue. What am I missing ?
    k
    a
    14 replies · 3 participants
  • c

    Constantino Schillebeeckx

    09/09/2021, 8:47 PM
    I have a task that globs for files given a file path, in my flow I call that task with something like
    f"{os.path.dirname(os.path.realpath(__file__))}/layouts"
    , however this causes the following (when executing from Prefect cloud):
    Failed to load and execute Flow's environment: NameError("name '__file__' is not defined")
    how come that global isn't available?
    c
    12 replies · 2 participants
  • a

    Abhas P

    09/09/2021, 10:33 PM
    Hi! I am trying to run a flow with mapped tasks with a DaskExecutor on my machine, I have a couple of queries : 1. will the overall run time benefit if I run the same flow on Prefect Cloud? 2. Does prefect cloud manage the spawn of a Dask cluster resource ?
    k
    1 reply · 2 participants
  • s

    sean williams

    09/09/2021, 10:38 PM
    Privacy Policy Question: I'm familiarizing myself with Prefect to see if we want to use it. One thing I'm curious about is that while disabling telemetry in my config.toml, it looks like my browser is still trying to block screen recording on localhost:8080, used by logrocket. Looking at https://www.prefect.io/legal/privacy-policy/,
    The LogRocket software has a feature that allows for the “video-like” replay of a previous user session
    it looks like disabling telemetry might not disable logrocket's session recording? Am I interpreting that correctly or am I missing a config to disable it? If anything, logrocket seems more invasive than GA telemetry. Please let me know if this belongs in a different channel. Thanks
    n
    2 replies · 2 participants
  • j

    Jacq Crous

    09/10/2021, 7:23 AM
    Hi all, I’m having some trouble with retrieving data from a child flow. I have a child flow register and that sits in folder A, the child flow is an ETL process that downloads and formats data from a S3 bucket. I want to retrieve the transformed data from the child flow after it ran. I am using create_flow_run to start the child flow and it seems to run, however when I use get_task_run_result to get the data from the child flow it just gets stuck. I’m not sure if I’m not passing the correct task-slug or if the problem is related to the context of the child flow changing. In principle I want to define a ETL process that I can use in other projects and just use the generated data from the child process as the data source in the new project I’m working on. Thank you in advance for the help!
    k
    12 replies · 2 participants
Powered by Linen
Title
j

Jacq Crous

09/10/2021, 7:23 AM
Hi all, I’m having some trouble with retrieving data from a child flow. I have a child flow register and that sits in folder A, the child flow is an ETL process that downloads and formats data from a S3 bucket. I want to retrieve the transformed data from the child flow after it ran. I am using create_flow_run to start the child flow and it seems to run, however when I use get_task_run_result to get the data from the child flow it just gets stuck. I’m not sure if I’m not passing the correct task-slug or if the problem is related to the context of the child flow changing. In principle I want to define a ETL process that I can use in other projects and just use the generated data from the child process as the data source in the new project I’m working on. Thank you in advance for the help!
k

Kevin Kho

09/10/2021, 2:07 PM
Hey @Jacq Crous, you can get the slug if you go into the Flow page for the
create_flow_run
task. This should work, but why is the context of the child flow changing? Also if that interface is giving you problems, you could also use the KV Store to persist a location and then retrieve the location downstream to use it. KV Store Docs
j

Jacq Crous

09/13/2021, 3:33 PM
Hi Kevin, thanks for the response. I tried to look at the issue again, and I realised that the flow (when I run it in the folder) executes (it creates the right outputs) but it does not register that it ran on prefect. This is the only flow that seems to do this so the issue might be something other than create_flow_run. I guess it’s a separate issue but any ideas on what would cause that? I have a local agent register that runs all my other flows but for this particular flow it runs but does not appear on the agent or prefect dashboard’s run histories. The flow itself is there but there is no run history reordered for it.
k

Kevin Kho

09/13/2021, 3:35 PM
Are you using
flow.run()
,
flow.run()
will not appear on Prefect cloud. How are you running the flow?
j

Jacq Crous

09/14/2021, 6:27 AM
Yes I am using flow.run(). But when I use flow.run() for my other flows it does appear on Prefect cloud.
k

Kevin Kho

09/14/2021, 2:03 PM
It should not at
flow.run()
is for local testing only. You may have a
StartFlowRun
or
create_flow_run
somewhere in the script? Would you like to show me the code?
j

Jacq Crous

09/15/2021, 1:06 PM
Hi Kevin, so in principle what I have looks like this: In Folder A I have a script for downloading data from an S3 bucket and then transforms the data. So the script is AWS_download.py and looks like:
import pandas as pd
import sqlite3
import os
from prefect import Flow, task, Parameter, unmapped
from prefect.tasks.aws.s3 import S3Download, S3List
from prefect.executors import LocalDaskExecutor
#from prefect.agent.local import LocalAgent
from prefect.engine.results import LocalResult
import re
from dotenv import dotenv_values

import sys

"""
# Extract:
@task
def get_aws_credentials():
    """
    Load aws credentials from a local .env file. 

    """
    return cred_dict

@task
def get_s3_data_list(bucket_name,source_prefix,cred_dict):
    """
    Get a list of available tables in the specified S3 bucket with 
    the source_prefix prefix
    """

    return table_list

@task()
def get_table_data(bucket_name,source_prefix,table_name,cred_dict):
    """
    Download specified table from the S3 bucket and convert data to a
    data frame. A payload dictionary is returned containing the constructed
    dataframe and the associated tablename to allow for mapping of this task.
    """
    return payload

# Transform:
def generate_sqlite_table_name(table_name,source_prefix):
    """
    Generate the sqlite table name by removing the db_name prefix from the 
    AWS S3 bucket name.
    """
    return sql_table_name

with Flow('AWS downloads') as flow:
    # Define parameters:
    source_prefix = Parameter('source_prefix',required=True,default=':memory')
    db_name = Parameter('db_name',required=True,default=':memory:')
    db_path = Parameter('db_path',default=os.getcwd())
    bucket_name = Parameter('bucket_name',required=True)

    # Define flow:
    cred_dict = get_aws_credentials()

    table_list = get_s3_data_list(bucket_name,source_prefix,cred_dict)

    payload_list = get_table_data.map(unmapped(bucket_name),unmapped(source_prefix),table_list,unmapped(cred_dict))

if len(sys.argv) > 1:
    if len(sys.argv) < 3:
        raise Exception('Three inputs are required to run pipeline: <aws bucket name> <source prefix> <sqlite_dbname>')

    flow.run(parameters={'bucket_name':sys.argv[1],'source_prefix':sys.argv[2],'db_name': sys.argv[3]})
    
if __name__=='__main__':
    """
    Bucket Name: AWS bucket name where data is stored.
    Source prefix: A prefixed used to search the storage bucket. The prefix is the name of the actual source file


    """
    flow_parameters = {
        'bucket_name':'bucket_name',
        'source_prefix':'some_prefix'
        'db_name': 'db_name',
        }

    executor = LocalDaskExecutor(scheduler="threads")
    flow.register(project_name='AWS ETL Pipelines')
    flow.run(parameters=flow_parameters)
So currently when I run this flow in does not show anything on the cloud. The question related to the parent and child flow is to run something like the following in folder B:
import pandas as pd
from prefect import Flow, task
from prefect.run_configs import LocalRun
from prefect.core.task import Parameter
from prefect.tasks.prefect import StartFlowRun, create_flow_run, get_task_run_result
from prefect.backend import FlowRunView
from prefect.engine.results.s3_result import S3Result
import os

data_dict1 = {
    'bucket_name':'bucket_name',
    'source_prefix':'prefix_1'
    }

with Flow('parent_flow') as flow:
    child_flow_id = create_flow_run(
        project_name='AWS ETL Pipelines',
        flow_name='AWS downloads',
        parameters=kaeri_data_params,
        run_name='test_run1'
        )

   child_flow_data = get_task_run_result(child_flow_id,task_slug='get_table_data-1')


flow.register(project_name='AWS ETL Pipelines')
flow.run()
So if I run the first flow directly in Folder A it does not show up on prefect cloud. If I run the second script the child flow runs, but when I try and retrieve the data I get the following error:
[2021-09-15 15:37:37+0200] INFO - prefect.TaskRunner | Task 'get_task_run_result': Starting task run...
[2021-09-15 15:38:00+0200] ERROR - prefect.TaskRunner | Task 'get_task_run_result': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 859, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 207, in get_task_run_result
    task_run = flow_run.get_task_run(task_slug=task_slug, map_index=map_index)
  File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/backend/flow_run.py", line 697, in get_task_run
    result = TaskRunView.from_task_slug(
  File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/backend/task_run.py", line 283, in from_task_slug
    cls._query_for_task_run(
  File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/backend/task_run.py", line 305, in _query_for_task_run
    task_runs = TaskRunView._query_for_task_runs(where=where, **kwargs)
  File "/Users/admin/opt/anaconda3/lib/python3.8/site-packages/prefect/backend/task_run.py", line 369, in _query_for_task_runs
    raise ValueError(
ValueError: No task runs found while querying for task runs where {'task': {'slug': {'_eq': 'get_table_data-1'}}, 'flow_run_id': {'_eq': '6553576c-cdf6-4306-9cc0-d848afc03203'}, 'map_index': {'_eq': 1}}
[2021-09-15 15:38:00+0200] INFO - prefect.TaskRunner | Task 'get_task_run_result': Finished task run for task with final state: 'Failed'
[2021-09-15 15:38:00+0200] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
k

Kevin Kho

09/15/2021, 2:12 PM
So
flow.run()
does not trigger a flow run with a backend (Cloud or Server).
flow.run()
just runs your script but Prefect context is not filled. It’s the backend that injects stuff like the
flow_run_name
and
flow_run_id
. Only scheduled runs and runs triggered in the UI will appear in the UI.
flow.run()
is not a real flow run. I also don’t suggest the
sys.argv
because that’s not going to be available when you are running on Prefect Cloud. Prefect will take care of passing the parameters for you. As long as you pass the Parameters in the schedule or through the UI, the flow run will use them. You also set them with
required=True
so there will be an error if they are not supplied. The reason
flow A
shows in the UI when you run
flow B
is because the
create_flow_run
task hits the graphQL API to start a flow run. Runs on a schedule and runs in the UI also hit the graphQL API.
flow.run
does not hit the API. This is so that if you run
flow B
on a schedule, you will see all of
flow A
runs in the UI as well. In order to get
flow A
to appear in the UI, you need to register and run in the UI or on a schedule.
j

Jacq Crous

09/17/2021, 9:40 AM
Hi Kyle, thanks for the response. That makes sense. I forgot about that distinction. I still however have the same issue of retrieving the data from the child flow as before: Error during execution of task: ValueError(“No task runs found while querying for task runs where {‘task’: {‘slug’: {‘_eq’: ‘get_table_data-1’}}, ‘flow_run_id’: {‘_eq’: ‘bdf0a323-0143-40c6-941d-08f65724c5a9’}, ‘map_index’: {‘_eq’: 1}}“) This error I pulled from prefect cloud. Could you please advise on why this is happening?
k

Kevin Kho

09/17/2021, 2:30 PM
Was this a one time thing or always happens? The set-up for
flow_run_id
works so could you check the task slug in the subflow if it’s right? Did you remove the first
flow.run()
call when you registered?
j

Jacq Crous

09/20/2021, 6:15 AM
So this happens every time. I tried a couple of different task slugs based on some graphql queries in the flows but none of them seem to work. I did remove the flow.run(), I’m running everything from the command-line, so the script only contains the tasks and the flow.
k

Kevin Kho

09/20/2021, 9:23 PM
When you navigate to the flow run page, what does the slug show up as? The
'bdf0a323-0143-40c6-941d-08f65724c5a9'
Wait sorry, is that task mapped? You might want
map_index=-1
? Not sure why yours says 1
View count: 2