Naimesh Chaudhari
04/12/2022, 3:56 PMAnders Segerberg
04/12/2022, 4:20 PMcreate_flow_run
, with the idempotency_key
set to this file path.
Running the flow the first time, will write this key. Running it a second time, won't -- as expected, because of the idempotency behavior.
What I don't understand is that the second flow run has every task state set to 'success' -- but wouldn't I expect them to be 'cached' or something else, indicating that the flow isn't being re-ran?Prasanth Kothuri
04/12/2022, 4:31 PMFile "/usr/local/lib/python3.8/dist-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.8/dist-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle '_thread.lock' object
to make it simple it has just one task as below
@task(log_stdout=True)
def get_file_names():
files = s3.Bucket(s3_bucket).objects.all()
file_names = []
for my_bucket_object in files:
file_name = my_bucket_object.key
regex = re.search(r".ctl", str(file_name))
if regex is not None:
file_names.append(file_name)
return file_names
and flow
# flow to chain the tasks
with Flow("my_flow", storage=storage, schedule=schedule) as f:
ctl_files = get_file_names()
any ideas why prefect is unable to serialize / pickle ???Josh
04/12/2022, 6:00 PMFlow run is cancelling…
message for 2 weeks. Is there anything I can do to let prefect cloud know that the flow is actually dead and never succeeded?Atsushi Saito
04/12/2022, 6:18 PMkiran
04/12/2022, 7:33 PMtry/except/else/finally
less often than before (in my main code) because I figure prefect will catch (and log) things. I still find uses for it inside actual tasks/functions. Is this what other people have done or am I thinking about it the wrong way? Thanks!Patrick Tan
04/12/2022, 7:54 PMDavid Yang
04/12/2022, 8:12 PMGreg Kennedy
04/13/2022, 12:08 AMCarlos Cueto
04/13/2022, 2:09 AM<Task: fetch_data>
. This is what I'm trying to do:
@task
def fetch_data():
return {"data": "random data"}
with Flow('Get-Data') as flow:
flow.run_config = LocalRun()
data = fetch_data()
<http://logger.info|logger.info>(data)
flow.run()
Bihag Kashikar
04/13/2022, 5:40 AMLeanna Morinishi
04/13/2022, 6:27 AMJacob Blanco
04/13/2022, 7:01 AMTrung Đô Trần
04/13/2022, 8:24 AMOlivér Atanaszov
04/13/2022, 9:19 AMStephen Lloyd
04/13/2022, 11:04 AMwith Flow(PREFECT_FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG) as flow:
creds = get_credentials()
conn = get_connection(creds)
tables = get_tables()
save_data = load_to_s3.map(tables, conn=unmapped(conn))
conn.set_upstream(creds)
save_data.set_upstream(tables)
save_data.set_upstream(conn)
It’s failing on get_connection
and the relevant code is:
@task
def get_credentials():
return PrefectSecret(PREFECT_FLOW_NAME).run()
@task
def get_connection(creds):
return connectors.get_redshift_connector(creds)
# from another file...
import redshift_connector
def get_redshift_connector(creds: dict) -> object:
conn = redshift_connector.connect(
host=creds['host'],
database=creds['database'],
user=creds['user'],
password=creds['password']
)
return conn
When I move to running in ECS, it fails with the following trace…
Unexpected error: TypeError("cannot pickle 'Struct' object")
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 930, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/s3_result.py", line 71, in write
binary_data = new.serializer.serialize(new.value)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/serializers.py", line 73, in serialize
return cloudpickle.dumps(value)
File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle 'Struct' object
Malthe Karbo
04/13/2022, 12:21 PM14:17:55.129 | ERROR | Flow run 'arrogant-yak' - Crash detected! Execution was interrupted by an unexpected exception.
followed by prefect.exceptions.Abort: This run has already terminated.
.
This happens regardless of task success/failure. Again, running without cloud this works fineStephen Herron
04/13/2022, 12:40 PMinvokeLambda
task.
I think we need to pass it some extra config through boto_kwargs. Has anyone came across needing to do this before?Tom Klein
04/13/2022, 1:39 PMRunNamespacedJob
as a failure (within the job itself) ? or is it impossible because the command is async?Matthew Seligson
04/13/2022, 2:07 PMTom Klein
04/13/2022, 3:31 PMRunNamespacedJob
example (from https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_task_library/s3_kubernetes_run_RunNamespacedJob_and_get_logs.py ) --- we implemented it and got it to work, but it seems that it’s now failing on :
VALIDATIONFAIL signal raised: VALIDATIONFAIL('More than one dummy pod')
because there seems to be many pod “resiudes” of previous runs:
['prefect-agent-7745fb9694-6fwk4', 'prefect-job-47d072a8-4pbsf', 'seg-pred-test-cm54l', 'seg-pred-test-doron', 'seg-pred-test-l2j5l', 'seg-pred-test-zvwld']
so wouldn’t k8s keep the pods around given that we gave a “delete_job_after_completion” = False ? and even if the job is deleted successfully, wouldn’t it keep the pods around? or are the pods supposed to be deleted automatically if the job is deleted…?David Yang
04/13/2022, 3:49 PMChris Reuter
04/13/2022, 4:00 PMKevin Mullins
04/13/2022, 5:02 PMAzureResult
to store task results for my flows. I’ve already got separate storage accounts per environment (dev, qa, prod). I’m curious if it would be recommended to use separate containers for different Prefect projects and/or flows or if it is ok to just store all the results in the same blob container for the environment.
My hesitation for further separation is it appears that the AzureResult requires the container to already exist, so I would need to orchestrate something creating containers for each Prefect project. Not a big deal but just trying to get a feel for good practices.
Any thoughts appreciated.Matt Alhonte
04/13/2022, 6:39 PMChris Reuter
04/13/2022, 6:45 PMZach Munro
04/13/2022, 7:03 PMZach Munro
04/13/2022, 7:03 PMZach Munro
04/13/2022, 7:13 PMRajan Subramanian
04/13/2022, 10:15 PMtemporary failure in name resolution
I'm not even sure whom to ask this, but since i was using prefect i figured someone here knows about this.
I raised it here, https://stackoverflow.com/questions/71864208/unable-to-connect-to-redis-elasticcache-from-fargate, curious if someone had any suggestions? My fargate profile has same 4 subnets that my cluster in elastic cache has. they also have the same security group.Rajan Subramanian
04/13/2022, 10:15 PMtemporary failure in name resolution
I'm not even sure whom to ask this, but since i was using prefect i figured someone here knows about this.
I raised it here, https://stackoverflow.com/questions/71864208/unable-to-connect-to-redis-elasticcache-from-fargate, curious if someone had any suggestions? My fargate profile has same 4 subnets that my cluster in elastic cache has. they also have the same security group.Anna Geller
04/14/2022, 12:36 AMRajan Subramanian
04/14/2022, 5:41 AMAnna Geller
04/14/2022, 9:09 AMrecords = [
{
"Time": now,
"TimeUnit": "MILLISECONDS",
"Dimensions": [{"Name": "crypto", "Value": "BTC"}],
"MeasureName": "Price",
"MeasureValue": str(btc),
"MeasureValueType": "DOUBLE",
},
{
"Time": now,
"TimeUnit": "MILLISECONDS",
"Dimensions": [{"Name": "crypto", "Value": "ETH"}],
"MeasureName": "Price",
"MeasureValue": str(eth),
"MeasureValueType": "DOUBLE",
},
{
"Time": now,
"TimeUnit": "MILLISECONDS",
"Dimensions": [{"Name": "crypto", "Value": "DASH"}],
"MeasureName": "Price",
"MeasureValue": str(dash),
"MeasureValueType": "DOUBLE",
},
{
"Time": now,
"TimeUnit": "MILLISECONDS",
"Dimensions": [{"Name": "crypto", "Value": "REP"}],
"MeasureName": "Price",
"MeasureValue": str(rep),
"MeasureValueType": "DOUBLE",
},
]
and then to write the records:
import boto3
write_client = boto3.client("timestream-write")
rejected_records = write_client.write_records(
DatabaseName="demo", TableName="data", Records=records, CommonAttributes={}
)
print(rejected_records)
this blog post shows more details, and this one has a full example incl. a guide how to build a Grafana dashboards for that 🙂Rajan Subramanian
04/14/2022, 6:43 PMAnna Geller
04/14/2022, 6:49 PMRajan Subramanian
04/14/2022, 7:03 PMAnna Geller
04/14/2022, 7:17 PM