https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • b

    BK Lau

    01/19/2021, 5:44 PM
    Q: Can Prefect Agent be set up to run Hashicorp
    Nomad
    as executor instead of
    Dask
    ?? I want to use Prefect on the Cloud to drive a
    Nomad
    cluster installed on-premise
    j
    • 2
    • 2
  • m

    Matic Lubej

    01/19/2021, 8:18 PM
    Hi again! I'm trying to run a process over a Fargate cluster using
    dask_cloudprovider
    API. running tutorials and sample code from
    dask
    this works great, but for prefect I have created a dedicated docker image which I provide to the cluster initializer. The cluster gets created, but as soon as the flow starts, after 10 s I get the following time-out error:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
        with self.check_for_cancellation(), executor.start():
      File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
        return next(self.gen)
      File "/usr/local/lib/python3.8/site-packages/prefect/executors/dask.py", line 203, in start
        with Client(self.address, **self.client_kwargs) as client:
      File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 748, in __init__
        self.start(timeout=timeout)
      File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 953, in start
        sync(self.loop, self._start, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 340, in sync
        raise exc.with_traceback(tb)
      File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 324, in f
        result[0] = yield future
      File "/usr/local/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
        value = future.result()
      File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1043, in _start
        await self._ensure_connected(timeout=timeout)
      File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1100, in _ensure_connected
        comm = await connect(
      File "/usr/local/lib/python3.8/site-packages/distributed/comm/core.py", line 308, in connect
        raise IOError(
    OSError: Timed out trying to connect to <tcp://172.31.40.184:8786> after 10 s
    [2021-01-19 20:14:19+0000] ERROR - prefect.Execute process | Unexpected error occured in FlowRunner: OSError('Timed out trying to connect to <tcp://172.31.40.184:8786> after 10 s')
    Traceback (most recent call last):
      File "s3_process_l2a_2019.py", line 114, in <module>
        assert status.is_successful()
    AssertionError
    Any ideas what is going on? Is the dask scheduler having issues connecting to the workers? Or might it be something else? Thanks!
    j
    • 2
    • 2
  • b

    Billy McMonagle

    01/19/2021, 9:29 PM
    I have a flow with docker storage, running with the k8s agent. When I run my flow and look at the logs, it seems to be executing an old version of my flow. Details in thread...
    j
    m
    j
    • 4
    • 72
  • c

    Chris Jordan

    01/19/2021, 10:13 PM
    Hey, I've got a quick question about persisting temporary files between tasks. It looks like if I pass a
    tmpfile
    object between tasks, it'll work as expected, as in
    @task(name="save records to a tmp file")
    def save_some_data(result=PrefectResult()):
        logger = prefect.context.get("logger")
        the_file = tempfile.NamedTemporaryFile()
        for i in range(5000000):
            the_file.write(b"lorem ipsum\n")
        the_file.seek(0)
        <http://logger.info|logger.info>("wrote the file")
        return the_file
    
    @task(name="reread that data")
    def read_some_data(the_file, result=PrefectResult()):
        logger = prefect.context.get("logger")
        output = the_file.read()
        <http://logger.info|logger.info>(f"read the file")
        <http://logger.info|logger.info>(f"length of file is {len(output)}")
    
    
    with Flow("save_retrieve_file_flow") as flow:
        f = save_some_data()
        g = read_some_data(f)
    but passing the name of the file won't work - the file will not be found
    @task(name="save records to a tmp file")
    def save_some_data(result=PrefectResult()):
        logger = prefect.context.get("logger")
        the_file = tempfile.NamedTemporaryFile()
        for i in range(500000):
            the_file.write(b"lorem ipsum\n")
        the_file.seek(0)
        <http://logger.info|logger.info>(f"wrote the file to {the_file.name}")
        return the_file.name
    
    @task(name="reread that data")
    def read_some_data(the_file_name, result=PrefectResult()):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(f"trying to open {the_file_name}")
        
        with open(the_file_name, 'r') as the_file:
            output = the_file.read()
            <http://logger.info|logger.info>(f"read the file")
            <http://logger.info|logger.info>(f"length of file is {len(output)}")
    
    
    with Flow("save_retrieve_file_flow2", 
            state_handlers=[cloud_only_slack_handler]
           ) as flow:
        f = save_some_data()
        g = read_some_data(f)
    what's going on here? is the file system being reset between tasks? does prefect clean up temporary files in the flow if they're not in memory directly? something else?
    👀 1
    m
    • 2
    • 10
  • h

    Hui Zheng

    01/20/2021, 12:58 AM
    Hello Prefect, One our flow run got an Connection Error at the start of the flow run, which we never seen before. Do you know what caused that?
    HTTPSConnectionPool(host='10.124.0.1', port=443): Max retries exceeded with url: /apis/batch/v1/namespaces/default/jobs (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7fa7a7705668>: Failed to establish a new connection: [Errno 111] Connection refused',))
    👀 1
    n
    • 2
    • 2
  • r

    Riley Hun

    01/20/2021, 1:06 AM
    Hi Prefect Team, I upgraded to Prefect version
    0.14.4
    and I can no longer run
    flow.environment.executor = executor
    . I get the following error
    AttributeError: 'NoneType' object has no attribute 'executor'
    . Does anyone know how to attach an executor to a flow registered to Prefect Server using Docker Storage in the latest version of Prefect. Had no issues with this is Prefect version
    0.13.18
    . In the meantime, I'll downgrade to
    0.13.8
    for the time being.
    c
    • 2
    • 2
  • s

    SK

    01/20/2021, 2:28 AM
    I have setup prefect in aws ec2 and able to register the flow and the flow is running too, but the aws prefect server page is not showing any flow and the page looks like the below. Please help
    👀 1
  • s

    SK

    01/20/2021, 2:28 AM
  • a

    Aiden Price

    01/20/2021, 4:49 AM
    Hi Prefect people, I'm wondering how I can get the KubernetesRun to use a specific image tag that I want to provide dynamically (or at least whenever I reinstall the server and agent with Helm). The backstory is that I have a custom base image for my Agent, Kubernetes jobs and Dask cluster, and when I launch a KubernetesRun there's a delay before the job starts because I call for the
    latest
    tag and
    Always
    pull so I can be sure I'm getting the latest dependencies. I'd like to minimise the delay by changing to a
    IfNotPresent
    image pull policy, but then I'd need to pin the image tag to a specific version, but then I'd want to make sure that specific tag could be updated as I keep pushing updates. Does anyone have any advice? Thanks.
  • r

    Riley Hun

    01/20/2021, 5:33 AM
    Hi Prefect Team, just a quick (probably silly) question. Please pardon my ignorance. I upgraded to
    Prefect 0.14.4
    . My flow, which is using Docker storage and Dask Gateway as the executor, is working fine after the transition. I finished reading the updated documentation and I'm trying to understand the concept of
    Run Configuration
    . I don't quite understand it, especially since my scheduled flow worked without it, so how do I know when a Run Config is applicable?
    m
    • 2
    • 1
  • v

    Vitaly Shulgin

    01/20/2021, 1:31 PM
    Hello there, is there a way to register flow from within another flow, or clone existing flow with changed name. We need to re-schedule flows each night for next day, different customers, the idea is to register flow with name which includes customer name, or clone existing flow with such name. The problem is that registration of flow from within another running flow was not working in previous versions, not sure what is the current status. And cloning with another name and parameters is preferable. Any ideas?
  • p

    Paul Vanetti

    01/20/2021, 1:35 PM
    Hi all, we are seeing the error "At least one upstream state has an unmappable result" for a failed task. A subsequent manual run of the same flow finishes without error. We think this might only happen when the upstream task has to retry several times before succeeding, but I've failed to reproduce this. Any idea what might be causing this?
    j
    m
    • 3
    • 8
  • m

    Marwan Sarieddine

    01/20/2021, 1:48 PM
    Hi folks, it seems I am facing a bug when trying to pass parameters returned from a task to
    StartFlowRun
    - I just want to make sure I am doing this correctly - please see the example in the thread.
    s
    j
    • 3
    • 6
  • v

    Vitaly Shulgin

    01/20/2021, 2:34 PM
    How to re-schedule flow which already exists? We have scheduler job, and registered flow, is it possible to re-schedule it ?
    m
    r
    j
    • 4
    • 23
  • m

    Matthew Blau

    01/20/2021, 3:23 PM
    Hello all, I have seemed to come across a bug, or at the very least something interesting. I have been having issues with getting the graphql service seen here to connect and I have discovered an oddity in that I had a config.toml and a config.toml.bak and the best that I can tell is that Prefect had attempted to parse both files leading to issues that was resolved after I removed the .bak file entirely. Has anyone else encountered this?
    m
    • 2
    • 1
  • v

    Vipul

    01/20/2021, 4:32 PM
    I am planning to run large Flow with the caching that seems to be working fine But lately I found out that if the same flow is started at almost similar time then the caching might not work as they both are running at the same time. Want to understand if there is a way to control this by not triggering the second flow when the first run is already in running state (with the same Parameter) using Prefect API or GraphQL query API
    m
    • 2
    • 6
  • j

    J. Martins

    01/20/2021, 5:19 PM
    Hello all. I have 2 tasks in a flow, execute_a and execute_b. The task execute_a is using raise LOOP to make successive calls to an API and will return a string when it finishes. When I simulate an error (eg. API is down), I notice that execute_a is set to Retry state and execute_b is also started and set to Pending state (I was expecting execute_b to not even start). At this point the API will become available again and eventually both tasks will finish with success. My question here is, how can we tell Prefect to only start execute_b if execute_a is finished?
    j
    • 2
    • 16
  • l

    Luis Gallegos

    01/20/2021, 6:22 PM
    Hi there, I have a few months with prefect. Can yo help me with this question?: I have the following flow. 1.- 300 csv (with different name and size) -> gzip -> EXASOL(DW) -> Delete (csv, gz) 2.- A flow that proccess 1 csv per run My question is how handle Prefect the flows and server resources if i run al csv at the same time. That's means run 300 flow's at the same time. (Does some kind of smart upload so don't collapse the server?) Enviroment info: # Server 24 Cores - 96GB RAM # Single Node # Total size csv (150GB aprox) # Prefect Core 0.14.0 # Docker Prefect core-0.14.0 # Executor LocalDaskExecutor (You can give some advice with DaskExecutor too) Note: I avoid pass a list of the csv in each task because disk space. Notice that i delete the file at the end of the flow Thanks for your help 🙂
    r
    j
    • 3
    • 6
  • s

    SK

    01/20/2021, 7:44 PM
    Hi, I am trying to access the registered flow from aws ec2 public dns...something like this http://ec2-xxxxxxxxxxxx.com:8080/default/flow/4dfbfb93-e00a-4ad4-a34d-fdd813f9a123, but the url reverts back to http://ec2-xxxxxxxxxxxx.com:8080 and the dashboard is not showing the flows, but the flow is running and I can see the output being stored in my database...Am I missing something in the setup when the host is not the localhost for the prefect server? @nicholas can you please help on this?
    m
    • 2
    • 3
  • z

    Zach

    01/20/2021, 8:37 PM
    I submitted a few hundred runs using the prefect API that I want to cancel right now, how can I do that easily?
    j
    • 2
    • 1
  • z

    Zach

    01/20/2021, 8:37 PM
    I want to cancel every currently scheduled run for one of my flows basically
    n
    • 2
    • 2
  • z

    Zach

    01/20/2021, 8:37 PM
    One way of doing this is to register a new version of that flow, but that seems hacky
  • b

    BK Lau

    01/20/2021, 8:55 PM
    Q: If I have 100 on-prem worker nodes, say, do I have to install a Prefect Agent on each of them to enable them to be controlled by a Prefect Server?
    j
    m
    • 3
    • 9
  • a

    Alberto de Santos

    01/20/2021, 9:06 PM
    Hi community, Do you have any idea on how to make a task mapping with two parameters? For instance, parameter1 = [a, b, c] and parameter2 [1,2,3] So how could I make tasks with all the pairs possibilities : (a, 1),(a,2),...? Thanks in advance
    j
    • 2
    • 3
  • s

    Sean Talia

    01/20/2021, 9:15 PM
    do any of the official prefect docker images have any of the extras pre-installed in them or is it on you to manually install, for example, the
    aws
    extras?
    j
    • 2
    • 2
  • v

    Vitaly Shulgin

    01/20/2021, 9:26 PM
    Hello there, what is the right way to pass results from task to another as inpurt for run method, when task are dynamically added to edge?
    j
    • 2
    • 5
  • m

    Matic Lubej

    01/20/2021, 9:49 PM
    Hi! Quick question. I'm running a prefect flow using the DaskExecutor, which is connected to a cluster. My checkpointing is turned on, but there is no result being save and no logs being printed out (DEBUG mode on). I get the results and the print-out if I use the local executor or a local instance of a DaskExecutor. Any ideas what to do in this case?
    c
    j
    • 3
    • 11
  • s

    Sai Srikanth

    01/20/2021, 10:11 PM
    Hello all, I am trying to connect Prefect, AWS thru storing secrets as ACCESS_KEY, SECRET_KEY, SECURITY_TOKEN. I am trying to connect as below but I am unable to connect. Any suggestions? Please check the image for further reference:
    m
    • 2
    • 8
  • l

    Luis Gallegos

    01/20/2021, 11:28 PM
    Hi, Quick Question: How i can use the parameter input as a string outside a task. When i try it , i get a parameter object <Parameter: example>
    with Flow("example-flow") as flow:
    	example_param = Parameter('example')
    
    	print(example_param)
    Thanks for your help!!
    m
    • 2
    • 6
  • d

    Darshan

    01/20/2021, 11:37 PM
    Hello, I am trying to evaluate prefect for my work and trying to run some toy flows to understand various aspects of Prefect. I am struggling to find a good example of how to pass parameters to a task function using imperative API. Basically I am trying to do something like below, how can I pass parameter to task_1 and task_3 using imperative API ?
    @task
    def task_1(some_param):
    	// Do something
    
    @task
    def task_2():
    	// Do something
    
    @task
    def task_3(some_other_param):
    	// Do something
    
    
    flow.set_dependencies(task = task_1)
    flow.set_dependencies(task = task_2, upstream_tasks=[task_1])
    flow.set_dependencies(task = task_3, upstream_tasks=[task_2])
    m
    • 2
    • 14
Powered by Linen
Title
d

Darshan

01/20/2021, 11:37 PM
Hello, I am trying to evaluate prefect for my work and trying to run some toy flows to understand various aspects of Prefect. I am struggling to find a good example of how to pass parameters to a task function using imperative API. Basically I am trying to do something like below, how can I pass parameter to task_1 and task_3 using imperative API ?
@task
def task_1(some_param):
	// Do something

@task
def task_2():
	// Do something

@task
def task_3(some_other_param):
	// Do something


flow.set_dependencies(task = task_1)
flow.set_dependencies(task = task_2, upstream_tasks=[task_1])
flow.set_dependencies(task = task_3, upstream_tasks=[task_2])
m

Michael Adkins

01/20/2021, 11:41 PM
I wouldn’t recommend starting with the imperative API but here you want to pass
keyword_tasks
e.g.
flow.set_dependencies(task=task_1, keyword_tasks={"some_param": Parameter("example_param")})
You can also use
bind
as shown in https://docs.prefect.io/core/getting_started/first-steps.html#imperative-api
Parameter("example_param")
in my example can also be a task or a constant value ie
10
d

Darshan

01/20/2021, 11:44 PM
Thanks @Michael Adkins will check this.
m

Michael Adkins

01/20/2021, 11:45 PM
I really would encourage you to use the functional API though — it’s much more intuitive. If you find yourself needing the imperative API for a specific thing then I’d switch to it in places 🤷 best of luck either way!
d

Darshan

01/20/2021, 11:46 PM
I understand functional API should be used as much as possible but one of the real life workflow we have has tasks which does not return any data back. for example, each task executes a SQL which creates a table in database and subsequent task uses the table cerated by previous task. I was struggling to make a flow with functional APIs where dependency of tasks needs to be maintained but tasks themselves does not return any direct results. Would be great to see if functional API can be used for this kind of workflow since as you said it is more intuitive for any developer.
m

Michael Adkins

01/20/2021, 11:50 PM
I would just write it out functionally then use
downstream_task.set_upstream(table_creation)
where needed
from prefect import task, Flow


@task(log_stdout=True)
def display(value):
    print(value)


@task
def create_table():
    pass


@task
def insert_data():
    pass


@task
def query_data():
    return "fake-data-from-table"


with Flow("non-data-dependencies") as flow:
    create = create_table()
    insert = insert_data()
    insert.set_upstream(create)
    data = query_data()
    data.set_upstream(insert)
    display(data)


flow.run()
Sorry that’s actually poorly written, it’ll run things more than once. Fixed now.
Slightly more succinct:
with Flow("non-data-dependencies") as flow:
    create = create_table()
    insert = insert_data().set_upstream(create)
    data = query_data().set_upstream(insert)
    display(data)
d

Darshan

01/21/2021, 12:15 AM
Very helpful, thanks
So here is the viz of a toy workflow I was trying to implement
and here is the code I was able to come up with to design this.
from prefect import Flow, task
import prefect

@task(name="create db_table_1")
def task_1():
    # Create db_table_1
    print("Create db_table_1")

@task(name="create db_table_2 using db_table_1")
def task_2():
    # Create db_table_2
    print("Create db_table_2")


@task(name="create db_table_3 using db_table_2")
def task_3():
    # Create db_table_3
    print("Create db_table_3")

@task(name="create db_table_4 using db_table_2")
def task_4():
    # Create db_table_4
    print("Create db_table_3")

@task(name="create db_table_5 using db_table_3 and db_table_4")
def task_5():
    # Create db_table_5
    print("Create db_table_3")


@task(name="query data from db_table_5 and display")
def query_data():
    # Query data and print
    print("result of query")

with Flow('ETL') as flow:

    task_2.set_upstream(task_1)
    task_3.set_upstream(task_2)
    task_4.set_upstream(task_2)
    
    task_5.set_dependencies(upstream_tasks=[task_3, task_4])
    
    query_data.set_upstream(task_5)

flow.run()
flow.visualize()
View count: 1