• p

    Preston Marshall

    2 years ago
    This is my attempt:
    with Flow("mirror_sftp_to_gcs") as flow:
        password_secret_path = Parameter("password_secret_path")
        sftp_connection_info = Parameter("sftp_connection_info")
        password = GoogleSecretManagerSecret(password_secret_path)  # TODO: Cast function to convert to namedtuple
        host_path = Parameter('host_path')
        files = sftp_list_files(sftp_connection_info, password, host_path)
        downloaded_files = sftp_to_gcs.map(
            file_path=files,
            connection_info=unmapped(sftp_connection_info),
            password=unmapped(password),
            bucket=unmapped("snip")
        )
    I get an error:
    ValueError: Flow.run received the following unexpected parameters: password_secret_path
    Very odd, the other parameters work, I am specifying them in a parameters kwarg in flow.run() in another file.
  • p

    Preston Marshall

    2 years ago
    It appears that because the parameter is never used in a task, it doesn't ever expect it
    p
    Chris White
    +1
    35 replies
    Copy to Clipboard
  • p

    Preston Marshall

    2 years ago
    How do I throttle a task being mapped?
  • n

    Nate Atkins

    2 years ago
    I just started looking at the Cloud Scheduler yesterday and saw this on concurrency limits. https://docs.prefect.io/cloud/concepts/concurrency-limiting.html#task-concurrency-limiting Not sure what you do if you are running locally with Dask.
  • p

    Preston Marshall

    2 years ago
    That's unfortunate it can't be rate-limited without cloud, I hope that feature comes back.
  • Jeremiah

    Jeremiah

    2 years ago
    Within a single Dask cluster, you can use Dask resource limits to achieve a similar result. Global limiting requires some central broker of state.
  • p

    Preston Marshall

    2 years ago
    Do you have a link on that by chance? I feel like thats something that should be handled by the executor if the underlying system supports it. I totally get you need to fund development but if you are thinking of open source as a strategy to onboard people to the commercial offering it's difficult to put together a POC without this 🙂
  • Jeremiah

    Jeremiah

    2 years ago
    It is handled by the executor as the docs explain - https://docs.prefect.io/api/latest/engine/executors.html#daskexecutor
  • p

    Preston Marshall

    2 years ago
    I meant that if the executor itself supports it, the Executor abstraction should facilitate it. Maybe I’m misunderstanding but it doesn’t seem like it does
  • i

    itay livni

    2 years ago
    Hi - Is this the correct way to handle a state in a flow that fails and skips all downstream task on failure. The result of the flow being a an empty list? Thanks again
    i
    Chris White
    3 replies
    Copy to Clipboard