https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Steve Pamer

    07/30/2021, 7:43 PM
    Looking for help implementing a simple flow of flows. I am creating a parent flow using two registered flows, A and B. A is upstream of B. Flows A and B have parameters so I use Parameter tasks in the parent. Ultimately I want to pass results from A to B but still trying to get simple A then B with params running. I am trying two solutions: One with StartFlowRun and the other with the new create_flow_run. With StartFlowRun I can create a parent that will run A then B, using wait=True, but dont understand how to pass parameters or the result from A to B as a parameter. With create_flow_run, I am trying the following: create_flow_run A with parameters, wait_for_flow_run A, create_flow_run B with parameters, wait_for_flow_run B. When I create a parent flow in that order, it runs A and B at the same time. Looks like I am missing the way to specify A is upstream from B and wait until A is done before running B. I am also having trouble with get_task_run_result for task A using the return id from the call to create_flow_run A. Any help appreciated.
    k
    m
    10 replies · 3 participants
  • m

    Michael Warnock

    07/30/2021, 8:50 PM
    A couple things: Is there some lag between editing
    config.toml
    and secrets added to it becoming available when doing local testing? That appears to be the case, since a missing local secret error went away seemingly magically, as I was preparing to ask about it. Alternatively, I had added the secret to cloud first, so maybe there was a (longer) lag before it found it there, and it's still not using my local-secrets? Also, SlackTask doesn't work when supplying 'message' at runtime (or directly to run); I get
    TypeError: run() got multiple values for argument 'message'
    - something to do with that defaults_from_attrs magic?
    k
    6 replies · 2 participants
  • f

    Fina Silva-Santisteban

    07/30/2021, 9:02 PM
    Hi everyone! I have a testing question: How do I mock method calls inside a Task? This is my current setup: File 1:
    def do_something_1(params_1):
      (does something)
    File 2:
    from path_to_file.file_1 import do_something_1
    
    class SomethingTask(Task):
      def run(self, params_1):
        do_something_1(params_1)
        do_something_2
    File 3:
    import unittest
    from unittest.mock import patch
    from path_to_file.file_2.SomethingTask import SomethingTask
    
    @patch('path_to_file.file_1.do_something_1')
    class TestSomethingTask(unittest.TestCase):
      def test_1(self, mock_do_something_1):
        task = SomethingTask()
        task.run()
        self.assertTrue(mock_do_something_1.called)
    Instead of mocking
    do_something_1()
    it actually calls
    do_something_1()
    , and strangely the assertion returns False.
    👀 1
    k
    4 replies · 2 participants
  • k

    Kyle McChesney

    07/30/2021, 9:35 PM
    Has anyone seen the following error:
    An error occurred (InvalidParameterException) when calling the RunTask operation: Task definition does not support launch_type FARGATE.
    I am running a flow via an ECS agent. It worked just fine until I specified a custom value for
    image
    when submitting the job via the UI. The image I specified was for an ECR image
    k
    11 replies · 2 participants
  • m

    matta

    07/31/2021, 2:58 AM
    Hrm, trying to make a Prod vs Test project, with separate CI/CD pipelines. Ideally, people won't have to think about anything but making a feature branch - but I'm using Git storage, so for now I do have to tell it where the feature branch is. It's not THAT big of a deal, but is there any way to set it up so that it'll "know" to tell the storage to look in the feature branch that it's on? Thanks!
    j
    3 replies · 2 participants
  • m

    Michael Warnock

    07/31/2021, 3:45 PM
    I'm trying to switch to S3 storage (from Docker, which was working). The flow shows up in cloud, but my local agent doesn't log anything about it. If I run a docker agent, it sees it, and complains about the storage type. I've reread all the docs on storage and agent types, and I don't know what I'm doing wrong.
    z
    k
    12 replies · 3 participants
  • i

    Irvin Tang

    08/02/2021, 12:24 AM
    hi, i have a flow that interacts with a library that my team built for storing/indexing data. i added some additional logs in that library and in my
    pyproject.toml
    have a git dependency pointing to that test branch. after building the image with this new version of the library, registering the flow with that image, running the flow doesn’t display any of the additional logs i have another project where i’m doing the same thing. the flows in that project do log the additional messages. i’m not sure if this might be the way i configured prefect/how i’m registering the flows. has anyone ever encountered this problem or have encountered something similar? also please let me know if i was unclear about anything. thank you!
    k
    4 replies · 2 participants
  • r

    Reece Hart

    08/02/2021, 1:03 AM
    Hi Community. I need some advice about the kinds of workflows that Prefect is good for or, conversely, not so good for. I'm considering use Prefect to coordinate the execution of genomics data processing. The pipeline consists of ~50 steps, each of which ranges from seconds to 2 hours. The steps are all command line tools written in a mix of compiled C and C++, Java, Python, shell scripts, and even some Perl for bad measure. The pipeline starts with 100-150GB of data, processes that into a series of similarly large files, then analyzes those data to result in files roughly 100MB or smaller. Processing takes 250-500 CPU-hours (e.g., 5-10 hours with 48 threads on a EC2 c5ad.12xlarge). The current pipeline is coordinated by a Makefile, which works surprisingly well. My primary concern (and question for the community) is whether prefect is really suited to the tasks at hand. For example, one of the first steps looks roughly like this:
    seqtk (args) | fastp (args) | sentieon (args) | samtools (args) >out.tmp
    In our pipeline, most steps are wrapped in a script, which is what the Makefile calls. This step starts with apx 100GB of data and dumps a 150GB of data. Given the data volume, I would be reluctant to write intermediate files in lieu of the pipes. Given the nature of workflow -- all command-line tools with file-based data -- I think adopting Prefect would amount to making most of our scripts into Prefect's ShellTasks. I wonder whether this is really worth the effort. The main drivers for choosing a workflow tool are to help with pipeline versioning, schedule and track jobs, to help orchestrate infrastructure scale up/down. Thanks for any guidance.
    k
    j
    11 replies · 3 participants
  • o

    Omar Sultan

    08/02/2021, 8:23 AM
    Hi Everyone, We are using Prefect's imperative API , and wanted to use the GitHub storage option. All the tutorials refer to the use of the functional APIs when registering and storing script based Flows. If anyone can point me in the right direction this will be really appreciated
    y
    k
    7 replies · 3 participants
  • s

    Samuel Tober

    08/02/2021, 9:33 AM
    Hi everyone! I am using .map() to run a task with a list as parameter, but I do not get a desired behaviour. For example, if I specify a parameter as:
    markets = Parameter('markets', default=['stockholm', 'oslo'])
    And then run a function:
    total_df = load_data.map(
        city=markets,
        date_from=date_from
    )
    where I pass a single value parameter, date_from, and my list parameter markets. Everything runs without error, however, the function is only run for the first value in the markets list. What I want is to run for each value in markets, using the same value of date_from for each element in markets. How can I achieve this?
    y
    4 replies · 2 participants
  • j

    Jai Deo

    08/02/2021, 9:51 AM
    Hi, I am trying to set up a flow with Azure storage and AKS for the agent and cloud server. The flow registers but when I try to quick run it I get the error 'Failed to load and execute Flow's environment: AttributeError("'NoneType' object has no attribute 'rstrip'") - not sure what I am doing wrong
    y
    11 replies · 2 participants
  • a

    aman gupta

    08/02/2021, 12:07 PM
    Getting this error while deploying the flow on PrefectCloud while locally executing it is working fine. We are trying to save the Results to GCS_RESULTS in buckets. Can anyone please assist with this? Unexpected error: PicklingError('Pickling client objects is explicitly not supported.\nClients have non-trivial state that is local and unpickleable.') Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner new_state = method(self, state, *args, **kwargs) File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 900, in get_task_run_state result = self.result.write(value, **formatting_kwargs) File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/gcs_result.py", line 75, in write binary_data = new.serializer.serialize(new.value) File "/usr/local/lib/python3.8/site-packages/prefect/engine/serializers.py", line 73, in serialize return cloudpickle.dumps(value) File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps cp.dump(obj) File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump return Pickler.dump(self, obj) File "/usr/local/lib/python3.8/site-packages/google/cloud/client.py", line 166, in getstate raise PicklingError( _pickle.PicklingError: Pickling client objects is explicitly not supported. Clients have non-trivial state that is local and unpickleable.
    k
    11 replies · 2 participants
  • r

    Robert Hales

    08/02/2021, 1:59 PM
    Hi again, is it possible to set
    max_retries
    on a task at flow run time?
    k
    s
    +1
    11 replies · 4 participants
  • b

    Bouke Krom

    08/02/2021, 3:18 PM
    Hey all, I'm trying to include my
    Artifact
    (just a link in our case) in a custom Slack notification. The State Handler gets a
    Flow
    and
    State
    . I'm having trouble finding the
    Artifact
    somewhere in the
    Flow
    objects. I guess I should try and find a
    flow_run
    object of some sort?
    k
    16 replies · 2 participants
  • m

    Mehdi Nazari

    08/02/2021, 5:12 PM
    Hey All, Can prefect’s secret retrieval tasks be done in a flow definition? I have a flow during which I’d want to retrieve a secret from Prefect Cloud and pass the value to a resident function (not decorated with task). I keep getting a
    Task<name>
    instance instead.
    k
    j
    14 replies · 3 participants
  • m

    Miguel Angel

    08/02/2021, 6:23 PM
    I was wondering if is any pattern to expose modules to be containerized in a python way. The project I'm working on has the following structure:
    .my-package
    ├── __init__.py
    ├── _config.yml
    ├── flows
        ├── __init__.py
        ├── flow1.py
        ├── flow2.py
        └── flow3.py
    └── utils.py
    So I can expose my flows via storage object like this:
    from prefect.storage import Module
    
    storage = Module("my-package.flows")
    Each of the flows that belong to
    my-package
    have the following structure:
    from prefect import Parameter, task
    from prefect.utilities import logging as logging
    
    def core_function(**args)-> prefect.flow:
       # process flow
       return flow
    
    flow = core_function()
    Seem legit to me, since I haven't spotted any downside, do you have any recommended pattern or advise?
    k
    16 replies · 2 participants
  • k

    Krapi Shah

    08/02/2021, 7:54 PM
    Hi all, Is there any automation to send alerts in case of late flow runs? I have currently set an alert for job that does not start for 100 seconds, but this only triggers after it has been scheduled, In case the flow schedule gets delayed due to any reason like agent being down, there is no alert. Any help appreciated! Thanks
    k
    j
    19 replies · 3 participants
  • k

    Kyle McChesney

    08/02/2021, 8:01 PM
    Hello, Does anyone have a bit more in-depth description of how results work? Specifically S3Results? What should I expect to find in those files for a given task. Example
    @task(result=S3Result('bucket', location='example.out')
    def example():
      return [1, 2, 3]
    Is it just a pickle file that when loaded, it recreated the list of
    [1, 2, 3]
    How does it work for more complicated returns, for example a task that returns a tuple or a pandas DataFrame?
    k
    7 replies · 2 participants
  • m

    Madison Schott

    08/02/2021, 8:05 PM
    Hi all, does anyone know how using a Fivetran task changes the Fivetran sync schedule of your connectors? Not sure if it's a coincidence but I just noticed that the tables I was testing using Prefect haven't been synced on my Fivetran UI in 6 days. Normally I have the schedule set to sync every 24 hours.
    k
    m
    +2
    12 replies · 5 participants
  • p

    Philip MacMenamin

    08/02/2021, 8:45 PM
    Hey - I just ran a deploy of current, looks like 4200 is running, but 8080 is not.
    k
    m
    4 replies · 3 participants
  • h

    Harry Baker

    08/02/2021, 9:36 PM
    Is it appropriate to re-use a task across multiple different flows, similar to how you would import a python function? I am trying to organize my code base by DRYing it out, and I find myself reusing the same code defining tasks to instantiate and return a google sheets API client. Would I be able to define a task in a file, that is then imported in multiple other files to be used across multiple flows? Or does it not work like that
    k
    2 replies · 2 participants
  • m

    Mehdi Nazari

    08/02/2021, 9:53 PM
    Hi again all, Best Practices question; what are practical ways to FAIL a flow when a single task fails? also, good ways to send out notification when that failure happens?
    k
    5 replies · 2 participants
  • l

    Leon Kozlowski

    08/02/2021, 10:18 PM
    Hi all, have there been any thoughts to add a choice concept to
    Parameter
    ? Something like:
    choice_param = Parameter(
        name="choice_param",
        choices=["some", "choice", "params"],
        default="some"
    )
    j
    2 replies · 2 participants
  • s

    Sumit Kumar Rai

    08/03/2021, 4:02 AM
    Are there any online resources that will help me create development environment for Prefect projects?
    y
    k
    6 replies · 3 participants
  • s

    Scarlett King

    08/03/2021, 9:56 AM
    Hey, I’m trying to pass a date parameter to a unmapped variable. I’m having trouble reading the parameter value. Can someone help me out please?
    snapshot_date = Parameter(‘snapshot_date’, default=dt.datetime.now().strftime(‘%Y-%m-%d’))
    run = apply_map(full_flow, params=params, snapshot_date=unmapped(snapshot_date))
    And inside full_flow
    def full_flow(params, snapshot_date):
    	# ..
    snapshot_date = dt.datetime.strptime(snapshot_date, ‘%Y-%m-%d’)
    	print(f’{snapshot_date:%Y%m%d}’)
    	# ..
    It keeps giving me error because the Parameter object is passed instead of a string. How can I access the parameter value only?
    k
    5 replies · 2 participants
  • s

    Samuel Kohlleffel

    08/03/2021, 2:48 PM
    Hi, I'm trying to build out a registration process that uses
    flow.serialized_hash()
    to only register the modified flows. However,
    flow.serialized_hash()
    is not returning a consistent hash value for flows that have not been modified. Why would this be the case? For context, I'm testing by registering the flows locally with
    flow.register()
    k
    12 replies · 2 participants
  • k

    Kyle McChesney

    08/03/2021, 4:05 PM
    Has anyone worked extensively with ShellTask? Specifically I am looking to use
    .map
    and include the mapped value into either the command or the helper_script. For example:
    @task
    def files():
        return [
            '/opt/file1.txt',
            '/opt/file2.txt',
            '/opt/file3.txt',
            '/opt/file4.txt',
        ]
    
    rm_task = ShellTask(
        command='rm $file',
    )
    
    with Flow('shell') as flow:
        files_to_delete = files()
        rm_task.map(files_to_delete, helper_script='file="{mapped_value}"')
    k
    9 replies · 2 participants
  • m

    Mehdi Nazari

    08/03/2021, 4:41 PM
    Hello All, Is there a way to define a placeholder on a defined Parameter from the Cloud UI interface? I’d want prefect to fill the place holder at the run time basically!
    k
    9 replies · 2 participants
  • m

    Miguel Angel

    08/03/2021, 4:45 PM
    Have anyone worked with a local environment for testing in a, lets say, docker-compose fashion? Suppose that you have an aws ECS cluster to execute your flows and Prefect Cloud as backend.
    k
    k
    15 replies · 3 participants
  • p

    Philip MacMenamin

    08/03/2021, 4:51 PM
    I'm getting "Skipped (metadata unchanged)" upon registering a flow that I've updated (as in, the source code has changed, and is being ignored.
    m
    k
    10 replies · 3 participants
Powered by Linen
Title
p

Philip MacMenamin

08/03/2021, 4:51 PM
I'm getting "Skipped (metadata unchanged)" upon registering a flow that I've updated (as in, the source code has changed, and is being ignored.
prefect register --project proj_name -p /path/flow.py -n mesh_to_glb
m

Michael Adkins

08/03/2021, 4:52 PM
Hey @Philip MacMenamin -- if the structure of the flow doesn't change, there is no change in the metadata that is sent to Cloud so it is 'skipped'
k

Kevin Kho

08/03/2021, 4:53 PM
Hey @Philip MacMenamin, added to Michael’s comments, if you change the executor from LocalExecutor to DaskExecutor, that is not saved in the metadata also.
p

Philip MacMenamin

08/03/2021, 4:55 PM
I'm running the flow on a server hosted by me (locally) - this is considered "Cloud" ?
k

Kevin Kho

08/03/2021, 4:56 PM
For this purpose they would behave the same yep.
p

Philip MacMenamin

08/03/2021, 4:56 PM
I'm addling logging.info within tasks, and I'm not seeing these logging events, the version remains at 1
this is expected behaviour?
well, actually more than that, I've changed functionality of the source. I guess I don't understand why this is considered to not have changed.
k

Kevin Kho

08/03/2021, 5:00 PM
It’s about the change in
flow.serialized_hash
so if you add code to the task, it doesn’t affect the serialized hash because Prefect only stores the metadata , and not the actual code. You can force registration though with:
prefect register --force ...
p

Philip MacMenamin

08/03/2021, 5:06 PM
ok -
--force
did the trick -thanks!
👍 1
View count: 3