https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • k

    Kevin Kho

    12/15/2021, 5:15 PM
    Hi everyone, as part of our partnership program, we are exploring what official integrations to add to our task library in 2022. Feel free to chime in what partnerships/integrations you want to see in this thread.
  • d

    Daniil Ponizov

    12/15/2021, 6:21 PM
    Hi! is it possible to pass source of flow that stores in local or docker storage on one machine to agent on the other machine?
    k
    v
    • 3
    • 12
  • f

    Frank Oplinger

    12/15/2021, 8:56 PM
    Hello! I am currently using an ECS service to run a prefect agent. When defining the service, I set a security group shared with an RDS instance. However, my flows launched from that agent are unable to connect to the RDS. Is there a way to define a security group for the task that the agent executes?
    k
    • 2
    • 10
  • k

    Kevin Mullins

    12/15/2021, 10:02 PM
    I’m looks at the Prefect Cloud plans page and I’m a little confused by the page honestly. 1. The front
    Standard
    card shows the feature `Role-based permissioning`; however, below in features when
    Standard
    is selected it has
    RBAC
    and
    Custom Permissions
    greyed out which would make me think it doesn’t have
    role-based permissioning
    . a. Can I get clarification on what
    Role-based permissioning
    actually is comparative to the features list and which plan would be needed for them? 2. From what I see, the features have either a checkmark, an open circle, a dotted circle, a greyed out filled circle, and a black full circle without an explanation of the differences, this makes it hard to understand some the differences without hovering over every. a. This is just feedback from a user perspective that it might be hard to quickly figure out the features. Thanks!
    k
    • 2
    • 14
  • k

    Kirk Quinbar

    12/15/2021, 10:10 PM
    I am working on a POC to use Prefect to help orchestrate an ELT process. I was able to get a flow and tasks to run locally and show up in the local server ui. Then i setup a cloud prefect account because i wanted to test task concurrency and got it to kick of my flow from my local machine. My next step is to see what it takes to productionize the flow in Azure, so i have a few questions. 1. I know there is a azure marketplace item to Create Prefect Agent. Is that the best way to get that part setup? Is Kubernetes the way to go over just a VM with Docker? I tried getting the kubernetes version to install but it failed on the last step with "The received access token is not valid: at least one of the claims 'puid' or 'altsecid' or 'oid' should be present. If you are accessing as application please make sure service principal is properly created in the tenant.". no idea what that means and when i tried to run the install again, it gave me "A custom role with the same name already exists in this directory. Use a different name" 2. Does anyone know of a complete walkthough of setting up an agent in Azure (VM with docker or kubernetes). i did find something, but they dont show anything on setting up the kubernetes service etc. https://infinitelambda.com/post/prefect-workflow-automation-azure-devops-aks/
    k
    k
    • 3
    • 5
  • t

    Tom Klein

    12/15/2021, 10:39 PM
    Assuming i’m running prefect agent on EC2 (for our PoC), is there any clear advantage to running a docker agent vs. a local agent? also, is it possible to use
    rootless mode
    for the docker (to increase security), or will it hinder Prefect in anyway? https://docs.docker.com/engine/security/rootless/
    k
    • 2
    • 7
  • k

    KhTan

    12/15/2021, 11:10 PM
    Hi, a basic question – what’s the recommended way to loop parameters if .map() doesn’t work with function of multiple outputs?  I have a pipeline like this
    def pipe(dt):
        #get intermediary output
    	output1, output2, output3 = step1(dt)
        #write to database
    	step2(output1, output2, output3)
    is it like this or is there a better way?
    with Flow('multi-output func') as flow:
        dt = Parameter("urls", default=['dt1', 'dt2', 'dt3'])
        for dt in dt_range:
            pipe(dt)
    thank you
    k
    • 2
    • 3
  • d

    Danny Vilela

    12/15/2021, 11:31 PM
    Hi all! I’m trying (again…) to write an
    HdfsResult
    class (a la `luigi.contrib.hdfs.target.HdfsTarget`; docs) but I’m stuck on the patterns the
    prefect
    codebase uses for the
    location
    attribute. It seems like the
    Result
    base class implements an interface that allows
    location
    at both initialization time and when calling
    exists(...)
    or
    read(...)
    . I guess my question is: why? Is it not enough to restrict the user to only pass
    location
    at initialization time and use that value throughout the
    exists
    and
    read
    methods? Edit: follow-up question: why does the
    prefect
    codebase follow the pattern of creating a new
    Result
    instance during
    Result.read(…)
    ? As opposed to updating the current value (
    self.value
    ) instance?
    k
    • 2
    • 10
  • g

    Gian Piero Izzo

    12/16/2021, 12:06 AM
    Hello, I was trying to set up a flow like this:
    from prefect import Flow, task, apply_map
    
    @task
    def task1(arg1):
        return arg1
    
    @task
    def task2(arg2):
        return sum(arg2)
    
    def jobMap(arg):
        print("branch"+str(arg))
        arg2 = task1.map(range(2))
        task2(arg2)
    
    with Flow("workflow") as flow:
        apply_map(jobMap, range(3))
            
    flow.run()
    but I receive the following error: ValueError: Cannot set
    mapped=True
    when running from inside a mapped context
    Is there any possibility to have a pipeline of mapping over mapping ? Thanks in advance Gian Piero
    k
    • 2
    • 7
  • t

    Tom Klein

    12/16/2021, 2:23 AM
    Hi again, i’m trying to get the snowflake query task to work but getting an error 😕
    k
    • 2
    • 30
  • t

    Tom Klein

    12/16/2021, 4:15 AM
    I’m trying to create a Docker container and then run the container - using the task library docker tasks - but i think i’m running into an issue
    k
    • 2
    • 28
  • a

    Adam Everington

    12/16/2021, 10:29 AM
    Ok, not sure this is a prefect or a SQL Server issue but.... SQL Server only allows one IDENTITY_INSERT ON on a table at once per session so when I run say 5 tasks that set identity insert to on, insert the records then set identity insert back to off using a
    LocalDaskExecutor
    it understandably fails. However, when I set upstream tasks so that they essentially execute one after the other it still fails. If I use a
    LocalExecutor
    it works fine. Thoughts?
    a
    • 2
    • 11
  • b

    Brett Naul

    12/16/2021, 1:12 PM
    very minor request, could someone help me delete an old agent in the UI that refuses to die (Remove and Clean Up both just spin for a while and then nothing happens)? ID is
    d6962d8d-f4b0-4e04-8217-82d41400a043
    a
    m
    c
    • 4
    • 9
  • i

    Ifeanyi Okwuchi

    12/16/2021, 4:29 PM
    Hello, I'm trying to run a prefect flow locally where the flow is within a module. The path to the flow file is: 
    repo_name/projects/npa_project/npa_project_module/npa_flow.py
    . The module is
    npa_project_module
    . When i run the following in the CLI
    prefect run -m npa_project_module.npa_flow
    I get an error from prefect saying:
    No module named "npa_project_module"
    . This happens both when I cd to the
    npa_project
    directory and the
    npa_project_module
    directory before running the cmd. Any ideas what could be wrong?
    k
    • 2
    • 40
  • v

    Vadym Dytyniak

    12/16/2021, 4:41 PM
    Hi. Is it any difference in task calling syntax?
    class SetupStorageTask(Task): 
        def run(self, mode) -> Storage:    
            return Storage(mode=mode)
    
    setup_storage_task = SetupStorageTask()
    storage = setup_storage_task(mode='rw')
    and
    class SetupStorageTask(Task):
    
        def __init__(self, mode):
            self.mode = mode
        
        def run(self) -> Storage:
            return Storage(mode=self.mode)
    
    setup_storage_task = SetupStorageTask(mode='rw')
    storage = setup_storage_task()
    
    or can be just
    storage = SetupStorageTask(mode='rw')
    a
    k
    • 3
    • 15
  • s

    Seth Birkholz

    12/16/2021, 4:53 PM
    Has anyone tried to unit test @resource_manager? I don’t see any documentation on that. I can test normal flows but this one stumps me.
    k
    a
    • 3
    • 9
  • p

    Pedro Machado

    12/16/2021, 5:00 PM
    Hi. I am using the LocalExecutor for a flow that has mapped tasks. The idea is for it to run tasks one at a time. However, when a mapped task fails, it blocks until that task is retried and it ultimately succeeds or fails. Is it possible to configure another executor so that tasks are run one at a time but the flow does not get stuck waiting on failed tasks that will be retried? Thanks
    k
    a
    • 3
    • 7
  • t

    Tom Klein

    12/16/2021, 5:06 PM
    I have an idea - would be glad for feedback on how much it makes sense (or not) - so while fiddling with the Docker tasks - I realized that this kind of "subflow" where I basically have some ready-made image (or alternatively image i'd like to build) - that I'd like to run as a docker container --- would always have a very similar structure, namely: • take a ready-made image (or build one from a provided
    Dockerfile
    ) • check if the dedicated container for it exists. if there is, remove it. • create a new container for it • run the container with some command • wait for the run to finish • log the results • remove the container ideally, i'd like to reach a point where i could just point Prefect at a local or remote code repository that contains a Dockerfile and have it basically just turn it into a single task (or, alternatively, point it at a pre-built image on ECR and rest would be the same) - do you think it would make more sense to turn this into a custom task - or - sort of a parameterized generic flow that i can incorporate into other flows using
    create_flow_run
    ? my issues with making the generic flow is that it cannot easily be shared with the community + it requires to register such a flow + requires to name all the tasks dynamically to make it clear (from the outside) what it's doing (and even then, i can only cause the tasks to be named dynamically, not the flow). E.g. if this image does some data preprocessing for some model, the flow name would still be something like
    run docker
    which doesn't do much to indicate that it's actually running this specific docker... the problem with creating a custom task is that i could not re-use the existing task-library Docker tasks... thoughts? 🤔
    k
    a
    • 3
    • 10
  • p

    Philip MacMenamin

    12/16/2021, 7:56 PM
    Hi, I'm struggling to create a curl POST kicking off a create_flow_run on prefect Cloud, with parameters. Is there an example somewhere?
    k
    a
    • 3
    • 21
  • a

    Austen Bouza

    12/16/2021, 8:46 PM
    👋 is there a built-in/idiomatic way of marking a run as SKIP/FAILED if it has been running for longer than a set duration? This is in the case where a task hasn’t returned a success or fail state within a reasonable amount of time and it prevents the rest of the flow from completing.
    k
    • 2
    • 2
  • t

    Tilak Maddy

    12/16/2021, 9:03 PM
    Hey there ! I'm trying to query for the total number of flow runs that are in Success state but the where clause seems to not affect the returned answer. Why ?
    query {
      flow(
        where: {name: {_eq: "YYY"}, project: {name: {_eq: "XXX"}}, flow_runs: {state: { _eq: "Success"}}}
      ) {
        flow_runs_aggregate{
          aggregate {
            count
          }
        }
      }
    }
    Can you help me query the total number of successful flow runs of a flow from the beginning of time?
    k
    • 2
    • 3
  • d

    Danny Vilela

    12/16/2021, 9:17 PM
    Hi all! I’m getting close to a new
    HdfsSparkDataFrameResult
    implementation and I’m now looking to test it. I’ve been successful in writing unit tests (similar to those in the
    prefect
    test suite) but I’m now looking to test whether the result works in a Flow. Namely, I’d want to be sure that if some task is decorated with
    @task(result=HdfsSparkDataFrameResult(…))
    that the Flow doesn’t re-run the task if the result already exists. Has anyone done this for their own custom
    Result
    (sub-)type? I’m trying to follow the test suite examples under
    TestOutputCaching
    , but I’m not sure I’m testing the right thing. I’ll add some more code in a thread here, but any advice/experience would be very welcome!
    k
    • 2
    • 13
  • d

    Danny Vilela

    12/16/2021, 10:22 PM
    Separate question: has anyone mapped a prefect task over the product of iterables? e.g., if I have a task
    EvaluateModelOnMetric(model: Model = …, metric: Metric = …)
    that evaluates a single model against a single metric and lists
    models: List[Model] = [model_a, model_b, model_c]
    and
    metrics: List[Metric] = [precision, recall, f1_score]
    . It’s pretty simple to design a task
    EvaluateModelsOnMetrics(models: List[Model] = …, metrics: List[Metric] = …)
    that uses
    itertools.product
    internally and delegates to a
    EvaluateModelOnMetric
    , but I’m wondering if prefect has something like this out of the box 🙂
    k
    a
    • 3
    • 5
  • l

    Leon Kozlowski

    12/17/2021, 12:27 AM
    Is there any way to see the docker image digest for the flow image an agent is using?
    a
    • 2
    • 1
  • m

    Matt Alhonte

    12/17/2021, 12:36 AM
    So, we launched an
    ECSAgent
    and I'm trying to get a basic "hello world" flow running on it. when I don't have a
    poviderStrategy
    defined, I get this:
    No Container Instances were found in your cluster.
    But when I took out the
    LaunchType
    keyword, it still gives me this:
    An error occurred (InvalidParameterException) when calling the RunTask operation: You may choose a capacity provider or a launch type but not both.
    a
    • 2
    • 11
  • y

    Yash

    12/17/2021, 7:51 AM
    Hi, I'm trying to figure out how to parameterize a work flow where user can select options from drop down in run screen while scheduling.
    a
    k
    • 3
    • 4
  • p

    Paul Hughes

    12/17/2021, 9:16 AM
    💃 Just arrived!
    👋 3
    c
    k
    • 3
    • 2
  • s

    Stefano Cascavilla

    12/17/2021, 10:46 AM
    Hi, I've upgraded my local prefect server to 0.15.9 to try out the Airbyte task. Anyway, when I'm building the docker image of the flow, it doesn't find the
    prefect.task.airbyte
    module. This is the error:
    #14 1.604 Traceback (most recent call last):
    #14 1.604   File "flow/etl.py", line 7, in <module>
    #14 1.604     from prefect.tasks.airbyte import AirbyteConnectionTask
    #14 1.604 ModuleNotFoundError: No module named 'prefect.tasks.airbyte'
    a
    • 2
    • 3
  • l

    Lucian Rosu

    12/17/2021, 12:41 PM
    i`m having this issue and i`m not sure if it`s a problem with the prefect cloud UI or with my code. I`m trying to schedule the flow to run daily at a specific hour but when i`m adding filters to the schedule the UI doesn`t show upcoming runs anymore. I was trying to set the filters alternatively with the option below. did anybody else had this issue? prefect version 0.15.9
    schedule = Schedule(
            clocks=[clocks.IntervalClock(timedelta(days=1))],
            # filters=[filters.at_time(time(10))],
            or_filters=[filters.between_times(pendulum.time(9), pendulum.time(9))]
        )
    a
    • 2
    • 2
  • d

    Daniil Ponizov

    12/17/2021, 12:55 PM
    Hi! does the agent will see configuration(envs, working dirs and etc) of flow that have been registered on different machine with LocalRun ?
    a
    • 2
    • 11
Powered by Linen
Title
d

Daniil Ponizov

12/17/2021, 12:55 PM
Hi! does the agent will see configuration(envs, working dirs and etc) of flow that have been registered on different machine with LocalRun ?
a

Anna Geller

12/17/2021, 1:13 PM
The flow run page in the UI will show you the run_config incl. things like env variables regardless of which machine registered the flow:
However, when using the default storage, you may get into troubles when you register from one machine and try to run it from another one, because Prefect won’t be able to find your flow file (i.e. your flow Storage) because it’s on completely different machine. You could leverage some other storage class e.g. S3 or GitHub for such use cases.
d

Daniil Ponizov

12/17/2021, 1:36 PM
@Anna Geller I use Git storage. By the way I get this error, when try to run the flow:
Failed to load and execute Flow's environment: TypeError("__init__() missing 1 required positional argument: 'flow_path'",)
but i got flow_path argument, when I attach storage to flow
a

Anna Geller

12/17/2021, 1:39 PM
Can you share your flow definition, especially storage?
and run_config
d

Daniil Ponizov

12/17/2021, 1:45 PM
@Anna Geller smth like this:
f.run_config = LocalRun(env={"VAR": os.environ["VAR"]})
f.storage = Git(repo="path/to/repository",
        repo_host="<http://my.host.com|my.host.com>", use_ssh=True, flow_path="main.py")
a

Anna Geller

12/17/2021, 1:52 PM
Did you commit your changes? Once you commit and push, this should work (provided that the ssh etc is properly set up)
additionally, you would need the Git extension and SSH client installed on your local agent too. How do you register your flow? Do you use “prefect register” CLI?
d

Daniil Ponizov

12/17/2021, 1:57 PM
@Anna Geller sorry, my bad. I've pushed wrong version of flow
now it works
🙌 1
View count: 6