https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    Joseph Haaga

    10/23/2020, 7:39 PM
    I dont understand this example from the Docs; isn’t
    flow
    only defined inside that contexthandler/`with` statement?
    from prefect import task, Flow
    
    @task
    def say_hello():
        print("Hello, world!")
    
    with Flow("Run Me") as flow:
        h = say_hello()
    
    flow.run() # prints "Hello, world!"
    I’m getting the following
    NameError: name 'flow' is not defined
    a
    b
    3 replies · 3 participants
  • a

    Alexander

    10/23/2020, 7:54 PM
    For some reason my tasks are not running in parallel. Logs saying
    Using executor type DaskExecutor
    (i use local one).
    import time
    
    from prefect import task, Flow
    from prefect.engine.executors import DaskExecutor
    from prefect.environments import LocalEnvironment
    
    
    @task
    def wait_for(seconds):
        time.sleep(seconds)
    
    
    with Flow('_Concurrency_') as flow:
        head = wait_for(5)
        leaf1 = wait_for(30)
        leaf2 = wait_for(15)
        tail = wait_for(3)
    
        head.set_dependencies(downstream_tasks=[leaf1, leaf2])
        leaf1.set_downstream(tail)
        leaf2.set_downstream(tail)
    
    flow.environment = LocalEnvironment(executor=DaskExecutor())
    flow.executor = DaskExecutor()
    I expect leaf1 and leaf2 to run in parallel but they are not. In gantt chart i see they running sequentially. If i run them locally, they run in parallel. BTW whats the difference between environment executor and flow executor? Flows are run by docker agent.
    n
    a
    5 replies · 3 participants
  • h

    Hui Zheng

    10/23/2020, 11:35 PM
    Hello, our flow run had failed with this error this afternoon.
    Failed to set task state with error: HTTPError('400 Client Error: Bad Request for url: <https://api.prefect.io/graphql>')
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 124, in call_runner_target_handlers
        state = self.client.set_task_run_state(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1399, in set_task_run_state
        result = self.graphql(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 275, in graphql
        result = <http://self.post|self.post>(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 230, in post
        response = self._request(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 400, in _request
        response = self._send_request(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 333, in _send_request
        response.raise_for_status()
      File "/usr/local/lib/python3.8/site-packages/requests/models.py", line 941, in raise_for_status
        raise HTTPError(http_error_msg, response=self)
    requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <https://api.prefect.io/graphql>
    Is it a prefect.io cloud API error? Anything I could do to avoid it or re-try it in a better way?
    c
    1 reply · 2 participants
  • j

    Jeff Friesen

    10/24/2020, 1:38 AM
    I don’t want to be negative or take away from anything you’re doing (which so far is great), but hosting your official blog on Medium seems counter productive to me. I would have to sign up and pay to read this: https://medium.com/the-prefect-blog/event-driven-workflows-with-aws-lambda-2ef9d8cc8f1a. I realize there is a free trial but still.
  • j

    Jeff Friesen

    10/24/2020, 1:40 AM
    Seems weird to put your company blog behind a paywall
    j
    j
    3 replies · 3 participants
  • s

    Scott Asher

    10/24/2020, 4:23 AM
    I have a custom executor/environment (PipelineExecutor/PipelineEnvironment) to execute code using my on-prem scheduler. I’m seeing the following error, which I’m trying to debug:
    execute() missing 1 required positional argument: 'flow_location'
    Traceback (most recent call last):
      File "/usr/local/bin/prefect", line 11, in <module>
        sys.exit(cli())
      File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 829, in __call__
        return self.main(*args, **kwargs)
      File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 782, in main
        rv = self.invoke(ctx)
      File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 1066, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 610, in invoke
        return callback(*args, **kwargs)
      File "/usr/local/lib/python3.6/dist-packages/prefect/cli/execute.py", line 34, in flow_run
        return _execute_flow_run()
      File "/usr/local/lib/python3.6/dist-packages/prefect/cli/execute.py", line 99, in _execute_flow_run
        raise exc
      File "/usr/local/lib/python3.6/dist-packages/prefect/cli/execute.py", line 93, in _execute_flow_run
        environment.execute(flow)
    TypeError: execute() missing 1 required positional argument: 'flow_location'
    [2020-10-24 04:14:03,330] INFO - agent | Process PID 23114 returned non-zero exit code
    This error confuses me, primarily because I don’t see my custom executor anywhere in the stack, nor do I understand what was supposed to be passing around location. The code runs fine when run locally using `flow.run()`:
    [2020-10-24 04:01:44] INFO - prefect.FlowRunner | Beginning Flow run for 'test_flow'
    [2020-10-24 04:01:44] INFO - prefect.TaskRunner | Task 'random_number': Starting task run...
    [2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'random_number': finished task run for task with final state: 'Success'
    [2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'plus_one': Starting task run...
    [2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'plus_one': finished task run for task with final state: 'Success'
    [2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'plus_two': Starting task run...
    [2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'plus_two': finished task run for task with final state: 'Success'
    [2020-10-24 04:01:50] INFO - prefect.TaskRunner | Task 'plus_three': Starting task run...
    [2020-10-24 04:01:50] INFO - prefect.TaskRunner | Task 'plus_three': finished task run for task with final state: 'Success'
    [2020-10-24 04:01:50] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    Code is (imports removed to save space):
    @task(name="random_number")
    def random_number():
        time.sleep(5)
        return random.randint(0, 100)
    
    @task(name="random_number")
    def random_number():
        time.sleep(5)
        return random.randint(0, 100)
    
    @task(name="plus_one")
    def plus_one(x):
        return x + 1
    
    @task(name="plus_two")
    def plus_two(x):
        return x + 2
    
    @task(name="plus_three")
    def plus_three(x):
        return x + 3
    
    #with Flow('test_flow', environment=LocalEnvironment(executor=PipelineExecutor())) as flow:
    flow = Flow('test_flow', storage=Local(), environment=PipelineEnvironment())
    plus_three.bind(plus_one, flow=flow)
    plus_two.set_upstream(plus_one, flow=flow)
    plus_two.bind(plus_one, flow=flow)
    plus_one.set_upstream(random_number, flow=flow)
    plus_one.bind(random_number, flow=flow)
  • s

    Scott Asher

    10/24/2020, 4:24 AM
    I registered the flow with Cloud via CLI:
    xxxx@xxxx:~/xxxx/xxxxxxxx$%>) % prefect register flow --project="Daily DAG" --file /tmp/testprefect.py
  • s

    Scott Asher

    10/24/2020, 4:25 AM
    Any thoughts or help are much appreciated.
  • a

    ale

    10/24/2020, 10:49 AM
    Hi folks, I’ reading about custom logging here: https://docs.prefect.io/core/advanced_tutorials/custom-logs.html I see that log level and formatter can be specified in the Prefect user config file. Do these logging settings apply also to Prefect server if supplied in
    config.toml
    ?
  • d

    Darragh

    10/24/2020, 3:10 PM
    Hey guys, I’m upgrading from 0.12.2 to 0.13.12 and having some problems. Previously I was able to set env variables to override the graphql endpoint, and a bunch of other things, but it doesn’t seem to accept the format I had anymore. Previous variable:
    PREFECT__SERVER__UI__GRAPHQL_URL
    Not working with 0.13.12 Docs seem to suggest that it’s changed
    graphql_url
    to
    apollo_url
    , so I tried to update the env var to
    PREFECT__SERVER__UI__APOLLO_URL
    or even
    PREFECT__SERVER__SERVER__UI__APOLLO_URL
    but no luck. Any suggestions? Also tried
    PREFECT__TELEMETRY__SERVER__TELEMETRY__ENABLED=false
    to disable telemetry but no luck there either
    n
    29 replies · 2 participants
  • m

    Marley

    10/24/2020, 8:49 PM
    I’d like my Cloud Flow to run and register from a Docker setup with a Prefect auth token that is at the team level. The setup right now is that I have a user token stored in a kube secret, but ideally that token wouldn’t be tied to a particular user. I cannot run a flow from a TENANT or RUNNER token, and I’d prefer not to pass a User token into the env variables of the image. Is there a way around using User tokens?
    c
    2 replies · 2 participants
  • a

    Anish Chhaparwal

    10/24/2020, 11:34 PM
    I have a python program with large number on input parameters. say around 35+. We are currently using argparser to set default values, load some of them from config files and pass them all args to functions. I'd like to convert the program into a flow. Is there a way to use the existing args in the flow. I have illustrated a sample of what id like to achieve but this fails when i try to run this from UI (refer attach image too).
    import argparse
    from prefect import Flow, Parameter, task
    import prefect
    from prefect.environments import LocalEnvironment
    @task
    def testing(args):
        
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"agrs value for name is : {args.name}")
    <http://logger.info|logger.info>(f"agrs value for worker is : {args.workers}")
    with Flow("args_trial") as flow:
        
    opts = Parameter("opts")
        
    testing(args=(opts))
    if __name__ == "__main__":
        
    parser = argparse.ArgumentParser(description="test pywinrm")
        
    parser.add_argument("--workers", type=int, default=10)
        
    parser.add_argument("--sleep_time", type=int, default=2)
        
    parser.add_argument("--name", type=str, default="args_tester")
        
    opts = parser.parse_args()
    #    flow.run(opts=opts)
        
    flow.environment = LocalEnvironment(labels=["qure9", "pipelines"])
        
    flow.register(project_name="Test")
    I can seem to figure out how to pass the default args while registering. Also, if i have to convert each of them into prefect parameters, i'd like to pass all parameters to a function using something like def func_1(args) instead of specifying 15-20 to each function. is that possible?
    c
    3 replies · 2 participants
  • s

    Scott Asher

    10/25/2020, 9:03 PM
    Running into problem - registering flow created in non
    __main__
    (moved code blocks into thread)
    c
    7 replies · 2 participants
  • n

    Narasimhan Ramaswamy

    10/25/2020, 10:04 PM
    from prefect import task, case, apply_map,Flow,unmapped
    import prefect
    from prefect.tasks.control_flow import merge
    from prefect.tasks.shell import ShellTask
    from prefect.engine.results import LocalResult
    from prefect.tasks.control_flow import case
    from prefect.environments.storage import Docker
    import docker 
    
    def test2(name):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Inside Task 2")
        a = ShellTask(name=name, command="ls", return_all=True, log_stderr=True)
        return a
    
    @task(log_stdout=True)
    def inc_if_even(x):
        logger = prefect.context.get("logger")
    
        <http://logger.info|logger.info>("----Start----")
        name = r'Shell' + str(x)
        b = test2(name).run()
        <http://logger.info|logger.info>(f"{b}")
        <http://logger.info|logger.info>("----End----")
        return 1
    
    @task
    def reduce(x):
        z = sum(x)
        if z > 1:
            return True
        else:
            return False
    
    @task
    def return_num(x):
        return x + 10
    
    @task
    def logger_st(status):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(f"Task Failed - {status}")
        return 1
    
    from prefect.engine.executors import DaskExecutor,LocalDaskExecutor
    from prefect.environments import LocalEnvironment
    
    with Flow("test-flow",environment=LocalEnvironment(executor=DaskExecutor())) as flow:
        logger = prefect.context.get("logger")
        return_nums = return_num.map([1,2,3,4,5,6,7])
        result = inc_if_even.map(return_nums)
        reduced = reduce(result)
    Hi All, just a quick question on DaskExecutor() - when running using flow.run, the mapped tasks works in parallel. but when running on cloud, it runs in sequence. My setup is agent running on Kubernetes. can you please help on what i should be doing?
    👀 2
    j
    2 replies · 2 participants
  • s

    Scott Asher

    10/26/2020, 1:45 AM
    I’m still seeing this error, even with my flow entirely created in the main script:
    File "/usr/scratch/sasher/pyenvs/prefect/lib/python3.6/site-packages/prefect/utilities/storage.py", line 85, in extract_flow_from_file
        raise ValueError("No flow found in file.")
    ValueError: No flow found in file.
    m
    7 replies · 2 participants
  • s

    Scott Asher

    10/26/2020, 3:47 AM
    Second question - and I assume this is in the docs somewhere, but i just can’t find it. In terms of using the Imperative API, you guys say:
    Prefect’s imperative API allows more fine-grained control. Its main advantage over the functional API is that it allows tasks to be set as upstream or downstream dependencies without passing their results. This allows you to create a strict ordering of tasks through state dependencies without also creating data dependencies.
    And then you give an example where you force a “PlusOne” task to run after a task that simply prints something. My question is - what if you WANT a data dependency? Can you do this through the imperative API? More code blocks to follow in thread so I don’t clutter up the main thread.
    m
    4 replies · 2 participants
  • l

    Leon Hao

    10/26/2020, 8:04 AM
    Hi Team, In the why not Airflow blog it says More confusingly, the
    execution_date
    is not interpreted by Airflow as the start time of the DAG, but rather the end of an interval capped by the DAG’s start time. I believe execution_date in Airflow is the start time of the schedule interval?
    k
    1 reply · 2 participants
  • h

    Hagai Arad

    10/26/2020, 9:49 AM
    Hello 👋 I’ve been trying to find the best way to set my prefect environment totally containerised and can’t manage to do it at the moment. I’m currently trying to use “docker in docker”: I’ve created an image that includes docker, docker-compose and prefect. I create a container of this image and run the prefect commands to initialise prefect server and ui. It all seems to work fine, but when launching localhost:8080 it shows me the home page and asks me to create a tenant. When running the create-tenant command (both in python and cli) from within the docker container I get an error (attached as a reply). When installing prefect on my host machine and running the create-tenant command the ui is working, but this is the exact thing I want to avoid (to install prefect on the host machine). Would love to get any tips how to solve it OR how to design it in another containerised way. Thanks!
    k
    3 replies · 2 participants
  • t

    tsar

    10/26/2020, 10:10 AM
    hoi, I'm trying to run prefect server start and I get the below error messages :
    ERROR: Invalid interpolation format for "apollo" option in service "services": "${GRAPHQL_HOST_PORT:-4201}"
    Exception caught; killing services (press ctrl-C to force)
    ERROR: Invalid interpolation format for "apollo" option in service "services": "${GRAPHQL_HOST_PORT:-4201}"
    Traceback (most recent call last):
      File "/usr/local/lib/python3.6/site-packages/prefect/cli/server.py", line 332, in start
        ["docker-compose", "pull"], cwd=compose_dir_path, env=env
      File "/usr/lib64/python3.6/subprocess.py", line 311, in check_call
        raise CalledProcessError(retcode, cmd)
    subprocess.CalledProcessError: Command '['docker-compose', 'pull']' returned non-zero exit status 1.
    2 replies · 1 participant
  • b

    banditelol

    10/26/2020, 11:29 AM
    Hello, I'm quite new to using prefect, but I've got some experience using airflow. I've got some question on the "_best practice"_ on using prefect server. Currently I have a server instance running Prefect Server, and I put all my flow inside a git repository (github). And my usual worflow is: 1. Work on the flow on my local machine, test it using
    flow.run()
    to run it locally and change it to
    flow.register()
    when I'm done. 2. Push the changes to git repo 3. Pull it from the server 4. Run the modified/created flow so that it's registered on Prefect Server (btw I have one agent running on the background in the server) 5. Activate the Flow from the UI I feel that there's clearly better way to do this, but I haven't found anything yet from googling. I really appreciate if there are anyone that could help give any clue for this. Thanks 🙂
    e
    2 replies · 2 participants
  • r

    Ralph Willgoss

    10/26/2020, 2:32 PM
    Hi, Thanks for all the help over the past few weeks/months. I've been evaluating prefect for a use case and right now it looks like it doesn't add much for what we do. I wanted to check with you guys, to see if I'm missing something or if I'm on the right track. I have a python model that is very disk intensive. We generate and move around lots of intermediate data, about 180GB, which is going to increase too. I currently run the model using LocalDaskExecutor on a single AWS EC2 instance with about 92 cores and 196GB RAM. While prefect gives me the ability to scale horizontally, if I were to spread the work horizontally, moving the intermediary data between instances is going to be slower than referencing off disk. So in summary, while I can just scale vertically our model seems limited by disk access so using something like prefect to go horizontal appears to add additional overhead. Thoughts, questions, opinions all welcome.
    r
    a
    12 replies · 3 participants
  • m

    Marwan Sarieddine

    10/26/2020, 2:38 PM
    Hi Folks , I am wondering if anyone has encountered this error before:
    Unexpected error: AttributeError("'NoneType' object has no attribute 'is_finished'")
    1 reply · 1 participant
  • m

    Marwan Sarieddine

    10/26/2020, 3:36 PM
    Hi again, what would be the easiest way to programmtically get the tasks that failed and those that trigger failed given a flow run id ?
    a
    k
    10 replies · 3 participants
  • t

    tsar

    10/26/2020, 3:57 PM
    there is currently no documentation on how to scale prefect on multiple hosts ? is this on purpose so people only use cloud ?
    r
    c
    13 replies · 3 participants
  • m

    Marley

    10/26/2020, 4:33 PM
    I’m trying to add two custom tasks to the end of my flow– one that notifies that the flow finished successfully (
    trigger=all_successful
    ), and one that only notifies if something failed (
    trigger=any_failed
    ). I’ve added a custom state handler to trigger failed (see code block in thread) that should
    SKIP
    if
    TriggerFailed
    . Raising that
    SKIP
    is causing the Task to fail in Prefect Cloud. I previously tried to leverage the Flow
    on_failure
    but for some reason it wasn’t sending my notifications. Am I missing something re: raising a
    SKIP
    signal on the last Task? Is there something special happening in a Flow’s
    on_failure
    preventing it from sending a Slack notification?
    k
    12 replies · 2 participants
  • n

    Nuno

    10/26/2020, 4:39 PM
    Hello there. What’s the best way to re-run a task, automatically? My use case: I’m fetching a lots of data, but I can’t put it all in memory, so I need to re run the task until I’ve access all the data. Thank you
    k
    11 replies · 2 participants
  • i

    Isaac Brodsky

    10/26/2020, 7:12 PM
    Anyone familiar with flows failing because of
    Unexpected error: KeyError('lz4')
    ? Seems the flow itself rather than a task is failing. This is using
    Docker
    storage, `LocalEnvironment`/`DaskExecutor` with Dask running on Kubernetes. Seems like somehow
    lz4
    is not present where the job is started? I do install
    pyarrow
    using
    python_dependencies
    in the
    Docker
    storage so I’d expect
    lz4
    to be there. I’m not sure where else
    lz4
    could be missing.
    k
    5 replies · 2 participants
  • j

    james.lamb

    10/26/2020, 10:31 PM
    👋 hello from Chicago! I have a question that I haven't been able to answer from the docs or looking through the
    prefect
    source code.
    help(LocalResult)
    shows the following:
    LocalResult(dir: str = None, validate_dir: bool = True, **kwargs: Any) -> None
    Result that is written to and retrieved from the local file system.
    Is it fair to say that "local" in this case means "local to where the task is physically run" and not "local to wherever
    flow.run()
    is called from"?
    k
    b
    10 replies · 3 participants
  • w

    wesley gabriel

    10/27/2020, 12:33 AM
    This might be a "dumb/obvious" question but what is the difference between "task_run_version" and "flow_run_version" and which of them should represent the version I see going up at each flow register?
    k
    m
    +1
    4 replies · 4 participants
  • d

    Dean Magee

    10/27/2020, 3:36 AM
    👋 hello from Melbourne Australia!! Im trying to have my state handler output some more useful information other than just "task failed". I would like it to output the parameters of that specific flow run. Is there a way I can access these within my state handler?
    👋 1
    c
    4 replies · 2 participants
Powered by Linen
Title
d

Dean Magee

10/27/2020, 3:36 AM
👋 hello from Melbourne Australia!! Im trying to have my state handler output some more useful information other than just "task failed". I would like it to output the parameters of that specific flow run. Is there a way I can access these within my state handler?
👋 1
c

Chris White

10/27/2020, 3:39 AM
Hello @Dean Magee! Yes, you should be able to access flow run parameters via `prefect.context`:
prefect.context.parameters # a dictionary of parameter name -> value
d

Dean Magee

10/27/2020, 4:06 AM
do you mean something like....
def alert_failed(obj, old_state, new_state):
    if new_state.is_failed():
        print(prefect.context.parameters)
        
    return new_state
c

Chris White

10/27/2020, 4:08 AM
yup yup, exactly
🙌 1
d

Dean Magee

10/28/2020, 3:26 AM
Great. Thanks!
View count: 1