Xavier Babu
09/23/2022, 11:31 PMIuliia Volkova
09/24/2022, 6:53 AMTadej Svetina
09/24/2022, 1:54 PMOliver Mannion
09/24/2022, 1:57 PMAlex Turek
09/24/2022, 7:24 PMFailed
, or skip retries when it's marked as Succeeded
, by my codeGeorgi Yanev
09/24/2022, 9:29 PMflavienbwk
09/25/2022, 3:10 PM@flow(name="get_paris_weather")
def get_paris_weather(
minio_endpoint: str,
minio_access_key: str,
minio_secret_key: str,
minio_use_ssl: bool,
bucket_name: str,
):
create_bucket(
minio_endpoint,
minio_access_key,
minio_secret_key,
minio_use_ssl,
bucket_name,
)
city_coordinates = get_city_coordinates("Paris")
return get_weather(city_coordinates[0], city_coordinates[1])
# --- Deployment definition
if __name__ == "__main__":
bucket_name = os.environ.get("MINIO_PREFECT_FLOWS_BUCKET_NAME")
minio_endpoint = os.environ.get("MINIO_ENDPOINT")
minio_use_ssl = os.environ.get("MINIO_USE_SSL") == "true"
minio_scheme = "https" if minio_use_ssl else "http"
minio_access_key = os.environ.get("MINIO_ACCESS_KEY")
minio_secret_key = os.environ.get("MINIO_SECRET_KEY")
flow_identifier = uuid.uuid4()
block_storage = RemoteFileSystem(
basepath=f"s3://{bucket_name}/{flow_identifier}",
key_type="hash",
settings=dict(
use_ssl=minio_use_ssl,
key=minio_access_key,
secret=minio_secret_key,
client_kwargs=dict(endpoint_url=f"{minio_scheme}://{minio_endpoint}"),
),
)
block_storage.save("s3-storage", overwrite=True)
deployment = Deployment.build_from_flow(
name="get_weather_s3_example",
flow=get_paris_weather,
storage=RemoteFileSystem.load("s3-storage"),
work_queue_name="flows-example-queue",
parameters={
minio_endpoint: minio_endpoint,
minio_access_key: minio_access_key,
minio_secret_key: minio_secret_key,
minio_use_ssl: minio_use_ssl,
bucket_name: bucket_name,
},
)
deployment.apply()
But what I get as error is :
prefect.exceptions.SignatureMismatchError: Function expects parameters ['minio_endpoint', 'minio_access_key', 'minio_secret_key', 'minio_use_ssl', 'bucket_name'] but was provided with parameters ['False', 'minio', 'minio123', 'prefect-flows', '172.17.0.1:9000']
Could you explain me how I can pass parameters to my flow ?flavienbwk
09/25/2022, 6:40 PMDeepanshu Aggarwal
09/25/2022, 8:10 PM张强
09/26/2022, 3:41 AMfuture = client.submit(func, *args, workers=['Alice'],
allow_other_workers=True)
JV
09/26/2022, 5:00 AMprefect.exceptions.ParameterTypeError: Flow run received invalid parameters:
- run_name: none is not an allowed value
in jobs_runs_submit_and_wait_for_completion
jobs_runs_state, jobs_runs_metadata = await jobs_runs_wait_for_completion(
ValueError: too many values to unpack (expected 2)
Ha Pham
09/26/2022, 9:05 AMAndreas Nord
09/26/2022, 9:44 AMHedgar
09/26/2022, 10:14 AMserverless.yml
file I don’t understand the KVPs PREFECT_API_KEY
and PREFECT_API_URL
are the values supposed to be variables or secrets and from where?Andreas Nord
09/26/2022, 10:20 AMErick Joca
09/26/2022, 11:28 AMAisha Merhebi
09/26/2022, 11:56 AMSylvain Hazard
09/26/2022, 12:14 PMdefault_params
that fill required parameters ? I want to make sure users can't run the flow without specifying some arguments but also schedule it.Blake Stefansen
09/26/2022, 2:32 PMXavier Babu
09/26/2022, 2:36 PMPhilip MacMenamin
09/26/2022, 3:19 PM@task(name="Task A")
def task_a(val):
if random.random() > 0.5:
raise ValueError(f"Non-deterministic error has occurred.{val}")
@task
def send_log(val):
<http://logger.info|logger.info>(f"hello {val}")
l = [1,2,3,4]
with Flow("Trigger example") as flow:
ta_s = task_a.map(val=l)
sl_s = send_log.map(val=l, upstream_tasks=[ta_s])
# flow run should be success if ANY of task_a's succeeded.
flow.set_reference_tasks([something])
Matt Delacour
09/26/2022, 3:38 PMWilliam Wolfe-McGuire
09/26/2022, 4:14 PMTraceback (most recent call last):
File "/home/wwolfe-mcguire/.conda/envs/dev_env/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/home/wwolfe-mcguire/.conda/envs/dev_env/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
executors.prepare_upstream_states_for_mapping(
File "/home/wwolfe-mcguire/.conda/envs/dev_env/lib/python3.9/site-packages/prefect/utilities/executors.py", line 681, in prepare_upstream_states_for_mapping
value = upstream_state.result[i]
KeyError: 0
[2022-09-26 15:42:45+0000] ERROR - prefect.ml-graph-energy | Unexpected error occured in FlowRunner: KeyError(0)
I wasn't able to find other examples of a similar error online and was hoping someone could give me some hints about how to start debugging this. thanksChris Gunderson
09/26/2022, 4:27 PMaws_credentials_block = AwsCredentials.load("****-user")
s3_block = S3.load("***-s3")
s3_bucket_block = S3Bucket(
bucket_name=s3_block.bucket_path,
aws_credentials=aws_credentials_block,
basepath=f"Allocations/{date.today().year}"
)
output_file = f'''SR Allocations {datetime.now().strftime('%Y%m%d-%H%M%S')}.csv'''
bytes_to_write = df_alloc.to_csv(None).encode()
csv_file = s3_bucket_block.write_path(path=output_file, content=bytes_to_write)
<http://logging.info|logging.info>('Filtering for **** Trades: Rows = %s, Accounts = %s' % (len(df_alloc), df_alloc.custodianAccnt.nunique()))
return csv_file
@flow(name = "***** Allocations")
def ****AllocationsFlow():
try:
slack_webhook_block = SlackWebhook.load("****-webhook")
state = allocations_process(return_state = True)
excel_file = state.result()
if 'Completed'== state.name:
slack_webhook_block.notify("**** Allocations was successful")
else:
slack_webhook_block.notify("***** Allocations failed")
Error:
cannot pickle 'coroutine' object
112331.736 | INFO | Task run 'Fidelity Allocations-e728df66-0' - Crash detected! Execution was interrupted by an unexpected exception.
112332.443 | ERROR | Flow run 'visionary-cat' - Finished in state Failed('1/1 states failed.')
Traceback (most recent call last):
File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
return future.result()
File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/client/orion.py", line 82, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/engine.py", line 239, in create_then_begin_flow_run
return state.result()
File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 159, in result
state.result()
File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
TypeError: cannot pickle 'coroutine' objectMichael Hadorn
09/26/2022, 4:40 PMKeith
09/26/2022, 4:56 PMfrom prefect2_flow import extract_load_transform
from prefect.deployments import Deployment
from prefect.infrastructure import KubernetesJob
from prefect.orion.schemas.schedules import CronSchedule
schedule = CronSchedule(cron="15 * * * *", timezone="UTC")
deployment = Deployment().build_from_flow(
flow=extract_load_transform,
name="test_hourly_elt",
parameters={
'collection_duration': '1h',
'uri': '<https://google.com>',
},
skip_upload=True,
schedule=schedule,
tags=["test"],
version=2,
work_queue_name="test-kubernetes",
infrastructure=KubernetesJob(
finished_job_ttl=30,
image="us-central1-docker.pkg.dev/.../prefect2-flows/elt:latest",
image_pull_policy="Always",
namespace="test-prefect2",
pod_watch_timeout_seconds=180,
job={
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {"labels": {}},
"spec": {
"template": {
"spec": {
"parallelism": 1,
"completions": 1,
"restartPolicy": "Never",
"containers": [
{
"name": "prefect-job",
"env": [],
"resources": {
"requests": {
"memory": "5Gi",
"cpu": "2",
"ephemeral-storage": "1Gi",
}
}
}
],
}
}
},
}
)
)
if __name__ == "__main__":
deployment.apply()
1. Is it okay to set skip_upload
to true? Since we are using Docker I'm not sure what the benefit of uploading our project to GCS is.
2. The Prefect API has a parameter for is_schedule_active
but it doesn't look like that parameter has made it over to the Python API b/c when I try to add it above it gives me an error about not including more parameters than needed, is this something I can contribute to or add functionality for?
3. The k8s job configuration is pretty verbose to request resources above the default, is there a better way to set these that I am missing?
Thank you for the time and any help provided!Nathaniel Russell
09/26/2022, 5:10 PMFROM prefecthq/prefect:2.4.2-python3.9
COPY service.py .
COPY requirements.txt .
RUN pip install -r requirements.txt
CMD [ "service.handler" ]
For example this doesn't work because I am not using aws' base image so CMD [ "service.handler" ] isn't properly setting the handlerRoger Webb
09/26/2022, 7:28 PMLeon Kozlowski
09/26/2022, 7:40 PMSean Turner
09/26/2022, 7:42 PMRemoteFileSystem
.
rfs = RemoteFileSystem(basepath="<s3://bucket-name/some/path/on/bucket>")
cfg = rfs.read_path("base_config.yaml")
I end up with cfg
being a coroutine
. I imagine this returns bytes but there aren't any helper methods and it's not clear how I'm supposed to proceed? I am interested in reading this file into memory.