https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • w

    Wolfgang Kerzendorf

    03/06/2020, 5:49 PM
    I'm trying to map a map again so the first map returns a list - then the second task makes a list of lists out of these tasks. I want the next task to directly work on the items of the list of lists. How do I do that?
    c
    2 replies · 2 participants
  • m

    Manuel Aristarán

    03/06/2020, 6:51 PM
    Is there a limit to the number of `Parameter`s a
    Flow
    can have?
  • m

    Manuel Aristarán

    03/06/2020, 6:53 PM
    prefect
    seems to ignore the 3rd declared `Parameter`:
    with Flow("Salesforce to Cassandra") as f:
        tap_config = Parameter("tap_config")
        tap_catalog = Parameter("tap_catalog")
        target_config = Parameter("target_config")
    
        salesforce_result = run_salesforce_tap(config=tap_config, catalog=tap_catalog)
        result = run_cassandra_target(config=target_config, input=salesforce_result)
  • m

    Manuel Aristarán

    03/06/2020, 6:54 PM
    this is the result of `print(f.parameters())`:
    {<Parameter: tap_catalog>, <Parameter: tap_config>}
    j
    7 replies · 2 participants
  • m

    Manuel Aristarán

    03/06/2020, 9:24 PM
    Is there a way to have a
    task
    return other task? My use case is creating a
    docker.CreateContainer
    , but I need to build the
    command
    parameter.
    j
    2 replies · 2 participants
  • m

    Manuel Aristarán

    03/06/2020, 11:48 PM
    is there a limit on the size of the parameters payload on the UI? Getting this when passing a ~800KiB object
    n
    5 replies · 2 participants
  • c

    Christian

    03/08/2020, 4:17 PM
    Hi all 👋 New user here - so please forgive my noob question. I'm setting up a scheduled web harvester for some data crawling... This kind of works (I think I do have some caching issues), however I was wondering if I can trigger an out-of-schedule run via a RESTful API? Say my script does it thing daily at 8am, can I send some request to a REST API to trigger an execution at this moment? Cheers, C
    n
    3 replies · 2 participants
  • b

    Bernhard

    03/09/2020, 10:41 AM
    Hi all, I am a new user: I want to handle the use case of watching a directory for changes in zipfile metadata. For changed zipfiles several tasks would be run then. The resulting flow would be run every 24 hours. The coarse concept of a flow could be: 1)   Initialize the prefect-cache with zipfile metadata (when the flow is started) 2)   At midnight get up to date zipfile metadata and compare with cached metadata 3)   Refresh cache for changed zipfile metadata 4)   for changed zipfile metadata only download the zipfiles and compute various derivatives 5)   wait for next midnight   The part "download the zipfile and compute various derivatives" is working nicely already. I would like to obtain recommendations referring designing a cache validator, and how to initialize the complete cache, after the flow is started.   Thank You
    z
    4 replies · 2 participants
  • j

    John Ramirez

    03/09/2020, 4:37 PM
    Hey everyone - how do i perform a conditional split on a single input
    l
    9 replies · 2 participants
  • j

    John Ramirez

    03/09/2020, 6:30 PM
    Traceback (most recent call last):
      File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/prefect/utilities/serialization.py", line 186, in _validate_json
        json.dumps(value)
      File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/__init__.py", line 231, in dumps
        return _default_encoder.encode(obj)
      File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/encoder.py", line 199, in encode
        chunks = self.iterencode(o, _one_shot=True)
      File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/encoder.py", line 257, in iterencode
        return _iterencode(o, 0)
      File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/encoder.py", line 179, in default
        raise TypeError(f'Object of type {o.__class__.__name__} '
    TypeError: Object of type datetime is not JSON serializable
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "main.py", line 176, in <module>
        main()
      File "main.py", line 163, in main
        labels=[args.env],
      File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/prefect/core/flow.py", line 1412, in register
        no_url=no_url,
      File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/prefect/client/client.py", line 616, in register
        serialized_flow = flow.serialize(build=build)  # type: Any
      File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/prefect/core/flow.py", line 1209, in serialize
        serialized = schema(exclude=["storage"]).dump(self)
      File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/marshmallow/schema.py", line 556, in dump
        result = self._serialize(processed_obj, many=many)
      File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/marshmallow/schema.py", line 520, in _serialize
        value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
      File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/marshmallow/fields.py", line 316, in serialize
        return self._serialize(value, attr, obj, **kwargs)
      File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/prefect/utilities/serialization.py", line 220, in _serialize
        return super()._serialize(value, attr, obj, **kwargs)
      File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/marshmallow/fields.py", line 571, in _serialize
        return schema.dump(nested_obj, many=many)
      File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/marshmallow/schema.py", line 556, in dump
        result = self._serialize(processed_obj, many=many)
      File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/marshmallow/schema.py", line 516, in _serialize
        for d in typing.cast(typing.Iterable[_T], obj)
      File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/marshmallow/schema.py", line 516, in <listcomp>
        for d in typing.cast(typing.Iterable[_T], obj)
      File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/marshmallow/schema.py", line 520, in _serialize
        value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
      File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/marshmallow/fields.py", line 316, in serialize
        return self._serialize(value, attr, obj, **kwargs)
      File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/prefect/utilities/serialization.py", line 181, in _serialize
        self._validate_json(value)
      File "/Users/johnramirez/.local/share/virtualenvs/xignite-data-x4Gzfino/lib/python3.7/site-packages/prefect/utilities/serialization.py", line 188, in _validate_json
        raise ValidationError("Value is not JSON-compatible")
    marshmallow.exceptions.ValidationError: Value is not JSON-compatible
    c
    s
    8 replies · 3 participants
  • j

    John Ramirez

    03/09/2020, 6:30 PM
    im getting this error when trying to register a workflow
  • l

    Laura Lorenz (she/her)

    03/09/2020, 10:09 PM
    Hey community! I have been noodling on obfuscating logs/exceptions/parameters to Cloud with some other contributors, and tried to consolidate the ideas in a PIN and would love some more community feedback. If you think you or a friend would find obfuscating your logs and exceptions useful, please let me know which parts of this PIN speak to you (positively or negatively): https://github.com/PrefectHQ/prefect/pull/2130 I’m especially interested if you are writing flows for a privacy/compliance focused industry!
    :upvote: 5
  • a

    Arsenii

    03/10/2020, 1:53 AM
    Hi all! Kind of a basic question, but I don't seem to find relevant documentation on this. I understand that
    DaskExecutor
    can be used for parallelization inside the flow between tasks, but what about parallelization between flows themselves? I see that there's
    DaskKubernetes
    environment that spawns pods for flows, each with a temporary Dask cluster inside, which makes sense to me on the surface but Kubernetes is not currently an option for us. Would setting up something like `FargateEnvironment`/`Fargate Agent` bring significant improvements compared to, say, regular
    DockerAgent
    ? If a flow is run as a
    Fargate Task
    with a specified remote
    DaskExecutor
    , where does it actually ""run"" the flow? Does it make more sense to have a dedicated remote Dask cluster somewhere, or start up a local one for each flow? Thanks again for all the help!
    c
    b
    5 replies · 3 participants
  • m

    Mark McDonald

    03/10/2020, 5:26 PM
    Hi - I'm working on a flow wherein I have a task that I only want to run when a particular upstream task fails. I've read through the docs and I am trying to use triggers to do this. There are a lot of flavors of triggers: all_finished, all_successful, all_failed, any_successful, any_failed, some_failed, etc. However, it's not clear to me if there is a way to specify if one particular upstream task failed to trigger the downstream task. I don't want to use some_failed or any_failed because there is a good chance that the failure happened at a task elsewhere in the flow
    c
    3 replies · 2 participants
  • b

    Braun Reyes

    03/10/2020, 10:05 PM
    is it possible to trigger a task based on the value of parameter? like...
    @task(trigger=<boolean based off value of parameter)
    c
    6 replies · 2 participants
  • b

    Braun Reyes

    03/10/2020, 10:05 PM
    trying to tie the triggering of certain tasks based on different schedules
  • b

    Braun Reyes

    03/10/2020, 10:06 PM
    different clocks that is since we can pass default parameters per schedule
  • a

    Amit Singh

    03/11/2020, 11:40 AM
    @here is there a way to send html emails using prefect.tasks.notifications.email_task.EmailTask
    z
    2 replies · 2 participants
  • j

    Jeff Brainerd

    03/11/2020, 3:14 PM
    Hey Prefect team, would like to upvote the issue to provide ability to run prefect flow container as non-root. This is definitely a problem for us. https://github.com/PrefectHQ/prefect/issues/2025 Does anyone have a workaround?
    c
    s
    4 replies · 3 participants
  • c

    Cab Maddux

    03/11/2020, 6:18 PM
    Hi! When having the Kubernetes Agent trigger flows, I haven't been able to have the agent create my job with a custom k8s config yaml (it creates the job, just with the default k8s config yaml). Shouldn't I be able to use
    flow.environment = KubernetesJobEnvironment(job_spec_file='...')
    prior to
    flow.register()
    and have the k8s agent use my
    job_spec_file
    to create the job? My job spec file is basically just the example here: https://docs.prefect.io/cloud/execution/k8s_job_environment.html#examples
    z
    2 replies · 2 participants
  • j

    John Ramirez

    03/11/2020, 7:40 PM
    Hey - does anyone have a code sample for Control Flow task such as
    switch
    and
    ifelse
    z
    8 replies · 2 participants
  • s

    Scott Zelenka

    03/11/2020, 9:03 PM
    Is it possible to make an EnvVarSecret "required" similar to how Parameters are required? https://docs.prefect.io/api/latest/tasks/secrets.html#secret
    1 reply · 1 participant
  • j

    Jeff Brainerd

    03/11/2020, 11:08 PM
    Hi Prefecters, I’m struggling with a peculiarity of Django, but I’m wondering if the problem is somewhat more general. The basic issue is that I need to initialize django before importing any ORM objects that many of my prefect tasks use. When Dask is running a task, I’m able to use an initialization hook to call that django function before the flow is deserialized with cloudpickle. The problem comes when building the prefect docker image -- the healthcheck fails during cloudpickle load. I’ve done some code spelunking and I can’t see an obvious way to override or provide a hook to the healthcheck script short of just hacking it (which I’m doing just to get it to pass). So I’m interested if there’s an existing workaround, or if this problem has come up before and if anyone else would benefit from some kind of mechanism for overriding, disabling or providing some kind of pre-run hook for the healthcheck script. Thanks all 🙏
    c
    j
    +1
    6 replies · 4 participants
  • m

    Mark Williams

    03/12/2020, 9:30 PM
    Three Questions: 1. Is there a way to do more with the visualizations that prefect core outputs? Like append a file name to a task? 2. Also, is there a way to split the files out dynamically? I adopted the visualization state handler from the examples, but when I run large amounts of loops through, it just makes a blank file. 3. How do I make the flow write to a log file? I have the flow result_handler set to LocalResultHandler with a directory specified, but it does not write anything to it.
    c
    3 replies · 2 participants
  • t

    Thomas La Piana

    03/13/2020, 9:56 AM
    can someone confirm that this feature is only available with prefect cloud? https://docs.prefect.io/cloud/execution/dask_k8s_environment.html#overview
    z
    2 replies · 2 participants
  • b

    bardovv

    03/13/2020, 1:02 PM
    👋 I’m here! What’d I miss?
    🎉 1
    z
    1 reply · 2 participants
  • k

    Kostas Chalikias

    03/13/2020, 1:10 PM
    Hi team, I am trying to find which flow is eating up all my agent's resources so have a couple of questions: • Is each run forked, ie will resources be reclaimed when the run finishes? • Is there a way to get memory usage per flow run?
    z
    16 replies · 2 participants
  • s

    Scott Zelenka

    03/13/2020, 2:29 PM
    Hi team, I'm attempting to debug a Flow locally that maps over a large list of objects. A single entry in that list fails to process in a given Task, and it happens to be about 3/4 the way through the list. So iterating on logic to correct the data is time intensive, as it needs to re-compute the entire pipeline before reaching the failure point (again). The way I've debugged these types of problems in other tools (i.e. Luigi) allowed for a concept of
    cache
    or
    checkpoint
    , where you could specify how long to persist the output of a given step in the pipeline to disk. So that, when you re-run the entire pipeline locally when debugging, it would simply read in the cached data, rather than re-compute each step that previously completed successfully. It was also nice in production, as it allowed commonly used tasks to share their output with other pipelines without needing to be re-computed. It seems Prefect has a concept of output caching, but only stores this in-memory for local runs when debugging.. which is useless for this use case of iterating on logic changes and re-running the entire pipeline again. https://docs.prefect.io/core/concepts/persistence.html#output-caching There's mention in this Slack channel to 'use Prefect Cloud', but I cannot find any tutorials or examples of how to accomplish this. So I'm looking for guidance. How would you use
    cache
    in Prefect Cloud to speed up the debugging iteration process of a local Flow?
    l
    5 replies · 2 participants
  • n

    Nathan Molby

    03/13/2020, 2:41 PM
    Is there any way to run a task inside of another task? It seems that a flow could quickly get incredibly complicated if this is not possible.
    c
    1 reply · 2 participants
  • n

    Nathan Molby

    03/13/2020, 2:57 PM
    Question 2 of the day haha: What is the recommended way of doing asynchronous http requests? (specifically in prefect)
    z
    4 replies · 2 participants
Powered by Linen
Title
n

Nathan Molby

03/13/2020, 2:57 PM
Question 2 of the day haha: What is the recommended way of doing asynchronous http requests? (specifically in prefect)
z

Zachary Hughes

03/13/2020, 2:58 PM
I'm a big fan of
httpx
, but if you prefer vanilla
requests
you can run that in a threadpool!
But if you're trying to run that in Prefect, it's worth noting that Prefect doesn't support async calls.
n

Nathan Molby

03/13/2020, 3:02 PM
Yeah that's the problem I've been struggling with for a bit. I'm trying to wait on a bunch of api calls, which I've previously implemented using aiohttp, but prefect doesn't support async. I'm currently trying to use the daskexecutor to potentially map and then gather the results, but I'm worried that none of the libraries will work for that.
z

Zachary Hughes

03/13/2020, 3:08 PM
Yeah, given that use case, I think your best bet is likely using synchronous requests and mapping to parallelize as desired.
View count: 1