https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • r

    Robin

    11/05/2020, 8:57 PM
    Dear all, while the flow was running perfectly before and still runs locally, I suddenly get an error that seems to be related to pickle when running it on cloud:
    TypeError: can't pickle _thread.lock objects
    See full error message below. Any ideas, what could be the reason?
    n
    • 2
    • 9
  • r

    Rob Fowler

    11/05/2020, 10:25 PM
    Is there a way to use a simple function in a flow, for example when I am going to call the function many times. I can easily make it work by making my function a 'task', but I can't see why it should have all the scheduling overhead:
    @task
    def get_host_from_kv(opts):
        hid = Schema({
            'host': str,
            Optional('identity', default=0): int
        }).validate(opts.kvopts)
        return HostParam(identity=hid['identity'], host=hid['host'])
    
    
    with Flow("Query a single host") as flow:
        opts = Parameter('opts')
    
        result = winrm_powershell(host=get_host_from_kv(opts),
                                  command=load_script(opts, 'win_ps_version.ps1'),
                                  opts=opts)
    n
    • 2
    • 20
  • s

    Saatvik Ramisetty

    11/05/2020, 11:54 PM
    Hi Prefect Community, I have prefect server running with GraphQL. I also have an agent registered. How do I deploy my script as a flow to this server? I do have an
    Agent ID
    from the server. A little confused around this. TIA!
    n
    • 2
    • 6
  • j

    Jeff Brainerd

    11/06/2020, 2:08 AM
    Hi šŸ˜›refect: team, a question about current best practices running Dask on Fargate…
    d
    • 2
    • 4
  • j

    Joƫl Luijmes

    11/06/2020, 9:12 AM
    I’m not getting a response on GH, so I bring it here again to attention: https://github.com/PrefectHQ/prefect/issues/3603 TLDR: parsing secrets with newlines in value of json (as env var), requires escaping. Is this the intended behavior or can I adjust it such it works as expected.
  • c

    Carlo

    11/06/2020, 1:28 PM
    šŸ‘‹ I'm running into issues setting up Fargate tasks. The agent successfully launches the ECS task, but the task fails at the validation stage after it loads the flow from S3. It thinks the task and container definitions are different, but neither has changed since I registered the flow and the agent scheduled it. I wonder if it's because the flow is using S3 storage? I didn't use Docker because I couldn't figure out how to use it with ECR and provide credentials. Thoughts?
    2020-11-06T07:56:19.166-05:00
    
    Copy
    	containerDefinition.flow-container.command -> Given: ['/bin/sh', '-c', "python -c 'import prefect; prefect.environments.execution.load_and_run_flow()'"], Expected: ['/bin/sh', '-c', 'prefect execute flow-run']
    containerDefinition.flow-container.command -> Given: ['/bin/sh', '-c', "python -c 'import prefect; prefect.environments.execution.load_and_run_flow()'"], Expected: ['/bin/sh', '-c', 'prefect execute flow-run']
    a
    s
    • 3
    • 21
  • k

    Kamil OkĆ”Ä

    11/06/2020, 1:56 PM
    Hi, I'm experimenting with "Non-Docker Storage for Containerized Environments" using AWS S3, but I haven't found a way to provide a list of external packages on which the flow is dependent (an equivalent for python_dependencies option available in Docker storage). Is there a way to achieve this?
    m
    • 2
    • 2
  • r

    Robin

    11/06/2020, 2:05 PM
    Dear prefect people, we get plenty of
    No heartbeat detected from the remote task; marking the run as failed.
    errors. šŸ’” (e.g. https://cloud.prefect.io/accure/task-run/8e44bd11-e7c3-4982-bcf2-7711bc1ca4a9?logId=83475348-2531-4efb-9663-33748515908e) This is particularly unexpected, as most of those tasks seem to have already almost finished... šŸ¤” How do you approach these errors and debug them? šŸ™‚
    m
    d
    • 3
    • 29
  • s

    Sergey Maslov

    11/06/2020, 2:43 PM
    Hi everyone, I need to build a service which runs a couple of the same workflows for about 10 000 b2b clients on a daily basis. It will take and process data from their APIs on schedule according to the clients' off-hours. So I want to write that workflows with a CLIENT_ID parameter and get the same control and functionality of them (e.g. UI, parallelism, logging) as if there were personal workflows for every client. It seems there is no such possibility in Airflow. Is there one in Prefect?
    d
    • 2
    • 3
  • m

    Mike Marinaccio

    11/06/2020, 3:33 PM
    Hi everyone! I’m exploring ideas around dynamically configuring a Fargate instance/cluster based on a flow parameter / input. In short, I have an hourly job that migrates data for a group of clients based on their timezone. For some hours of the day, there will be more clients and thus more work. Ideally, I want my flow to scale resources based on the number of clients queried for a timezone. Does anyone have ideas for a way to do this? Since flow environments are set at registration time, I’m struggling to find a feasible approach. I’ve also started to explore the new
    ECR Task
    and
    run_config
    recently added, which sound like a potential solution. Thanks for the input!
    j
    • 2
    • 2
  • m

    Maura Drabik

    11/06/2020, 3:50 PM
    Hello! I am running into an interesting problem. I am using a parameter called "lifecycle" in my flow (Dev and Prod) which determines which secret the flow should be using. The secrets are set using the following code:
    lifecycle = Parameter("lifecycle")
        true_branch = PrefectSecret("SCM_AUTOMATION_PROD")
        false_branch = PrefectSecret("SCM_AUTOMATION_DEV")
        ifelse(tasks.get_lifecycle(lifecycle), true_branch, false_branch)
    
        secrets = merge(true_branch, false_branch)
    When I run this flow locally, setting the lifecycle parameter set to DEV it works as expected. However, on the cloud, it is only using the PROD secret. Any guidance on why this might be happening? I am positive the cloud secrets are all configured correctly and the issue seems to be with the IFELSE, MERGE Thanks in advance!
    d
    • 2
    • 3
  • z

    Zach

    11/06/2020, 5:11 PM
    Is there some sort of standard practice with event driven prefect flows? Say I have some sort of data-cleaning-flow in prefect that takes in a data file, cleans it, and stores the new file in cloud storage. I have two other flows, A and B, that I want to have triggered by the completion of the data-cleaning-flow. I want them to more-or-less subscribe to the data-cleaning-flow. Is there a prefect-esque way of doing this, or is my only option to create some external machinery that detects the successful completion of the data-cleaning-flow and keeps track of all the flows that I want to have triggerd by that flow.
    d
    • 2
    • 3
  • z

    Zach

    11/06/2020, 5:11 PM
    I could combine the data-cleaning-flow logic into flow A and flow B, and not have this problem, but then I would be repeating work
  • z

    Zach

    11/06/2020, 5:33 PM
    Another way of phrasing it: Suppose I have an E-T-L flow. And one day I decide I want to have another L be triggered from the data produced by the T. Maybe one day I have five Ls that need to be triggered from the data produced by the T. I don't want to start adding more *L*s to the same flow because that would get messy after awhile. Also, maybe I want a flow that just does the T-L and not the E, because I already have the data output from E from a previous flow and E takes awhile and I don't want to do it again.
  • d

    DJ Erraballi

    11/06/2020, 5:54 PM
    Have a simialar question to the above ^^ whats the easiest way to trigger downstream flows from a completed flow? Use the prefet API to directly trigger?
    d
    • 2
    • 14
  • d

    DJ Erraballi

    11/06/2020, 5:56 PM
    Also just some questions on ā€œall_finishedā€ behaviour, if a -> b -> c -> d and d is triggered on all ā€œall_finishedā€ if a fails and so b and c are skipped, will d still run (looking for a finally block).
    j
    • 2
    • 3
  • j

    Jasono

    11/06/2020, 7:06 PM
    Is there a way to quickly start running an existing flow starting with a task in the middle of the dependency graph? (treating like all its dependencies have already been met)
    d
    • 2
    • 14
  • a

    Andrew Hannigan

    11/06/2020, 9:51 PM
    Suppose I have a class called
    ReadTable
    which extends
    Task.
    This class just SELECTs data from a big SQL table. I want to perform this SELECT in parallel with say n sub-Tasks. So the
    ReadTable(n)
    object should spawn n sub-Tasks when it runs. How would this work within Prefect's OOP paradigm, where
    ReadTable
    inherits
    Task
    ?
    m
    • 2
    • 1
  • c

    Cab Maddux

    11/06/2020, 10:25 PM
    Hi! Quick question, we had flow concurrency for a specific label set to 5, we scheduled 20 flows and the first 5 ran for 12 hours (and are still running). We updated flow concurrency for that label to 20 but the remaining 15 flows have remained in the scheduled state. We've confirmed that we can run more than 5 flows concurrently by triggering a brand new flow which gets submitted and run immediately, but we can get those original 15 to submit. Any suggestions?
    d
    • 2
    • 15
  • d

    Dor Menachem

    11/07/2020, 12:27 AM
    Hello Prefect Community, for a reason I don't understand my file can't recognize the prefect '_from_Ā prefectĀ importĀ task,Ā Flow'. although it works, how can I fix that?
  • a

    Alexander

    11/07/2020, 2:57 PM
    Does all tables in DB created by prefect have proper FK relations so if i remove let say a flow via graphql, all connected flow runs and logs will be removed too?
    j
    • 2
    • 1
  • j

    Jasono

    11/07/2020, 4:50 PM
    Hi, some of my tasks import modules from a distant path using
    sys.path.append()
    . This seems to work fine when I run the flow locally with flow.run(), but not when running it from the Prefect Cloud/Server. (The error says the module can’t be found) Does it sound right? If so, why would it be the case?
    s
    • 2
    • 6
  • o

    Omar Sultan

    11/08/2020, 8:33 AM
    Hi Everyone, I'm using the imperative API, and was wondering if there was any tutorials or samples for the use of Parameters with imperative API
    d
    • 2
    • 2
  • a

    Amanda Wee

    11/09/2020, 2:17 AM
    Hello! Would it be possible to add to the Prefect CLI and/or the Core client an option to create projects only if they don't already exist? My use case is that I intend to create projects and register flows when an ECS task starts up, and then the ECS task will go on to serve as a Prefect agent. The new
    idempotency_key
    option and
    serialized_hash()
    method for flow registration makes this really nice, but I'll still get an error for the project creation when the ECS task restarts. I could ignore this error, but then that could be problem if somehow the error is not due to the project already existing, but a genuine failure to create a new project.
    j
    • 2
    • 1
  • j

    JC Garcia

    11/09/2020, 6:30 AM
    Hello guys, I am seeing the following error when trying to build a flow in an azure devops pipeline:
    WARNING: Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('<pip._vendor.urllib3.connection.HTTPSConnection object at 0x7f74c98d0710>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution',)': /simple/cryptography/
    ERROR: Could not find a version that satisfies the requirement cryptography (from versions: none)
    ERROR: No matching distribution found for cryptography
    
    Removing intermediate container f73d8367b237
    The command '/bin/sh -c pip install requests cryptography snowflake-connector-python wheel' returned a non-zero code: 1
    Traceback (most recent call last):
      File "./flows/frieze-viewing-room-flow/flow.py", line 113, in <module>
        flow.storage.build()
      File "/usr/local/lib/python3.6/dist-packages/prefect/environments/storage/docker.py", line 351, in build
        self._build_image(push=push)
      File "/usr/local/lib/python3.6/dist-packages/prefect/environments/storage/docker.py", line 418, in _build_image
        "Your docker image failed to build!  Your flow might have "
    ValueError: Your docker image failed to build!  Your flow might have failed one of its deployment health checks - please ensure that all necessary files and dependencies have been included.
    ##[error]Bash exited with code '1'.
    any ideas on why this might be happening? it looks like it cannot connect to pypi for some odd reason. ideas/thoughts welcome. thanks!
  • d

    deltikron

    11/09/2020, 9:41 AM
    Hi! I'm running prefect server and my flows keep getting stuck on
    submitted
    . I originally started with a kubernetes agent and thought it was a configuration issue between prefect and kubernetes, but I get the same behavior with a docker agent. The agents pull the correct docker image and submit it, but the flows don't enter a running state. I'd be grateful for any ideas or pointers! Cheers
    a
    r
    • 3
    • 14
  • p

    Philip Bennett

    11/09/2020, 3:40 PM
    Hey all. I'm trying to deploy all our flows automatically from Google Cloud Build or GitHub actions in our Prod environment. We use Docker storage for our flows. Has anyone had any luck in doing this? I can foresee a few issues: 1. We will need to run Docker within the Cloud Build container as the
    flow.register()
    method uses Docker to build the image. I'm not sure if this is possible. 2. We will need to create firewall rules that allow Google Cloud Build to communicate with Prefect server running as a Compute VM. I'd be interested how people are automatically deploying flows from any CI/CD platform.
    k
    a
    • 3
    • 5
  • d

    DJ Erraballi

    11/09/2020, 7:21 PM
    Trying to wrap my head around the imperative api, I feel like I want a little more fine grained control on the graph than what the functional API provides, but its really hard for me to understand how i can pass data between tasks. Ex.
    Task A: 
      run(self, x, y):
    
    Task B: 
      run(self, a_result, x, y):
    
    
    with Flow() as flow: 
        x_parameter = Parameter("x", required=True)
        y_parameter = Parameter("y",  required=True)
        # question is how do setup the parameter pasing from teh parameters -> A -> B
        # my attempt but still dont know how to keyword_tasks in this scenario to pass along results from A
        flow.add_task(x_parameter) 
        flow.add_task(y_parameter)
        a = A() 
        flow.set_dependencies(a , upstream_tasks=[x_parameter, y_parameter], keyword_tasks=??? )
        flow.set_dependencies(B(), upstream_tasks=[a, x_parameter, y_parameter], keyword_tasks=????)
    k
    • 2
    • 5
  • d

    DJ Erraballi

    11/09/2020, 7:28 PM
    also is there a common pattern for chaining tasks post mapping? I.e. if i map a list of ints to a task that multiples them by 2, and then want each of those to pass their value down to a parametrized StartFlowRun task (baiscally a task that will do some comptutation over a map of values, and then trigger n downstream flows for each value
    k
    j
    • 3
    • 10
  • j

    Jakub Hettler

    11/09/2020, 11:10 PM
    Hi everyone, is there a way how to rename flow runs before running flow itself? I can rename it with RenameFlowRun task, but this changes the name when the task is running, but we would like to have meaningful flow names before running it. Thank you! cc @Radek Tomsej
    c
    m
    a
    • 4
    • 9
Powered by Linen
Title
j

Jakub Hettler

11/09/2020, 11:10 PM
Hi everyone, is there a way how to rename flow runs before running flow itself? I can rename it with RenameFlowRun task, but this changes the name when the task is running, but we would like to have meaningful flow names before running it. Thank you! cc @Radek Tomsej
c

Chris White

11/09/2020, 11:14 PM
Hi Jakub! Yes you definitely can; we haven’t exposed the ability to edit the name directly in the UI yet (here’s an issue you can comment on to get this back on our radar: https://github.com/PrefectHQ/ui/issues/250), but in the meantime you can use the GraphQL API / Interactive API for this:
mutation{
  set_flow_run_name(input: {flow_run_id: "e475058b-5777-4033-a2b5-36960b6335d4", name: "my-new-name"}) {
    success
  }
}
or use the Python client:
from prefect import Client

client = Client()
client.set_flow_run_name(flow_run_id, flow_run_name)
j

Jakub Hettler

11/09/2020, 11:40 PM
Perfect, thanks @Chris White !
šŸ‘ 1
c

Chris White

11/10/2020, 4:42 AM
@Marvin archive ā€œHow to update a Flow Run name?ā€
m

Marvin

11/10/2020, 4:42 AM
https://github.com/PrefectHQ/prefect/issues/3634
j

Jakub Hettler

11/10/2020, 7:16 AM
Hi @Chris White, one more question. in case you mentioned above I need to somehow access flow_run(s) and get flow_run_ids and update it, is it possible to access it somehow from functional API in Python, something like on line 58 below
Untitled
a

Alexander

11/12/2020, 6:11 AM
Yeah i was looking forward this too. All these
oversized-capibara
flow runs just confuse folks :)
But still this just renames only current flow run. How to completely change flow run names? I think an ability for a schedule class to also emit required flow run name would be great: This way we get more fine-grained control on flow run names
Schedule already emits Events which already have assorted metadata attached like labels and parameter defaults. Can I add a templated
flow_run_name
field to it?
j

Jakub Hettler

11/12/2020, 10:02 AM
Same here šŸ™‚
View count: 1