https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Salohy

    04/28/2021, 1:24 PM
    Hello every one,  This is my example flow that I want to run with docker as Storage, KubernetesRun as run_config and DaskExecutor in an existing cluster as executor.
    import prefect
    from prefect import task, Flow
    from prefect.storage import Docker
    from prefect.run_configs import KubernetesRun
    from prefect.executors import DaskExecutor
    
    STORAGE = Docker(registry_url="<http://prefect.azurecr.io|prefect.azurecr.io>", image_name="prefect/test")
    
    RUN_CONFIG = KubernetesRun(image="<http://prefect.azurecr.io/prefect/test|prefect.azurecr.io/prefect/test>")
    
    EXECUTOR = DaskExecutor(address="<tcp://my-ip:8786>")
    
    
    @task
    def hello():
      logger = prefect.context.get('logger')
      <http://logger.info|logger.info>("Hello!")
    
    
    with Flow("changeme", storage=STORAGE, run_config = RUN_CONFIG, executor = EXECUTOR) as flow:
      hello()
     When I run this flow, the flow is successfully submitted but never got executed and the status is forever pending. Do I miss something? Do I need to specify
    flow.run()
    in my code? Many thanks already for helping me 🙂🙏
    e
    f
    +2
    • 5
    • 35
  • b

    Ben Collier

    04/28/2021, 3:58 PM
    Hi all, just reposting what I had in the server channel. I hope someone can help. We’re deploying an ECS agent as a container into a Fargate ECS cluster. As far as I can tell, the load balancer etc are all configured sensibly - they use the same settings as other containers which work. Prior to the agent starting, we can see logs on Cloudwatch (for instance, if we send a broken command), but once it’s launched, there is nothing. The container health fails, and the task goes into a health check loop. We’re seeing lots of messages telling us that a task failed to load the image, but again, we know the image is accessible and is being loaded initially, because we can see logs from it when we change the command in the Dockerfile. I want to understand more about what’s going on here. We have no NAT or internet gateway configured, due to security restrictions, which we know will prevent the agent from communicating with Prefect Cloud, but we would expect some sort of message to tell us so. Could someone explain why we’re getting the messages about ECR? Is the agent trying to load additional instances of itself into the cluster?
    k
    • 2
    • 2
  • v

    Vikram Thirumalai

    04/28/2021, 4:36 PM
    Hi All I am trying to set up slack notifications to a channel i created from prefect, I have the SLACK_WEBHOOK_URL saved and it doesn't seem to be working, i'm getting this error:
    Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000002C254411C10>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))
    k
    • 2
    • 35
  • j

    Jacob Wilson

    04/28/2021, 5:11 PM
    Hi all, I have written a Flow to perform an ETL process. Throughout the course of the Flow thousands of API requests are being made (using the .map function). Consequently, my Flow takes over 24 hours to make the API calls and parse all the data. Also, the Flow is using a local Dask executor and it’s running on an ECS container. Does anyone have any ideas on how I might be able to shave some time off my Flow execution?
    k
    • 2
    • 8
  • r

    Ryan Baker

    04/28/2021, 8:15 PM
    Quick question about the prefect CLI. It seems that when I run a command like
    prefect describe flows --name foo-bar
    That I get output like this, which includes information about the storage and parameters, but contains no information about the run configuration (I’m using kubernetes)
    {
      "archived": false,
      "created": "2021-04-20T19:01:08.156447+00:00",
      "description": null,
      "environment": null,
      "name": "inference-alpha-poc",
      "parameters": [
        .......
      ],
      "project": {
        "name": "recursion-mlops-poc"
      },
      "storage": {
        "__version__": "0.14.16",
        "access_token_secret": "GITHUB_ACCESS_TOKEN",
        "base_url": null,
        "flows": {
          "inference-alpha-poc": "prefect_cloud/inference_flow.py"
        },
        "path": "prefect_cloud/inference_flow.py",
        "ref": "prefect-cloud",
        "repo": "recursionpharma/inference-alpha",
        "secrets": [],
        "type": "GitHub"
      },
      "version": 9
    }
    Is this expected, or a bug? Why isn’t the run configuration shown in this output?
    k
    m
    • 3
    • 3
  • n

    Nathan Atkins

    04/28/2021, 8:43 PM
    I have: • task1 mapped over list1 and task2 mapped over list2 • task2 isn't mapped over the output of task1 • I don't want task2 to start until after task1 has completed. I tried adding
    upstream_tasks=[task1]
    to task2. My gut told me this probably wasn't going to do what I wanted, but it was worth a try. My gut was right as task2 only process the number of elements in list2 that are in list1.
    import prefect
    from prefect import Flow, task
    
    
    @task
    def mapped(value: int) -> int:
        prefect.context.get("logger").info(f"{value}")
        return value
    
    
    if __name__ == "__main__":
        with Flow("mapped_upstream") as flow:
            list1 = [1, 2]
            task1 = mapped.map(value=list1)
    
            list2 = [10, 20, 30]
            task2 = mapped.map(value=list2, upstream_tasks=[task1])
        flow.run()
    What is the correct way to get mapped task2 to have an upstream dependency on task1? [2021-04-28 14:38:31-0600] INFO - prefect.FlowRunner | Beginning Flow run for 'mapped_upstream' [2021-04-28 14:38:31-0600] INFO - prefect.TaskRunner | Task 'mapped': Starting task run... [2021-04-28 14:38:31-0600] INFO - prefect.TaskRunner | Task 'mapped': Finished task run for task with final state: 'Mapped' [2021-04-28 14:38:31-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Starting task run... [2021-04-28 14:38:31-0600] INFO - prefect.mapped[0] | 1 [2021-04-28 14:38:31-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Finished task run for task with final state: 'Success' [2021-04-28 14:38:31-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Starting task run... [2021-04-28 14:38:31-0600] INFO - prefect.mapped[1] | 2 [2021-04-28 14:38:31-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Finished task run for task with final state: 'Success' [2021-04-28 14:38:31-0600] INFO - prefect.TaskRunner | Task 'mapped': Starting task run... [2021-04-28 14:38:32-0600] INFO - prefect.TaskRunner | Task 'mapped': Finished task run for task with final state: 'Mapped' [2021-04-28 14:38:32-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Starting task run... [2021-04-28 14:38:32-0600] INFO - prefect.mapped[0] | 10 [2021-04-28 14:38:32-0600] INFO - prefect.TaskRunner | Task 'mapped[0]': Finished task run for task with final state: 'Success' [2021-04-28 14:38:32-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Starting task run... [2021-04-28 14:38:32-0600] INFO - prefect.mapped[1] | 20 [2021-04-28 14:38:32-0600] INFO - prefect.TaskRunner | Task 'mapped[1]': Finished task run for task with final state: 'Success' [2021-04-28 14:38:32-0600] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    k
    • 2
    • 10
  • e

    Enda Peng

    04/28/2021, 8:51 PM
    Is there a nature way to work with Parameters? e.g the example below will throw an exception because for_date has type Parameter
    with Flow("Demo") as flow:
        for_date = Parameter('for_date', required=True, default='20200101')
        for_date_str = dt.datetime.strptime(for_date, "%Y%m%d")
    k
    j
    • 3
    • 11
  • f

    flavienbwk

    04/28/2021, 11:46 PM
    Hi guys, is there a Made with Prefect GitHub badge for illustrating repos ?
    💯 3
    c
    • 2
    • 1
  • j

    jars

    04/28/2021, 11:54 PM
    Hi @Kyle Moon-Wright, just following up from the email. You mentioned the interval kwarg. I know where this is in the code, but, was there a specific part of the UI you were seeing this odd value? Or, where did you see it?
    k
    • 2
    • 2
  • k

    kevin

    04/29/2021, 2:02 AM
    Hey guys is this a prefect compliant way of defining a task?
    task(
       fn=lambda a, b: a+b,
       name='some_task'
    )(some_a, some_b)
    ✅ 1
    c
    • 2
    • 3
  • m

    Mahesh

    04/29/2021, 7:21 AM
    Hello Team, I have a query regarding upstream tasks,
    e
    k
    • 3
    • 9
  • r

    Romain

    04/29/2021, 9:21 AM
    Hello team, Is there a bug in "apply_map" working with flatten? I tried to modify the example from the doc (https://docs.prefect.io/core/concepts/mapping.html#flat-mapping) to use apply_map:
    from prefect import Flow, task, flatten, apply_map
    @task
    def A():
        return [1, 2, 3]
    
    @task
    def B(x):
        return list(range(x))
    
    @task
    def C(y):
        return y + 100
        
    def foo(y):
        return C(y)
    
    with Flow('flat map') as f:
        a = A() # [1, 2, 3]
        b = B.map(x=a) # [[0], [0, 1], [0, 1, 2]]
        c = apply_map(foo, y=flatten(b)) # [100, 100, 101, 100, 101, 102]
    While running it, I get : "TypeError: can only concatenate list (not "int") to list" Am I missing something?
    z
    m
    • 3
    • 4
  • j

    Johan Lundin

    04/29/2021, 9:30 AM
    Hey Team, I have a Kubernetes config question: I setup a k8s prefect agent running in a few clusters. It is pretty straight forward and the prefect commandline tool will spit out the yaml to kubectl to create a k8s Deployment running the agent. The agent then polls the cloud instance for new jobs and spins up new k8s Jobs/Pods that are running the tasks. Given I have a number of different AWS VPCs which all have AWS VPC Endpoints pointing to the same services but with different IPs mapped to them I would like to assign different HostAlias mappings so that same name in multiple clusters can be assigned to different IP addresses. I know it is possible since I have a cluster configured in this way but the guy setting it up has since left. Example output from kubectl in attached file with ******* marking the place of interest and descriptions. Any pointers are of great interest!!!
    kubectl-prefect.txt
    k
    • 2
    • 6
  • s

    Sumit Kumar Rai

    04/29/2021, 10:11 AM
    Hello, I'm new to Prefect and I'm wondering if I can use Prefect for my use case. I have a python script that extracts data from Google sheets and loads them into SnowFlake. A cron is used to schedule it running daily. Similarly, there are singer taps and targets orchestrated by the pipelinewise to load data from databases to SnowFlake. They are also scheduled by crons. The loaded data are then transformed using DBT and are scheduled much later than the above crons. Most of the examples I see in the Internet are workflows running python functions with task decorator.
    k
    • 2
    • 21
  • t

    Talha

    04/29/2021, 11:06 AM
    Hi,. I am still pretty confused. I am new to the prefect. I need to have run an ETL task. Such that Extract is done by one agent and transform /load is done by another agent . How can I achieve that. How can I run two agents. I know I can do labels with agents. But can you refer to any example. It would help to understand better, Please guide.
    k
    • 2
    • 8
  • n

    Nikola Lusic

    04/29/2021, 12:53 PM
    Hey all, While trying to run a flow via ECS agent, I get the following error:
    Failed to load and execute Flow's environment: StorageError("An error occurred while unpickling the flow:\n TypeError('code() takes at most 15 arguments (16 given)')\n This may be due to one of the following version mismatches between the flow build and execution environments: \n - python: (flow build with '3.8.5', currently running with '3.7.10')")
    I'm running prefect server, prefect agent and registration environment on 3 different Ubuntu machines, all running:
    $ python3
    Python 3.8.5 (default, Jan 27 2021, 15:41:15) 
    [GCC 9.3.0] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    >>> import sys
    >>> import prefect
    >>> prefect.__version__
    '0.14.17'
    >>> sys.version_info
    sys.version_info(major=3, minor=8, micro=5, releaselevel='final', serial=0)
    I don't understand where does the agent pick up the python 3.7.10 version, and why is the task executed with that environment (the python 3.7 is not even installed on the server nor agent environment). Is there a way to set the python version on the execution environment, or am I missing something?
    s
    k
    • 3
    • 6
  • s

    Sean Talia

    04/29/2021, 1:35 PM
    Did the url for the documentation on Cloud Hooks change recently? I'm getting a 404 when I go to this page that I had bookmarked not too long ago
    j
    r
    • 3
    • 9
  • r

    Robin Cole

    04/29/2021, 3:40 PM
    Quite an aws specific question.. I have a batch job (created before I started using Prefect) which is calling a lambda function approx 1000 times, each time with slightly different args (it is processing images on S3). The lambdas happen to all request the same file on S3 (used as a reference image) and I was getting S3 rate limits, so taking the lazy route I just throttled the lambdas by running them in serial using
    RequestResponse
    . I also log the response from the lambdas to a db, currently at the end of the batch job. My question is whether I am better to stick with running batch job from Prefect, or if I am better to use Prefect to call the 1000 lambdas in parallel using lambda
    Event
    mode, then use Prefects ability to handle retries to manage the show..? Also a related question is when is the best time to log to the db. Thanks in advance! ps loving prefect way more than airflow
    k
    • 2
    • 4
  • d

    Danielle Dalton

    04/29/2021, 5:56 PM
    Hi All, We have a handy little Task class called 
    api_data_to_s3
     that sends json data to S3. In other flows, we import this class and pass it two arguments, the vendor name and the api_endpoint (ex. 
    api_data_to_s3(vendor="google", api_endpoint="keywords")
     . The issue is that some vendors have multiple endpoints and when we try to parameterize the api_endpoint we get some errors. I'll post some code in the thread below but I'd love to know the recommended way to solve.
    k
    • 2
    • 4
  • j

    Joe

    04/29/2021, 6:09 PM
    I'm currently working on using the prefect API through an iPython environment (Jupyter Notebook atm). Because my environment is heavily containerized, I'm looking at using the Webhook Storage, since that'll let me reuse a bunch of existing infrastructure. Am I correct in my reading of the source that if I want to store the flow as a script (i.e. not have it pickled), I would need to write a file out temporarily to local storage, if the flow is some python functions defined in a cell?
    f
    k
    • 3
    • 11
  • k

    kevin

    04/29/2021, 6:25 PM
    Will Prefect allow me to access the built in functions of the data type being returned by the Task or does the Task object occlude some of those calls? Specifically for dictionaries, can I do somethin glike this:
    d_task  = task(return some_dict)
    m_task = task(lambda x: do_map_stuff(x))
    
    with Flow f as ('flow'):
      some_data = d_task()
      mapped_result = m_task.map(some_data.values())
    k
    • 2
    • 3
  • d

    Diogo Munaro

    04/29/2021, 10:05 PM
    Hey guys, I build modules with tasks that I'm using in my flow, but updating flow version these modules aren't updated. What are the best practices?
    k
    • 2
    • 17
  • m

    Matthew Blau

    04/30/2021, 12:28 AM
    Hi all, for an instance of Prefect Server, what are the recommended specs for a system to run Prefect effectively?
    k
    t
    • 3
    • 7
  • t

    Trevor Kramer

    04/30/2021, 1:28 AM
    I am getting this warning and I don't understand what it means - UserWarning: Task <Task: pack_ds_data> has retry settings but some upstream dependencies do not have result types.
    c
    • 2
    • 1
  • t

    Talha

    04/30/2021, 11:47 AM
    Hi, Can I run two Local Agents using the same machine. I am trying to test my code and I want to assign my flows to different agents. So I am trying to run two agents on my machine
    f
    k
    • 3
    • 2
  • e

    emre

    04/30/2021, 2:49 PM
    Hi everyone, I just realized something about
    merge
    , and I find it kind of unexpected. Here goes: Usually, when using something like
    ifelse
    and
    merge
    , the flow goes like:
    skip_if_true -> actual_task_for_false -> merge
    skip_if_false -> actual_task_for_true -> merge
    This works as expected. However I tried to directly merge tasks that can or cannot
    SKIP
    :
    skippable_1 = do_i_skip(skip=True)
    skippable_2 = do_i_skip(skip=False)
    merge(skippable_1, skippable_2)
    merge
    results in the
    SKIP
    signal raised in skippable_1, rather than skippable_2. That is unexpected, at least for me. Is this expected behavior? Can I somehow connect these explicit
    SKIP
    ping tasks with a merge directly?
    k
    j
    • 3
    • 15
  • j

    Josselin Girault

    04/30/2021, 3:10 PM
    Hello, having an issue with passing parameters/context from flow to flow:
    @task
    def log_scheduled_start_time():
        """Print scheduled_start_time in logger."""
        logger = prefect.context.get("logger")
        scheduled_start_time = prefect.context.get("scheduled_start_time")
        flow_name = prefect.context.get("flow_name")
        <http://logger.info|logger.info>(f"{flow_name}: {scheduled_start_time}")
    
    
    with Flow("subflow") as subflow:
        log_scheduled_start_time()
    
    subflow.register(project_name="test_schedule", labels=["test"])
    
    with Flow("mainflow") as mainflow:
        log_scheduled_start_time()
        StartFlowRun(
            flow_name="subflow", project_name="test_schedule", wait=True,
            scheduled_start_time=prefect.context.get("scheduled_start_time")
        )
    
    mainflow.register(project_name="test_schedule", labels=["test"])
    Expectation: mainflow's
    scheduled_start_time
    is passed down to subflow, both flows log the same time. Reality: subflow starts without a
    scheduled_start_time
    , defaults to
    now()
    , logged times are different. Use case: Subflow depends on
    scheduled_start_time
    to query some database, the agent that was supposed to run mainflow is down, mainflow and subflow are run a day later, subflow's query is then incorrect. Bonus question: I can't seem to find documentation on how to pass parameters from one flow to another/use a flow's results as parameters for another 🙇
    ✅ 1
    c
    • 2
    • 2
  • m

    Milton Tavares Neto

    04/30/2021, 3:14 PM
    Hey everyone. I'm running into some trouble running a prefect flow. It's a very simple one (converting json files to parquet). The problem is that when I try to run the flow (flow.run(...)) the flow doesn't work. I get to seet the "Saving status ..." log but the df.to_parquet never starts processing (looking at the dask dashboard). And it finally finishes with exception below. But when I run the task directly (to_parquet.run(...)) setting up the local dask cluster by myself it works perfectly. It's probably a rookie mistake but do you have any ideas what I'm doing wrong here? I'm using prefect 0.14.7 and dask 2014.4.1
    j
    k
    • 3
    • 46
  • f

    Frederick Thomas

    04/30/2021, 4:01 PM
    Greetings, I'm having an issue with passing a Parameter from the command line to my flow, if I'm correct on the server the Parameter can be passed in the UI, but for testing I need to pass in sys.argv[2]. here's the relevant code:
    def register__flow():
        
    with Flow("FlowName", schedule=None, ) as flow:
            
    file_name = Parameter(name='file_name',default=None)
            
    params = get_params()
            
    p = get_file_blob(
                            
    file_name=f"notebooks/{file_name}",
                            
    con_string=params["uploads_blob"],
                            
    container="uploads"
            
    )
            
    flow.add_edge(file_name,params)
            
    flow.add_edge(params, p)
        
    if len(sys.argv) > 1 and sys.argv[1] == "register":
            
    flow.register(project_name="project_x")
        
    elif len(sys.argv) > 1 and sys.argv[1] == "test" and len(sys.argv[2]) > 1:
            
    flow.run(parameters={'file_name':sys.argv[2]})
        
    else:
            
    print("Please use a quoted string of the file name...")
    if __name__ == "__main__":
        
    register__flow()
    Thanks
    k
    • 2
    • 16
  • s

    Sean Talia

    04/30/2021, 4:03 PM
    Is there a canonical way of using a
    PrefectSecret
    to add a new item to the
    prefect.context
    ? I see that you can add to the global context by doing something like:
    with prefect.context(key='abc'):
        flow.run()
    but it's not clear to me how to actually supply this value to the runtime context when my agents are deploying the flows and I'm not explicitly calling
    flow.run()
    . For context, I'm reading through the documentation on adding Slack handlers and I see that all you need to do is set a
    SLACK_WEBHOOK_URL
    secret in order for the buiilt-in
    slack_notifier
    to be able to post to your channel.
    k
    • 2
    • 18
Powered by Linen
Title
s

Sean Talia

04/30/2021, 4:03 PM
Is there a canonical way of using a
PrefectSecret
to add a new item to the
prefect.context
? I see that you can add to the global context by doing something like:
with prefect.context(key='abc'):
    flow.run()
but it's not clear to me how to actually supply this value to the runtime context when my agents are deploying the flows and I'm not explicitly calling
flow.run()
. For context, I'm reading through the documentation on adding Slack handlers and I see that all you need to do is set a
SLACK_WEBHOOK_URL
secret in order for the buiilt-in
slack_notifier
to be able to post to your channel.
k

Kevin Kho

04/30/2021, 4:09 PM
Hey @Sean Talia, you can pass it through spinning up the agent as an environment variable. Have you seen this on how to load the secret?
s

Sean Talia

04/30/2021, 4:18 PM
I'm not sure what you mean by "spinning up the agent as an environment variable" but I suppose it would be possible to set the env var
PREFECT__CONTEXT__SECRETS__SLACK_WEBHOOK_URL
at runtime via a task?
or is it not even possible to modify the context on the fly like that and the context is only read once when the flow is launched
k

Kevin Kho

04/30/2021, 4:21 PM
oh sorry i meant “with” an environment variable like
prefect local agent start --env var1
If you want to set it on runtime, you would pass it through the RunConfig.
DockerRun(env={"SOME_VAR": "value"})
Actually let me double check if you can use a secret with runconfig
s

Sean Talia

04/30/2021, 4:24 PM
oh yeah yeah I've done that before locally, I was hoping to avoid doing that in production 1) because I didn't want to have
var1
stashed on the host where I'm running the agent but also 2) I'd like for different teams at my org to notify different channels as they need and I was hoping that they could make those adjustments in their flow code rather than my having to configure a pattern for agents/labels to determine that
Oh yes if that could work I'd love to be able to pass that in (although similar to the question I posted the other day I'm wondering if this would result in the secret url getting loaded at registration-time and then supplied to the cloud in the flow metadata as plaintext)
k

Kevin Kho

04/30/2021, 4:26 PM
Ok my bad. We can’t pass secrets to the runconfig…
😭 1
Yes this is tough. I believe you mentioned the only way to modify context at runtime already. Sounds like we can’t have it stashed on the host, especially cuz it’s dynamic. We can pull it with a Prefect secret…but how to we pass it to the slack_notifier? Let me think lol
👍 1
s

Sean Talia

04/30/2021, 4:33 PM
okay well at the very least I'm encouraged by the fact that the solution isn't obvious so my understanding of how prefect works can't be that terrible
k

Kevin Kho

04/30/2021, 4:35 PM
Maybe you can pass the PrefectSecret directly to the
slack_notifier
. Check
slack_notifier
in the docs . It takes in a
webhook_secret
s

Sean Talia

04/30/2021, 5:46 PM
alright I will take a look at this!
oh yeah wow this totally seems like it should work...
alright i'll try and report back!
(thanks, I should have looked at the source for this to start with)
k

Kevin Kho

04/30/2021, 6:09 PM
Nah man all good. Happy to help 🙂
s

Sean Talia

04/30/2021, 6:34 PM
okay, that worked like a charm! I might have to refactor my code a little bit if I want to turn this into a parameter but that was super easy, thanks for your help
👍 1
View count: 4