https://prefect.io logo
Title
i

Ievgenii Martynenko

05/09/2022, 8:50 AM
Morning, I'm trying to register flow from another python (like it's described in https://github.com/PrefectHQ/prefect/discussions/4042) My code which does registration:
import os.path as p

from <http://flows.xxx|flows.xxx> import testflow
from prefect.storage import S3

flows = [testflow]

root = p.dirname(p.realpath(__file__))

storage = S3(stored_as_script=True, key='testflow.py', bucket='test')

if __name__ == '__main__':

    for flow_file in flows:
        flow = flow_file.flow

        print(f"Registering flow {flow.name} from {flow_file}")

        storage.add_flow(flow)

        flow.register(
            project_name='test',
            idempotency_key=flow.serialized_hash()
        )
Flow itself:
name = "testflow"
executor = LocalDaskExecutor()
storage = S3(stored_as_script=True, key='testflow.py', bucket='test')
run_config = KubernetesRun(
    job_template_path='<https://XXX/job_template/k8s_job_template.yaml>')

with Flow(name=name, executor=executor, storage=storage, run_config=run_config) as flow:
	....
Error I've got is the same when you're not adding storage to DataFlow:
Failed to load and execute flow run: ValueError('Flow is not contained in this Storage')
What am I missing?
a

Anna Geller

05/09/2022, 9:28 AM
If your end goal is to register all flows in a given directory, you don't need to loop over flows, you can use Prefect CLI:
prefect register --project xyz -p your_dir/
This will register all flows in
your_dir
without any custom registration script. Also, no need to do
storage.add_flow(flow)
, this is done during registration by Prefect. Are you uploading the S3 files yourself, or do you want Prefect to upload the flow file to S3 on registration? If the latter, you can use:
STORAGE = S3(
    bucket="prefectdata",
    key=f"flows/{FLOW_NAME}.py",
    stored_as_script=True,
    # this will ensure to upload the Flow script to S3 during registration
    local_script_path=f"flows/{FLOW_NAME}.py",
)
check examples in this repo containing s3 in the filename
the idempotency key also likely won't be needed if you register from CLI - check this topic for more info
i

Ievgenii Martynenko

05/09/2022, 9:35 AM
Thanks for your reply. We're doing S3 upload manually now, but it's ok if Prefect will do that for us. The only requirement is dataflow on S3 should have human-readable names.
👍 1
Running
prefect register --project xyz -p your_dir/
gives an error:
botocore.exceptions.NoCredentialsError: Unable to locate credentials
clearly I'm not asking prefect to load file to S3 at all as I'm not using 'local_script_path' variable in Storage definition
storage = S3(stored_as_script=True, key=f'flows/{project}/{name}.py', bucket='XXX')
a

Anna Geller

05/09/2022, 3:39 PM
I checked that and you are right. As long as you provide the
key
explicitly, Prefect will always try to upload your flow file to S3 during registration, even if the
local_script_path
is left out or set to None. If you want to disable that behavior, you can disable the build by setting it to False:
flow.register("community", build=False)
I could open an issue for tracking, but realistically, I wouldn't expect it to be prioritized given that Prefect 2.0 is the priority I would recommend disabling the build for Prefect 1.0 if manual upload to S3 is your preference. I can bring this use case as a feature request for Prefect 2.0 if you are interested in this way of deploying your flows
i

Ievgenii Martynenko

05/09/2022, 4:09 PM
I can use "build = False", but then we're back to my original question where I tried to register flow programmatically.
Failed to load and execute flow run: ValueError('Flow is not contained in this Storage')
I don't an option to disable build via CLI.
a

Anna Geller

05/09/2022, 5:06 PM
that's true, you can only disable that from the script. You can do it this way:
from prefect import Flow, task
from prefect.storage import S3
from prefect.run_configs import LocalRun


FLOW_NAME = "s3_local_run"
AGENT_LABEL = "dev"
STORAGE = S3(
    bucket="prefectdata",
    key=f"flows/{FLOW_NAME}.py",
    stored_as_script=True,
)

RUN_CONFIG = LocalRun(
    labels=[AGENT_LABEL],
)


@task(log_stdout=True)
def hello_world():
    print("Hello world!")


with Flow(
    FLOW_NAME,
    storage=STORAGE,
    run_config=RUN_CONFIG,
) as flow:
    hw = hello_world()

if __name__ == "__main__":
    flow_id = flow.register("community", build=False)
i

Ievgenii Martynenko

05/10/2022, 7:40 AM
We're going in circles.
Such approach is working when you register from the same file where you define a flow. Going back to my original question, we're registering flow from another script (in the first message) and when flow runs it fails with
Failed to load and execute flow run: ValueError('Flow is not contained in this Storage')
a

Anna Geller

05/10/2022, 12:32 PM
Sorry to hear that, I understand your frustration, but I'm afraid this is currently not possible. You would need to choose one of the following options: 1. Change your strategy to
flow.register("project", build=False)
with manually uploaded files to S3 + registering separately in each flow 2. Let Prefect upload the files to S3 automatically upon registration (rather than uploading the files yourself) and use CLI to bulk register all flows 3. Contribute a fix to the S3 storage and/or register CLI since due to the current focus on Prefect 2.0 we don't have the bandwidth to contribute a fix Again, I can totally understand your issue, but those are the currently available options
i

Ievgenii Martynenko

05/10/2022, 12:41 PM
I understand; if we give control to Prefect to upload DataFlows can we guarantee some more-less human-readable names on S3?
a

Anna Geller

05/10/2022, 12:43 PM
as I showed before, you can use:
STORAGE = S3(
    bucket="prefectdata",
    key=f"flows/{FLOW_NAME}.py",
    stored_as_script=True,
    # this will ensure to upload the Flow script to S3 during registration
    local_script_path=f"flows/{FLOW_NAME}.py",
)
check examples in this repo containing s3 in the filename
i

Ievgenii Martynenko

05/10/2022, 12:43 PM
In relation to option 1). if we have to register each flow separately in the same files where flow defined, what is the sense to write any common registration script.
a

Anna Geller

05/10/2022, 12:43 PM
this is the S3 key key=f"flows/{FLOW_NAME}.py",
no need for a registration script really, the recommended approach is to use CLI
i

Ievgenii Martynenko

05/10/2022, 1:16 PM
understood. In your example you don't provide aws_id and aws_secret to S3 storage, how does access is given then?
a

Anna Geller

05/11/2022, 1:55 PM
through the
aws configure
- you can authenticate your terminal with AWS before running it