https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • b

    Brett Naul

    10/09/2020, 6:57 PM
    @Jim Crist-Harif quick q about https://github.com/PrefectHQ/prefect/pull/3333: where would a callback like https://docs.prefect.io/orchestration/execution/overview.html#environment-callbacks be go once environments are ✂️ ?
    j
    j
    • 3
    • 12
  • k

    Krzysztof Nawara

    10/09/2020, 7:26 PM
    Hi everyone! I'd like to ask for explanation how built-in
    all_inputs
    cache validator is meant to work. Cache validator receives 3 arguments, but only 2 of those are relevant here:
    - state (State): a `Success` state from the last successful Task run that contains the cache
    - inputs (dict): a `dict` of inputs that were available on the last successful run of the cached Task
    Now my current understanding (almost certainly incorrect) is that they come from the same run. But then the logic of the validator wouldn't make any sense:
    elif getattr(state, "hashed_inputs", None) is not None:
            if state.hashed_inputs == {key: tokenize(val) for key, val in inputs.items()}:
                return True
            else:
                return False
        elif {key: res.value for key, res in state.cached_inputs.items()} == inputs:
            return True
    It just compares inputs passed directly to validator to inputs extracted from the state. So it's pretty clear those 2 arguments can come from different runs, but I don't understand how is that possible. If someone could provide an explenation I'd be very grateful 🙂
    s
    • 2
    • 3
  • k

    Krzysztof Nawara

    10/09/2020, 8:06 PM
    Hello all 🙂 Is it possible to have dynamic cache keys? Currently they can be templated, so they are semi-dynamic, but I haven't found any way to make data that's passed through the pipeline part of the key. Usecase: caching mapped tasks where the order can be non-deterministic, so
    prefect.context.map_index
    might not be enough
    n
    • 2
    • 10
  • f

    flavienbwk

    10/09/2020, 8:45 PM
    Hi again, I want to run my first flow linked to my local API and I am trying to run
    client.create_project()
    to create a project. But each time I get the following error :
    requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <http://xxx.xxx.xxx.xxx:4200/graphq>
    My code :
    client = Client(api_server="<http://xxx.xxx.xxx.xxx:4200/graphql>")
    client.create_project(project_name="weather")
    I don't understand what port / URL I should put in
    Client()
    . Thank you for your help.
    n
    • 2
    • 22
  • f

    flavienbwk

    10/10/2020, 12:03 AM
    Hi, I am trying to make a flow run on my API but I get the following error from the UI :
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named '/root/'")
    I don't understand the problem as I don't have any relative import in my code and the latter is pretty simple : https://github.com/flavienbwk/prefect-docker-compose/blob/main/scripts/weather.py
    c
    • 2
    • 14
  • t

    Tsang Yong

    10/10/2020, 4:20 AM
    Hi, is there a proper way to launch an on demand k8s dask cluster i.e. KubeCluster and then run prefect tasks on it using DaskExecutor? I tried doing it but getting a
    ERROR - prefect.FlowRunner | Unexpected error: TypeError('code() takes at most 15 arguments (16 given)')
    not sure if it’s a supported mode of operation.
    c
    j
    • 3
    • 9
  • a

    Alfie

    10/10/2020, 4:45 PM
    hi Team, I’m migrating Prefect core from 0.12.6 to 0.13.10, and get the error when i try to register my flow. Please help, thanks.
    File ".../.pyenv/versions/alert/lib/python3.8/site-packages/prefect/core/flow.py", line 1608, in register
        registered_flow = client.register(
      File ".../.pyenv/versions/alert/lib/python3.8/site-packages/prefect/client/client.py", line 734, in register
        serialized_flow = flow.serialize(build=build)  # type: Any
      File ".../.pyenv/versions/alert/lib/python3.8/site-packages/prefect/core/flow.py", line 1451, in serialize
        self.storage.add_flow(self)
      File ".../.pyenv/versions/alert/lib/python3.8/site-packages/prefect/environments/storage/local.py", line 140, in add_flow
        flow_location = flow.save(flow_location)
      File ".../.pyenv/versions/alert/lib/python3.8/site-packages/prefect/core/flow.py", line 1520, in save
        cloudpickle.dump(self, f)
      File ".../.pyenv/versions/alert/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 55, in dump
        CloudPickler(
      File ".../.pyenv/versions/alert/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
        return Pickler.dump(self, obj)
    TypeError: 'NoneType' object is not callable
    j
    • 2
    • 3
  • f

    flavienbwk

    10/10/2020, 9:25 PM
    Hi, I am trying the S3 storage option. I have two questions about it : • Does the S3 storage option uses Docker (does it saves images or just flows data ?) • How can I specify the endpoint of my own S3 server (I am using Minio) and specify its credentials ? (Am I forced to use Webhook storage with non-AWS S3 servers ?) Thank you !
    j
    • 2
    • 2
  • l

    Leo Meyerovich (Graphistry)

    10/10/2020, 10:15 PM
    Is there a way to get an IntervalSchedule to pass (a Parameter?) the interval to the resulting flow handler so it knows what time span it is responsible for?
    c
    • 2
    • 18
  • l

    Leo Meyerovich (Graphistry)

    10/10/2020, 10:15 PM
    the docs show how to pass a fixed label of the Schedule itself (e.g., "Daily schedule"), but not the span ("10/1/2020--10/2/2020")
  • l

    Leo Meyerovich (Graphistry)

    10/10/2020, 10:17 PM
    I think the ideal is Parameter so we can reuse for other flow handlers, but right now looking for anything 🙂
  • a

    Alfie

    10/11/2020, 3:56 AM
    Hi Team, i see GraphQL API change between 0.12.6 and 0.13.10, such as there is schedule instead of schedules in flow attributes. The changes break some functions in my system, so it’s better to have a brief about the change for me to fix them. where can I see some introduction about all the changes between the two versions? Thansk
    👍 1
  • a

    ale

    10/11/2020, 11:47 AM
    Hi folks! Are there any training courses or workshops to learn and master Prefect?
    👍 1
    f
    i
    • 3
    • 2
  • f

    flavienbwk

    10/11/2020, 2:26 PM
    Hi, I have a question about this error :
    ValueError: Interval can not be less than one minute when deploying to Prefect Cloud.
    Why was that limit set ? Why can't I decide to schedule a job every second ?
    c
    • 2
    • 5
  • f

    flavienbwk

    10/11/2020, 4:10 PM
    Hi, about Prefect flows and agents : does 1 flow == 1 agent, or are tasks in flows parallelized over the available agents ? Should I use Dask for that ?
    j
    k
    d
    • 4
    • 4
  • a

    Alfie

    10/12/2020, 3:51 AM
    hi Team, with the latest core 0.13.10, seems I have to get tenant and project ready before creating any flow. Is there any workaround to get tenant and project ready easily instead of creating them manually?
    c
    c
    • 3
    • 4
  • b

    Billy McMonagle

    10/12/2020, 11:17 PM
    Hi there! I am trying to get started, and I'm wondering if there are any good architecture diagrams to get oriented within the Prefect ecosystem?
    k
    j
    • 3
    • 9
  • j

    Jackson Maxfield Brown

    10/13/2020, 12:56 AM
    Looking around in the GitHub issues but I don't see anything for it. Has there been thought to adding something like:
    results, errors = my_task.safe_map(**iterables)
    k
    • 2
    • 3
  • r

    Rob Fowler

    10/13/2020, 9:19 AM
    is there a way to parameterise the timeout of a flow somehow?
  • r

    Rob Fowler

    10/13/2020, 9:19 AM
    I have a task to grab some data, but depending on the size i want to specify on the command line how long the timeout should be
  • r

    Rob Fowler

    10/13/2020, 9:29 AM
    I tried to set the timeout before I run the task, for example flow.slow_task.timeout = 100 for @task(timeout=9) def slow_task(opts, scripts): sleep(10) return 1
    j
    • 2
    • 3
  • n

    Newskooler

    10/13/2020, 10:05 AM
    Does anyone know is this will be implemented in Prefect anytime soon? https://docs.prefect.io/core/PINs/PIN-14-Listener-Flows-2.html How do people currently handle event-based flows I wonder? 🤔
    j
    • 2
    • 4
  • r

    Rob Fowler

    10/13/2020, 10:22 AM
    I think I need to do the class slow_task(Task): run(self, opts, scripts): thing, but the example in the documentation hints at this but not enough detail for me to work out how to do it.
  • c

    Chirag

    10/13/2020, 10:59 AM
    Hi Guys, Is there a way to supply MySQL connection to multiple tasks? Currently, I am getting this error : "TypeError: can't pickle _mysql_connector.MySQL objects"
    a
    j
    • 3
    • 6
  • n

    Newskooler

    10/13/2020, 11:36 AM
    Does anyone know if it’s possible to either disable or increase the time interval for the Lazarus process?
    👍 1
    j
    • 2
    • 6
  • n

    Newskooler

    10/13/2020, 1:37 PM
    Hi 👋 Is mapping only possible on the functional API and if not, how can something be mapped with Imperative API?
    j
    • 2
    • 2
  • v

    Vinod Sugur

    10/13/2020, 2:17 PM
    Hi, I am working on FargateTask sample provided in the documentation "https://docs.prefect.io/orchestration/execution/fargate_task_environment.html”. I am getting following error:
    [2020-10-13 13:56:50,113] ERROR - agent | Error while deploying flow: ClientException("An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Invalid setting for container 'flow'. At least one of 'memory' or 'memoryReservation' must be specified.")
    . I have followed the sample given the documentation and have defined all the attributes including "memory" and "cpu". Please suggest on the same.
    j
    v
    • 3
    • 13
  • m

    Matias Godoy

    10/13/2020, 4:08 PM
    Hi guys! We have successfully deployed a Kubernetes Agent in Amazon EKS following your great tutorial on Medium. I executed some stress tests by starting around 30 flow runs for the new k8s agent, and I noticed that it picked 6 runs at a time (I don't know if 6 is a setting, or the agent picks this number based on the node capacity), and queued the rest. It's not bad! but since our flow runs can take as long as 10 hours each, now I'd like to setup an autoscaler to avoid having a long queue of runs waiting for so long. As you might already inferred, I'm new to Kubernetes. My guess is that HPA is not the right answer here. Maybe a cluster-autoscaler is the way. Could you guys give me a hint on where should I start for setting up an autoscaler on EKS that works with the Kubernetes Agent? Thanks!
    j
    j
    • 3
    • 5
  • n

    Newskooler

    10/13/2020, 4:31 PM
    Hi 👋 🙂 Does anyone have experience with Dask Executor? I guess to the nature of a flow the DFE can be blocked sometimes… 🤔 I have detailed my question here: https://github.com/PrefectHQ/prefect/discussions/3491
  • e

    Emma Willemsma

    10/13/2020, 7:36 PM
    Hey friends, I'm looking for some guidance on versioning flows. I like the way flows are up-versioned when they get registered to Prefect Cloud and older versions are automatically archived. However, I want to be able to trace a flow version to its code in github and I'm not sure the best way to do that. We tag our commits with version strings, so I just need to be able to add that information to the flow somehow. Options I've thought of: • Can I add some kind of tag when registering my flow? I know there are labels, but as far as I understand the labels are just for linking flows with agents. I haven't seen anything about being able to add other types of tags. • Can I override the auto-incremented version number so it matches our version string? Again I haven't seen how to do this in the docs. • Can/should I add our version string to the flow name? It sounds like I can set a common version group id for each new version of my flow so that even though they're named differently, Prefect Cloud will still consider them to be versions of each other. However, the docs also say that version group ids are deprecated, so I'm not sure that's the right thing to use. How are other people handling this?
    👀 1
    j
    • 2
    • 2
Powered by Linen
Title
e

Emma Willemsma

10/13/2020, 7:36 PM
Hey friends, I'm looking for some guidance on versioning flows. I like the way flows are up-versioned when they get registered to Prefect Cloud and older versions are automatically archived. However, I want to be able to trace a flow version to its code in github and I'm not sure the best way to do that. We tag our commits with version strings, so I just need to be able to add that information to the flow somehow. Options I've thought of: • Can I add some kind of tag when registering my flow? I know there are labels, but as far as I understand the labels are just for linking flows with agents. I haven't seen anything about being able to add other types of tags. • Can I override the auto-incremented version number so it matches our version string? Again I haven't seen how to do this in the docs. • Can/should I add our version string to the flow name? It sounds like I can set a common version group id for each new version of my flow so that even though they're named differently, Prefect Cloud will still consider them to be versions of each other. However, the docs also say that version group ids are deprecated, so I'm not sure that's the right thing to use. How are other people handling this?
👀 1
j

Jim Crist-Harif

10/13/2020, 7:55 PM
Hi Emma, this is definitely a common ask. For now the pattern we've been recommending is adding the version string you want to the storage you're using. For example, if using
S3
storage, you could use the version as part of the
key
, for
Docker
storage it could be part of the
image_tag
, etc... This will show up in the UI and be stored with the flow as metadata.
👀 1
:upvote: 2
e

Emma Willemsma

10/13/2020, 9:31 PM
Oh that works, thank you!
View count: 1