https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • d

    David Ojeda

    04/28/2020, 4:37 PM
    Hello! Is there a way to add custom states to the graphql service? We have a custom state called
    GracefulFail
    , which extends
    Success
    (we use it for tasks that did not succeed but should not stop the rest of the downstream tasks). Unfortunately, using these on prefect server gives a 500, leaving the task out of date on the UI…
    Failed to set task state with error: ClientError([{'message': '{\'_schema\': "Invalid data type: [None, {\'_schema\': \'Unsupported object type: GracefulFail\'}]"}', 'locations': [{'line': 6, 'column': 13}], 'path': ['set_task_run_states', 'states', 0, 'id'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
    Traceback (most recent call last):
      File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 115, in call_runner_target_handlers
        state = self.client.set_task_run_state(
      File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 1082, in set_task_run_state
        result = self.graphql(
      File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 225, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'message': '{\'_schema\': "Invalid data type: [None, {\'_schema\': \'Unsupported object type: GracefulFail\'}]"}', 'locations': [{'line': 6, 'column': 13}], 'path': ['set_task_run_states', 'states', 0, 'id'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    z
    • 2
    • 13
  • a

    alvin goh

    04/28/2020, 5:13 PM
    Hi, is it possible to include a kwarg in flow.visualize to select the filetype of the saved visualisation? JPG/PNG would be good.. I am collecting these flow visualisations in Mlflow, which cannot display pdf natively in the interface...
    z
    • 2
    • 5
  • k

    Kevin Systrom

    04/28/2020, 9:06 PM
    Hey folks, two questions: (1) what is a proper way to manually clear a task from the cache between runs? I have a mapped task over many values and sometimes I want to knock out specific values ... (2) I'm passing around dataframes between tasks and I'd like to use an inputs cache_validator - however, this doesn't work because it seems the validator 'can't evaluate the truth of a dataframe'. - is there a better approach so that if I pass in a different dataframe the cache for that task is invalidated?
    z
    c
    m
    • 4
    • 9
  • m

    Matthew Maldonado

    04/28/2020, 11:31 PM
    can the shelltask run windows commands
    n
    • 2
    • 1
  • k

    Kevin Systrom

    04/29/2020, 12:49 AM
    Another question: I can't seem to get cache to work when I run a task via map. So for instance, my_task.map([1,2,3]) called again with my_task.map([2,3,4]) I'd assume that for 2 and 3 my_task would be cached if cache_for=timedelta(hours=1) and cache_validator=cache_validators.all_inputs -- am I missing something obvious? thanks!
  • k

    Kevin Systrom

    04/29/2020, 12:52 AM
    import prefect
    from prefect.engine import cache_validators
    from prefect.engine import signals
    from prefect import task, Flow
    
    import pandas as pd
    
    @task(cache_for=pd.Timedelta(minutes=2),
          cache_validator=cache_validators.all_inputs)
    def task_b(n):
        print(f'Running task b: {n}')
    
    with Flow('CacheTests') as flow:
        task_b.map([1,2,3])
        
    with prefect.context():
        flow.run()
  • k

    Kevin Systrom

    04/29/2020, 12:52 AM
    If I run the flow.run block it seems to miss the cache every time... I'm v confused!
    c
    • 2
    • 16
  • s

    Sanjay Patel

    04/29/2020, 1:08 AM
    Hi, can I please get some assistance with my flow runs. I have a process where I need to run a series of mapped tasks. This first section of mapped tasks all need to complete before running my final 'summarize' task. My problem is that I'm writing results from the mapped tasks back to a database and then the summarize task reads from that database. There's no dependency that can be deciphered from the flow code. Should I be restructuring my tasks, or have a dummy variable as output from the last mapped task feeding into the final summarize task or are there other simple things that I am overlooking. Example code is here:
    simids = [1, 2, 3]
    with Flow('example') as flow:
        
    x = task1.map(simids)
        
    y = task2.map(x)
        
    z = task3.map(x, y)
        
    #start a DB session
        
    write_session = create_db_session.map()
        
    task_write_to_db.map(write_session, x , z)
          
    #summarize task - after completion of all mapped results - this part executes too early before all mapped tasks are complete
        
    read_session = create_db_session()
        
    PostProcessResults()
    n
    • 2
    • 4
  • m

    Matthew Maldonado

    04/29/2020, 2:20 AM
    Is there anyway to add the output of a command to the prefect logs
    c
    • 2
    • 1
  • a

    alvin goh

    04/29/2020, 3:59 AM
    Is there a way to use flowrunner on a schedule? I would like to return only the last task instead of all task, but return_task is only available in the flowrunner and not in flow.run 😞
    c
    • 2
    • 2
  • a

    Aakarshi Goel

    04/29/2020, 6:36 AM
    Hi Is there any way to send html emails using prefect ?
    c
    • 2
    • 4
  • a

    Aakarshi Goel

    04/29/2020, 6:39 AM
    <!here>
  • d

    David Ojeda

    04/29/2020, 9:20 AM
    Hello all! I wanted to introduce a small discussion concerning logging / formatting. Python formatting is one of those things that does not follow its own Python Zen `There should be one-- and preferably only one --obvious way to do it`… We have had
    %
    -interpolation,
    .format
    and f-strings. I personally avoid
    %
    -interpolation;
    .format
    is very popular but I almost always use f-strings. The
    logging
    module is the sole exception, in my opinion, where
    %
    -interpolation should be used, because it uses lazy evaluation:
    logger.debug('My object is %s', some_object)
    In that example, the
    some_object.__str__
    method will not be called if the logger level (or is it the handler?) is not low enough to process debug messages. This is different from
    logger.debug('My object is %s'%my_object)
    or
    logger.debug('My object is {}'.format(my_object))
    or
    logger.debug(f'My object is {my_object}')
    because of lazy evaluation: the string will be formatted, then sent to the logger. In a Prefect server setting, where logs are sent to the server, each log record needs to be serializable. This is achieved with
    json.dumps
    on the
    record
    received by
    prefect.utilities.logging.CloudHandler.emit
    This
    record
    has a field
    args
    with the args passed to the
    logger.log
    call, which can have non-serializable objects. What I have seen is that the server logs have many critical messages saying:
    Failed to write log with error: TypeError: Object of type X is not JSON serializable
    I can change my logging calls to do
    logging.debug('My object is %s', str(my_object))
    but I really feel like this is a workaround for a bug in
    CloudHandler
    : perhaps the
    args
    should not be sent to the server, or unserializable args objects could be redacted. What do you guys think is the appropriate fix here?
    j
    • 2
    • 3
  • t

    Thomas La Piana

    04/29/2020, 12:48 PM
    would anyone be willing to give me some insight into how they productionalize their flows? I've got a flow written and I've gotten Prefect server up and running in k8s, but how do i go about getting my local flows to execute on the k8s agent? more in the thread
    s
    j
    +2
    • 5
    • 17
  • s

    Sanjay Patel

    04/29/2020, 3:03 PM
    Hi! can I please get some guidance as to how to start diagnosing why my flow won't register() on prefect core server. When I call flow.run() it successfully executes. I then call flow.register() on the same machine and it doesn't have any issues registering it. I then open up Prefect Core Server UI, start a local agent and try to run the registered flow and I'm getting an error below. 'wf' is the name of one our folders in the structure, using relative imports.:
    'Last State Message
    [10:44am]: Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'wf'",)
    I should note that i did have this working a couple of weeks ago, but with some recent code changes / environment changes / dependencies perhaps, I can no longer reproduce a successful run after executing. I can successfully get a simple example flow to run on Prefect Core Server so my problem here is specific to our code base I'm sure. But I'm not sure how to start diagnosing why it runs when I test it flow.run() and then has a problem with prefect core server with everything still running on my local host. I've also tried calling flow.serialize() and it successfully prints out a big dictionary but don't know exactly what I'm looking for. An alternate option is to start trying to containerize with Docker - which I'll likely need to do to move off my host machine anyway (as directed by prefect developers). Therefore my second question would be, will manually creating this dockerized container approach still work locally as well? Still in initial functionality and testing phase so just trying to get some examples working with our code base. Thanks so much
    👀 1
    n
    c
    • 3
    • 24
  • l

    Lukas

    04/29/2020, 4:41 PM
    Hi, new to prefect and totally in love, really great job. I'm running into an issue with my flow: I have multiple tasks all running fine, results are
    pd.DataFrame
    objects. In the end I have one task "gathering" those dataFrame results to write them all into a Postgres DB (reason to have it all in one task is that I want either all results to be uploaded or none of them in case something fails). The function that is supposed to write these results into DB takes a
    dict
    as argument with the key
    tablename
    and the value
    dataFrame
    . So in the function I loop over the dict and write the data frame into the corresponding table. Here is where something goes wrong: my keys and values get mixed up and the function ends up trying to write the dataframes into the wrong table which obviously causes errors. My flow looks somewhat like this:
    task_result_1 = task1()
    
    task_result_2 = task2()
    
    task_result_3 = task3()
    
    upload_dfs(
        {
            "table1": task_result_1,
            "table2": task_result_2,
            "table3": task_result_3,
        },
    )
    In the flow prefect automatically creates a
    List
    and a
    Dict
    Task. Is the confusion in my keys / values somehow related to the sorting of items in the
    List
    class? https://github.com/PrefectHQ/prefect/blob/eb59918d98b15ba6e14c0a406cee885c0e44ea8b/src/prefect/tasks/core/collections.py#L73 Thanks a lot! 🙂
    👋 2
    j
    m
    • 3
    • 19
  • t

    Troy Sankey

    04/29/2020, 5:45 PM
    I have a basic misunderstanding about how the KubernetesJobEnvironment creates jobs. I keep trying to add items into my job_spec_file which gets passed into KubernetesJobEnvironment, but it doesn't seem like they're being picked up. Like, when I try to add spec.template.spec.serviceAccount and spec.template.spec.serviceAccountName, the containers that get create don't mount the token for the specified service account (they just use the
    default
    one instead). Also, more fundamentally, the prefect code is hard-coded to use the following string for the container args: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/environments/execution/k8s/job.py#L276
    "python -c 'import prefect; prefect.Flow.load(prefect.context.flow_file_path).environment.run_flow()'"
    but actually the job spec ends up using a different string!
    $ kubectl --namespace=prefect get job prefect-job-e4b101a5 -o yaml | grep -A1 args
          - args:
            - prefect execute cloud-flow
    I checked the version of prefect on my laptop (which i use to deploy the prefect flow) and the prefect agent image deployed to my cluster. It's all 0.10.x.
    👀 1
    j
    • 2
    • 20
  • z

    Zach

    04/29/2020, 10:39 PM
    Trying to get a conditional workflow written and having trouble. At three points in the workflow, I have a task that queries the database for a URI. If the tasks finds a URI then there are 3 subsequent tasks that need to run. Each of the tasks has output that the following tasks needs as input. I was looking at the
    ifelse
    flow conditional but it doesn't really make sense how I would use it when I have multiple "true_tasks" and they have output that feed into eachother
    n
    • 2
    • 3
  • z

    Zach

    04/29/2020, 10:41 PM
    also, how does the
    ifelse
    flow conditional work when I only need the "if" and not the "else"
  • j

    Joe Schmid

    04/30/2020, 12:26 AM
    Hi folks, does anyone have a recipe for building a multi-flow docker image on your own, i.e. without calling
    storage.build()
    or
    flow.register(build=True)
    I'm calling
    flow.save()
    and
    flow.register(build=False)
    in my Dockerfile and the registered flows look correct(Storage, Flow Locations, etc. look good in Cloud UI) but my flow runs fail with:
    Unexpected error while running flow: KeyError('a60e3215-88c7-499c-b525-83ba87b817f2')
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 241, in run
        parameters=parameters,
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/flow_runner.py", line 185, in initialize_run
        task = tasks[task_run.task_slug]
    KeyError: 'a60e3215-88c7-499c-b525-83ba87b817f2'
    (I do see this doc: https://docs.prefect.io/orchestration/recipes/multi_flow_storage.html and that's close but not quite what I'm looking for)
    c
    • 2
    • 5
  • r

    Rubens Sôto

    04/30/2020, 12:39 AM
    Hi guys, I'm very curious about prefect. We are build our big data architecture, no we use aws glue with pyspark, but we want to use aws emr, so we looking for a orchestration tool. Prefect can submit spark jobs to emr? Thank you
    n
    • 2
    • 3
  • h

    Hendrik Petzer

    04/30/2020, 7:48 AM
    Hi guys, firstly very cool tool! We're looking to improve our current ETL/data processing/business process flow by using Prefect as an orchestration tool. Our heavy lifting is done by R code, which is also exposed via a Plumber API on Kubernetes and some legacy processing engine(also converted to run in a container). Do you guys think Prefect will be a good fit? Basically I'm envisioning to stitch everything together with Python/Prefect, but what tasks should I use? Python REST calls, or should I rather do Shell Tasks and call the R code directly? (Apologies in advance if this type of question was already asked on the group)
    j
    • 2
    • 3
  • k

    Kotra Pali

    04/30/2020, 12:53 PM
    Hey 👋 . Thanks for a great product, I really like it! I would like to ask if our usecase is a good fit for prefect. Basically, we are processing tables via a few simple rules parametrized by user. Imagine about ~10 transformation funcs like:
    Rename(source='cola', target='colb').
    Transpose(source=[...]),
    Map(colname="a", values={3:4, 5:7}),
    Assign(condition='colb>2', target="cola", value=100)
    ...
    We have about 100k of these per single processing and there are some dependencies between commands (e.g. you can see that
    cola
    depends on values of
    colb
    in the last
    Assign
    command and we are able to parse these dependencies in advance). What I am thinking about is to create `prefect.Task`s from these transformation funcs and create a graph. That means a DAG with like 100k nodes with some edges at least (but each of them very lightweight). Do you think it's a good fit/idea for prefect? Can it handle such huge DAGs, or is it rather design for smaller ones?
    👀 1
    l
    • 2
    • 2
  • a

    Avi A

    04/30/2020, 1:11 PM
    Hey there! I’ve started implementing our flow with prefect and it was super fun! It’s currently a basic ETL (actually EL) that reads data from Elastic to a local filesystem. The flow is basically a mapped extraction for each (input: day to extract, output: data for that day) followed by a load for each such extract (also a simple map). Now, one of the tasks is running out of memory. It read only 250k records which shouldn’t cause an OOM error. I’m using a local system (UI+ one agent) Any ideas on how to start debugging?
    j
    • 2
    • 13
  • a

    Avi A

    04/30/2020, 2:21 PM
    on a different note, what’s the best practice for knowing that a task was already executed and to skip its execution if that is the case? e.g. suppose if a specific data partition was already loaded, skip the extract+load phases related to it
    👀 1
    d
    • 2
    • 11
  • j

    Jacques

    04/30/2020, 2:51 PM
    Hi all, I have a question about logging. we provide a trace_id Parameter to flows, and I'd like to include that trace_id in all log events inside tasks. Is this possible?
    👀 1
    d
    • 2
    • 46
  • a

    Amit Singh

    04/30/2020, 3:09 PM
    <!here> I'm trying to iterate thru a list received as response from a task but getting this error. How do i resolve it. thanks
    TypeError: 'GetItem' object is not iterable
    👀 1
    a
    d
    • 3
    • 8
  • a

    Avi A

    04/30/2020, 3:12 PM
    Using dataclasses as inputs/outputs: I’m trying to pass Python dataclasses between tasks, but I’m getting JSON serialization errors (naturally because they are… well.. not JSON serializable. Any ideas for how to tackle this issue? Ideally if there’s a way to supply custom ser/deser functions, that’d be perfect
    👀 1
    l
    • 2
    • 4
  • i

    itay livni

    04/30/2020, 4:31 PM
    @Laura Lorenz (she/her) do you have a link to the PyDenver talk you gave?
    👍 2
    🙂 1
    l
    • 2
    • 1
  • m

    Mark McDonald

    04/30/2020, 5:35 PM
    I'm trying to make better use of the Cloud Hooks on my flows. Ideally, I wouldn't have to set this up through the cloud UI, but rather when I'm deploying my flows. It doesn't look like there is a way to do this from Core. Will someone confirm? Like it'd be cool if this were part of Flow.register(). I'm guessing it is possible to do this from the graphql api? I'm not seeing much about it in the docs.
    👀 1
    d
    • 2
    • 5
Powered by Linen
Title
m

Mark McDonald

04/30/2020, 5:35 PM
I'm trying to make better use of the Cloud Hooks on my flows. Ideally, I wouldn't have to set this up through the cloud UI, but rather when I'm deploying my flows. It doesn't look like there is a way to do this from Core. Will someone confirm? Like it'd be cool if this were part of Flow.register(). I'm guessing it is possible to do this from the graphql api? I'm not seeing much about it in the docs.
👀 1
d

Dylan

04/30/2020, 5:40 PM
Hi @Mark McDonald! This is definitely interesting feedback. Would you mind opening an issue for this feature request? Are you changing the hooks that are set between versions of a flow? Cloud Hooks should persist across versions once they’re set It’s possible to set cloud hooks using the graphql api. Try exploring the Interactive API for details.
Here’s an example:
mutation {
  create_cloud_hook(input: {type: EMAIL, name: "Example", version_group_id: "abc", states: ["Running"], config: {}}) {
    id
  }
}
m

Mark McDonald

04/30/2020, 5:52 PM
thanks for the feedback, Dylan. It's good to know that these hooks persist across versions. Ideally I would have the recipient of the cloud hook notifications under some version control, as part of the flow code. We are using prefect in a self service fashion. So, while it works for me to set up my own cloud hooks through the UI, I'd like to have it programmatically set for others who might forget this (important) step. In airflow, you set this in the default_args of the dag file (see below). I would probably try to do something like this, if it were possible to do during Flow.register(). The graphql api should work fine, I'd just have to do it after flow deployment, because for new flows, I wouldn't yet have the version_group_id to work off of.
default_args = {
    'owner': 'mark',
    'start_date': datetime(2020, 4, 30),
    'email': [<mailto:'mmcdonald@clearcover.com|'mmcdonald@clearcover.com>'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retry_exponential_backoff': True,
    'retry_delay' = timedelta(seconds=300)
    'retries': 3
}
I will open this feature request. In the meantime, I'll work with the graphql api. That may be sufficient. Thanks again, Dylan.
d

Dylan

04/30/2020, 5:55 PM
Thank you!
View count: 1