Morning, I'm trying to register flow from another ...
# prefect-community
i
Morning, I'm trying to register flow from another python (like it's described in https://github.com/PrefectHQ/prefect/discussions/4042) My code which does registration:
Copy code
import os.path as p

from <http://flows.xxx|flows.xxx> import testflow
from prefect.storage import S3

flows = [testflow]

root = p.dirname(p.realpath(__file__))

storage = S3(stored_as_script=True, key='testflow.py', bucket='test')

if __name__ == '__main__':

    for flow_file in flows:
        flow = flow_file.flow

        print(f"Registering flow {flow.name} from {flow_file}")

        storage.add_flow(flow)

        flow.register(
            project_name='test',
            idempotency_key=flow.serialized_hash()
        )
Flow itself:
Copy code
name = "testflow"
executor = LocalDaskExecutor()
storage = S3(stored_as_script=True, key='testflow.py', bucket='test')
run_config = KubernetesRun(
    job_template_path='<https://XXX/job_template/k8s_job_template.yaml>')

with Flow(name=name, executor=executor, storage=storage, run_config=run_config) as flow:
	....
Error I've got is the same when you're not adding storage to DataFlow:
Copy code
Failed to load and execute flow run: ValueError('Flow is not contained in this Storage')
What am I missing?
a
If your end goal is to register all flows in a given directory, you don't need to loop over flows, you can use Prefect CLI:
Copy code
prefect register --project xyz -p your_dir/
This will register all flows in
your_dir
without any custom registration script. Also, no need to do
storage.add_flow(flow)
, this is done during registration by Prefect. Are you uploading the S3 files yourself, or do you want Prefect to upload the flow file to S3 on registration? If the latter, you can use:
Copy code
STORAGE = S3(
    bucket="prefectdata",
    key=f"flows/{FLOW_NAME}.py",
    stored_as_script=True,
    # this will ensure to upload the Flow script to S3 during registration
    local_script_path=f"flows/{FLOW_NAME}.py",
)
check examples in this repo containing s3 in the filename
the idempotency key also likely won't be needed if you register from CLI - check this topic for more info
i
Thanks for your reply. We're doing S3 upload manually now, but it's ok if Prefect will do that for us. The only requirement is dataflow on S3 should have human-readable names.
👍 1
Running
Copy code
prefect register --project xyz -p your_dir/
gives an error:
Copy code
botocore.exceptions.NoCredentialsError: Unable to locate credentials
clearly I'm not asking prefect to load file to S3 at all as I'm not using 'local_script_path' variable in Storage definition
Copy code
storage = S3(stored_as_script=True, key=f'flows/{project}/{name}.py', bucket='XXX')
a
I checked that and you are right. As long as you provide the
key
explicitly, Prefect will always try to upload your flow file to S3 during registration, even if the
local_script_path
is left out or set to None. If you want to disable that behavior, you can disable the build by setting it to False:
Copy code
flow.register("community", build=False)
I could open an issue for tracking, but realistically, I wouldn't expect it to be prioritized given that Prefect 2.0 is the priority I would recommend disabling the build for Prefect 1.0 if manual upload to S3 is your preference. I can bring this use case as a feature request for Prefect 2.0 if you are interested in this way of deploying your flows
i
I can use "build = False", but then we're back to my original question where I tried to register flow programmatically.
Copy code
Failed to load and execute flow run: ValueError('Flow is not contained in this Storage')
I don't an option to disable build via CLI.
a
that's true, you can only disable that from the script. You can do it this way:
Copy code
from prefect import Flow, task
from prefect.storage import S3
from prefect.run_configs import LocalRun


FLOW_NAME = "s3_local_run"
AGENT_LABEL = "dev"
STORAGE = S3(
    bucket="prefectdata",
    key=f"flows/{FLOW_NAME}.py",
    stored_as_script=True,
)

RUN_CONFIG = LocalRun(
    labels=[AGENT_LABEL],
)


@task(log_stdout=True)
def hello_world():
    print("Hello world!")


with Flow(
    FLOW_NAME,
    storage=STORAGE,
    run_config=RUN_CONFIG,
) as flow:
    hw = hello_world()

if __name__ == "__main__":
    flow_id = flow.register("community", build=False)
i
We're going in circles.
Such approach is working when you register from the same file where you define a flow. Going back to my original question, we're registering flow from another script (in the first message) and when flow runs it fails with
Copy code
Failed to load and execute flow run: ValueError('Flow is not contained in this Storage')
a
Sorry to hear that, I understand your frustration, but I'm afraid this is currently not possible. You would need to choose one of the following options: 1. Change your strategy to
flow.register("project", build=False)
with manually uploaded files to S3 + registering separately in each flow 2. Let Prefect upload the files to S3 automatically upon registration (rather than uploading the files yourself) and use CLI to bulk register all flows 3. Contribute a fix to the S3 storage and/or register CLI since due to the current focus on Prefect 2.0 we don't have the bandwidth to contribute a fix Again, I can totally understand your issue, but those are the currently available options
i
I understand; if we give control to Prefect to upload DataFlows can we guarantee some more-less human-readable names on S3?
a
as I showed before, you can use:
Copy code
STORAGE = S3(
    bucket="prefectdata",
    key=f"flows/{FLOW_NAME}.py",
    stored_as_script=True,
    # this will ensure to upload the Flow script to S3 during registration
    local_script_path=f"flows/{FLOW_NAME}.py",
)
check examples in this repo containing s3 in the filename
i
In relation to option 1). if we have to register each flow separately in the same files where flow defined, what is the sense to write any common registration script.
a
this is the S3 key key=f"flows/{FLOW_NAME}.py",
no need for a registration script really, the recommended approach is to use CLI
i
understood. In your example you don't provide aws_id and aws_secret to S3 storage, how does access is given then?
a
through the
aws configure
- you can authenticate your terminal with AWS before running it