https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Mitchell Bregman

    12/16/2020, 4:32 AM
    Has anyone gotten this error from Docker API when registering flow to cloud?
    docker.errors.APIError: 500 Server Error for <https://35.229.119.149:2376/v1.32/build?t=xxx-xxx.jfrog.io%2Fprefect%2Fprefect_qisvgvrv-compare_sqlserver_and_snowflake%3A0.0.0&q=False&nocache=False&rm=False&forcerm=True&pull=False&dockerfile=Dockerfile>: Internal Server Error ("Syntax error - can't find = in "Driver". Must be of the form: name=value")
    • 1
    • 1
  • а

    Алексей Филимонов

    12/16/2020, 8:21 AM
    Hi folks! Sometimes we face a problem: flow starts few times (according to the logs) even though in the timeline we can see only one run. The same about tasks inside flow - sometimes they start few times. Expected behavior: flow (and tasks) starts one time using one pid. Additional info: prefect server
    v 0.13.12
    , flows doesn't contain map and task runs without restart option.
    a
    • 2
    • 2
  • v

    Vlad Koshelev

    12/16/2020, 8:46 AM
    Hi, everyone. I'm trying to build ETL flow for star schema DB with references and facts tables. They are loaded from csv files daily. The facts should be loaded after the references, so facts ETL flow depends on already loaded references for the day. The problem is that files come in unpredictable order. E.g. facts.csv first, references.csv next. Moreover, references.csv may not be present on some days. In that case facts flow should wait for references for some time. The cases are: 1. references.csv uploaded first, facts.csv uploaded last: load references -> load facts, 2. facts.csv first, references.csv last: wait for references -> load references -> load facts, 3. facts.csv and no references.csv: wait for references with timeout -> load facts Another problem is that I need to forbid parallel execution of facts loading if there are several fact files uploaded. (facts data come in sliding time ranges without primary keys, so before loading them to DB I need to delete previous data within the time range). I guess I need a shared state which tells facts flow if references are already loaded or not and tells it that some other facts flow task is running. And the state is already present in Prefect DB tasks. Is there an API to get Prefect tasks from DB and which criteria I can use (e.g. some custom tags, dynamically set) to filter them? Something like this:
    references_task = StartFlowRun(flow_name='references-etl', project_name='etl', wait=True)
    facts_task = StartFlowRun(flow_name='facts-etl', project_name='etl', wait=True)
    
    with Flow('main-etl') as main_flow:
        files = get_files()
        references_task.map(files)
        facts_task.map(files)
    
    
    @task
    def get_group(file)
        # get files "group" name from the file (20200101-facts.csv -> group=20200101)
    
    @task
    def check_references_done(group, file):
        # get references tasks from Perfect DB for the group and check they are done
        # or check if wait timeout reached (e.g. get time of the file creation and check if now - created_at > timeout)
    
    @task
    def check_no_another_facts_running(group, file):
        # check if no "do_etl" tasks from Perfect DB with "running" state exist
    
    @task
    def do_etl(group, file):
        ...
    
    with Flow('facts-etl') as facts_flow:
        file = Parameter('file')
        group = get_group(file)
        check_references_done(group, file)
        check_no_another_facts_running(group, file)
        do_etl(group, file, upstream_tasks=[check_references_done, check_no_another_facts_running])
    j
    • 2
    • 2
  • l

    Lukas N.

    12/16/2020, 9:55 AM
    Hi Prefect community 👋, I've got a problem with passing environment variables from Kubernetes agent to DaskKubernetesEnvironment. Anyone able to help? More in thread
    v
    • 2
    • 2
  • w

    Will Milner

    12/16/2020, 2:30 PM
    is there any extra config needed in order to have std_out from a ShellTask show up in the server logs? When I declare a task like this I see the output fine
    @task(log_stdout=True)
    def sample_print_task():
        print("hello")
    When I declare a shell task like this
    task_shell = ShellTask(return_all=True, log_stdout=True, log_stderr=True, stream_output=True)
    with Flow("test") as flow:
        print_test = task_shell(command="echo hi", task_args={"name": "hi"})
    I don't see anything printed after I register and run the flow. I have
    log_to_cloud
    set to True on the agent I am running
    j
    • 2
    • 8
  • v

    Vitaly Shulgin

    12/16/2020, 3:09 PM
    Hi Prefect community, how can I configure agent to work for specific tenant? There is the error in agent log, is it possible to set it via env var?
    [2020-12-16 14:48:37,015] ERROR - agent | 400 Client Error: Bad Request for url: <http://prefect-apollo:4200/>
    
    The following error messages were provided by the GraphQL server:
    
        INTERNAL_SERVER_ERROR: Variable "$input" got invalid value null at
            "input.tenant_id"; Expected non-nullable type UUID! not to be null.
    
    The GraphQL query was:
    
        mutation($input: get_runs_in_queue_input!) {
                get_runs_in_queue(input: $input) {
                    flow_run_ids
            }
        }
    
    The passed variables were:
    
        {"input": {"before": "2020-12-16T14:48:36.911186+00:00", "labels": ["talynuc", "any"], "tenant_id": null}}
    j
    • 2
    • 6
  • d

    dh

    12/16/2020, 4:04 PM
    hello, prefect community. Does prefect has an idiomatic way to specify a) “please run this Task within this docker image” and b) “please execute this Task on a container using x amounts of cpu/memory + gpu”. I am curious about task-level specification, not flow-level.
    j
    j
    • 3
    • 15
  • m

    Marc Lipoff

    12/16/2020, 7:45 PM
    Does Prefect create a repository on AWS ECR, if it does not exist? I am getting this error
    The repository with name 'staging-prefect-insurance_export' does not exist in the registry with id '524279393077'
    Im running something like this :
    Docker(registry_url='<account_id>.<http://dkr.ecr.us-west-2.amazonaws.com|dkr.ecr.us-west-2.amazonaws.com>',
                              image_name="dev-prefect-<flow-name>",
                              python_dependencies=python_dependencies,
                              files={f: os.path.join('/modules/', os.path.basename(f))
                                                    for f in get_files_in_directory(current_module_directory)},
                              env_vars = {"PYTHONPATH": "$PYTHONPATH:modules/:modules/utils/"})
    j
    • 2
    • 2
  • c

    Christian

    12/16/2020, 10:27 PM
    Hi. 👋 I want to access prefect.context.parameters from within a task state_handler but the value I get is always
    None
    . The idea is to send an email (EmailTask) on a task failure and I want to parameterise the email address that will be alerted. Is there another way to pass a variable into a state_handler? Am I going in the wrong direction? Thanks
    j
    • 2
    • 5
  • l

    Lech Głowiak

    12/17/2020, 6:14 AM
    Hi! 👋 I admit my use-case isn't what Prefect was designed for, but here I am. I have to have manual trigger to let flow continue. That's because that are first runs of this flow in production, we want to manually check some state before going further. The problem is that after pause new Container is created. All state (both variables and temporary files stored in previous container) is lost this way and after pause it's missing. Is there some way to come around this obstacle?
    m
    d
    • 3
    • 9
  • b

    Bob Primusanto

    12/17/2020, 9:23 AM
    Hello, I'm looking for a way to get the flow_id. I want to record this as a column in my table, to trace back to a particular task if i encounter an issue. any ideas?
    j
    • 2
    • 2
  • t

    Thomas Reynaud

    12/17/2020, 2:02 PM
    Hi! First I’d like to thank you for building such a great product!! We are currently experimenting with Prefect and we were wondering what was the best practice when running a docker agent (or any agent actually) on a remote server. Should you just start it behind a screen, detach and leave it as a background process? Do you have any recommendations? It seems very dirty when running multiple agents on the same machine..
    j
    • 2
    • 2
  • d

    dh

    12/17/2020, 2:39 PM
    hello community, could anyone help me understand if there’s an easy way to get FlowRunner “NOT” to submit a Task to executor if any of the task’s upstream tasks are not ready (e.g. in Pending states)? I think the current design decision is to submit regardless of the upstream states and let the TaskRunner decide if that task is runnable or not as per this code [1]. Because of this behavior, I am experiencing a UX problem where non-runnable tasks repeatedly get submitted to our SLURM cluster occupying resources for some time only to confirm those tasks are not actually ready yet due to upstream tasks. Would there a clean solution / workaround to this? [1]: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/flow_runner.py#L593
    j
    • 2
    • 10
  • j

    Joël Luijmes

    12/17/2020, 2:42 PM
    Hey, how can I provide secrets to the Kubernetes Agent? I noticed that I can set some envs at
    KubernetesRun
    but these are only strings. Is there currently an easy way to provide secrets as envs? My intuition says I have to provide a custom job yaml then, is that correct, or am I missing an easier way?
    ✅ 1
    j
    s
    +2
    • 5
    • 11
  • j

    Joseph

    12/17/2020, 5:23 PM
    I’m evaluating Prefect for some data engineering pipelines, but our organization is using Bamboo for CI/CD and it has some very obvious weaknesses (such as config via web rather than in code) and I’d be interested in trying Prefect as a Bamboo replacement. I understand that Prefect was created for data engineering pipelines. Is anyone aware of any write-ups of people using Prefect for more traditional CI/CD tasks in addition to data engineering? Alternatively are there obvious reasons why this is a bad idea?
    c
    • 2
    • 4
  • p

    Pedro Martins

    12/17/2020, 6:31 PM
    Hey! Following the above thread on secrets in k8s agent. How can ensure that prefect-jobs spawned by the agent contains the secret I specified? I'm running the Aircraft example from a notebook and connecting it to my server in the cluster. Simply passing image_pull_secrets to KubernetesRun do not work: keep getting the
    Error: ErrImagePull
    custom_confs = {
        "run_config": KubernetesRun(
            image="drtools/prefect:aircraft-etl", 
            image_pull_secrets=["regcred"], 
        ),   
        "storage": S3(bucket="dr-prefect"),
    } 
    
    with Flow("Aircraft-ETL", **custom_confs) as flow:
        airport = Parameter("airport", default = "IAD")
        radius = Parameter("radius", default = 200)
        
        reference_data = extract_reference_data()
        live_data = extract_live_data(airport, radius, reference_data)
    
        transformed_live_data = transform(live_data, reference_data)
    
        load_reference_data(reference_data)
        load_live_data(transformed_live_data)
    🙌 1
    👀 2
    d
    j
    +2
    • 5
    • 34
  • r

    Richard Hughes

    12/17/2020, 6:36 PM
    Hi, I am hoping to find some help writing a mutation to delete flows from a query. Any guidance on this so far what I can figure out from lack of understanding of the graphql
    mutation {
      delete_flow(input: 
      
        query {
      flow 
      (where: {name: {_eq: "MyFlowToDelete" }})
        {
    
        id
      }
    }
        ) {
        success,
        error
      }
    }
    d
    • 2
    • 10
  • v

    Vitaly Shulgin

    12/17/2020, 7:25 PM
    Hi, I'm rewriting some ETL jobs from bonobo based implementation to prefect, question, does Prefect support functionality like FixedWindow, to limit number of input elements?
    d
    m
    • 3
    • 9
  • p

    Phil Glazer

    12/17/2020, 9:33 PM
    Any good guides for integrating/deploying with AWS? Apologies if I missed this elsewhere
    m
    d
    • 3
    • 2
  • h

    Henrik Väisänen

    12/17/2020, 9:53 PM
    Hey, I'm experimenting with prefect and dask workers running on multiple servers and trying to achieve the following: I run a flow from time to time that uses all the workers and would like to cache the results for future flow runs so that servers can access each others cache. The servers do not have a shared drive, and I can not bind the task to a specific server either. Based on https://github.com/PrefectHQ/prefect/issues/2636, having this kind of distributed cache is not possible currently out of the box with dask, or am I missing some crucial piece of prefect knowledge?
    👀 1
    m
    • 2
    • 5
  • m

    Marwan Sarieddine

    12/17/2020, 9:59 PM
    https://prefect-community.slack.com/archives/CL09KU1K7/p1608242318263800?thread_ts=1606496254.410900&amp;cid=CL09KU1K7
  • d

    Diogo Munaro

    12/17/2020, 11:38 PM
    Hey guys, I'm Diogo and using Prefect for the first time and already in love with it ❤️ . I'm doing a Flow calling another registered flow with
    StartFlowRun
    , but I can't get results from that flow. Looking at the code,
    create_flow_run
    ignores result arg: https://github.com/PrefectHQ/prefect/blob/96ef85470872593268c9498b57ac9f0b5a268e01/src/prefect/tasks/prefect/flow_run.py#L160 Do you know a way to get Flow results? Here a test code:
    from prefect.tasks.prefect import StartFlowRun
    from prefect import Flow, task
    from prefect.engine.results.local_result import LocalResult
    
    graph_building = StartFlowRun(
          flow_name="test_flow",
          project_name="test_project",
          wait=True,
          result=LocalResult(".")
    )
    with Flow("Call Flow") as flow:
        end_flow = graph_building()
    
    state = flow.run()
    state.result[endflow].result #nothing here
    d
    m
    • 3
    • 14
  • m

    Marc Lipoff

    12/18/2020, 12:43 AM
    I'm having an issue where the task definition that is created from the agent seems to not be correct. The task definition for the flow looks like this: (see screenshot below) But, when I am running my agent, I am using
    prefect agent ecs start --cluster arn:aws:ecs:us-west-2:xxx:cluster/staging-cluster --label staging --task-role-arn arn:aws:iam::xxx:role/staging-prefect-agent-flow --log-level DEBUG --launch-type FARGATE
    . So I would expect the compatibility to be
    FARGATE
    It seems this is the cause of the error I am getting when I submit my flow to the agent. The error is:
    [2020-12-18 00:34:44,559] ERROR - agent | Error while deploying flow: InvalidParameterException('An error occurred (InvalidParameterException) when calling the RunTask operation: Task definition does not support launch_type FARGATE.')
    Has anyone seen something like this before?
    a
    k
    • 3
    • 5
  • j

    jack

    12/18/2020, 2:20 AM
    Hey team, I tried running a flow and it gave me this error
    {'_schema': 'Invalid data type: None'}
    Does anyone know what this means?
    n
    • 2
    • 2
  • v

    Vitaly Shulgin

    12/18/2020, 9:11 AM
    Hello Team, I'm trying to run test flow, the problem is that no one task except first is called, any ideas?
    n
    • 2
    • 11
  • v

    Vitaly Shulgin

    12/18/2020, 2:51 PM
    Is it possible to modify context in one task, by adding key to the context, and get that key value in another task? What I see now, in next task context does not contain key added in previous task.
    n
    • 2
    • 9
  • s

    Sergiy Krutsenko

    12/18/2020, 5:32 PM
    Hi, I try to create dependencies between 2 flows but have some problems passing results from the first one to the following. Below is updated minimal code to reproduce. I use LocalAgent to run flows via Server. They all are finished with success status but results from flow_a are not passed to flow_b. Here are entries from logs:
    Flow_a: Created 3 chunks
    Flow_b: Got result from previous flow None
    import prefect as pf
    from prefect import Client, task, Parameter, Flow
    from prefect.engine.results import LocalResult
    from prefect.environments.storage import Local
    from prefect.utilities.configuration import set_temporary_config
    from prefect.tasks.prefect import FlowRunTask
    from prefect.environments import LocalEnvironment
    import socket
    
    def get_server_config(server, port):
        return {
                    "cloud.api": "http://{}:{}".format(server, port),
                    "cloud.graphql": "http://{}:{}/graphql".format(server, port),
                    "backend": "server",
                }
    
    def get_logger():
        return pf.context.get('logger')
    
    @task()
    def create_chunks(inputs):
        logger = get_logger()
        chunks = ['a', 'b', 'c']
        <http://logger.info|logger.info>('Created %d chunks', len(chunks))
        return chunks
    
    @task()
    def accept_results(result):
        logger = get_logger()
        <http://logger.info|logger.info>('Got result from previous flow %s', result)
        return result
    
    def main():
        hostname = socket.gethostname()
        labels = [hostname]
        env = LocalEnvironment(labels=labels)
        with set_temporary_config(get_server_config('xxx', '4200')):
            with Flow('flow_a', storage=Local(), environment=env,
                      result=LocalResult(dir='c:/temp/flows/flow_a',
                                         location='{flow_run_id}_{task_name}_{map_index}.txt')
                      ) as flow_a:
                inputs = Parameter('inputs', required=True)
                create_chunks(inputs)
    
            with Flow('flow_b', storage=Local(), environment=env,
                      result=LocalResult(dir='c:/temp/flows/flow_b',
                                         location='{flow_run_id}_{task_name}_{map_index}.txt')
                      ) as flow_b:
                result = Parameter('result', required=True)
                accept_results(result)
    
            fa = FlowRunTask(flow_name="flow_a", project_name="Test", wait=True)
            fb = FlowRunTask(flow_name="flow_b", project_name="Test", wait=True)
    
            with Flow('flow_c', storage=Local(), environment=env,
                      result=LocalResult(dir='c:/temp/flows/flow_c',
                                         location='{flow_run_id}_{task_name}_{map_index}.txt')
                      ) as flow_c:
                a = fa(parameters={'inputs': {}})
                b = fb(upstream_tasks=[a], parameters={'result': a.result})
    
            client = Client()
            id_a = client.register(flow_a, project_name="Test")
            id_b = client.register(flow_b, project_name="Test")
            id_c = client.register(flow_c, project_name="Test")
            client.create_flow_run(flow_id=id_c)
    
    if __name__ == "__main__":
        main()
    👀 1
    n
    • 2
    • 18
  • i

    itay livni

    12/18/2020, 6:31 PM
    Hi - Did
    FlowRunTask
    change to
    StartFlowRun
    somewhere along the way? Thanks
    n
    • 2
    • 2
  • a

    Andrew Hannigan

    12/18/2020, 7:05 PM
    Extremely helpful warning messages, not sure if this is a 0.14 thing or was always there but...great stuff! Feels almost like a code review note.
    /opt/project/dwt/extract/extract.py:43: UserWarning: A Task was passed as an argument to BookmarkEnd, you likely want to first initialize BookmarkEnd with any static (non-Task) arguments, then call the initialized task with any dynamic (Task) arguments instead. For example:
    
      my_task = BookmarkEnd(...)  # static (non-Task) args go here
      res = my_task(...)  # dynamic (Task) args go here
    💯 5
    🤩 1
    j
    • 2
    • 1
  • c

    Chris White

    12/18/2020, 7:15 PM
    Hello everyone! For all those who are still curious about Run Configs, how to migrate to the new paradigm and avoid any
    DeprecationWarnings
    , or those who just want to learn more, please join @Laura Lorenz as she walks us through the changes and new functionality! The stream will begin at 1pm PT / 4pm ET at this link:

    https://www.youtube.com/watch?v=yu1bD3KHmhA&amp;feature=youtu.be▾

    See you all there!
    :hero: 1
    :thank-you: 2
    ❤️ 2
    🚀 1
    l
    • 2
    • 1
Powered by Linen
Title
c

Chris White

12/18/2020, 7:15 PM
Hello everyone! For all those who are still curious about Run Configs, how to migrate to the new paradigm and avoid any
DeprecationWarnings
, or those who just want to learn more, please join @Laura Lorenz as she walks us through the changes and new functionality! The stream will begin at 1pm PT / 4pm ET at this link:

https://www.youtube.com/watch?v=yu1bD3KHmhA&amp;feature=youtu.be▾

See you all there!
:hero: 1
:thank-you: 2
❤️ 2
🚀 1
l

Laura Lorenz

12/18/2020, 7:23 PM
The demo is also all in AWS Fargate/ECS if you are a fan (or want to be one/see what’s up!)
👍 1
View count: 3